|
@@ -3,6 +3,7 @@ use byteorder::{ByteOrder, BigEndian, WriteBytesExt};
|
|
|
use futures::Stream;
|
|
|
use futures::sync::{oneshot, mpsc};
|
|
|
use futures::{Poll, Async, Future};
|
|
|
+use futures::future::{self, FutureResult};
|
|
|
use std::cmp::min;
|
|
|
use std::fs;
|
|
|
use std::io::{self, Read, Write, Seek, SeekFrom};
|
|
@@ -19,16 +20,17 @@ component! {
|
|
|
AudioFileManager : AudioFileManagerInner { }
|
|
|
}
|
|
|
|
|
|
-pub struct AudioFile {
|
|
|
- read_file: fs::File,
|
|
|
-
|
|
|
- position: u64,
|
|
|
- seek: mpsc::UnboundedSender<u64>,
|
|
|
+pub enum AudioFile {
|
|
|
+ Cached(fs::File),
|
|
|
+ Streaming(AudioFileStreaming),
|
|
|
+}
|
|
|
|
|
|
- shared: Arc<AudioFileShared>,
|
|
|
+pub enum AudioFileOpen {
|
|
|
+ Cached(FutureResult<fs::File, ChannelError>),
|
|
|
+ Streaming(AudioFileOpenStreaming),
|
|
|
}
|
|
|
|
|
|
-pub struct AudioFileOpen {
|
|
|
+pub struct AudioFileOpenStreaming {
|
|
|
session: Session,
|
|
|
data_rx: Option<ChannelData>,
|
|
|
headers: ChannelHeaders,
|
|
@@ -36,6 +38,15 @@ pub struct AudioFileOpen {
|
|
|
complete_tx: Option<oneshot::Sender<NamedTempFile>>,
|
|
|
}
|
|
|
|
|
|
+pub struct AudioFileStreaming {
|
|
|
+ read_file: fs::File,
|
|
|
+
|
|
|
+ position: u64,
|
|
|
+ seek: mpsc::UnboundedSender<u64>,
|
|
|
+
|
|
|
+ shared: Arc<AudioFileShared>,
|
|
|
+}
|
|
|
+
|
|
|
struct AudioFileShared {
|
|
|
file_id: FileId,
|
|
|
chunk_count: usize,
|
|
@@ -43,8 +54,8 @@ struct AudioFileShared {
|
|
|
bitmap: Mutex<BitSet>,
|
|
|
}
|
|
|
|
|
|
-impl AudioFileOpen {
|
|
|
- fn finish(&mut self, size: usize) -> AudioFile {
|
|
|
+impl AudioFileOpenStreaming {
|
|
|
+ fn finish(&mut self, size: usize) -> AudioFileStreaming {
|
|
|
let chunk_count = (size + CHUNK_SIZE - 1) / CHUNK_SIZE;
|
|
|
|
|
|
let shared = Arc::new(AudioFileShared {
|
|
@@ -69,7 +80,7 @@ impl AudioFileOpen {
|
|
|
);
|
|
|
self.session.spawn(move |_| fetcher);
|
|
|
|
|
|
- AudioFile {
|
|
|
+ AudioFileStreaming {
|
|
|
read_file: read_file,
|
|
|
|
|
|
position: 0,
|
|
@@ -85,6 +96,24 @@ impl Future for AudioFileOpen {
|
|
|
type Error = ChannelError;
|
|
|
|
|
|
fn poll(&mut self) -> Poll<AudioFile, ChannelError> {
|
|
|
+ match *self {
|
|
|
+ AudioFileOpen::Streaming(ref mut open) => {
|
|
|
+ let file = try_ready!(open.poll());
|
|
|
+ Ok(Async::Ready(AudioFile::Streaming(file)))
|
|
|
+ }
|
|
|
+ AudioFileOpen::Cached(ref mut open) => {
|
|
|
+ let file = try_ready!(open.poll());
|
|
|
+ Ok(Async::Ready(AudioFile::Cached(file)))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl Future for AudioFileOpenStreaming {
|
|
|
+ type Item = AudioFileStreaming;
|
|
|
+ type Error = ChannelError;
|
|
|
+
|
|
|
+ fn poll(&mut self) -> Poll<AudioFileStreaming, ChannelError> {
|
|
|
loop {
|
|
|
let (id, data) = try_ready!(self.headers.poll()).unwrap();
|
|
|
|
|
@@ -99,11 +128,17 @@ impl Future for AudioFileOpen {
|
|
|
}
|
|
|
|
|
|
impl AudioFileManager {
|
|
|
- pub fn open(&self, file_id: FileId) -> (AudioFileOpen, oneshot::Receiver<NamedTempFile>) {
|
|
|
+ pub fn open(&self, file_id: FileId) -> AudioFileOpen {
|
|
|
+ let cache = self.session().cache().cloned();
|
|
|
+
|
|
|
+ if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) {
|
|
|
+ return AudioFileOpen::Cached(future::ok(file));
|
|
|
+ }
|
|
|
+
|
|
|
let (complete_tx, complete_rx) = oneshot::channel();
|
|
|
let (headers, data) = request_chunk(&self.session(), file_id, 0).split();
|
|
|
|
|
|
- let open = AudioFileOpen {
|
|
|
+ let open = AudioFileOpenStreaming {
|
|
|
session: self.session(),
|
|
|
file_id: file_id,
|
|
|
|
|
@@ -113,12 +148,21 @@ impl AudioFileManager {
|
|
|
complete_tx: Some(complete_tx),
|
|
|
};
|
|
|
|
|
|
- (open, complete_rx)
|
|
|
+ let session = self.session();
|
|
|
+ self.session().spawn(move |_| {
|
|
|
+ complete_rx.map(move |mut file| {
|
|
|
+ if let Some(cache) = session.cache() {
|
|
|
+ cache.save_file(file_id, &mut file);
|
|
|
+ }
|
|
|
+ }).or_else(|oneshot::Canceled| Ok(()))
|
|
|
+ });
|
|
|
+
|
|
|
+ AudioFileOpen::Streaming(open)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
fn request_chunk(session: &Session, file: FileId, index: usize) -> Channel {
|
|
|
- debug!("requesting chunk {}", index);
|
|
|
+ trace!("requesting chunk {}", index);
|
|
|
|
|
|
let start = (index * CHUNK_SIZE / 4) as u32;
|
|
|
let end = ((index + 1) * CHUNK_SIZE / 4) as u32;
|
|
@@ -236,7 +280,7 @@ impl Future for AudioFileFetch {
|
|
|
Ok(Async::Ready(None)) => {
|
|
|
progress = true;
|
|
|
|
|
|
- debug!("chunk {} / {} complete", self.index, self.shared.chunk_count);
|
|
|
+ trace!("chunk {} / {} complete", self.index, self.shared.chunk_count);
|
|
|
|
|
|
let full = {
|
|
|
let mut bitmap = self.shared.bitmap.lock().unwrap();
|
|
@@ -268,7 +312,7 @@ impl Future for AudioFileFetch {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl Read for AudioFile {
|
|
|
+impl Read for AudioFileStreaming {
|
|
|
fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
|
|
|
let index = self.position as usize / CHUNK_SIZE;
|
|
|
let offset = self.position as usize % CHUNK_SIZE;
|
|
@@ -288,7 +332,7 @@ impl Read for AudioFile {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl Seek for AudioFile {
|
|
|
+impl Seek for AudioFileStreaming {
|
|
|
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
|
|
|
self.position = try!(self.read_file.seek(pos));
|
|
|
|
|
@@ -299,3 +343,21 @@ impl Seek for AudioFile {
|
|
|
Ok(self.position)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+impl Read for AudioFile {
|
|
|
+ fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
|
|
|
+ match *self {
|
|
|
+ AudioFile::Cached(ref mut file) => file.read(output),
|
|
|
+ AudioFile::Streaming(ref mut file) => file.read(output),
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl Seek for AudioFile {
|
|
|
+ fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
|
|
|
+ match *self {
|
|
|
+ AudioFile::Cached(ref mut file) => file.seek(pos),
|
|
|
+ AudioFile::Streaming(ref mut file) => file.seek(pos),
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|