audio_file.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. use bit_set::BitSet;
  2. use byteorder::{ByteOrder, BigEndian, WriteBytesExt};
  3. use futures::Stream;
  4. use futures::sync::{oneshot, mpsc};
  5. use futures::{Poll, Async, Future};
  6. use futures::future::{self, FutureResult};
  7. use std::cmp::min;
  8. use std::fs;
  9. use std::io::{self, Read, Write, Seek, SeekFrom};
  10. use std::sync::{Arc, Condvar, Mutex};
  11. use tempfile::NamedTempFile;
  12. use channel::{Channel, ChannelData, ChannelError, ChannelHeaders};
  13. use session::Session;
  14. use util::FileId;
  15. const CHUNK_SIZE: usize = 0x20000;
  16. component! {
  17. AudioFileManager : AudioFileManagerInner { }
  18. }
  19. pub enum AudioFile {
  20. Cached(fs::File),
  21. Streaming(AudioFileStreaming),
  22. }
  23. pub enum AudioFileOpen {
  24. Cached(FutureResult<fs::File, ChannelError>),
  25. Streaming(AudioFileOpenStreaming),
  26. }
  27. pub struct AudioFileOpenStreaming {
  28. session: Session,
  29. data_rx: Option<ChannelData>,
  30. headers: ChannelHeaders,
  31. file_id: FileId,
  32. complete_tx: Option<oneshot::Sender<NamedTempFile>>,
  33. }
  34. pub struct AudioFileStreaming {
  35. read_file: fs::File,
  36. position: u64,
  37. seek: mpsc::UnboundedSender<u64>,
  38. shared: Arc<AudioFileShared>,
  39. }
  40. struct AudioFileShared {
  41. file_id: FileId,
  42. chunk_count: usize,
  43. cond: Condvar,
  44. bitmap: Mutex<BitSet>,
  45. }
  46. impl AudioFileOpenStreaming {
  47. fn finish(&mut self, size: usize) -> AudioFileStreaming {
  48. let chunk_count = (size + CHUNK_SIZE - 1) / CHUNK_SIZE;
  49. let shared = Arc::new(AudioFileShared {
  50. file_id: self.file_id,
  51. chunk_count: chunk_count,
  52. cond: Condvar::new(),
  53. bitmap: Mutex::new(BitSet::with_capacity(chunk_count)),
  54. });
  55. let mut write_file = NamedTempFile::new().unwrap();
  56. write_file.set_len(size as u64).unwrap();
  57. write_file.seek(SeekFrom::Start(0)).unwrap();
  58. let read_file = write_file.reopen().unwrap();
  59. let data_rx = self.data_rx.take().unwrap();
  60. let complete_tx = self.complete_tx.take().unwrap();
  61. let (seek_tx, seek_rx) = mpsc::unbounded();
  62. let fetcher = AudioFileFetch::new(
  63. self.session.clone(), shared.clone(), data_rx, write_file, seek_rx, complete_tx
  64. );
  65. self.session.spawn(move |_| fetcher);
  66. AudioFileStreaming {
  67. read_file: read_file,
  68. position: 0,
  69. seek: seek_tx,
  70. shared: shared,
  71. }
  72. }
  73. }
  74. impl Future for AudioFileOpen {
  75. type Item = AudioFile;
  76. type Error = ChannelError;
  77. fn poll(&mut self) -> Poll<AudioFile, ChannelError> {
  78. match *self {
  79. AudioFileOpen::Streaming(ref mut open) => {
  80. let file = try_ready!(open.poll());
  81. Ok(Async::Ready(AudioFile::Streaming(file)))
  82. }
  83. AudioFileOpen::Cached(ref mut open) => {
  84. let file = try_ready!(open.poll());
  85. Ok(Async::Ready(AudioFile::Cached(file)))
  86. }
  87. }
  88. }
  89. }
  90. impl Future for AudioFileOpenStreaming {
  91. type Item = AudioFileStreaming;
  92. type Error = ChannelError;
  93. fn poll(&mut self) -> Poll<AudioFileStreaming, ChannelError> {
  94. loop {
  95. let (id, data) = try_ready!(self.headers.poll()).unwrap();
  96. if id == 0x3 {
  97. let size = BigEndian::read_u32(&data) as usize * 4;
  98. let file = self.finish(size);
  99. return Ok(Async::Ready(file));
  100. }
  101. }
  102. }
  103. }
  104. impl AudioFileManager {
  105. pub fn open(&self, file_id: FileId) -> AudioFileOpen {
  106. let cache = self.session().cache().cloned();
  107. if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) {
  108. debug!("File {} already in cache", file_id);
  109. return AudioFileOpen::Cached(future::ok(file));
  110. }
  111. debug!("Downloading file {}", file_id);
  112. let (complete_tx, complete_rx) = oneshot::channel();
  113. let (headers, data) = request_chunk(&self.session(), file_id, 0).split();
  114. let open = AudioFileOpenStreaming {
  115. session: self.session(),
  116. file_id: file_id,
  117. headers: headers,
  118. data_rx: Some(data),
  119. complete_tx: Some(complete_tx),
  120. };
  121. let session = self.session();
  122. self.session().spawn(move |_| {
  123. complete_rx.map(move |mut file| {
  124. if let Some(cache) = session.cache() {
  125. cache.save_file(file_id, &mut file);
  126. debug!("File {} complete, saving to cache", file_id);
  127. } else {
  128. debug!("File {} complete", file_id);
  129. }
  130. }).or_else(|oneshot::Canceled| Ok(()))
  131. });
  132. AudioFileOpen::Streaming(open)
  133. }
  134. }
  135. fn request_chunk(session: &Session, file: FileId, index: usize) -> Channel {
  136. trace!("requesting chunk {}", index);
  137. let start = (index * CHUNK_SIZE / 4) as u32;
  138. let end = ((index + 1) * CHUNK_SIZE / 4) as u32;
  139. let (id, channel) = session.channel().allocate();
  140. let mut data: Vec<u8> = Vec::new();
  141. data.write_u16::<BigEndian>(id).unwrap();
  142. data.write_u8(0).unwrap();
  143. data.write_u8(1).unwrap();
  144. data.write_u16::<BigEndian>(0x0000).unwrap();
  145. data.write_u32::<BigEndian>(0x00000000).unwrap();
  146. data.write_u32::<BigEndian>(0x00009C40).unwrap();
  147. data.write_u32::<BigEndian>(0x00020000).unwrap();
  148. data.write(&file.0).unwrap();
  149. data.write_u32::<BigEndian>(start).unwrap();
  150. data.write_u32::<BigEndian>(end).unwrap();
  151. session.send_packet(0x8, data);
  152. channel
  153. }
  154. struct AudioFileFetch {
  155. session: Session,
  156. shared: Arc<AudioFileShared>,
  157. output: Option<NamedTempFile>,
  158. index: usize,
  159. data_rx: ChannelData,
  160. seek_rx: mpsc::UnboundedReceiver<u64>,
  161. complete_tx: Option<oneshot::Sender<NamedTempFile>>,
  162. }
  163. impl AudioFileFetch {
  164. fn new(session: Session, shared: Arc<AudioFileShared>,
  165. data_rx: ChannelData, output: NamedTempFile,
  166. seek_rx: mpsc::UnboundedReceiver<u64>,
  167. complete_tx: oneshot::Sender<NamedTempFile>) -> AudioFileFetch
  168. {
  169. AudioFileFetch {
  170. session: session,
  171. shared: shared,
  172. output: Some(output),
  173. index: 0,
  174. data_rx: data_rx,
  175. seek_rx: seek_rx,
  176. complete_tx: Some(complete_tx),
  177. }
  178. }
  179. fn download(&mut self, mut new_index: usize) {
  180. assert!(new_index < self.shared.chunk_count);
  181. {
  182. let bitmap = self.shared.bitmap.lock().unwrap();
  183. while bitmap.contains(new_index) {
  184. new_index = (new_index + 1) % self.shared.chunk_count;
  185. }
  186. }
  187. if self.index != new_index {
  188. self.index = new_index;
  189. let offset = self.index * CHUNK_SIZE;
  190. self.output.as_mut().unwrap()
  191. .seek(SeekFrom::Start(offset as u64)).unwrap();
  192. let (_headers, data) = request_chunk(&self.session, self.shared.file_id, self.index).split();
  193. self.data_rx = data;
  194. }
  195. }
  196. fn finish(&mut self) {
  197. let mut output = self.output.take().unwrap();
  198. let complete_tx = self.complete_tx.take().unwrap();
  199. output.seek(SeekFrom::Start(0)).unwrap();
  200. complete_tx.complete(output);
  201. }
  202. }
  203. impl Future for AudioFileFetch {
  204. type Item = ();
  205. type Error = ();
  206. fn poll(&mut self) -> Poll<(), ()> {
  207. loop {
  208. let mut progress = false;
  209. match self.seek_rx.poll() {
  210. Ok(Async::Ready(None)) => {
  211. return Ok(Async::Ready(()));
  212. }
  213. Ok(Async::Ready(Some(offset))) => {
  214. progress = true;
  215. let index = offset as usize / CHUNK_SIZE;
  216. self.download(index);
  217. }
  218. Ok(Async::NotReady) => (),
  219. Err(()) => unreachable!(),
  220. }
  221. match self.data_rx.poll() {
  222. Ok(Async::Ready(Some(data))) => {
  223. progress = true;
  224. self.output.as_mut().unwrap()
  225. .write_all(data.as_ref()).unwrap();
  226. }
  227. Ok(Async::Ready(None)) => {
  228. progress = true;
  229. trace!("chunk {} / {} complete", self.index, self.shared.chunk_count);
  230. let full = {
  231. let mut bitmap = self.shared.bitmap.lock().unwrap();
  232. bitmap.insert(self.index as usize);
  233. self.shared.cond.notify_all();
  234. bitmap.len() >= self.shared.chunk_count
  235. };
  236. if full {
  237. self.finish();
  238. return Ok(Async::Ready(()));
  239. }
  240. let new_index = (self.index + 1) % self.shared.chunk_count;
  241. self.download(new_index);
  242. }
  243. Ok(Async::NotReady) => (),
  244. Err(ChannelError) => {
  245. warn!("error from channel");
  246. return Ok(Async::Ready(()));
  247. },
  248. }
  249. if !progress {
  250. return Ok(Async::NotReady);
  251. }
  252. }
  253. }
  254. }
  255. impl Read for AudioFileStreaming {
  256. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  257. let index = self.position as usize / CHUNK_SIZE;
  258. let offset = self.position as usize % CHUNK_SIZE;
  259. let len = min(output.len(), CHUNK_SIZE - offset);
  260. let mut bitmap = self.shared.bitmap.lock().unwrap();
  261. while !bitmap.contains(index) {
  262. bitmap = self.shared.cond.wait(bitmap).unwrap();
  263. }
  264. drop(bitmap);
  265. let read_len = try!(self.read_file.read(&mut output[..len]));
  266. self.position += read_len as u64;
  267. Ok(read_len)
  268. }
  269. }
  270. impl Seek for AudioFileStreaming {
  271. fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
  272. self.position = try!(self.read_file.seek(pos));
  273. // Notify the fetch thread to get the correct block
  274. // This can fail if fetch thread has completed, in which case the
  275. // block is ready. Just ignore the error.
  276. let _ = self.seek.send(self.position);
  277. Ok(self.position)
  278. }
  279. }
  280. impl Read for AudioFile {
  281. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  282. match *self {
  283. AudioFile::Cached(ref mut file) => file.read(output),
  284. AudioFile::Streaming(ref mut file) => file.read(output),
  285. }
  286. }
  287. }
  288. impl Seek for AudioFile {
  289. fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
  290. match *self {
  291. AudioFile::Cached(ref mut file) => file.seek(pos),
  292. AudioFile::Streaming(ref mut file) => file.seek(pos),
  293. }
  294. }
  295. }