audio_file.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. use bit_set::BitSet;
  2. use byteorder::{ByteOrder, BigEndian, WriteBytesExt};
  3. use futures::Stream;
  4. use futures::sync::{oneshot, mpsc};
  5. use futures::{Poll, Async, Future};
  6. use futures::future::{self, FutureResult};
  7. use std::cmp::min;
  8. use std::fs;
  9. use std::io::{self, Read, Write, Seek, SeekFrom};
  10. use std::sync::{Arc, Condvar, Mutex};
  11. use tempfile::NamedTempFile;
  12. use core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders};
  13. use core::session::Session;
  14. use core::util::FileId;
  15. const CHUNK_SIZE: usize = 0x20000;
  16. pub enum AudioFile {
  17. Cached(fs::File),
  18. Streaming(AudioFileStreaming),
  19. }
  20. pub enum AudioFileOpen {
  21. Cached(FutureResult<fs::File, ChannelError>),
  22. Streaming(AudioFileOpenStreaming),
  23. }
  24. pub struct AudioFileOpenStreaming {
  25. session: Session,
  26. data_rx: Option<ChannelData>,
  27. headers: ChannelHeaders,
  28. file_id: FileId,
  29. complete_tx: Option<oneshot::Sender<NamedTempFile>>,
  30. }
  31. pub struct AudioFileStreaming {
  32. read_file: fs::File,
  33. position: u64,
  34. seek: mpsc::UnboundedSender<u64>,
  35. shared: Arc<AudioFileShared>,
  36. }
  37. struct AudioFileShared {
  38. file_id: FileId,
  39. chunk_count: usize,
  40. cond: Condvar,
  41. bitmap: Mutex<BitSet>,
  42. }
  43. impl AudioFileOpenStreaming {
  44. fn finish(&mut self, size: usize) -> AudioFileStreaming {
  45. let chunk_count = (size + CHUNK_SIZE - 1) / CHUNK_SIZE;
  46. let shared = Arc::new(AudioFileShared {
  47. file_id: self.file_id,
  48. chunk_count: chunk_count,
  49. cond: Condvar::new(),
  50. bitmap: Mutex::new(BitSet::with_capacity(chunk_count)),
  51. });
  52. let mut write_file = NamedTempFile::new().unwrap();
  53. write_file.set_len(size as u64).unwrap();
  54. write_file.seek(SeekFrom::Start(0)).unwrap();
  55. let read_file = write_file.reopen().unwrap();
  56. let data_rx = self.data_rx.take().unwrap();
  57. let complete_tx = self.complete_tx.take().unwrap();
  58. let (seek_tx, seek_rx) = mpsc::unbounded();
  59. let fetcher = AudioFileFetch::new(
  60. self.session.clone(), shared.clone(), data_rx, write_file, seek_rx, complete_tx
  61. );
  62. self.session.spawn(move |_| fetcher);
  63. AudioFileStreaming {
  64. read_file: read_file,
  65. position: 0,
  66. seek: seek_tx,
  67. shared: shared,
  68. }
  69. }
  70. }
  71. impl Future for AudioFileOpen {
  72. type Item = AudioFile;
  73. type Error = ChannelError;
  74. fn poll(&mut self) -> Poll<AudioFile, ChannelError> {
  75. match *self {
  76. AudioFileOpen::Streaming(ref mut open) => {
  77. let file = try_ready!(open.poll());
  78. Ok(Async::Ready(AudioFile::Streaming(file)))
  79. }
  80. AudioFileOpen::Cached(ref mut open) => {
  81. let file = try_ready!(open.poll());
  82. Ok(Async::Ready(AudioFile::Cached(file)))
  83. }
  84. }
  85. }
  86. }
  87. impl Future for AudioFileOpenStreaming {
  88. type Item = AudioFileStreaming;
  89. type Error = ChannelError;
  90. fn poll(&mut self) -> Poll<AudioFileStreaming, ChannelError> {
  91. loop {
  92. let (id, data) = try_ready!(self.headers.poll()).unwrap();
  93. if id == 0x3 {
  94. let size = BigEndian::read_u32(&data) as usize * 4;
  95. let file = self.finish(size);
  96. return Ok(Async::Ready(file));
  97. }
  98. }
  99. }
  100. }
  101. impl AudioFile {
  102. pub fn open(session: &Session, file_id: FileId) -> AudioFileOpen {
  103. let cache = session.cache().cloned();
  104. if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) {
  105. debug!("File {} already in cache", file_id);
  106. return AudioFileOpen::Cached(future::ok(file));
  107. }
  108. debug!("Downloading file {}", file_id);
  109. let (complete_tx, complete_rx) = oneshot::channel();
  110. let (headers, data) = request_chunk(session, file_id, 0).split();
  111. let open = AudioFileOpenStreaming {
  112. session: session.clone(),
  113. file_id: file_id,
  114. headers: headers,
  115. data_rx: Some(data),
  116. complete_tx: Some(complete_tx),
  117. };
  118. let session_ = session.clone();
  119. session.spawn(move |_| {
  120. complete_rx.map(move |mut file| {
  121. if let Some(cache) = session_.cache() {
  122. cache.save_file(file_id, &mut file);
  123. debug!("File {} complete, saving to cache", file_id);
  124. } else {
  125. debug!("File {} complete", file_id);
  126. }
  127. }).or_else(|oneshot::Canceled| Ok(()))
  128. });
  129. AudioFileOpen::Streaming(open)
  130. }
  131. }
  132. fn request_chunk(session: &Session, file: FileId, index: usize) -> Channel {
  133. trace!("requesting chunk {}", index);
  134. let start = (index * CHUNK_SIZE / 4) as u32;
  135. let end = ((index + 1) * CHUNK_SIZE / 4) as u32;
  136. let (id, channel) = session.channel().allocate();
  137. let mut data: Vec<u8> = Vec::new();
  138. data.write_u16::<BigEndian>(id).unwrap();
  139. data.write_u8(0).unwrap();
  140. data.write_u8(1).unwrap();
  141. data.write_u16::<BigEndian>(0x0000).unwrap();
  142. data.write_u32::<BigEndian>(0x00000000).unwrap();
  143. data.write_u32::<BigEndian>(0x00009C40).unwrap();
  144. data.write_u32::<BigEndian>(0x00020000).unwrap();
  145. data.write(&file.0).unwrap();
  146. data.write_u32::<BigEndian>(start).unwrap();
  147. data.write_u32::<BigEndian>(end).unwrap();
  148. session.send_packet(0x8, data);
  149. channel
  150. }
  151. struct AudioFileFetch {
  152. session: Session,
  153. shared: Arc<AudioFileShared>,
  154. output: Option<NamedTempFile>,
  155. index: usize,
  156. data_rx: ChannelData,
  157. seek_rx: mpsc::UnboundedReceiver<u64>,
  158. complete_tx: Option<oneshot::Sender<NamedTempFile>>,
  159. }
  160. impl AudioFileFetch {
  161. fn new(session: Session, shared: Arc<AudioFileShared>,
  162. data_rx: ChannelData, output: NamedTempFile,
  163. seek_rx: mpsc::UnboundedReceiver<u64>,
  164. complete_tx: oneshot::Sender<NamedTempFile>) -> AudioFileFetch
  165. {
  166. AudioFileFetch {
  167. session: session,
  168. shared: shared,
  169. output: Some(output),
  170. index: 0,
  171. data_rx: data_rx,
  172. seek_rx: seek_rx,
  173. complete_tx: Some(complete_tx),
  174. }
  175. }
  176. fn download(&mut self, mut new_index: usize) {
  177. assert!(new_index < self.shared.chunk_count);
  178. {
  179. let bitmap = self.shared.bitmap.lock().unwrap();
  180. while bitmap.contains(new_index) {
  181. new_index = (new_index + 1) % self.shared.chunk_count;
  182. }
  183. }
  184. if self.index != new_index {
  185. self.index = new_index;
  186. let offset = self.index * CHUNK_SIZE;
  187. self.output.as_mut().unwrap()
  188. .seek(SeekFrom::Start(offset as u64)).unwrap();
  189. let (_headers, data) = request_chunk(&self.session, self.shared.file_id, self.index).split();
  190. self.data_rx = data;
  191. }
  192. }
  193. fn finish(&mut self) {
  194. let mut output = self.output.take().unwrap();
  195. let complete_tx = self.complete_tx.take().unwrap();
  196. output.seek(SeekFrom::Start(0)).unwrap();
  197. complete_tx.complete(output);
  198. }
  199. }
  200. impl Future for AudioFileFetch {
  201. type Item = ();
  202. type Error = ();
  203. fn poll(&mut self) -> Poll<(), ()> {
  204. loop {
  205. let mut progress = false;
  206. match self.seek_rx.poll() {
  207. Ok(Async::Ready(None)) => {
  208. return Ok(Async::Ready(()));
  209. }
  210. Ok(Async::Ready(Some(offset))) => {
  211. progress = true;
  212. let index = offset as usize / CHUNK_SIZE;
  213. self.download(index);
  214. }
  215. Ok(Async::NotReady) => (),
  216. Err(()) => unreachable!(),
  217. }
  218. match self.data_rx.poll() {
  219. Ok(Async::Ready(Some(data))) => {
  220. progress = true;
  221. self.output.as_mut().unwrap()
  222. .write_all(data.as_ref()).unwrap();
  223. }
  224. Ok(Async::Ready(None)) => {
  225. progress = true;
  226. trace!("chunk {} / {} complete", self.index, self.shared.chunk_count);
  227. let full = {
  228. let mut bitmap = self.shared.bitmap.lock().unwrap();
  229. bitmap.insert(self.index as usize);
  230. self.shared.cond.notify_all();
  231. bitmap.len() >= self.shared.chunk_count
  232. };
  233. if full {
  234. self.finish();
  235. return Ok(Async::Ready(()));
  236. }
  237. let new_index = (self.index + 1) % self.shared.chunk_count;
  238. self.download(new_index);
  239. }
  240. Ok(Async::NotReady) => (),
  241. Err(ChannelError) => {
  242. warn!("error from channel");
  243. return Ok(Async::Ready(()));
  244. },
  245. }
  246. if !progress {
  247. return Ok(Async::NotReady);
  248. }
  249. }
  250. }
  251. }
  252. impl Read for AudioFileStreaming {
  253. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  254. let index = self.position as usize / CHUNK_SIZE;
  255. let offset = self.position as usize % CHUNK_SIZE;
  256. let len = min(output.len(), CHUNK_SIZE - offset);
  257. let mut bitmap = self.shared.bitmap.lock().unwrap();
  258. while !bitmap.contains(index) {
  259. bitmap = self.shared.cond.wait(bitmap).unwrap();
  260. }
  261. drop(bitmap);
  262. let read_len = try!(self.read_file.read(&mut output[..len]));
  263. self.position += read_len as u64;
  264. Ok(read_len)
  265. }
  266. }
  267. impl Seek for AudioFileStreaming {
  268. fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
  269. self.position = try!(self.read_file.seek(pos));
  270. // Notify the fetch thread to get the correct block
  271. // This can fail if fetch thread has completed, in which case the
  272. // block is ready. Just ignore the error.
  273. let _ = self.seek.send(self.position);
  274. Ok(self.position)
  275. }
  276. }
  277. impl Read for AudioFile {
  278. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  279. match *self {
  280. AudioFile::Cached(ref mut file) => file.read(output),
  281. AudioFile::Streaming(ref mut file) => file.read(output),
  282. }
  283. }
  284. }
  285. impl Seek for AudioFile {
  286. fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
  287. match *self {
  288. AudioFile::Cached(ref mut file) => file.seek(pos),
  289. AudioFile::Streaming(ref mut file) => file.seek(pos),
  290. }
  291. }
  292. }