audio_file.rs 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. use byteorder::{ByteOrder, BigEndian};
  2. use std::cmp::min;
  3. use std::collections::BitSet;
  4. use std::io::{self, SeekFrom};
  5. use std::slice::bytes::copy_memory;
  6. use std::sync::{Arc, Condvar, Mutex};
  7. use std::sync::mpsc::{self, TryRecvError};
  8. use stream::{StreamRequest, StreamEvent};
  9. use util::FileId;
  10. use std::thread;
  11. const CHUNK_SIZE : usize = 0x40000;
  12. #[derive(Clone)]
  13. pub struct AudioFile {
  14. position: usize,
  15. seek: mpsc::Sender<u64>,
  16. shared: Arc<AudioFileShared>,
  17. }
  18. struct AudioFileShared {
  19. fileid: FileId,
  20. size: usize,
  21. data: Mutex<AudioFileData>,
  22. cond: Condvar
  23. }
  24. struct AudioFileData {
  25. buffer: Vec<u8>,
  26. bitmap: BitSet,
  27. }
  28. impl AudioFile {
  29. pub fn new(fileid: FileId, streams: mpsc::Sender<StreamRequest>) -> AudioFile {
  30. let (tx, rx) = mpsc::channel();
  31. streams.send(StreamRequest {
  32. id: fileid,
  33. offset: 0,
  34. size: 1,
  35. callback: tx
  36. }).unwrap();
  37. let size = {
  38. let mut size = None;
  39. for event in rx.iter() {
  40. match event {
  41. StreamEvent::Header(id, data) => {
  42. if id == 0x3 {
  43. size = Some(BigEndian::read_u32(&data) * 4);
  44. break;
  45. }
  46. },
  47. StreamEvent::Data(_) => break
  48. }
  49. }
  50. size.unwrap() as usize
  51. };
  52. let bufsize = size + (CHUNK_SIZE - size % CHUNK_SIZE);
  53. let (tx, rx) = mpsc::channel();
  54. let shared = Arc::new(AudioFileShared {
  55. fileid: fileid,
  56. size: size,
  57. data: Mutex::new(AudioFileData {
  58. buffer: vec![0u8; bufsize],
  59. bitmap: BitSet::with_capacity(bufsize / CHUNK_SIZE as usize)
  60. }),
  61. cond: Condvar::new(),
  62. });
  63. let file = AudioFile {
  64. position: 0,
  65. seek: tx,
  66. shared: shared.clone(),
  67. };
  68. thread::spawn( move || { AudioFile::fetch(shared, streams, rx); });
  69. file
  70. }
  71. fn fetch_chunk(shared: &Arc<AudioFileShared>, streams: &mpsc::Sender<StreamRequest>, index: usize) {
  72. let (tx, rx) = mpsc::channel();
  73. streams.send(StreamRequest {
  74. id: shared.fileid,
  75. offset: (index * CHUNK_SIZE / 4) as u32,
  76. size: (CHUNK_SIZE / 4) as u32,
  77. callback: tx
  78. }).unwrap();
  79. let mut offset = 0usize;
  80. for event in rx.iter() {
  81. match event {
  82. StreamEvent::Header(_,_) => (),
  83. StreamEvent::Data(data) => {
  84. let mut handle = shared.data.lock().unwrap();
  85. copy_memory(&data, &mut handle.buffer[index * CHUNK_SIZE + offset ..]);
  86. offset += data.len();
  87. if offset >= CHUNK_SIZE {
  88. break
  89. }
  90. }
  91. }
  92. }
  93. {
  94. let mut handle = shared.data.lock().unwrap();
  95. handle.bitmap.insert(index as usize);
  96. shared.cond.notify_all();
  97. }
  98. }
  99. fn fetch(shared: Arc<AudioFileShared>, streams: mpsc::Sender<StreamRequest>, seek: mpsc::Receiver<u64>) {
  100. let mut index = 0;
  101. loop {
  102. index = if index * CHUNK_SIZE < shared.size {
  103. match seek.try_recv() {
  104. Ok(position) => position as usize / CHUNK_SIZE,
  105. Err(TryRecvError::Empty) => index,
  106. Err(TryRecvError::Disconnected) => break
  107. }
  108. } else {
  109. match seek.recv() {
  110. Ok(position) => position as usize / CHUNK_SIZE,
  111. Err(_) => break
  112. }
  113. };
  114. {
  115. let handle = shared.data.lock().unwrap();
  116. while handle.bitmap.contains(&index) && index * CHUNK_SIZE < shared.size {
  117. index += 1;
  118. }
  119. }
  120. if index * CHUNK_SIZE < shared.size {
  121. AudioFile::fetch_chunk(&shared, &streams, index)
  122. }
  123. }
  124. }
  125. }
  126. impl io::Read for AudioFile {
  127. fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
  128. let index = self.position / CHUNK_SIZE;
  129. let offset = self.position % CHUNK_SIZE;
  130. let len = min(output.len(), CHUNK_SIZE-offset);
  131. let mut handle = self.shared.data.lock().unwrap();
  132. while !handle.bitmap.contains(&index) {
  133. handle = self.shared.cond.wait(handle).unwrap();
  134. }
  135. copy_memory(&handle.buffer[self.position..self.position+len], output);
  136. self.position += len;
  137. Ok(len)
  138. }
  139. }
  140. impl io::Seek for AudioFile {
  141. fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
  142. let newpos = match pos {
  143. SeekFrom::Start(offset) => offset as i64,
  144. SeekFrom::End(offset) => self.shared.size as i64 + offset,
  145. SeekFrom::Current(offset) => self.position as i64 + offset,
  146. };
  147. self.position = min(newpos as usize, self.shared.size);
  148. self.seek.send(self.position as u64).unwrap();
  149. Ok(self.position as u64)
  150. }
  151. }