audio_file.rs 7.0 KB


  1. use bit_set::BitSet;
  2. use byteorder::{ByteOrder, BigEndian};
  3. use std::cmp::min;
  4. use std::sync::{Arc, Condvar, Mutex};
  5. use std::sync::mpsc::{self, TryRecvError};
  6. use std::thread;
  7. use std::fs;
  8. use std::io::{self, Read, Write, Seek, SeekFrom};
  9. use std::path::PathBuf;
  10. use tempfile::TempFile;
  11. use util::{FileId, IgnoreExt, ZeroFile, mkdir_existing};
  12. use session::Session;
  13. use stream::StreamEvent;
  14. const CHUNK_SIZE: usize = 0x20000;
  15. pub enum AudioFile {
  16. Direct(fs::File),
  17. Loading(AudioFileLoading),
  18. }
  19. pub struct AudioFileLoading {
  20. read_file: TempFile,
  21. position: u64,
  22. seek: mpsc::Sender<u64>,
  23. shared: Arc<AudioFileShared>,
  24. }
  25. struct AudioFileShared {
  26. file_id: FileId,
  27. size: usize,
  28. chunk_count: usize,
  29. cond: Condvar,
  30. bitmap: Mutex<BitSet>,
  31. }
  32. impl AudioFileLoading {
  33. fn new(session: &Session, file_id: FileId) -> AudioFileLoading {
  34. let mut files_iter = TempFile::shared(2).unwrap().into_iter();
  35. let read_file = files_iter.next().unwrap();
  36. let mut write_file = files_iter.next().unwrap();
  37. let size = session.stream(file_id, 0, 1)
  38. .into_iter()
  39. .filter_map(|event| {
  40. match event {
  41. StreamEvent::Header(id, ref data) if id == 0x3 => {
  42. Some(BigEndian::read_u32(data) as usize * 4)
  43. }
  44. _ => None,
  45. }
  46. })
  47. .next()
  48. .unwrap();
  49. let chunk_count = (size + CHUNK_SIZE - 1) / CHUNK_SIZE;
  50. let shared = Arc::new(AudioFileShared {
  51. file_id: file_id,
  52. size: size,
  53. chunk_count: chunk_count,
  54. cond: Condvar::new(),
  55. bitmap: Mutex::new(BitSet::with_capacity(chunk_count)),
  56. });
  57. io::copy(&mut ZeroFile::new(size as u64), &mut write_file).unwrap();
  58. let (seek_tx, seek_rx) = mpsc::channel();
  59. let _shared = shared.clone();
  60. let _session = session.clone();
  61. thread::spawn(move || AudioFileLoading::fetch(&_session, _shared, write_file, seek_rx));
  62. AudioFileLoading {
  63. read_file: read_file,
  64. position: 0,
  65. seek: seek_tx,
  66. shared: shared,
  67. }
  68. }
  69. fn fetch(session: &Session,
  70. shared: Arc<AudioFileShared>,
  71. mut write_file: TempFile,
  72. seek_rx: mpsc::Receiver<u64>) {
  73. let mut index = 0;
  74. loop {
  75. match seek_rx.try_recv() {
  76. Ok(position) => {
  77. index = position as usize / CHUNK_SIZE;
  78. }
  79. Err(TryRecvError::Disconnected) => break,
  80. Err(TryRecvError::Empty) => (),
  81. }
  82. let bitmap = shared.bitmap.lock().unwrap();
  83. if bitmap.len() >= shared.chunk_count {
  84. drop(bitmap);
  85. AudioFileLoading::store(session, &shared, &mut write_file);
  86. break;
  87. }
  88. while bitmap.contains(&index) {
  89. index = (index + 1) % shared.chunk_count;
  90. }
  91. drop(bitmap);
  92. AudioFileLoading::fetch_chunk(session, &shared, &mut write_file, index);
  93. }
  94. }
  95. fn fetch_chunk(session: &Session,
  96. shared: &Arc<AudioFileShared>,
  97. write_file: &mut TempFile,
  98. index: usize) {
  99. let rx = session.stream(shared.file_id,
  100. (index * CHUNK_SIZE / 4) as u32,
  101. (CHUNK_SIZE / 4) as u32);
  102. println!("Chunk {}", index);
  103. write_file.seek(SeekFrom::Start((index * CHUNK_SIZE) as u64)).unwrap();
  104. let mut size = 0usize;
  105. for event in rx.iter() {
  106. match event {
  107. StreamEvent::Header(..) => (),
  108. StreamEvent::Data(data) => {
  109. write_file.write_all(&data).unwrap();
  110. size += data.len();
  111. if size >= CHUNK_SIZE {
  112. break;
  113. }
  114. }
  115. }
  116. }
  117. let mut bitmap = shared.bitmap.lock().unwrap();
  118. bitmap.insert(index as usize);
  119. shared.cond.notify_all();
  120. }
  121. fn store(session: &Session, shared: &AudioFileShared, write_file: &mut TempFile) {
  122. write_file.seek(SeekFrom::Start(0)).unwrap();
  123. mkdir_existing(&AudioFileManager::cache_dir(session, shared.file_id)).unwrap();
  124. let mut f = fs::File::create(AudioFileManager::cache_path(session, shared.file_id))
  125. .unwrap();
  126. io::copy(write_file, &mut f).unwrap();
  127. }
  128. }
  129. impl Read for AudioFileLoading {
  130. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  131. let index = self.position as usize / CHUNK_SIZE;
  132. let offset = self.position as usize % CHUNK_SIZE;
  133. let len = min(output.len(), CHUNK_SIZE - offset);
  134. let mut bitmap = self.shared.bitmap.lock().unwrap();
  135. while !bitmap.contains(&index) {
  136. bitmap = self.shared.cond.wait(bitmap).unwrap();
  137. }
  138. drop(bitmap);
  139. let read_len = try!(self.read_file.read(&mut output[..len]));
  140. self.position += read_len as u64;
  141. Ok(read_len)
  142. }
  143. }
  144. impl Seek for AudioFileLoading {
  145. fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
  146. self.position = try!(self.read_file.seek(pos));
  147. // Notify the fetch thread to get the correct block
  148. // This can fail if fetch thread has completed, in which case the
  149. // block is ready. Just ignore the error.
  150. self.seek.send(self.position).ignore();
  151. Ok(self.position as u64)
  152. }
  153. }
  154. impl Read for AudioFile {
  155. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  156. match *self {
  157. AudioFile::Direct(ref mut file) => file.read(output),
  158. AudioFile::Loading(ref mut loading) => loading.read(output),
  159. }
  160. }
  161. }
  162. impl Seek for AudioFile {
  163. fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
  164. match *self {
  165. AudioFile::Direct(ref mut file) => file.seek(pos),
  166. AudioFile::Loading(ref mut loading) => loading.seek(pos),
  167. }
  168. }
  169. }
  170. pub struct AudioFileManager;
  171. impl AudioFileManager {
  172. pub fn new() -> AudioFileManager {
  173. AudioFileManager
  174. }
  175. pub fn cache_dir(session: &Session, file_id: FileId) -> PathBuf {
  176. let name = file_id.to_base16();
  177. session.0.config.cache_location.join(&name[0..2])
  178. }
  179. pub fn cache_path(session: &Session, file_id: FileId) -> PathBuf {
  180. let name = file_id.to_base16();
  181. AudioFileManager::cache_dir(session, file_id).join(&name[2..])
  182. }
  183. pub fn request(&mut self, session: &Session, file_id: FileId) -> AudioFile {
  184. match fs::File::open(AudioFileManager::cache_path(session, file_id)) {
  185. Ok(f) => AudioFile::Direct(f),
  186. Err(..) => AudioFile::Loading(AudioFileLoading::new(session, file_id)),
  187. }
  188. }
  189. }