|
@@ -1,26 +1,21 @@
|
|
|
use bit_set::BitSet;
|
|
|
use byteorder::{ByteOrder, BigEndian};
|
|
|
+use eventual;
|
|
|
use std::cmp::min;
|
|
|
use std::sync::{Arc, Condvar, Mutex};
|
|
|
use std::sync::mpsc::{self, TryRecvError};
|
|
|
use std::thread;
|
|
|
use std::fs;
|
|
|
use std::io::{self, Read, Write, Seek, SeekFrom};
|
|
|
-use std::path::PathBuf;
|
|
|
use tempfile::NamedTempFile;
|
|
|
|
|
|
-use util::{FileId, IgnoreExt, mkdir_existing};
|
|
|
+use util::{FileId, IgnoreExt};
|
|
|
use session::Session;
|
|
|
use stream::StreamEvent;
|
|
|
|
|
|
const CHUNK_SIZE: usize = 0x20000;
|
|
|
|
|
|
-pub enum AudioFile {
|
|
|
- Direct(fs::File),
|
|
|
- Loading(AudioFileLoading),
|
|
|
-}
|
|
|
-
|
|
|
-pub struct AudioFileLoading {
|
|
|
+pub struct AudioFile {
|
|
|
read_file: fs::File,
|
|
|
|
|
|
position: u64,
|
|
@@ -31,14 +26,15 @@ pub struct AudioFileLoading {
|
|
|
|
|
|
struct AudioFileShared {
|
|
|
file_id: FileId,
|
|
|
- size: usize,
|
|
|
chunk_count: usize,
|
|
|
cond: Condvar,
|
|
|
bitmap: Mutex<BitSet>,
|
|
|
}
|
|
|
|
|
|
-impl AudioFileLoading {
|
|
|
- fn new(session: &Session, file_id: FileId) -> AudioFileLoading {
|
|
|
+impl AudioFile {
|
|
|
+ pub fn new(session: &Session, file_id: FileId)
|
|
|
+ -> (AudioFile, eventual::Future<NamedTempFile, ()>) {
|
|
|
+
|
|
|
let size = session.stream(file_id, 0, 1)
|
|
|
.iter()
|
|
|
.filter_map(|event| {
|
|
@@ -54,10 +50,8 @@ impl AudioFileLoading {
|
|
|
|
|
|
let chunk_count = (size + CHUNK_SIZE - 1) / CHUNK_SIZE;
|
|
|
|
|
|
-
|
|
|
let shared = Arc::new(AudioFileShared {
|
|
|
file_id: file_id,
|
|
|
- size: size,
|
|
|
chunk_count: chunk_count,
|
|
|
cond: Condvar::new(),
|
|
|
bitmap: Mutex::new(BitSet::with_capacity(chunk_count)),
|
|
@@ -68,27 +62,29 @@ impl AudioFileLoading {
|
|
|
let read_file = write_file.reopen().unwrap();
|
|
|
|
|
|
let (seek_tx, seek_rx) = mpsc::channel();
|
|
|
+ let (complete_tx, complete_rx) = eventual::Future::pair();
|
|
|
|
|
|
{
|
|
|
let shared = shared.clone();
|
|
|
let session = session.clone();
|
|
|
- thread::spawn(move || AudioFileLoading::fetch(&session, shared, write_file, seek_rx));
|
|
|
+ thread::spawn(move || AudioFile::fetch(&session, shared, write_file, seek_rx, complete_tx));
|
|
|
}
|
|
|
|
|
|
- AudioFileLoading {
|
|
|
+ (AudioFile {
|
|
|
read_file: read_file,
|
|
|
|
|
|
position: 0,
|
|
|
seek: seek_tx,
|
|
|
|
|
|
shared: shared,
|
|
|
- }
|
|
|
+ }, complete_rx)
|
|
|
}
|
|
|
|
|
|
fn fetch(session: &Session,
|
|
|
shared: Arc<AudioFileShared>,
|
|
|
mut write_file: NamedTempFile,
|
|
|
- seek_rx: mpsc::Receiver<u64>) {
|
|
|
+ seek_rx: mpsc::Receiver<u64>,
|
|
|
+ complete_tx: eventual::Complete<NamedTempFile, ()>) {
|
|
|
let mut index = 0;
|
|
|
|
|
|
loop {
|
|
@@ -103,7 +99,8 @@ impl AudioFileLoading {
|
|
|
let bitmap = shared.bitmap.lock().unwrap();
|
|
|
if bitmap.len() >= shared.chunk_count {
|
|
|
drop(bitmap);
|
|
|
- AudioFileLoading::persist_to_cache(session, &shared, &mut write_file);
|
|
|
+ write_file.seek(SeekFrom::Start(0)).unwrap();
|
|
|
+ complete_tx.complete(write_file);
|
|
|
break;
|
|
|
}
|
|
|
|
|
@@ -112,7 +109,7 @@ impl AudioFileLoading {
|
|
|
}
|
|
|
drop(bitmap);
|
|
|
|
|
|
- AudioFileLoading::fetch_chunk(session, &shared, &mut write_file, index);
|
|
|
+ AudioFile::fetch_chunk(session, &shared, &mut write_file, index);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -149,19 +146,9 @@ impl AudioFileLoading {
|
|
|
|
|
|
shared.cond.notify_all();
|
|
|
}
|
|
|
-
|
|
|
- fn persist_to_cache(session: &Session, shared: &AudioFileShared, write_file: &mut NamedTempFile) {
|
|
|
- if let Some(path) = AudioFileManager::cache_path(session, shared.file_id) {
|
|
|
- write_file.seek(SeekFrom::Start(0)).unwrap();
|
|
|
- mkdir_existing(path.parent().unwrap()).unwrap();
|
|
|
-
|
|
|
- let mut cache_file = fs::File::create(path).unwrap();
|
|
|
- io::copy(write_file, &mut cache_file).unwrap();
|
|
|
- }
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
-impl Read for AudioFileLoading {
|
|
|
+impl Read for AudioFile {
|
|
|
fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
|
|
|
let index = self.position as usize / CHUNK_SIZE;
|
|
|
let offset = self.position as usize % CHUNK_SIZE;
|
|
@@ -181,7 +168,7 @@ impl Read for AudioFileLoading {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl Seek for AudioFileLoading {
|
|
|
+impl Seek for AudioFile {
|
|
|
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
|
|
|
self.position = try!(self.read_file.seek(pos));
|
|
|
|
|
@@ -192,44 +179,3 @@ impl Seek for AudioFileLoading {
|
|
|
Ok(self.position as u64)
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
-impl Read for AudioFile {
|
|
|
- fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
|
|
|
- match *self {
|
|
|
- AudioFile::Direct(ref mut file) => file.read(output),
|
|
|
- AudioFile::Loading(ref mut loading) => loading.read(output),
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-impl Seek for AudioFile {
|
|
|
- fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
|
|
|
- match *self {
|
|
|
- AudioFile::Direct(ref mut file) => file.seek(pos),
|
|
|
- AudioFile::Loading(ref mut loading) => loading.seek(pos),
|
|
|
- }
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-pub struct AudioFileManager;
|
|
|
-impl AudioFileManager {
|
|
|
- pub fn new() -> AudioFileManager {
|
|
|
- AudioFileManager
|
|
|
- }
|
|
|
-
|
|
|
- pub fn cache_path(session: &Session, file_id: FileId) -> Option<PathBuf> {
|
|
|
- session.config().cache_location.as_ref().map(|cache| {
|
|
|
- let name = file_id.to_base16();
|
|
|
- cache.join(&name[0..2]).join(&name[2..])
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
- pub fn request(&mut self, session: &Session, file_id: FileId) -> AudioFile {
|
|
|
- let cache_path = AudioFileManager::cache_path(session, file_id);
|
|
|
- let cache_file = cache_path.and_then(|p| fs::File::open(p).ok());
|
|
|
-
|
|
|
- cache_file.map(AudioFile::Direct).unwrap_or_else(|| {
|
|
|
- AudioFile::Loading(AudioFileLoading::new(session, file_id))
|
|
|
- })
|
|
|
- }
|
|
|
-}
|