audio_file.rs 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. use bit_set::BitSet;
  2. use byteorder::{ByteOrder, BigEndian};
  3. use std::cmp::min;
  4. use std::sync::{Arc, Condvar, Mutex};
  5. use std::sync::mpsc::{self, TryRecvError};
  6. use std::thread;
  7. use std::fs;
  8. use std::io::{self, Read, Write, Seek, SeekFrom};
  9. use std::path::PathBuf;
  10. use tempfile::NamedTempFile;
  11. use util::{FileId, IgnoreExt, mkdir_existing};
  12. use session::Session;
  13. use stream::StreamEvent;
  14. const CHUNK_SIZE: usize = 0x20000;
  15. pub enum AudioFile {
  16. Direct(fs::File),
  17. Loading(AudioFileLoading),
  18. }
  19. pub struct AudioFileLoading {
  20. read_file: fs::File,
  21. position: u64,
  22. seek: mpsc::Sender<u64>,
  23. shared: Arc<AudioFileShared>,
  24. }
  25. struct AudioFileShared {
  26. file_id: FileId,
  27. size: usize,
  28. chunk_count: usize,
  29. cond: Condvar,
  30. bitmap: Mutex<BitSet>,
  31. }
  32. impl AudioFileLoading {
  33. fn new(session: &Session, file_id: FileId) -> AudioFileLoading {
  34. let size = session.stream(file_id, 0, 1)
  35. .into_iter()
  36. .filter_map(|event| {
  37. match event {
  38. StreamEvent::Header(id, ref data) if id == 0x3 => {
  39. Some(BigEndian::read_u32(data) as usize * 4)
  40. }
  41. _ => None,
  42. }
  43. })
  44. .next()
  45. .unwrap();
  46. let chunk_count = (size + CHUNK_SIZE - 1) / CHUNK_SIZE;
  47. let shared = Arc::new(AudioFileShared {
  48. file_id: file_id,
  49. size: size,
  50. chunk_count: chunk_count,
  51. cond: Condvar::new(),
  52. bitmap: Mutex::new(BitSet::with_capacity(chunk_count)),
  53. });
  54. let write_file = NamedTempFile::new().unwrap();
  55. write_file.set_len(size as u64).unwrap();
  56. let read_file = write_file.reopen().unwrap();
  57. let (seek_tx, seek_rx) = mpsc::channel();
  58. {
  59. let shared = shared.clone();
  60. let session = session.clone();
  61. thread::spawn(move || AudioFileLoading::fetch(&session, shared, write_file, seek_rx));
  62. }
  63. AudioFileLoading {
  64. read_file: read_file,
  65. position: 0,
  66. seek: seek_tx,
  67. shared: shared,
  68. }
  69. }
  70. fn fetch(session: &Session,
  71. shared: Arc<AudioFileShared>,
  72. mut write_file: NamedTempFile,
  73. seek_rx: mpsc::Receiver<u64>) {
  74. let mut index = 0;
  75. loop {
  76. match seek_rx.try_recv() {
  77. Ok(position) => {
  78. index = position as usize / CHUNK_SIZE;
  79. }
  80. Err(TryRecvError::Disconnected) => break,
  81. Err(TryRecvError::Empty) => (),
  82. }
  83. let bitmap = shared.bitmap.lock().unwrap();
  84. if bitmap.len() >= shared.chunk_count {
  85. drop(bitmap);
  86. AudioFileLoading::store(session, &shared, &mut write_file);
  87. break;
  88. }
  89. while bitmap.contains(&index) {
  90. index = (index + 1) % shared.chunk_count;
  91. }
  92. drop(bitmap);
  93. AudioFileLoading::fetch_chunk(session, &shared, &mut write_file, index);
  94. }
  95. }
  96. fn fetch_chunk(session: &Session,
  97. shared: &Arc<AudioFileShared>,
  98. write_file: &mut NamedTempFile,
  99. index: usize) {
  100. let rx = session.stream(shared.file_id,
  101. (index * CHUNK_SIZE / 4) as u32,
  102. (CHUNK_SIZE / 4) as u32);
  103. println!("Chunk {}", index);
  104. write_file.seek(SeekFrom::Start((index * CHUNK_SIZE) as u64)).unwrap();
  105. let mut size = 0usize;
  106. for event in rx.iter() {
  107. match event {
  108. StreamEvent::Header(..) => (),
  109. StreamEvent::Data(data) => {
  110. write_file.write_all(&data).unwrap();
  111. size += data.len();
  112. if size >= CHUNK_SIZE {
  113. break;
  114. }
  115. }
  116. }
  117. }
  118. let mut bitmap = shared.bitmap.lock().unwrap();
  119. bitmap.insert(index as usize);
  120. shared.cond.notify_all();
  121. }
  122. fn store(session: &Session, shared: &AudioFileShared, write_file: &mut NamedTempFile) {
  123. write_file.seek(SeekFrom::Start(0)).unwrap();
  124. mkdir_existing(&AudioFileManager::cache_dir(session, shared.file_id)).unwrap();
  125. let mut f = fs::File::create(AudioFileManager::cache_path(session, shared.file_id))
  126. .unwrap();
  127. io::copy(write_file, &mut f).unwrap();
  128. }
  129. }
  130. impl Read for AudioFileLoading {
  131. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  132. let index = self.position as usize / CHUNK_SIZE;
  133. let offset = self.position as usize % CHUNK_SIZE;
  134. let len = min(output.len(), CHUNK_SIZE - offset);
  135. let mut bitmap = self.shared.bitmap.lock().unwrap();
  136. while !bitmap.contains(&index) {
  137. bitmap = self.shared.cond.wait(bitmap).unwrap();
  138. }
  139. drop(bitmap);
  140. let read_len = try!(self.read_file.read(&mut output[..len]));
  141. self.position += read_len as u64;
  142. Ok(read_len)
  143. }
  144. }
  145. impl Seek for AudioFileLoading {
  146. fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
  147. self.position = try!(self.read_file.seek(pos));
  148. // Notify the fetch thread to get the correct block
  149. // This can fail if fetch thread has completed, in which case the
  150. // block is ready. Just ignore the error.
  151. self.seek.send(self.position).ignore();
  152. Ok(self.position as u64)
  153. }
  154. }
  155. impl Read for AudioFile {
  156. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  157. match *self {
  158. AudioFile::Direct(ref mut file) => file.read(output),
  159. AudioFile::Loading(ref mut loading) => loading.read(output),
  160. }
  161. }
  162. }
  163. impl Seek for AudioFile {
  164. fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
  165. match *self {
  166. AudioFile::Direct(ref mut file) => file.seek(pos),
  167. AudioFile::Loading(ref mut loading) => loading.seek(pos),
  168. }
  169. }
  170. }
  171. pub struct AudioFileManager;
  172. impl AudioFileManager {
  173. pub fn new() -> AudioFileManager {
  174. AudioFileManager
  175. }
  176. pub fn cache_dir(session: &Session, file_id: FileId) -> PathBuf {
  177. let name = file_id.to_base16();
  178. session.0.config.cache_location.join(&name[0..2])
  179. }
  180. pub fn cache_path(session: &Session, file_id: FileId) -> PathBuf {
  181. let name = file_id.to_base16();
  182. AudioFileManager::cache_dir(session, file_id).join(&name[2..])
  183. }
  184. pub fn request(&mut self, session: &Session, file_id: FileId) -> AudioFile {
  185. match fs::File::open(AudioFileManager::cache_path(session, file_id)) {
  186. Ok(f) => AudioFile::Direct(f),
  187. Err(..) => AudioFile::Loading(AudioFileLoading::new(session, file_id)),
  188. }
  189. }
  190. }