audio_file.rs 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. use bit_set::BitSet;
  2. use byteorder::{ByteOrder, BigEndian};
  3. use eventual;
  4. use std::cmp::min;
  5. use std::sync::{Arc, Condvar, Mutex};
  6. use std::sync::mpsc::{self, TryRecvError};
  7. use std::fs;
  8. use std::io::{self, Read, Write, Seek, SeekFrom};
  9. use tempfile::NamedTempFile;
  10. use util::{FileId, IgnoreExt};
  11. use session::Session;
  12. use audio_file2;
  13. const CHUNK_SIZE: usize = 0x20000;
  14. pub struct AudioFile {
  15. read_file: fs::File,
  16. position: u64,
  17. seek: mpsc::Sender<u64>,
  18. shared: Arc<AudioFileShared>,
  19. }
  20. struct AudioFileInternal {
  21. partial_tx: Option<eventual::Complete<fs::File, ()>>,
  22. complete_tx: eventual::Complete<NamedTempFile, ()>,
  23. write_file: NamedTempFile,
  24. seek_rx: mpsc::Receiver<u64>,
  25. shared: Arc<AudioFileShared>,
  26. chunk_count: usize,
  27. }
  28. struct AudioFileShared {
  29. cond: Condvar,
  30. bitmap: Mutex<BitSet>,
  31. }
  32. impl AudioFile {
  33. pub fn new(session: &Session, file_id: FileId)
  34. -> (eventual::Future<AudioFile, ()>, eventual::Future<NamedTempFile, ()>) {
  35. let shared = Arc::new(AudioFileShared {
  36. cond: Condvar::new(),
  37. bitmap: Mutex::new(BitSet::new()),
  38. });
  39. let (seek_tx, seek_rx) = mpsc::channel();
  40. let (partial_tx, partial_rx) = eventual::Future::pair();
  41. let (complete_tx, complete_rx) = eventual::Future::pair();
  42. let internal = AudioFileInternal {
  43. shared: shared.clone(),
  44. write_file: NamedTempFile::new().unwrap(),
  45. seek_rx: seek_rx,
  46. partial_tx: Some(partial_tx),
  47. complete_tx: complete_tx,
  48. chunk_count: 0,
  49. };
  50. audio_file2::AudioFile::new(file_id, 0, internal, session);
  51. let file_rx = partial_rx.map(|read_file| {
  52. AudioFile {
  53. read_file: read_file,
  54. position: 0,
  55. seek: seek_tx,
  56. shared: shared,
  57. }
  58. });
  59. (file_rx, complete_rx)
  60. }
  61. }
  62. impl audio_file2::Handler for AudioFileInternal {
  63. fn on_header(mut self, header_id: u8, header_data: &[u8], _session: &Session) -> audio_file2::Response<Self> {
  64. if header_id == 0x3 {
  65. if let Some(tx) = self.partial_tx.take() {
  66. let size = BigEndian::read_u32(header_data) as usize * 4;
  67. self.write_file.set_len(size as u64).unwrap();
  68. let read_file = self.write_file.reopen().unwrap();
  69. self.chunk_count = (size + CHUNK_SIZE - 1) / CHUNK_SIZE;
  70. self.shared.bitmap.lock().unwrap().reserve_len(self.chunk_count);
  71. tx.complete(read_file)
  72. }
  73. }
  74. audio_file2::Response::Continue(self)
  75. }
  76. fn on_data(mut self, offset: usize, data: &[u8], _session: &Session) -> audio_file2::Response<Self> {
  77. self.write_file.seek(SeekFrom::Start(offset as u64)).unwrap();
  78. self.write_file.write_all(&data).unwrap();
  79. // We've crossed a chunk boundary
  80. // Mark the previous one as complete in the bitmap and notify the reader
  81. let seek = if (offset + data.len()) % CHUNK_SIZE < data.len() {
  82. let mut index = offset / CHUNK_SIZE;
  83. let mut bitmap = self.shared.bitmap.lock().unwrap();
  84. bitmap.insert(index);
  85. self.shared.cond.notify_all();
  86. // If all blocks are complete when can stop
  87. if bitmap.len() >= self.chunk_count {
  88. drop(bitmap);
  89. self.write_file.seek(SeekFrom::Start(0)).unwrap();
  90. self.complete_tx.complete(self.write_file);
  91. return audio_file2::Response::Close;
  92. }
  93. // Find the next undownloaded block
  94. index = (index + 1) % self.chunk_count;
  95. while bitmap.contains(index) {
  96. index = (index + 1) % self.chunk_count;
  97. }
  98. Some(index)
  99. } else {
  100. None
  101. };
  102. match self.seek_rx.try_recv() {
  103. Ok(seek_offset) => audio_file2::Response::Seek(self, seek_offset as usize / CHUNK_SIZE * CHUNK_SIZE),
  104. Err(TryRecvError::Disconnected) => audio_file2::Response::Close,
  105. Err(TryRecvError::Empty) => match seek {
  106. Some(index) => audio_file2::Response::Seek(self, index * CHUNK_SIZE),
  107. None => audio_file2::Response::Continue(self),
  108. },
  109. }
  110. }
  111. fn on_eof(mut self, _session: &Session) -> audio_file2::Response<Self> {
  112. let index = {
  113. let mut index = self.chunk_count - 1;
  114. let mut bitmap = self.shared.bitmap.lock().unwrap();
  115. bitmap.insert(index);
  116. self.shared.cond.notify_all();
  117. // If all blocks are complete when can stop
  118. if bitmap.len() >= self.chunk_count {
  119. drop(bitmap);
  120. self.write_file.seek(SeekFrom::Start(0)).unwrap();
  121. self.complete_tx.complete(self.write_file);
  122. return audio_file2::Response::Close;
  123. }
  124. // Find the next undownloaded block
  125. index = (index + 1) % self.chunk_count;
  126. while bitmap.contains(index) {
  127. index = (index + 1) % self.chunk_count;
  128. }
  129. index
  130. };
  131. audio_file2::Response::Seek(self, index * CHUNK_SIZE)
  132. }
  133. fn on_error(self, _session: &Session) {
  134. }
  135. }
  136. impl Read for AudioFile {
  137. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  138. let index = self.position as usize / CHUNK_SIZE;
  139. let offset = self.position as usize % CHUNK_SIZE;
  140. let len = min(output.len(), CHUNK_SIZE - offset);
  141. let mut bitmap = self.shared.bitmap.lock().unwrap();
  142. while !bitmap.contains(index) {
  143. bitmap = self.shared.cond.wait(bitmap).unwrap();
  144. }
  145. drop(bitmap);
  146. let read_len = try!(self.read_file.read(&mut output[..len]));
  147. self.position += read_len as u64;
  148. Ok(read_len)
  149. }
  150. }
  151. impl Seek for AudioFile {
  152. fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
  153. self.position = try!(self.read_file.seek(pos));
  154. // Notify the fetch thread to get the correct block
  155. // This can fail if fetch thread has completed, in which case the
  156. // block is ready. Just ignore the error.
  157. self.seek.send(self.position).ignore();
  158. Ok(self.position as u64)
  159. }
  160. }