fetch.rs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. use bit_set::BitSet;
  2. use byteorder::{ByteOrder, BigEndian, WriteBytesExt};
  3. use futures::Stream;
  4. use futures::sync::{oneshot, mpsc};
  5. use futures::{Poll, Async, Future};
  6. use std::cmp::min;
  7. use std::fs;
  8. use std::io::{self, Read, Write, Seek, SeekFrom};
  9. use std::sync::{Arc, Condvar, Mutex};
  10. use tempfile::NamedTempFile;
  11. use core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders};
  12. use core::session::Session;
  13. use core::util::FileId;
  14. const CHUNK_SIZE: usize = 0x20000;
  15. pub enum AudioFile {
  16. Cached(fs::File),
  17. Streaming(AudioFileStreaming),
  18. }
  19. pub enum AudioFileOpen {
  20. Cached(Option<fs::File>),
  21. Streaming(AudioFileOpenStreaming),
  22. }
  23. pub struct AudioFileOpenStreaming {
  24. session: Session,
  25. data_rx: Option<ChannelData>,
  26. headers: ChannelHeaders,
  27. file_id: FileId,
  28. complete_tx: Option<oneshot::Sender<NamedTempFile>>,
  29. }
  30. pub struct AudioFileStreaming {
  31. read_file: fs::File,
  32. position: u64,
  33. seek: mpsc::UnboundedSender<u64>,
  34. shared: Arc<AudioFileShared>,
  35. }
  36. struct AudioFileShared {
  37. file_id: FileId,
  38. chunk_count: usize,
  39. cond: Condvar,
  40. bitmap: Mutex<BitSet>,
  41. }
  42. impl AudioFileOpenStreaming {
  43. fn finish(&mut self, size: usize) -> AudioFileStreaming {
  44. let chunk_count = (size + CHUNK_SIZE - 1) / CHUNK_SIZE;
  45. let shared = Arc::new(AudioFileShared {
  46. file_id: self.file_id,
  47. chunk_count: chunk_count,
  48. cond: Condvar::new(),
  49. bitmap: Mutex::new(BitSet::with_capacity(chunk_count)),
  50. });
  51. let mut write_file = NamedTempFile::new().unwrap();
  52. write_file.set_len(size as u64).unwrap();
  53. write_file.seek(SeekFrom::Start(0)).unwrap();
  54. let read_file = write_file.reopen().unwrap();
  55. let data_rx = self.data_rx.take().unwrap();
  56. let complete_tx = self.complete_tx.take().unwrap();
  57. let (seek_tx, seek_rx) = mpsc::unbounded();
  58. let fetcher = AudioFileFetch::new(
  59. self.session.clone(), shared.clone(), data_rx, write_file, seek_rx, complete_tx
  60. );
  61. self.session.spawn(move |_| fetcher);
  62. AudioFileStreaming {
  63. read_file: read_file,
  64. position: 0,
  65. seek: seek_tx,
  66. shared: shared,
  67. }
  68. }
  69. }
  70. impl Future for AudioFileOpen {
  71. type Item = AudioFile;
  72. type Error = ChannelError;
  73. fn poll(&mut self) -> Poll<AudioFile, ChannelError> {
  74. match *self {
  75. AudioFileOpen::Streaming(ref mut open) => {
  76. let file = try_ready!(open.poll());
  77. Ok(Async::Ready(AudioFile::Streaming(file)))
  78. }
  79. AudioFileOpen::Cached(ref mut file) => {
  80. let file = file.take().unwrap();
  81. Ok(Async::Ready(AudioFile::Cached(file)))
  82. }
  83. }
  84. }
  85. }
  86. impl Future for AudioFileOpenStreaming {
  87. type Item = AudioFileStreaming;
  88. type Error = ChannelError;
  89. fn poll(&mut self) -> Poll<AudioFileStreaming, ChannelError> {
  90. loop {
  91. let (id, data) = try_ready!(self.headers.poll()).unwrap();
  92. if id == 0x3 {
  93. let size = BigEndian::read_u32(&data) as usize * 4;
  94. let file = self.finish(size);
  95. return Ok(Async::Ready(file));
  96. }
  97. }
  98. }
  99. }
  100. impl AudioFile {
  101. pub fn open(session: &Session, file_id: FileId) -> AudioFileOpen {
  102. let cache = session.cache().cloned();
  103. if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) {
  104. debug!("File {} already in cache", file_id);
  105. return AudioFileOpen::Cached(Some(file));
  106. }
  107. debug!("Downloading file {}", file_id);
  108. let (complete_tx, complete_rx) = oneshot::channel();
  109. let (headers, data) = request_chunk(session, file_id, 0).split();
  110. let open = AudioFileOpenStreaming {
  111. session: session.clone(),
  112. file_id: file_id,
  113. headers: headers,
  114. data_rx: Some(data),
  115. complete_tx: Some(complete_tx),
  116. };
  117. let session_ = session.clone();
  118. session.spawn(move |_| {
  119. complete_rx.map(move |mut file| {
  120. if let Some(cache) = session_.cache() {
  121. cache.save_file(file_id, &mut file);
  122. debug!("File {} complete, saving to cache", file_id);
  123. } else {
  124. debug!("File {} complete", file_id);
  125. }
  126. }).or_else(|oneshot::Canceled| Ok(()))
  127. });
  128. AudioFileOpen::Streaming(open)
  129. }
  130. }
  131. fn request_chunk(session: &Session, file: FileId, index: usize) -> Channel {
  132. trace!("requesting chunk {}", index);
  133. let start = (index * CHUNK_SIZE / 4) as u32;
  134. let end = ((index + 1) * CHUNK_SIZE / 4) as u32;
  135. let (id, channel) = session.channel().allocate();
  136. let mut data: Vec<u8> = Vec::new();
  137. data.write_u16::<BigEndian>(id).unwrap();
  138. data.write_u8(0).unwrap();
  139. data.write_u8(1).unwrap();
  140. data.write_u16::<BigEndian>(0x0000).unwrap();
  141. data.write_u32::<BigEndian>(0x00000000).unwrap();
  142. data.write_u32::<BigEndian>(0x00009C40).unwrap();
  143. data.write_u32::<BigEndian>(0x00020000).unwrap();
  144. data.write(&file.0).unwrap();
  145. data.write_u32::<BigEndian>(start).unwrap();
  146. data.write_u32::<BigEndian>(end).unwrap();
  147. session.send_packet(0x8, data);
  148. channel
  149. }
  150. struct AudioFileFetch {
  151. session: Session,
  152. shared: Arc<AudioFileShared>,
  153. output: Option<NamedTempFile>,
  154. index: usize,
  155. data_rx: ChannelData,
  156. seek_rx: mpsc::UnboundedReceiver<u64>,
  157. complete_tx: Option<oneshot::Sender<NamedTempFile>>,
  158. }
  159. impl AudioFileFetch {
  160. fn new(session: Session, shared: Arc<AudioFileShared>,
  161. data_rx: ChannelData, output: NamedTempFile,
  162. seek_rx: mpsc::UnboundedReceiver<u64>,
  163. complete_tx: oneshot::Sender<NamedTempFile>) -> AudioFileFetch
  164. {
  165. AudioFileFetch {
  166. session: session,
  167. shared: shared,
  168. output: Some(output),
  169. index: 0,
  170. data_rx: data_rx,
  171. seek_rx: seek_rx,
  172. complete_tx: Some(complete_tx),
  173. }
  174. }
  175. fn download(&mut self, mut new_index: usize) {
  176. assert!(new_index < self.shared.chunk_count);
  177. {
  178. let bitmap = self.shared.bitmap.lock().unwrap();
  179. while bitmap.contains(new_index) {
  180. new_index = (new_index + 1) % self.shared.chunk_count;
  181. }
  182. }
  183. if self.index != new_index {
  184. self.index = new_index;
  185. let offset = self.index * CHUNK_SIZE;
  186. self.output.as_mut().unwrap()
  187. .seek(SeekFrom::Start(offset as u64)).unwrap();
  188. let (_headers, data) = request_chunk(&self.session, self.shared.file_id, self.index).split();
  189. self.data_rx = data;
  190. }
  191. }
  192. fn finish(&mut self) {
  193. let mut output = self.output.take().unwrap();
  194. let complete_tx = self.complete_tx.take().unwrap();
  195. output.seek(SeekFrom::Start(0)).unwrap();
  196. let _ = complete_tx.send(output);
  197. }
  198. }
  199. impl Future for AudioFileFetch {
  200. type Item = ();
  201. type Error = ();
  202. fn poll(&mut self) -> Poll<(), ()> {
  203. loop {
  204. let mut progress = false;
  205. match self.seek_rx.poll() {
  206. Ok(Async::Ready(None)) => {
  207. return Ok(Async::Ready(()));
  208. }
  209. Ok(Async::Ready(Some(offset))) => {
  210. progress = true;
  211. let index = offset as usize / CHUNK_SIZE;
  212. self.download(index);
  213. }
  214. Ok(Async::NotReady) => (),
  215. Err(()) => unreachable!(),
  216. }
  217. match self.data_rx.poll() {
  218. Ok(Async::Ready(Some(data))) => {
  219. progress = true;
  220. self.output.as_mut().unwrap()
  221. .write_all(data.as_ref()).unwrap();
  222. }
  223. Ok(Async::Ready(None)) => {
  224. progress = true;
  225. trace!("chunk {} / {} complete", self.index, self.shared.chunk_count);
  226. let full = {
  227. let mut bitmap = self.shared.bitmap.lock().unwrap();
  228. bitmap.insert(self.index as usize);
  229. self.shared.cond.notify_all();
  230. bitmap.len() >= self.shared.chunk_count
  231. };
  232. if full {
  233. self.finish();
  234. return Ok(Async::Ready(()));
  235. }
  236. let new_index = (self.index + 1) % self.shared.chunk_count;
  237. self.download(new_index);
  238. }
  239. Ok(Async::NotReady) => (),
  240. Err(ChannelError) => {
  241. warn!("error from channel");
  242. return Ok(Async::Ready(()));
  243. },
  244. }
  245. if !progress {
  246. return Ok(Async::NotReady);
  247. }
  248. }
  249. }
  250. }
  251. impl Read for AudioFileStreaming {
  252. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  253. let index = self.position as usize / CHUNK_SIZE;
  254. let offset = self.position as usize % CHUNK_SIZE;
  255. let len = min(output.len(), CHUNK_SIZE - offset);
  256. let mut bitmap = self.shared.bitmap.lock().unwrap();
  257. while !bitmap.contains(index) {
  258. bitmap = self.shared.cond.wait(bitmap).unwrap();
  259. }
  260. drop(bitmap);
  261. let read_len = try!(self.read_file.read(&mut output[..len]));
  262. self.position += read_len as u64;
  263. Ok(read_len)
  264. }
  265. }
  266. impl Seek for AudioFileStreaming {
  267. fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
  268. self.position = try!(self.read_file.seek(pos));
  269. // Notify the fetch thread to get the correct block
  270. // This can fail if fetch thread has completed, in which case the
  271. // block is ready. Just ignore the error.
  272. let _ = self.seek.send(self.position);
  273. Ok(self.position)
  274. }
  275. }
  276. impl Read for AudioFile {
  277. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  278. match *self {
  279. AudioFile::Cached(ref mut file) => file.read(output),
  280. AudioFile::Streaming(ref mut file) => file.read(output),
  281. }
  282. }
  283. }
  284. impl Seek for AudioFile {
  285. fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
  286. match *self {
  287. AudioFile::Cached(ref mut file) => file.seek(pos),
  288. AudioFile::Streaming(ref mut file) => file.seek(pos),
  289. }
  290. }
  291. }