audio_file.rs 7.0 KB


  1. use bit_set::BitSet;
  2. use byteorder::{ByteOrder, BigEndian};
  3. use std::cmp::min;
  4. use std::sync::{Arc, Condvar, Mutex};
  5. use std::sync::mpsc::{self, TryRecvError};
  6. use std::thread;
  7. use std::fs;
  8. use std::io::{self, Read, Write, Seek, SeekFrom};
  9. use std::path::PathBuf;
  10. use tempfile::NamedTempFile;
  11. use util::{FileId, IgnoreExt, mkdir_existing};
  12. use session::Session;
  13. use stream::StreamEvent;
  14. const CHUNK_SIZE: usize = 0x20000;
  15. pub enum AudioFile {
  16. Direct(fs::File),
  17. Loading(AudioFileLoading),
  18. }
  19. pub struct AudioFileLoading {
  20. read_file: fs::File,
  21. position: u64,
  22. seek: mpsc::Sender<u64>,
  23. shared: Arc<AudioFileShared>,
  24. }
  25. struct AudioFileShared {
  26. file_id: FileId,
  27. size: usize,
  28. chunk_count: usize,
  29. cond: Condvar,
  30. bitmap: Mutex<BitSet>,
  31. }
  32. impl AudioFileLoading {
  33. fn new(session: &Session, file_id: FileId) -> AudioFileLoading {
  34. let size = session.stream(file_id, 0, 1)
  35. .iter()
  36. .filter_map(|event| {
  37. match event {
  38. StreamEvent::Header(id, ref data) if id == 0x3 => {
  39. Some(BigEndian::read_u32(data) as usize * 4)
  40. }
  41. _ => None,
  42. }
  43. })
  44. .next()
  45. .unwrap();
  46. let chunk_count = (size + CHUNK_SIZE - 1) / CHUNK_SIZE;
  47. let shared = Arc::new(AudioFileShared {
  48. file_id: file_id,
  49. size: size,
  50. chunk_count: chunk_count,
  51. cond: Condvar::new(),
  52. bitmap: Mutex::new(BitSet::with_capacity(chunk_count)),
  53. });
  54. let write_file = NamedTempFile::new().unwrap();
  55. write_file.set_len(size as u64).unwrap();
  56. let read_file = write_file.reopen().unwrap();
  57. let (seek_tx, seek_rx) = mpsc::channel();
  58. {
  59. let shared = shared.clone();
  60. let session = session.clone();
  61. thread::spawn(move || AudioFileLoading::fetch(&session, shared, write_file, seek_rx));
  62. }
  63. AudioFileLoading {
  64. read_file: read_file,
  65. position: 0,
  66. seek: seek_tx,
  67. shared: shared,
  68. }
  69. }
  70. fn fetch(session: &Session,
  71. shared: Arc<AudioFileShared>,
  72. mut write_file: NamedTempFile,
  73. seek_rx: mpsc::Receiver<u64>) {
  74. let mut index = 0;
  75. loop {
  76. match seek_rx.try_recv() {
  77. Ok(position) => {
  78. index = position as usize / CHUNK_SIZE;
  79. }
  80. Err(TryRecvError::Disconnected) => break,
  81. Err(TryRecvError::Empty) => (),
  82. }
  83. let bitmap = shared.bitmap.lock().unwrap();
  84. if bitmap.len() >= shared.chunk_count {
  85. drop(bitmap);
  86. AudioFileLoading::persist_to_cache(session, &shared, &mut write_file);
  87. break;
  88. }
  89. while bitmap.contains(&index) {
  90. index = (index + 1) % shared.chunk_count;
  91. }
  92. drop(bitmap);
  93. AudioFileLoading::fetch_chunk(session, &shared, &mut write_file, index);
  94. }
  95. }
  96. fn fetch_chunk(session: &Session,
  97. shared: &Arc<AudioFileShared>,
  98. write_file: &mut NamedTempFile,
  99. index: usize) {
  100. let rx = session.stream(shared.file_id,
  101. (index * CHUNK_SIZE / 4) as u32,
  102. (CHUNK_SIZE / 4) as u32);
  103. println!("Chunk {}", index);
  104. write_file.seek(SeekFrom::Start((index * CHUNK_SIZE) as u64)).unwrap();
  105. let mut size = 0usize;
  106. for event in rx.iter() {
  107. match event {
  108. StreamEvent::Header(..) => (),
  109. StreamEvent::Data(data) => {
  110. write_file.write_all(&data).unwrap();
  111. size += data.len();
  112. if size >= CHUNK_SIZE {
  113. break;
  114. }
  115. }
  116. }
  117. }
  118. let mut bitmap = shared.bitmap.lock().unwrap();
  119. bitmap.insert(index as usize);
  120. shared.cond.notify_all();
  121. }
  122. fn persist_to_cache(session: &Session, shared: &AudioFileShared, write_file: &mut NamedTempFile) {
  123. if let Some(path) = AudioFileManager::cache_path(session, shared.file_id) {
  124. write_file.seek(SeekFrom::Start(0)).unwrap();
  125. mkdir_existing(path.parent().unwrap()).unwrap();
  126. let mut cache_file = fs::File::create(path).unwrap();
  127. io::copy(write_file, &mut cache_file).unwrap();
  128. }
  129. }
  130. }
  131. impl Read for AudioFileLoading {
  132. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  133. let index = self.position as usize / CHUNK_SIZE;
  134. let offset = self.position as usize % CHUNK_SIZE;
  135. let len = min(output.len(), CHUNK_SIZE - offset);
  136. let mut bitmap = self.shared.bitmap.lock().unwrap();
  137. while !bitmap.contains(&index) {
  138. bitmap = self.shared.cond.wait(bitmap).unwrap();
  139. }
  140. drop(bitmap);
  141. let read_len = try!(self.read_file.read(&mut output[..len]));
  142. self.position += read_len as u64;
  143. Ok(read_len)
  144. }
  145. }
  146. impl Seek for AudioFileLoading {
  147. fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
  148. self.position = try!(self.read_file.seek(pos));
  149. // Notify the fetch thread to get the correct block
  150. // This can fail if fetch thread has completed, in which case the
  151. // block is ready. Just ignore the error.
  152. self.seek.send(self.position).ignore();
  153. Ok(self.position as u64)
  154. }
  155. }
  156. impl Read for AudioFile {
  157. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  158. match *self {
  159. AudioFile::Direct(ref mut file) => file.read(output),
  160. AudioFile::Loading(ref mut loading) => loading.read(output),
  161. }
  162. }
  163. }
  164. impl Seek for AudioFile {
  165. fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
  166. match *self {
  167. AudioFile::Direct(ref mut file) => file.seek(pos),
  168. AudioFile::Loading(ref mut loading) => loading.seek(pos),
  169. }
  170. }
  171. }
  172. pub struct AudioFileManager;
  173. impl AudioFileManager {
  174. pub fn new() -> AudioFileManager {
  175. AudioFileManager
  176. }
  177. pub fn cache_path(session: &Session, file_id: FileId) -> Option<PathBuf> {
  178. session.config().cache_location.as_ref().map(|cache| {
  179. let name = file_id.to_base16();
  180. cache.join(&name[0..2]).join(&name[2..])
  181. })
  182. }
  183. pub fn request(&mut self, session: &Session, file_id: FileId) -> AudioFile {
  184. let cache_path = AudioFileManager::cache_path(session, file_id);
  185. let cache_file = cache_path.and_then(|p| fs::File::open(p).ok());
  186. cache_file.map(AudioFile::Direct).unwrap_or_else(|| {
  187. AudioFile::Loading(AudioFileLoading::new(session, file_id))
  188. })
  189. }
  190. }