123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897 |
- use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
- use bytes::Bytes;
- use futures::sync::{mpsc, oneshot};
- use futures::Stream;
- use futures::{Async, Future, Poll};
- use std::cmp::min;
- use std::fs;
- use std::io::{self, Read, Seek, SeekFrom, Write};
- use std::sync::{Arc, Condvar, Mutex};
- use std::time::{Duration, Instant};
- use tempfile::NamedTempFile;
- use range_set::{Range, RangeSet};
- use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders};
- use librespot_core::session::Session;
- use librespot_core::spotify_id::FileId;
- use futures::sync::mpsc::unbounded;
- use std::sync::atomic;
- use std::sync::atomic::AtomicUsize;
- const MINIMUM_CHUNK_SIZE: usize = 1024 * 16;
- const MAXIMUM_CHUNK_SIZE: usize = 1024 * 128;
- const MAXIMUM_ASSUMED_PING_TIME_SECONDS: u64 = 5;
- pub enum AudioFile {
- Cached(fs::File),
- Streaming(AudioFileStreaming),
- }
- pub enum AudioFileOpen {
- Cached(Option<fs::File>),
- Streaming(AudioFileOpenStreaming),
- }
- pub struct AudioFileOpenStreaming {
- session: Session,
- initial_data_rx: Option<ChannelData>,
- initial_data_length: Option<usize>,
- initial_request_sent_time: Instant,
- headers: ChannelHeaders,
- file_id: FileId,
- complete_tx: Option<oneshot::Sender<NamedTempFile>>,
- }
- enum StreamLoaderCommand{
- Fetch(Range), // signal the stream loader to fetch a range of the file
- RandomAccessMode(), // optimise download strategy for random access
- StreamMode(), // optimise download strategy for streaming
- StreamDataRate(usize), // when optimising for streaming, assume a streaming rate of this many bytes per second.
- Close(), // terminate and don't load any more data
- }
- #[derive(Clone)]
- pub struct StreamLoaderController {
- channel_tx: Option<mpsc::UnboundedSender<StreamLoaderCommand>>,
- stream_shared: Option<Arc<AudioFileShared>>,
- file_size: usize,
- }
- impl StreamLoaderController {
- pub fn len(&self) -> usize {
- return self.file_size;
- }
- pub fn range_available(&self, range: Range) -> bool {
- if let Some(ref shared) = self.stream_shared {
- let download_status = shared.download_status.lock().unwrap();
- if range.length <= download_status.downloaded.contained_length_from_value(range.start) {
- return true;
- } else {
- return false;
- }
- } else {
- if range.length <= self.len() - range.start {
- return true;
- } else {
- return false;
- }
- }
- }
- pub fn ping_time_ms(&self) -> usize {
- if let Some(ref shared) = self.stream_shared {
- return shared.ping_time_ms.load(atomic::Ordering::Relaxed);
- } else {
- return 0;
- }
- }
- fn send_stream_loader_command(&mut self, command: StreamLoaderCommand) {
- if let Some(ref mut channel) = self.channel_tx {
- // ignore the error in case the channel has been closed already.
- let _ = channel.unbounded_send(command);
- }
- }
- pub fn fetch(&mut self, range: Range) {
- // signal the stream loader to fetch a range of the file
- self.send_stream_loader_command(StreamLoaderCommand::Fetch(range));
- }
- pub fn fetch_blocking(&mut self, mut range: Range) {
- // signal the stream loader to tech a range of the file and block until it is loaded.
- // ensure the range is within the file's bounds.
- if range.start >= self.len() {
- range.length = 0;
- } else if range.end() > self.len() {
- range.length = self.len() - range.start;
- }
- self.fetch(range);
- if let Some(ref shared) = self.stream_shared {
- let mut download_status = shared.download_status.lock().unwrap();
- while range.length > download_status.downloaded.contained_length_from_value(range.start) {
- download_status = shared.cond.wait_timeout(download_status, Duration::from_millis(1000)).unwrap().0;
- if range.length > (download_status.downloaded.union(&download_status.requested).contained_length_from_value(range.start)) {
- // For some reason, the requested range is neither downloaded nor requested.
- // This could be due to a network error. Request it again.
- // We can't use self.fetch here because self can't borrowed mutably, so we access the channel directly.
- if let Some(ref mut channel) = self.channel_tx {
- // ignore the error in case the channel has been closed already.
- let _ = channel.unbounded_send(StreamLoaderCommand::Fetch(range));
- }
- }
- }
- }
- }
- pub fn fetch_next(&mut self, length: usize) {
- let range:Range = if let Some(ref shared) = self.stream_shared {
- Range {
- start: shared.read_position.load(atomic::Ordering::Relaxed),
- length: length,
- }
- } else {
- return;
- };
- self.fetch(range);
- }
- pub fn fetch_next_blocking(&mut self, length: usize) {
- let range:Range = if let Some(ref shared) = self.stream_shared {
- Range {
- start: shared.read_position.load(atomic::Ordering::Relaxed),
- length: length,
- }
- } else {
- return;
- };
- self.fetch_blocking(range);
- }
- pub fn set_random_access_mode(&mut self) {
- // optimise download strategy for random access
- self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode());
- }
- pub fn set_stream_mode(&mut self) {
- // optimise download strategy for streaming
- self.send_stream_loader_command(StreamLoaderCommand::StreamMode());
- }
- pub fn set_stream_data_rate(&mut self, bytes_per_second: usize) {
- // when optimising for streaming, assume a streaming rate of this many bytes per second.
- self.send_stream_loader_command(StreamLoaderCommand::StreamDataRate(bytes_per_second));
- }
- pub fn close(&mut self) {
- // terminate stream loading and don't load any more data for this file.
- self.send_stream_loader_command(StreamLoaderCommand::Close());
- }
- }
- pub struct AudioFileStreaming {
- read_file: fs::File,
- position: u64,
- stream_loader_command_tx: mpsc::UnboundedSender<StreamLoaderCommand>,
- shared: Arc<AudioFileShared>,
- }
- struct AudioFileDownloadStatus {
- requested: RangeSet,
- downloaded: RangeSet,
- }
- struct AudioFileShared {
- file_id: FileId,
- file_size: usize,
- cond: Condvar,
- download_status: Mutex<AudioFileDownloadStatus>,
- ping_time_ms: AtomicUsize,
- read_position: AtomicUsize,
- }
- impl AudioFileOpenStreaming {
- fn finish(&mut self, size: usize) -> AudioFileStreaming {
- let shared = Arc::new(AudioFileShared {
- file_id: self.file_id,
- file_size: size,
- cond: Condvar::new(),
- download_status: Mutex::new(AudioFileDownloadStatus {requested: RangeSet::new(), downloaded: RangeSet::new()}),
- ping_time_ms: AtomicUsize::new(0),
- read_position: AtomicUsize::new(0),
- });
- let mut write_file = NamedTempFile::new().unwrap();
- write_file.as_file().set_len(size as u64).unwrap();
- write_file.seek(SeekFrom::Start(0)).unwrap();
- let read_file = write_file.reopen().unwrap();
- let initial_data_rx = self.initial_data_rx.take().unwrap();
- let initial_data_length = self.initial_data_length.take().unwrap();
- let complete_tx = self.complete_tx.take().unwrap();
- //let (seek_tx, seek_rx) = mpsc::unbounded();
- let (stream_loader_command_tx, stream_loader_command_rx) = mpsc::unbounded::<StreamLoaderCommand>();
- let fetcher = AudioFileFetch::new(
- self.session.clone(),
- shared.clone(),
- initial_data_rx,
- self.initial_request_sent_time,
- initial_data_length,
- write_file,
- stream_loader_command_rx,
- complete_tx,
- );
- self.session.spawn(move |_| fetcher);
- AudioFileStreaming {
- read_file: read_file,
- position: 0,
- //seek: seek_tx,
- stream_loader_command_tx: stream_loader_command_tx,
- shared: shared,
- }
- }
- }
- impl Future for AudioFileOpen {
- type Item = AudioFile;
- type Error = ChannelError;
- fn poll(&mut self) -> Poll<AudioFile, ChannelError> {
- match *self {
- AudioFileOpen::Streaming(ref mut open) => {
- let file = try_ready!(open.poll());
- Ok(Async::Ready(AudioFile::Streaming(file)))
- }
- AudioFileOpen::Cached(ref mut file) => {
- let file = file.take().unwrap();
- Ok(Async::Ready(AudioFile::Cached(file)))
- }
- }
- }
- }
- impl Future for AudioFileOpenStreaming {
- type Item = AudioFileStreaming;
- type Error = ChannelError;
- fn poll(&mut self) -> Poll<AudioFileStreaming, ChannelError> {
- loop {
- let (id, data) = try_ready!(self.headers.poll()).unwrap();
- if id == 0x3 {
- let size = BigEndian::read_u32(&data) as usize * 4;
- let file = self.finish(size);
- return Ok(Async::Ready(file));
- }
- }
- }
- }
- impl AudioFile {
- pub fn open(session: &Session, file_id: FileId) -> AudioFileOpen {
- let cache = session.cache().cloned();
- if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) {
- debug!("File {} already in cache", file_id);
- return AudioFileOpen::Cached(Some(file));
- }
- debug!("Downloading file {}", file_id);
- let (complete_tx, complete_rx) = oneshot::channel();
- let initial_data_length = MINIMUM_CHUNK_SIZE;
- let (headers, data) = request_range(session, file_id, 0, initial_data_length).split();
- let open = AudioFileOpenStreaming {
- session: session.clone(),
- file_id: file_id,
- headers: headers,
- initial_data_rx: Some(data),
- initial_data_length: Some(initial_data_length),
- initial_request_sent_time: Instant::now(),
- complete_tx: Some(complete_tx),
- };
- let session_ = session.clone();
- session.spawn(move |_| {
- complete_rx
- .map(move |mut file| {
- if let Some(cache) = session_.cache() {
- cache.save_file(file_id, &mut file);
- debug!("File {} complete, saving to cache", file_id);
- } else {
- debug!("File {} complete", file_id);
- }
- })
- .or_else(|oneshot::Canceled| Ok(()))
- });
- AudioFileOpen::Streaming(open)
- }
- pub fn get_stream_loader_controller(&self) -> StreamLoaderController {
- match self {
- AudioFile::Streaming(stream) => {
- return StreamLoaderController {
- channel_tx: Some(stream.stream_loader_command_tx.clone()),
- stream_shared: Some(stream.shared.clone()),
- file_size: stream.shared.file_size,
- }
- }
- AudioFile::Cached(ref file) => {
- return StreamLoaderController {
- channel_tx: None,
- stream_shared: None,
- file_size: file.metadata().unwrap().len() as usize,
- }
- }
- }
- }
- }
- fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel {
- trace!("requesting range starting at {} of length {}", offset, length);
- let start = offset / 4;
- let mut end = (offset+length) / 4;
- if (offset+length) % 4 != 0 {
- end += 1;
- }
- let (id, channel) = session.channel().allocate();
- let mut data: Vec<u8> = Vec::new();
- data.write_u16::<BigEndian>(id).unwrap();
- data.write_u8(0).unwrap();
- data.write_u8(1).unwrap();
- data.write_u16::<BigEndian>(0x0000).unwrap();
- data.write_u32::<BigEndian>(0x00000000).unwrap();
- data.write_u32::<BigEndian>(0x00009C40).unwrap();
- data.write_u32::<BigEndian>(0x00020000).unwrap();
- data.write(&file.0).unwrap();
- data.write_u32::<BigEndian>(start as u32).unwrap();
- data.write_u32::<BigEndian>(end as u32).unwrap();
- session.send_packet(0x8, data);
- channel
- }
- struct PartialFileData {
- offset: usize,
- data: Bytes,
- }
- enum ReceivedData {
- ResponseTimeMs(usize),
- Data(PartialFileData),
- }
- struct AudioFileFetchDataReceiver {
- shared: Arc<AudioFileShared>,
- file_data_tx: mpsc::UnboundedSender<ReceivedData>,
- data_rx: ChannelData,
- data_offset: usize,
- request_length: usize,
- request_sent_time: Option<Instant>,
- }
- impl AudioFileFetchDataReceiver {
- fn new(
- shared: Arc<AudioFileShared>,
- file_data_tx: mpsc::UnboundedSender<ReceivedData>,
- data_rx: ChannelData,
- data_offset: usize,
- request_length: usize,
- request_sent_time: Instant,
- ) -> AudioFileFetchDataReceiver {
- AudioFileFetchDataReceiver {
- shared: shared,
- data_rx: data_rx,
- file_data_tx: file_data_tx,
- data_offset: data_offset,
- request_length: request_length,
- request_sent_time: Some(request_sent_time),
- }
- }
- }
- impl AudioFileFetchDataReceiver {
- fn finish(&mut self) {
- if self.request_length > 0 {
- let missing_range = Range::new(self.data_offset, self.request_length);
- let mut download_status = self.shared.download_status.lock().unwrap();
- download_status.requested.subtract_range(&missing_range);
- self.shared.cond.notify_all();
- }
- }
- }
- impl Future for AudioFileFetchDataReceiver {
- type Item = ();
- type Error = ();
- fn poll(&mut self) -> Poll<(), ()> {
- loop {
- trace!("Looping data_receiver for offset {} and length {}", self.data_offset, self.request_length);
- match self.data_rx.poll() {
- Ok(Async::Ready(Some(data))) => {
- if let Some(request_sent_time) = self.request_sent_time {
- let duration = Instant::now() - request_sent_time;
- let duration_ms: u64;
- if duration.as_secs() > MAXIMUM_ASSUMED_PING_TIME_SECONDS {
- duration_ms = MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000;
- }else {
- duration_ms = duration.as_secs() *1000 + duration.subsec_millis() as u64;
- }
- let _ = self.file_data_tx.unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize));
- }
- let data_size = data.len();
- trace!("data_receiver got {} bytes of data", data_size);
- let _ = self.file_data_tx.unbounded_send(ReceivedData::Data(PartialFileData { offset: self.data_offset, data: data, }));
- self.data_offset += data_size;
- if self.request_length < data_size {
- warn!("Received more data from server than requested.");
- self.request_length = 0;
- } else {
- self.request_length -= data_size;
- }
- if self.request_length == 0 {
- trace!("Data receiver completed at position {}", self.data_offset);
- return Ok(Async::Ready(()));
- }
- }
- Ok(Async::Ready(None)) => {
- if self.request_length > 0 {
- warn!("Received less data from server than requested.");
- self.finish();
- }
- return Ok(Async::Ready(()));
- }
- Ok(Async::NotReady) => {
- //trace!("No more data for data_receiver at the moment.");
- return Ok(Async::NotReady);
- }
- Err(ChannelError) => {
- warn!("error from channel");
- self.finish();
- return Ok(Async::Ready(()));
- }
- }
- }
- }
- }
- enum DownloadStrategy {
- RandomAccess(),
- Streaming(),
- }
- struct AudioFileFetch {
- session: Session,
- shared: Arc<AudioFileShared>,
- output: Option<NamedTempFile>,
- file_data_tx: mpsc::UnboundedSender<ReceivedData>,
- file_data_rx: mpsc::UnboundedReceiver<ReceivedData>,
- stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
- complete_tx: Option<oneshot::Sender<NamedTempFile>>,
- download_strategy: DownloadStrategy,
- streaming_data_rate: usize,
- network_response_times_ms: Vec<usize>,
- }
- impl AudioFileFetch {
- fn new(
- session: Session,
- shared: Arc<AudioFileShared>,
- initial_data_rx: ChannelData,
- initial_request_sent_time: Instant,
- initial_data_length: usize,
- output: NamedTempFile,
- stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
- complete_tx: oneshot::Sender<NamedTempFile>,
- ) -> AudioFileFetch {
- let (file_data_tx, file_data_rx) = unbounded::<ReceivedData>();
- {
- let requested_range = Range::new(0, initial_data_length);
- let mut download_status = shared.download_status.lock().unwrap();
- download_status.requested.add_range(&requested_range);
- }
- let initial_data_receiver = AudioFileFetchDataReceiver::new(
- shared.clone(),
- file_data_tx.clone(),
- initial_data_rx,
- 0,
- initial_data_length,
- initial_request_sent_time,
- );
- session.spawn(move |_| initial_data_receiver);
- AudioFileFetch {
- session: session,
- shared: shared,
- output: Some(output),
- file_data_tx: file_data_tx,
- file_data_rx: file_data_rx,
- stream_loader_command_rx: stream_loader_command_rx,
- complete_tx: Some(complete_tx),
- download_strategy: DownloadStrategy::RandomAccess(), // start with random access mode until someone tells us otherwise
- streaming_data_rate: 40, // assume 360 kbit per second unless someone tells us otherwise.
- network_response_times_ms: Vec::new(),
- }
- }
- fn download_range(&mut self, mut offset: usize, mut length: usize) {
- if length < MINIMUM_CHUNK_SIZE {
- length = MINIMUM_CHUNK_SIZE;
- }
- // ensure the values are within the bounds and align them by 4 for the spotify protocol.
- if offset >= self.shared.file_size {
- return;
- }
- if length <= 0 {
- return;
- }
- if offset + length > self.shared.file_size {
- length = self.shared.file_size - offset;
- }
- if offset % 4 != 0 {
- length += offset % 4;
- offset -= offset % 4;
- }
- if length % 4 != 0 {
- length += 4 - (length % 4);
- }
- let mut ranges_to_request = RangeSet::new();
- ranges_to_request.add_range(&Range::new(offset, length));
- let mut download_status = self.shared.download_status.lock().unwrap();
- ranges_to_request.subtract_range_set(&download_status.downloaded);
- ranges_to_request.subtract_range_set(&download_status.requested);
- for range in ranges_to_request.iter() {
- let (_headers, data) = request_range(&self.session, self.shared.file_id, range.start, range.length).split();
- download_status.requested.add_range(range);
- let receiver = AudioFileFetchDataReceiver::new(
- self.shared.clone(),
- self.file_data_tx.clone(),
- data,
- range.start,
- range.length,
- Instant::now(),
- );
- self.session.spawn(move |_| receiver);
- }
- }
- fn pre_fetch_more_data(&mut self) {
- // determine what is still missing
- let mut missing_data = RangeSet::new();
- missing_data.add_range(&Range::new(0,self.shared.file_size));
- {
- let download_status = self.shared.download_status.lock().unwrap();
- missing_data.subtract_range_set(&download_status.downloaded);
- missing_data.subtract_range_set(&download_status.requested);
- }
- // download data from after the current read position first
- let mut tail_end = RangeSet::new();
- let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed);
- tail_end.add_range(&Range::new(read_position, self.shared.file_size - read_position));
- let tail_end = tail_end.intersection(&missing_data);
- if ! tail_end.is_empty() {
- let range = tail_end.get_range(0);
- let offset = range.start;
- let length = min(range.length, MAXIMUM_CHUNK_SIZE);
- self.download_range(offset, length);
- } else if ! missing_data.is_empty() {
- // ok, the tail is downloaded, download something fom the beginning.
- let range = missing_data.get_range(0);
- let offset = range.start;
- let length = min(range.length, MAXIMUM_CHUNK_SIZE);
- self.download_range(offset, length);
- }
- }
- fn poll_file_data_rx(&mut self) -> Poll<(), ()> {
- loop {
- match self.file_data_rx.poll() {
- Ok(Async::Ready(None)) => {
- trace!("File data channel closed.");
- return Ok(Async::Ready(()));
- }
- Ok(Async::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms)))) => {
- trace!("Received ping time information: {} ms.", response_time_ms);
- // record the response time
- self.network_response_times_ms.push(response_time_ms);
- // prone old response times. Keep at most three.
- while self.network_response_times_ms.len() > 3 {
- self.network_response_times_ms.remove(0);
- }
- // stats::median is experimental. So we calculate the median of up to three ourselves.
- let ping_time_ms: usize = match self.network_response_times_ms.len() {
- 1 => self.network_response_times_ms[0] as usize,
- 2 => ((self.network_response_times_ms[0] + self.network_response_times_ms[1]) / 2) as usize,
- 3 => {
- let mut times = self.network_response_times_ms.clone();
- times.sort();
- times[1]
- }
- _ => unreachable!(),
- };
- // store our new estimate for everyone to see
- self.shared.ping_time_ms.store(ping_time_ms, atomic::Ordering::Relaxed);
- },
- Ok(Async::Ready(Some(ReceivedData::Data(data)))) => {
- trace!("Writing data to file: offset {}, length {}", data.offset, data.data.len());
- self.output
- .as_mut()
- .unwrap()
- .seek(SeekFrom::Start(data.offset as u64))
- .unwrap();
- self.output.as_mut().unwrap().write_all(data.data.as_ref()).unwrap();
- let mut full = false;
- {
- let mut download_status = self.shared.download_status.lock().unwrap();
- let received_range = Range::new(data.offset, data.data.len());
- download_status.downloaded.add_range(&received_range);
- self.shared.cond.notify_all();
- if download_status.downloaded.contained_length_from_value(0) >= self.shared.file_size {
- full = true;
- }
- drop(download_status);
- }
- if full {
- self.finish();
- return Ok(Async::Ready(()));
- }
- }
- Ok(Async::NotReady) => {
- return Ok(Async::NotReady);
- },
- Err(()) => unreachable!(),
- }
- }
- }
- fn poll_stream_loader_command_rx(&mut self) -> Poll<(), ()> {
- loop {
- match self.stream_loader_command_rx.poll() {
- Ok(Async::Ready(None)) => {}
- Ok(Async::Ready(Some(StreamLoaderCommand::Fetch(request)))) => {
- self.download_range(request.start, request.length);
- }
- Ok(Async::Ready(Some(StreamLoaderCommand::RandomAccessMode()))) => {
- self.download_strategy = DownloadStrategy::RandomAccess();
- }
- Ok(Async::Ready(Some(StreamLoaderCommand::StreamMode()))) => {
- self.download_strategy = DownloadStrategy::Streaming();
- }
- Ok(Async::Ready(Some(StreamLoaderCommand::StreamDataRate(rate)))) => {
- self.streaming_data_rate = rate;
- }
- Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => {
- return Ok(Async::Ready(()));
- }
- Ok(Async::NotReady) => {
- return Ok(Async::NotReady)
- },
- Err(()) => unreachable!(),
- }
- }
- }
- fn finish(&mut self) {
- trace!("====== FINISHED DOWNLOADING FILE! ======");
- let mut output = self.output.take().unwrap();
- let complete_tx = self.complete_tx.take().unwrap();
- output.seek(SeekFrom::Start(0)).unwrap();
- let _ = complete_tx.send(output);
- }
- }
- impl Future for AudioFileFetch {
- type Item = ();
- type Error = ();
- fn poll(&mut self) -> Poll<(), ()> {
- trace!("Polling AudioFileFetch");
- match self.poll_stream_loader_command_rx() {
- Ok(Async::NotReady) => (),
- Ok(Async::Ready(_)) => {
- return Ok(Async::Ready(()));
- }
- Err(()) => unreachable!(),
- }
- match self.poll_file_data_rx() {
- Ok(Async::NotReady) => (),
- Ok(Async::Ready(_)) => {
- return Ok(Async::Ready(()));
- }
- Err(()) => unreachable!(),
- }
- if let DownloadStrategy::Streaming() = self.download_strategy {
- let bytes_pending: usize = {
- let download_status = self.shared.download_status.lock().unwrap();
- download_status.requested.minus(&download_status.downloaded).len()
- };
- let ping_time = self.shared.ping_time_ms.load(atomic::Ordering::Relaxed);
- if bytes_pending < 2 * ping_time * self.streaming_data_rate {
- self.pre_fetch_more_data();
- }
- }
- return Ok(Async::NotReady)
- }
- }
- impl Read for AudioFileStreaming {
- fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
- let offset = self.position as usize;
- if offset >= self.shared.file_size {
- return Ok(0);
- }
- let length = min(output.len(), self.shared.file_size - offset);
- if length == 0 {
- return Ok(0);
- }
- let mut ranges_to_request = RangeSet::new();
- ranges_to_request.add_range(&Range::new(offset, length));
- let mut download_status = self.shared.download_status.lock().unwrap();
- ranges_to_request.subtract_range_set(&download_status.downloaded);
- ranges_to_request.subtract_range_set(&download_status.requested);
- for range in ranges_to_request.iter() {
- debug!("requesting data at position {} (length : {})", range.start, range.length);
- self.stream_loader_command_tx.unbounded_send(StreamLoaderCommand::Fetch(range.clone())).unwrap();
- }
- while !download_status.downloaded.contains(offset) {
- download_status = self.shared.cond.wait_timeout(download_status, Duration::from_millis(1000)).unwrap().0;
- }
- let available_length = download_status.downloaded.contained_length_from_value(offset);
- assert!(available_length > 0);
- drop(download_status);
- self.position = self.read_file.seek(SeekFrom::Start(offset as u64)).unwrap();
- let read_len = min(length, available_length);
- let read_len = try!(self.read_file.read(&mut output[..read_len]));
- self.position += read_len as u64;
- self.shared.read_position.store(self.position as usize, atomic::Ordering::Relaxed);
- return Ok(read_len);
- }
- }
- impl Seek for AudioFileStreaming {
- fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
- self.position = try!(self.read_file.seek(pos));
- // Do not seek past EOF
- self.shared.read_position.store(self.position as usize, atomic::Ordering::Relaxed);
- Ok(self.position)
- }
- }
- impl Read for AudioFile {
- fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
- match *self {
- AudioFile::Cached(ref mut file) => file.read(output),
- AudioFile::Streaming(ref mut file) => file.read(output),
- }
- }
- }
- impl Seek for AudioFile {
- fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
- match *self {
- AudioFile::Cached(ref mut file) => file.seek(pos),
- AudioFile::Streaming(ref mut file) => file.seek(pos),
- }
- }
- }
|