fetch.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. use bit_set::BitSet;
  2. use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
  3. use futures::sync::{mpsc, oneshot};
  4. use futures::Stream;
  5. use futures::{Async, Future, Poll};
  6. use std::cmp::min;
  7. use std::fs;
  8. use std::io::{self, Read, Seek, SeekFrom, Write};
  9. use std::sync::{Arc, Condvar, Mutex};
  10. use tempfile::NamedTempFile;
  11. use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders};
  12. use librespot_core::session::Session;
  13. use librespot_core::spotify_id::FileId;
  14. const CHUNK_SIZE: usize = 0x20000;
  15. pub enum AudioFile {
  16. Cached(fs::File),
  17. Streaming(AudioFileStreaming),
  18. }
  19. pub enum AudioFileOpen {
  20. Cached(Option<fs::File>),
  21. Streaming(AudioFileOpenStreaming),
  22. }
  23. pub struct AudioFileOpenStreaming {
  24. session: Session,
  25. data_rx: Option<ChannelData>,
  26. headers: ChannelHeaders,
  27. file_id: FileId,
  28. complete_tx: Option<oneshot::Sender<NamedTempFile>>,
  29. }
  30. pub struct AudioFileStreaming {
  31. read_file: fs::File,
  32. position: u64,
  33. seek: mpsc::UnboundedSender<u64>,
  34. shared: Arc<AudioFileShared>,
  35. }
  36. struct AudioFileShared {
  37. file_id: FileId,
  38. chunk_count: usize,
  39. cond: Condvar,
  40. bitmap: Mutex<BitSet>,
  41. }
  42. impl AudioFileOpenStreaming {
  43. fn finish(&mut self, size: usize) -> AudioFileStreaming {
  44. let chunk_count = (size + CHUNK_SIZE - 1) / CHUNK_SIZE;
  45. let shared = Arc::new(AudioFileShared {
  46. file_id: self.file_id,
  47. chunk_count: chunk_count,
  48. cond: Condvar::new(),
  49. bitmap: Mutex::new(BitSet::with_capacity(chunk_count)),
  50. });
  51. let mut write_file = NamedTempFile::new().unwrap();
  52. write_file.as_file().set_len(size as u64).unwrap();
  53. write_file.seek(SeekFrom::Start(0)).unwrap();
  54. let read_file = write_file.reopen().unwrap();
  55. let data_rx = self.data_rx.take().unwrap();
  56. let complete_tx = self.complete_tx.take().unwrap();
  57. let (seek_tx, seek_rx) = mpsc::unbounded();
  58. let fetcher = AudioFileFetch::new(
  59. self.session.clone(),
  60. shared.clone(),
  61. data_rx,
  62. write_file,
  63. seek_rx,
  64. complete_tx,
  65. );
  66. self.session.spawn(move |_| fetcher);
  67. AudioFileStreaming {
  68. read_file: read_file,
  69. position: 0,
  70. seek: seek_tx,
  71. shared: shared,
  72. }
  73. }
  74. }
  75. impl Future for AudioFileOpen {
  76. type Item = AudioFile;
  77. type Error = ChannelError;
  78. fn poll(&mut self) -> Poll<AudioFile, ChannelError> {
  79. match *self {
  80. AudioFileOpen::Streaming(ref mut open) => {
  81. let file = try_ready!(open.poll());
  82. Ok(Async::Ready(AudioFile::Streaming(file)))
  83. }
  84. AudioFileOpen::Cached(ref mut file) => {
  85. let file = file.take().unwrap();
  86. Ok(Async::Ready(AudioFile::Cached(file)))
  87. }
  88. }
  89. }
  90. }
  91. impl Future for AudioFileOpenStreaming {
  92. type Item = AudioFileStreaming;
  93. type Error = ChannelError;
  94. fn poll(&mut self) -> Poll<AudioFileStreaming, ChannelError> {
  95. loop {
  96. let (id, data) = try_ready!(self.headers.poll()).unwrap();
  97. if id == 0x3 {
  98. let size = BigEndian::read_u32(&data) as usize * 4;
  99. let file = self.finish(size);
  100. return Ok(Async::Ready(file));
  101. }
  102. }
  103. }
  104. }
  105. impl AudioFile {
  106. pub fn open(session: &Session, file_id: FileId) -> AudioFileOpen {
  107. let cache = session.cache().cloned();
  108. if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) {
  109. debug!("File {} already in cache", file_id);
  110. return AudioFileOpen::Cached(Some(file));
  111. }
  112. debug!("Downloading file {}", file_id);
  113. let (complete_tx, complete_rx) = oneshot::channel();
  114. let (headers, data) = request_chunk(session, file_id, 0).split();
  115. let open = AudioFileOpenStreaming {
  116. session: session.clone(),
  117. file_id: file_id,
  118. headers: headers,
  119. data_rx: Some(data),
  120. complete_tx: Some(complete_tx),
  121. };
  122. let session_ = session.clone();
  123. session.spawn(move |_| {
  124. complete_rx
  125. .map(move |mut file| {
  126. if let Some(cache) = session_.cache() {
  127. cache.save_file(file_id, &mut file);
  128. debug!("File {} complete, saving to cache", file_id);
  129. } else {
  130. debug!("File {} complete", file_id);
  131. }
  132. })
  133. .or_else(|oneshot::Canceled| Ok(()))
  134. });
  135. AudioFileOpen::Streaming(open)
  136. }
  137. }
  138. fn request_chunk(session: &Session, file: FileId, index: usize) -> Channel {
  139. trace!("requesting chunk {}", index);
  140. let start = (index * CHUNK_SIZE / 4) as u32;
  141. let end = ((index + 1) * CHUNK_SIZE / 4) as u32;
  142. let (id, channel) = session.channel().allocate();
  143. let mut data: Vec<u8> = Vec::new();
  144. data.write_u16::<BigEndian>(id).unwrap();
  145. data.write_u8(0).unwrap();
  146. data.write_u8(1).unwrap();
  147. data.write_u16::<BigEndian>(0x0000).unwrap();
  148. data.write_u32::<BigEndian>(0x00000000).unwrap();
  149. data.write_u32::<BigEndian>(0x00009C40).unwrap();
  150. data.write_u32::<BigEndian>(0x00020000).unwrap();
  151. data.write(&file.0).unwrap();
  152. data.write_u32::<BigEndian>(start).unwrap();
  153. data.write_u32::<BigEndian>(end).unwrap();
  154. session.send_packet(0x8, data);
  155. channel
  156. }
  157. struct AudioFileFetch {
  158. session: Session,
  159. shared: Arc<AudioFileShared>,
  160. output: Option<NamedTempFile>,
  161. index: usize,
  162. data_rx: ChannelData,
  163. seek_rx: mpsc::UnboundedReceiver<u64>,
  164. complete_tx: Option<oneshot::Sender<NamedTempFile>>,
  165. }
  166. impl AudioFileFetch {
  167. fn new(
  168. session: Session,
  169. shared: Arc<AudioFileShared>,
  170. data_rx: ChannelData,
  171. output: NamedTempFile,
  172. seek_rx: mpsc::UnboundedReceiver<u64>,
  173. complete_tx: oneshot::Sender<NamedTempFile>,
  174. ) -> AudioFileFetch {
  175. AudioFileFetch {
  176. session: session,
  177. shared: shared,
  178. output: Some(output),
  179. index: 0,
  180. data_rx: data_rx,
  181. seek_rx: seek_rx,
  182. complete_tx: Some(complete_tx),
  183. }
  184. }
  185. fn download(&mut self, mut new_index: usize) {
  186. assert!(new_index < self.shared.chunk_count);
  187. {
  188. let bitmap = self.shared.bitmap.lock().unwrap();
  189. while bitmap.contains(new_index) {
  190. new_index = (new_index + 1) % self.shared.chunk_count;
  191. }
  192. }
  193. if self.index != new_index {
  194. self.index = new_index;
  195. let offset = self.index * CHUNK_SIZE;
  196. self.output
  197. .as_mut()
  198. .unwrap()
  199. .seek(SeekFrom::Start(offset as u64))
  200. .unwrap();
  201. let (_headers, data) = request_chunk(&self.session, self.shared.file_id, self.index).split();
  202. self.data_rx = data;
  203. }
  204. }
  205. fn finish(&mut self) {
  206. let mut output = self.output.take().unwrap();
  207. let complete_tx = self.complete_tx.take().unwrap();
  208. output.seek(SeekFrom::Start(0)).unwrap();
  209. let _ = complete_tx.send(output);
  210. }
  211. }
  212. impl Future for AudioFileFetch {
  213. type Item = ();
  214. type Error = ();
  215. fn poll(&mut self) -> Poll<(), ()> {
  216. loop {
  217. let mut progress = false;
  218. match self.seek_rx.poll() {
  219. Ok(Async::Ready(None)) => {
  220. return Ok(Async::Ready(()));
  221. }
  222. Ok(Async::Ready(Some(offset))) => {
  223. progress = true;
  224. let index = offset as usize / CHUNK_SIZE;
  225. self.download(index);
  226. }
  227. Ok(Async::NotReady) => (),
  228. Err(()) => unreachable!(),
  229. }
  230. match self.data_rx.poll() {
  231. Ok(Async::Ready(Some(data))) => {
  232. progress = true;
  233. self.output.as_mut().unwrap().write_all(data.as_ref()).unwrap();
  234. }
  235. Ok(Async::Ready(None)) => {
  236. progress = true;
  237. trace!("chunk {} / {} complete", self.index, self.shared.chunk_count);
  238. let full = {
  239. let mut bitmap = self.shared.bitmap.lock().unwrap();
  240. bitmap.insert(self.index as usize);
  241. self.shared.cond.notify_all();
  242. bitmap.len() >= self.shared.chunk_count
  243. };
  244. if full {
  245. self.finish();
  246. return Ok(Async::Ready(()));
  247. }
  248. let new_index = (self.index + 1) % self.shared.chunk_count;
  249. self.download(new_index);
  250. }
  251. Ok(Async::NotReady) => (),
  252. Err(ChannelError) => {
  253. warn!("error from channel");
  254. return Ok(Async::Ready(()));
  255. }
  256. }
  257. if !progress {
  258. return Ok(Async::NotReady);
  259. }
  260. }
  261. }
  262. }
  263. impl Read for AudioFileStreaming {
  264. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  265. let index = self.position as usize / CHUNK_SIZE;
  266. let offset = self.position as usize % CHUNK_SIZE;
  267. let len = min(output.len(), CHUNK_SIZE - offset);
  268. let mut bitmap = self.shared.bitmap.lock().unwrap();
  269. while !bitmap.contains(index) {
  270. bitmap = self.shared.cond.wait(bitmap).unwrap();
  271. }
  272. drop(bitmap);
  273. let read_len = try!(self.read_file.read(&mut output[..len]));
  274. self.position += read_len as u64;
  275. Ok(read_len)
  276. }
  277. }
  278. impl Seek for AudioFileStreaming {
  279. fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
  280. self.position = try!(self.read_file.seek(pos));
  281. // Do not seek past EOF
  282. if (self.position as usize % CHUNK_SIZE) != 0 {
  283. // Notify the fetch thread to get the correct block
  284. // This can fail if fetch thread has completed, in which case the
  285. // block is ready. Just ignore the error.
  286. let _ = self.seek.unbounded_send(self.position);
  287. } else {
  288. warn!("Trying to seek past EOF");
  289. }
  290. Ok(self.position)
  291. }
  292. }
  293. impl Read for AudioFile {
  294. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  295. match *self {
  296. AudioFile::Cached(ref mut file) => file.read(output),
  297. AudioFile::Streaming(ref mut file) => file.read(output),
  298. }
  299. }
  300. }
  301. impl Seek for AudioFile {
  302. fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
  303. match *self {
  304. AudioFile::Cached(ref mut file) => file.seek(pos),
  305. AudioFile::Streaming(ref mut file) => file.seek(pos),
  306. }
  307. }
  308. }