|
@@ -1,3 +1,4 @@
|
|
|
+//use bit_set::BitSet;
|
|
|
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
|
|
|
use bytes::Bytes;
|
|
|
use futures::sync::{mpsc, oneshot};
|
|
@@ -301,8 +302,10 @@ impl AudioFile {
|
|
|
debug!("Downloading file {}", file_id);
|
|
|
|
|
|
let (complete_tx, complete_rx) = oneshot::channel();
|
|
|
+ debug!("calling request_chunk");
|
|
|
let initial_data_length = MINIMUM_CHUNK_SIZE;
|
|
|
let (headers, data) = request_range(session, file_id, 0, initial_data_length).split();
|
|
|
+ debug!("returned from request_chunk");
|
|
|
|
|
|
let open = AudioFileOpenStreaming {
|
|
|
session: session.clone(),
|
|
@@ -316,6 +319,7 @@ impl AudioFile {
|
|
|
complete_tx: Some(complete_tx),
|
|
|
};
|
|
|
|
|
|
+ debug!("cloning into cache session");
|
|
|
let session_ = session.clone();
|
|
|
session.spawn(move |_| {
|
|
|
complete_rx
|
|
@@ -330,6 +334,7 @@ impl AudioFile {
|
|
|
.or_else(|oneshot::Canceled| Ok(()))
|
|
|
});
|
|
|
|
|
|
+ debug!("returning open stream");
|
|
|
AudioFileOpen::Streaming(open)
|
|
|
}
|
|
|
|
|
@@ -365,6 +370,8 @@ fn request_range(session: &Session, file: FileId, offset: usize, length: usize)
|
|
|
|
|
|
let (id, channel) = session.channel().allocate();
|
|
|
|
|
|
+ trace!("allocated channel {}", id);
|
|
|
+
|
|
|
let mut data: Vec<u8> = Vec::new();
|
|
|
data.write_u16::<BigEndian>(id).unwrap();
|
|
|
data.write_u8(0).unwrap();
|
|
@@ -382,6 +389,32 @@ fn request_range(session: &Session, file: FileId, offset: usize, length: usize)
|
|
|
channel
|
|
|
}
|
|
|
|
|
|
+//fn request_chunk(session: &Session, file: FileId, index: usize) -> Channel {
|
|
|
+// trace!("requesting chunk {}", index);
|
|
|
+//
|
|
|
+// let start = (index * CHUNK_SIZE / 4) as u32;
|
|
|
+// let end = ((index + 1) * CHUNK_SIZE / 4) as u32;
|
|
|
+//
|
|
|
+// let (id, channel) = session.channel().allocate();
|
|
|
+//
|
|
|
+// trace!("allocated channel {}", id);
|
|
|
+//
|
|
|
+// let mut data: Vec<u8> = Vec::new();
|
|
|
+// data.write_u16::<BigEndian>(id).unwrap();
|
|
|
+// data.write_u8(0).unwrap();
|
|
|
+// data.write_u8(1).unwrap();
|
|
|
+// data.write_u16::<BigEndian>(0x0000).unwrap();
|
|
|
+// data.write_u32::<BigEndian>(0x00000000).unwrap();
|
|
|
+// data.write_u32::<BigEndian>(0x00009C40).unwrap();
|
|
|
+// data.write_u32::<BigEndian>(0x00020000).unwrap();
|
|
|
+// data.write(&file.0).unwrap();
|
|
|
+// data.write_u32::<BigEndian>(start).unwrap();
|
|
|
+// data.write_u32::<BigEndian>(end).unwrap();
|
|
|
+//
|
|
|
+// session.send_packet(0x8, data);
|
|
|
+//
|
|
|
+// channel
|
|
|
+//}
|
|
|
|
|
|
|
|
|
struct PartialFileData {
|
|
@@ -508,6 +541,7 @@ struct AudioFileFetch {
|
|
|
file_data_tx: mpsc::UnboundedSender<ReceivedData>,
|
|
|
file_data_rx: mpsc::UnboundedReceiver<ReceivedData>,
|
|
|
|
|
|
+ //seek_rx: mpsc::UnboundedReceiver<u64>,
|
|
|
stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
|
|
|
complete_tx: Option<oneshot::Sender<NamedTempFile>>,
|
|
|
download_strategy: DownloadStrategy,
|
|
@@ -654,6 +688,35 @@ impl AudioFileFetch {
|
|
|
|
|
|
}
|
|
|
|
|
|
+// fn download(&mut self, mut new_index: usize) {
|
|
|
+// assert!(new_index < self.shared.chunk_count);
|
|
|
+//
|
|
|
+// {
|
|
|
+// let download_status = self.shared.download_status.lock().unwrap();
|
|
|
+// while download_status.downloaded.contains(new_index) {
|
|
|
+// new_index = (new_index + 1) % self.shared.chunk_count;
|
|
|
+// debug!("Download iterated to new_index {}", new_index);
|
|
|
+// }
|
|
|
+// }
|
|
|
+//
|
|
|
+// trace!("== download called for chunk {} of {}", new_index, self.shared.chunk_count);
|
|
|
+//
|
|
|
+// if self.index != new_index {
|
|
|
+// self.index = new_index;
|
|
|
+//
|
|
|
+// let offset = self.index * CHUNK_SIZE;
|
|
|
+//
|
|
|
+// self.output
|
|
|
+// .as_mut()
|
|
|
+// .unwrap()
|
|
|
+// .seek(SeekFrom::Start(offset as u64))
|
|
|
+// .unwrap();
|
|
|
+//
|
|
|
+// let (_headers, data) = request_chunk(&self.session, self.shared.file_id, self.index).split();
|
|
|
+// self.data_rx = data;
|
|
|
+// }
|
|
|
+// }
|
|
|
+
|
|
|
fn poll_file_data_rx(&mut self) -> Poll<(), ()> {
|
|
|
|
|
|
loop {
|
|
@@ -837,6 +900,7 @@ impl Read for AudioFileStreaming {
|
|
|
let mut ranges_to_request = RangeSet::new();
|
|
|
ranges_to_request.add_range(&Range::new(offset, length));
|
|
|
|
|
|
+ debug!("reading at postion {} (length : {})", offset, length);
|
|
|
|
|
|
let mut download_status = self.shared.download_status.lock().unwrap();
|
|
|
ranges_to_request.subtract_range_set(&download_status.downloaded);
|
|
@@ -849,7 +913,9 @@ impl Read for AudioFileStreaming {
|
|
|
}
|
|
|
|
|
|
while !download_status.downloaded.contains(offset) {
|
|
|
+ debug!("waiting for download");
|
|
|
download_status = self.shared.cond.wait_timeout(download_status, Duration::from_millis(1000)).unwrap().0;
|
|
|
+ debug!("re-checking data availability at offset {}.", offset);
|
|
|
}
|
|
|
let available_length = download_status.downloaded.contained_length_from_value(offset);
|
|
|
assert!(available_length > 0);
|
|
@@ -860,6 +926,7 @@ impl Read for AudioFileStreaming {
|
|
|
let read_len = min(length, available_length);
|
|
|
let read_len = try!(self.read_file.read(&mut output[..read_len]));
|
|
|
|
|
|
+ debug!("read at postion {} (length : {})", offset, read_len);
|
|
|
|
|
|
self.position += read_len as u64;
|
|
|
self.shared.read_position.store(self.position as usize, atomic::Ordering::Relaxed);
|