Browse Source

Run cargo fmt for my code.

Konstantin Seiler 5 years ago
parent
commit
d2d6df0e24
5 changed files with 229 additions and 179 deletions
  1. 164 116
      audio/src/fetch.rs
  2. 5 2
      audio/src/lib.rs
  3. 11 34
      audio/src/range_set.rs
  4. 4 8
      core/src/channel.rs
  5. 45 19
      playback/src/player.rs

+ 164 - 116
audio/src/fetch.rs

@@ -3,28 +3,27 @@ use bytes::Bytes;
 use futures::sync::{mpsc, oneshot};
 use futures::Stream;
 use futures::{Async, Future, Poll};
-use std::cmp::{min, max};
+use range_set::{Range, RangeSet};
+use std::cmp::{max, min};
 use std::fs;
 use std::io::{self, Read, Seek, SeekFrom, Write};
 use std::sync::{Arc, Condvar, Mutex};
 use std::time::{Duration, Instant};
 use tempfile::NamedTempFile;
-use range_set::{Range, RangeSet};
 
+use futures::sync::mpsc::unbounded;
 use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders};
 use librespot_core::session::Session;
 use librespot_core::spotify_id::FileId;
-use futures::sync::mpsc::unbounded;
 use std::sync::atomic;
 use std::sync::atomic::AtomicUsize;
 
-
 const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16;
 // The minimum size of a block that is requested from the Spotify servers in one request.
 // This is the block size that is typically requested while doing a seek() on a file.
 // Note: smaller requests can happen if part of the block is downloaded already.
 
-const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 16; // MUST be divisible by four!!!
+const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 16;
 // The amount of data that is requested when initially opening a file.
 // Note: if the file is opened to play from the beginning, the amount of data to
 // read ahead is requested in addition to this amount. If the file is opened to seek to
@@ -78,8 +77,6 @@ const FAST_PREFETCH_THRESHOLD_FACTOR: f64 = 1.5;
 // performed while downloading. Values smaller than 1 cause the download rate to collapse and effectively
 // only PREFETCH_THRESHOLD_FACTOR is in effect. Thus, set to zero if bandwidth saturation is not wanted.
 
-
-
 pub enum AudioFile {
     Cached(fs::File),
     Streaming(AudioFileStreaming),
@@ -101,15 +98,13 @@ pub struct AudioFileOpenStreaming {
     streaming_data_rate: usize,
 }
 
-
-enum StreamLoaderCommand{
-    Fetch(Range), // signal the stream loader to fetch a range of the file
+enum StreamLoaderCommand {
+    Fetch(Range),       // signal the stream loader to fetch a range of the file
     RandomAccessMode(), // optimise download strategy for random access
-    StreamMode(), // optimise download strategy for streaming
-    Close(), // terminate and don't load any more data
+    StreamMode(),       // optimise download strategy for streaming
+    Close(),            // terminate and don't load any more data
 }
 
-
 #[derive(Clone)]
 pub struct StreamLoaderController {
     channel_tx: Option<mpsc::UnboundedSender<StreamLoaderCommand>>,
@@ -117,7 +112,6 @@ pub struct StreamLoaderController {
     file_size: usize,
 }
 
-
 impl StreamLoaderController {
     pub fn len(&self) -> usize {
         return self.file_size;
@@ -126,7 +120,11 @@ impl StreamLoaderController {
     pub fn range_available(&self, range: Range) -> bool {
         if let Some(ref shared) = self.stream_shared {
             let download_status = shared.download_status.lock().unwrap();
-            if range.length <= download_status.downloaded.contained_length_from_value(range.start) {
+            if range.length
+                <= download_status
+                    .downloaded
+                    .contained_length_from_value(range.start)
+            {
                 return true;
             } else {
                 return false;
@@ -174,9 +172,22 @@ impl StreamLoaderController {
 
         if let Some(ref shared) = self.stream_shared {
             let mut download_status = shared.download_status.lock().unwrap();
-            while range.length > download_status.downloaded.contained_length_from_value(range.start) {
-                download_status = shared.cond.wait_timeout(download_status, Duration::from_millis(1000)).unwrap().0;
-                if range.length > (download_status.downloaded.union(&download_status.requested).contained_length_from_value(range.start)) {
+            while range.length
+                > download_status
+                    .downloaded
+                    .contained_length_from_value(range.start)
+            {
+                download_status = shared
+                    .cond
+                    .wait_timeout(download_status, Duration::from_millis(1000))
+                    .unwrap()
+                    .0;
+                if range.length
+                    > (download_status
+                        .downloaded
+                        .union(&download_status.requested)
+                        .contained_length_from_value(range.start))
+                {
                     // For some reason, the requested range is neither downloaded nor requested.
                     // This could be due to a network error. Request it again.
                     // We can't use self.fetch here because self can't be borrowed mutably, so we access the channel directly.
@@ -187,11 +198,10 @@ impl StreamLoaderController {
                 }
             }
         }
-
     }
 
     pub fn fetch_next(&mut self, length: usize) {
-        let range:Range = if let Some(ref shared) = self.stream_shared {
+        let range: Range = if let Some(ref shared) = self.stream_shared {
             Range {
                 start: shared.read_position.load(atomic::Ordering::Relaxed),
                 length: length,
@@ -203,7 +213,7 @@ impl StreamLoaderController {
     }
 
     pub fn fetch_next_blocking(&mut self, length: usize) {
-        let range:Range = if let Some(ref shared) = self.stream_shared {
+        let range: Range = if let Some(ref shared) = self.stream_shared {
             Range {
                 start: shared.read_position.load(atomic::Ordering::Relaxed),
                 length: length,
@@ -228,11 +238,8 @@ impl StreamLoaderController {
         // terminate stream loading and don't load any more data for this file.
         self.send_stream_loader_command(StreamLoaderCommand::Close());
     }
-
-
 }
 
-
 pub struct AudioFileStreaming {
     read_file: fs::File,
 
@@ -243,7 +250,6 @@ pub struct AudioFileStreaming {
     shared: Arc<AudioFileShared>,
 }
 
-
 struct AudioFileDownloadStatus {
     requested: RangeSet,
     downloaded: RangeSet,
@@ -269,13 +275,15 @@ struct AudioFileShared {
 
 impl AudioFileOpenStreaming {
     fn finish(&mut self, size: usize) -> AudioFileStreaming {
-
         let shared = Arc::new(AudioFileShared {
             file_id: self.file_id,
             file_size: size,
             stream_data_rate: self.streaming_data_rate,
             cond: Condvar::new(),
-            download_status: Mutex::new(AudioFileDownloadStatus {requested: RangeSet::new(), downloaded: RangeSet::new()}),
+            download_status: Mutex::new(AudioFileDownloadStatus {
+                requested: RangeSet::new(),
+                downloaded: RangeSet::new(),
+            }),
             download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise
             number_of_open_requests: AtomicUsize::new(0),
             ping_time_ms: AtomicUsize::new(0),
@@ -292,7 +300,8 @@ impl AudioFileOpenStreaming {
         let initial_data_length = self.initial_data_length.take().unwrap();
         let complete_tx = self.complete_tx.take().unwrap();
         //let (seek_tx, seek_rx) = mpsc::unbounded();
-        let (stream_loader_command_tx, stream_loader_command_rx) = mpsc::unbounded::<StreamLoaderCommand>();
+        let (stream_loader_command_tx, stream_loader_command_rx) =
+            mpsc::unbounded::<StreamLoaderCommand>();
 
         let fetcher = AudioFileFetch::new(
             self.session.clone(),
@@ -355,7 +364,12 @@ impl Future for AudioFileOpenStreaming {
 }
 
 impl AudioFile {
-    pub fn open(session: &Session, file_id: FileId, bytes_per_second: usize, play_from_beginning: bool) -> AudioFileOpen {
+    pub fn open(
+        session: &Session,
+        file_id: FileId,
+        bytes_per_second: usize,
+        play_from_beginning: bool,
+    ) -> AudioFileOpen {
         let cache = session.cache().cloned();
 
         if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) {
@@ -367,10 +381,16 @@ impl AudioFile {
 
         let (complete_tx, complete_rx) = oneshot::channel();
         let mut initial_data_length = if play_from_beginning {
-                INITIAL_DOWNLOAD_SIZE + max((READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, (INITIAL_PING_TIME_ESTIMATE_SECONDS * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * bytes_per_second as f64) as usize)
-            } else {
-                INITIAL_DOWNLOAD_SIZE
-            };
+            INITIAL_DOWNLOAD_SIZE
+                + max(
+                    (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize,
+                    (INITIAL_PING_TIME_ESTIMATE_SECONDS
+                        * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
+                        * bytes_per_second as f64) as usize,
+                )
+        } else {
+            INITIAL_DOWNLOAD_SIZE
+        };
         if initial_data_length % 4 != 0 {
             initial_data_length += 4 - (initial_data_length % 4);
         }
@@ -387,7 +407,6 @@ impl AudioFile {
 
             complete_tx: Some(complete_tx),
             streaming_data_rate: bytes_per_second,
-
         };
 
         let session_ = session.clone();
@@ -427,17 +446,26 @@ impl AudioFile {
     }
 }
 
-
 fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel {
-
-    assert!(offset % 4 == 0, "Range request start positions must be aligned by 4 bytes.");
-    assert!(length % 4 == 0, "Range request range lengths must be aligned by 4 bytes.");
+    assert!(
+        offset % 4 == 0,
+        "Range request start positions must be aligned by 4 bytes."
+    );
+    assert!(
+        length % 4 == 0,
+        "Range request range lengths must be aligned by 4 bytes."
+    );
     let start = offset / 4;
-    let end = (offset+length) / 4;
+    let end = (offset + length) / 4;
 
     let (id, channel) = session.channel().allocate();
 
-    trace!("requesting range starting at {} of length {} on channel {}.", offset, length, id);
+    trace!(
+        "requesting range starting at {} of length {} on channel {}.",
+        offset,
+        length,
+        id
+    );
 
     let mut data: Vec<u8> = Vec::new();
     data.write_u16::<BigEndian>(id).unwrap();
@@ -456,8 +484,6 @@ fn request_range(session: &Session, file: FileId, offset: usize, length: usize)
     channel
 }
 
-
-
 struct PartialFileData {
     offset: usize,
     data: Bytes,
@@ -489,10 +515,11 @@ impl AudioFileFetchDataReceiver {
         request_length: usize,
         request_sent_time: Instant,
     ) -> AudioFileFetchDataReceiver {
-
         let measure_ping_time = shared.number_of_open_requests.load(atomic::Ordering::SeqCst) == 0;
 
-        shared.number_of_open_requests.fetch_add(1, atomic::Ordering::SeqCst);
+        shared
+            .number_of_open_requests
+            .fetch_add(1, atomic::Ordering::SeqCst);
 
         AudioFileFetchDataReceiver {
             shared: shared,
@@ -508,12 +535,9 @@ impl AudioFileFetchDataReceiver {
     }
 }
 
-
-
 impl AudioFileFetchDataReceiver {
     fn finish(&mut self) {
         if self.request_length > 0 {
-
             let missing_range = Range::new(self.data_offset, self.request_length);
 
             let mut download_status = self.shared.download_status.lock().unwrap();
@@ -521,8 +545,9 @@ impl AudioFileFetchDataReceiver {
             self.shared.cond.notify_all();
         }
 
-        self.shared.number_of_open_requests.fetch_sub(1, atomic::Ordering::SeqCst);
-
+        self.shared
+            .number_of_open_requests
+            .fetch_sub(1, atomic::Ordering::SeqCst);
     }
 }
 
@@ -538,18 +563,26 @@ impl Future for AudioFileFetchDataReceiver {
                         if let Some(request_sent_time) = self.request_sent_time {
                             let duration = Instant::now() - request_sent_time;
                             let duration_ms: u64;
-                            if 0.001 * (duration.as_millis() as f64) > MAXIMUM_ASSUMED_PING_TIME_SECONDS {
+                            if 0.001 * (duration.as_millis() as f64) > MAXIMUM_ASSUMED_PING_TIME_SECONDS
+                            {
                                 duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64;
                             } else {
                                 duration_ms = duration.as_millis() as u64;
                             }
-                            let _ = self.file_data_tx.unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize));
+                            let _ = self
+                                .file_data_tx
+                                .unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize));
                             self.measure_ping_time = false;
                         }
                     }
                     let data_size = data.len();
                     trace!("data_receiver for range {} (+{}) got {} bytes of data starting at {}. ({} bytes pending).", self.initial_data_offset, self.initial_request_length, data_size, self.data_offset, self.request_length - data_size);
-                    let _ = self.file_data_tx.unbounded_send(ReceivedData::Data(PartialFileData { offset: self.data_offset, data: data, }));
+                    let _ = self
+                        .file_data_tx
+                        .unbounded_send(ReceivedData::Data(PartialFileData {
+                            offset: self.data_offset,
+                            data: data,
+                        }));
                     self.data_offset += data_size;
                     if self.request_length < data_size {
                         warn!("Data receiver for range {} (+{}) received more data from server than requested.", self.initial_data_offset, self.initial_request_length);
@@ -558,7 +591,11 @@ impl Future for AudioFileFetchDataReceiver {
                         self.request_length -= data_size;
                     }
                     if self.request_length == 0 {
-                        trace!("Data receiver for range {} (+{}) completed.", self.initial_data_offset, self.initial_request_length);
+                        trace!(
+                            "Data receiver for range {} (+{}) completed.",
+                            self.initial_data_offset,
+                            self.initial_request_length
+                        );
                         self.finish();
                         return Ok(Async::Ready(()));
                     }
@@ -574,7 +611,10 @@ impl Future for AudioFileFetchDataReceiver {
                     return Ok(Async::NotReady);
                 }
                 Err(ChannelError) => {
-                    warn!("Error from channel for data receiver for range {} (+{}).", self.initial_data_offset, self.initial_request_length);
+                    warn!(
+                        "Error from channel for data receiver for range {} (+{}).",
+                        self.initial_data_offset, self.initial_request_length
+                    );
                     self.finish();
                     return Ok(Async::Ready(()));
                 }
@@ -583,7 +623,6 @@ impl Future for AudioFileFetchDataReceiver {
     }
 }
 
-
 struct AudioFileFetch {
     session: Session,
     shared: Arc<AudioFileShared>,
@@ -609,7 +648,6 @@ impl AudioFileFetch {
         stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
         complete_tx: oneshot::Sender<NamedTempFile>,
     ) -> AudioFileFetch {
-
         let (file_data_tx, file_data_rx) = unbounded::<ReceivedData>();
 
         {
@@ -618,7 +656,6 @@ impl AudioFileFetch {
             download_status.requested.add_range(&requested_range);
         }
 
-
         let initial_data_receiver = AudioFileFetchDataReceiver::new(
             shared.clone(),
             file_data_tx.clone(),
@@ -649,7 +686,6 @@ impl AudioFileFetch {
     }
 
     fn download_range(&mut self, mut offset: usize, mut length: usize) {
-
         if length < MINIMUM_DOWNLOAD_SIZE {
             length = MINIMUM_DOWNLOAD_SIZE;
         }
@@ -684,13 +720,12 @@ impl AudioFileFetch {
         ranges_to_request.subtract_range_set(&download_status.downloaded);
         ranges_to_request.subtract_range_set(&download_status.requested);
 
-
         for range in ranges_to_request.iter() {
-            let (_headers, data) = request_range(&self.session, self.shared.file_id, range.start, range.length).split();
+            let (_headers, data) =
+                request_range(&self.session, self.shared.file_id, range.start, range.length).split();
 
             download_status.requested.add_range(range);
 
-
             let receiver = AudioFileFetchDataReceiver::new(
                 self.shared.clone(),
                 self.file_data_tx.clone(),
@@ -702,15 +737,12 @@ impl AudioFileFetch {
 
             self.session.spawn(move |_| receiver);
         }
-
     }
 
     fn pre_fetch_more_data(&mut self, bytes: usize) {
-
         let mut bytes_to_go = bytes;
 
         while bytes_to_go > 0 {
-
             // determine what is still missing
             let mut missing_data = RangeSet::new();
             missing_data.add_range(&Range::new(0, self.shared.file_size));
@@ -743,12 +775,9 @@ impl AudioFileFetch {
                 return;
             }
         }
-
     }
 
-
     fn poll_file_data_rx(&mut self) -> Poll<(), ()> {
-
         loop {
             match self.file_data_rx.poll() {
                 Ok(Async::Ready(None)) => {
@@ -768,7 +797,10 @@ impl AudioFileFetch {
                     // stats::median is experimental. So we calculate the median of up to three ourselves.
                     let ping_time_ms: usize = match self.network_response_times_ms.len() {
                         1 => self.network_response_times_ms[0] as usize,
-                        2 => ((self.network_response_times_ms[0] + self.network_response_times_ms[1]) / 2) as usize,
+                        2 => {
+                            ((self.network_response_times_ms[0] + self.network_response_times_ms[1]) / 2)
+                                as usize
+                        }
                         3 => {
                             let mut times = self.network_response_times_ms.clone();
                             times.sort();
@@ -778,20 +810,21 @@ impl AudioFileFetch {
                     };
 
                     // store our new estimate for everyone to see
-                    self.shared.ping_time_ms.store(ping_time_ms, atomic::Ordering::Relaxed);
-
-                },
+                    self.shared
+                        .ping_time_ms
+                        .store(ping_time_ms, atomic::Ordering::Relaxed);
+                }
                 Ok(Async::Ready(Some(ReceivedData::Data(data)))) => {
-
-
                     self.output
                         .as_mut()
                         .unwrap()
                         .seek(SeekFrom::Start(data.offset as u64))
                         .unwrap();
-                    self.output.as_mut().unwrap().write_all(data.data.as_ref()).unwrap();
-
-
+                    self.output
+                        .as_mut()
+                        .unwrap()
+                        .write_all(data.data.as_ref())
+                        .unwrap();
 
                     let mut full = false;
 
@@ -802,11 +835,17 @@ impl AudioFileFetch {
                         download_status.downloaded.add_range(&received_range);
                         self.shared.cond.notify_all();
 
-                        if download_status.downloaded.contained_length_from_value(0) >= self.shared.file_size {
+                        if download_status.downloaded.contained_length_from_value(0)
+                            >= self.shared.file_size
+                        {
                             full = true;
                         }
 
-                        trace!("Downloaded: {} Requested: {}", download_status.downloaded, download_status.requested.minus(&download_status.downloaded));
+                        trace!(
+                            "Downloaded: {} Requested: {}",
+                            download_status.downloaded,
+                            download_status.requested.minus(&download_status.downloaded)
+                        );
 
                         drop(download_status);
                     }
@@ -815,22 +854,16 @@ impl AudioFileFetch {
                         self.finish();
                         return Ok(Async::Ready(()));
                     }
-
-
                 }
                 Ok(Async::NotReady) => {
                     return Ok(Async::NotReady);
-                },
+                }
                 Err(()) => unreachable!(),
             }
-
         }
-
     }
 
-
     fn poll_stream_loader_command_rx(&mut self) -> Poll<(), ()> {
-
         loop {
             match self.stream_loader_command_rx.poll() {
                 Ok(Async::Ready(None)) => {
@@ -848,13 +881,10 @@ impl AudioFileFetch {
                 Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => {
                     return Ok(Async::Ready(()));
                 }
-                Ok(Async::NotReady) => {
-                    return Ok(Async::NotReady)
-                },
+                Ok(Async::NotReady) => return Ok(Async::NotReady),
                 Err(()) => unreachable!(),
             }
         }
-
     }
 
     fn finish(&mut self) {
@@ -865,7 +895,6 @@ impl AudioFileFetch {
         output.seek(SeekFrom::Start(0)).unwrap();
         let _ = complete_tx.send(output);
     }
-
 }
 
 impl Future for AudioFileFetch {
@@ -873,7 +902,6 @@ impl Future for AudioFileFetch {
     type Error = ();
 
     fn poll(&mut self) -> Poll<(), ()> {
-
         match self.poll_stream_loader_command_rx() {
             Ok(Async::NotReady) => (),
             Ok(Async::Ready(_)) => {
@@ -896,22 +924,29 @@ impl Future for AudioFileFetch {
                 download_status.requested.minus(&download_status.downloaded).len()
             };
 
-            let ping_time_seconds = 0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
+            let ping_time_seconds =
+                0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
             let download_rate = self.session.channel().get_download_rate_estimate();
 
             let desired_pending_bytes = max(
-                (PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.shared.stream_data_rate as f64) as usize,
-                (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64) as usize
+                (PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.shared.stream_data_rate as f64)
+                    as usize,
+                (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64) as usize,
             );
 
             if bytes_pending < desired_pending_bytes {
-                trace!("Prefetching more data. pending: {}, desired: {}, ping: {}, rate: {}", bytes_pending, desired_pending_bytes, ping_time_seconds, download_rate);
+                trace!(
+                    "Prefetching more data. pending: {}, desired: {}, ping: {}, rate: {}",
+                    bytes_pending,
+                    desired_pending_bytes,
+                    ping_time_seconds,
+                    download_rate
+                );
                 self.pre_fetch_more_data(desired_pending_bytes - bytes_pending);
             }
         }
 
-
-        return Ok(Async::NotReady)
+        return Ok(Async::NotReady);
     }
 }
 
@@ -925,23 +960,25 @@ impl Read for AudioFileStreaming {
 
         let length = min(output.len(), self.shared.file_size - offset);
 
-
         let length_to_request = match *(self.shared.download_strategy.lock().unwrap()) {
-            DownloadStrategy::RandomAccess() => { length }
+            DownloadStrategy::RandomAccess() => length,
             DownloadStrategy::Streaming() => {
                 // Due to the read-ahead stuff, we potentially request more than the actual reqeust demanded.
-                let ping_time_seconds = 0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
-
-                let length_to_request = length + max(
-                    (READ_AHEAD_DURING_PLAYBACK_SECONDS * self.shared.stream_data_rate as f64) as usize,
-                    (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * ping_time_seconds * self.shared.stream_data_rate as f64) as usize
-                );
+                let ping_time_seconds =
+                    0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
+
+                let length_to_request = length
+                    + max(
+                        (READ_AHEAD_DURING_PLAYBACK_SECONDS * self.shared.stream_data_rate as f64)
+                            as usize,
+                        (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
+                            * ping_time_seconds
+                            * self.shared.stream_data_rate as f64) as usize,
+                    );
                 min(length_to_request, self.shared.file_size - offset)
             }
         };
 
-
-
         let mut ranges_to_request = RangeSet::new();
         ranges_to_request.add_range(&Range::new(offset, length_to_request));
 
@@ -951,27 +988,35 @@ impl Read for AudioFileStreaming {
         ranges_to_request.subtract_range_set(&download_status.downloaded);
         ranges_to_request.subtract_range_set(&download_status.requested);
 
-
         for range in ranges_to_request.iter() {
-            trace!("requesting data at position {} (length : {})", range.start, range.length);
-            self.stream_loader_command_tx.unbounded_send(StreamLoaderCommand::Fetch(range.clone())).unwrap();
+            trace!(
+                "requesting data at position {} (length : {})",
+                range.start,
+                range.length
+            );
+            self.stream_loader_command_tx
+                .unbounded_send(StreamLoaderCommand::Fetch(range.clone()))
+                .unwrap();
         }
 
-
         if length == 0 {
             return Ok(0);
         }
 
         while !download_status.downloaded.contains(offset) {
             trace!("waiting for download");
-            download_status = self.shared.cond.wait_timeout(download_status, Duration::from_millis(1000)).unwrap().0;
+            download_status = self
+                .shared
+                .cond
+                .wait_timeout(download_status, Duration::from_millis(1000))
+                .unwrap()
+                .0;
             trace!("re-checking data availability at offset {}.", offset);
         }
         let available_length = download_status.downloaded.contained_length_from_value(offset);
         assert!(available_length > 0);
         drop(download_status);
 
-
         self.position = self.read_file.seek(SeekFrom::Start(offset as u64)).unwrap();
         let read_len = min(length, available_length);
         let read_len = try!(self.read_file.read(&mut output[..read_len]));
@@ -979,8 +1024,9 @@ impl Read for AudioFileStreaming {
         trace!("read successfully at postion {} (length : {})", offset, read_len);
 
         self.position += read_len as u64;
-        self.shared.read_position.store(self.position as usize, atomic::Ordering::Relaxed);
-
+        self.shared
+            .read_position
+            .store(self.position as usize, atomic::Ordering::Relaxed);
 
         return Ok(read_len);
     }
@@ -990,7 +1036,9 @@ impl Seek for AudioFileStreaming {
     fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
         self.position = try!(self.read_file.seek(pos));
         // Do not seek past EOF
-        self.shared.read_position.store(self.position as usize, atomic::Ordering::Relaxed);
+        self.shared
+            .read_position
+            .store(self.position as usize, atomic::Ordering::Relaxed);
         Ok(self.position)
     }
 }

+ 5 - 2
audio/src/lib.rs

@@ -3,13 +3,13 @@ extern crate futures;
 #[macro_use]
 extern crate log;
 
+extern crate aes_ctr;
 extern crate bit_set;
 extern crate byteorder;
 extern crate bytes;
 extern crate num_bigint;
 extern crate num_traits;
 extern crate tempfile;
-extern crate aes_ctr;
 
 extern crate librespot_core;
 
@@ -25,7 +25,10 @@ mod range_set;
 
 pub use decrypt::AudioDecrypt;
 pub use fetch::{AudioFile, AudioFileOpen, StreamLoaderController};
-pub use fetch::{READ_AHEAD_BEFORE_PLAYBACK_SECONDS, READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS};
+pub use fetch::{
+    READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_BEFORE_PLAYBACK_SECONDS,
+    READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS,
+};
 
 #[cfg(not(any(feature = "with-tremor", feature = "with-vorbis")))]
 pub use lewton_decoder::{VorbisDecoder, VorbisError, VorbisPacket};

+ 11 - 34
audio/src/range_set.rs

@@ -1,9 +1,6 @@
-
-use std::cmp::{max,min};
-use std::slice::Iter;
+use std::cmp::{max, min};
 use std::fmt;
-
-
+use std::slice::Iter;
 
 #[derive(Copy, Clone)]
 pub struct Range {
@@ -13,27 +10,23 @@ pub struct Range {
 
 impl fmt::Display for Range {
     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        return write!(f, "[{}, {}]", self.start, self.start+self.length-1);
+        return write!(f, "[{}, {}]", self.start, self.start + self.length - 1);
     }
 }
 
-
 impl Range {
-
     pub fn new(start: usize, length: usize) -> Range {
         return Range {
             start: start,
             length: length,
-        }
+        };
     }
 
     pub fn end(&self) -> usize {
         return self.start + self.length;
     }
-
 }
 
-
 #[derive(Clone)]
 pub struct RangeSet {
     ranges: Vec<Range>,
@@ -49,11 +42,9 @@ impl fmt::Display for RangeSet {
     }
 }
 
-
-
 impl RangeSet {
     pub fn new() -> RangeSet {
-        RangeSet{
+        RangeSet {
             ranges: Vec::<Range>::new(),
         }
     }
@@ -98,7 +89,6 @@ impl RangeSet {
             }
         }
         return 0;
-
     }
 
     #[allow(dead_code)]
@@ -111,23 +101,20 @@ impl RangeSet {
         return true;
     }
 
-
-    pub fn add_range(&mut self, range:&Range) {
-
+    pub fn add_range(&mut self, range: &Range) {
         if range.length <= 0 {
             // the interval is empty or invalid -> nothing to do.
             return;
         }
 
-
         for index in 0..self.ranges.len() {
             // the new range is clear of any ranges we already iterated over.
-            if  range.end() < self.ranges[index].start{
+            if range.end() < self.ranges[index].start {
                 // the new range starts after anything we already passed and ends before the next range starts (they don't touch) -> insert it.
                 self.ranges.insert(index, range.clone());
                 return;
-
-            } else if range.start <= self.ranges[index].end() &&  self.ranges[index].start <= range.end() {
+            } else if range.start <= self.ranges[index].end() && self.ranges[index].start <= range.end()
+            {
                 // the new range overlaps (or touches) the first range. They are to be merged.
                 // In addition we might have to merge further ranges in as well.
 
@@ -142,7 +129,6 @@ impl RangeSet {
 
                 self.ranges.insert(index, new_range);
                 return;
-
             }
         }
 
@@ -165,7 +151,6 @@ impl RangeSet {
     }
 
     pub fn subtract_range(&mut self, range: &Range) {
-
         if range.length <= 0 {
             return;
         }
@@ -175,8 +160,7 @@ impl RangeSet {
 
             if range.end() <= self.ranges[index].start {
                 // the remaining ranges are past the one to subtract. -> we're done.
-                return
-
+                return;
             } else if range.start <= self.ranges[index].start && self.ranges[index].start < range.end() {
                 // the range to subtract started before the current range and reaches into the current range
                 // -> we have to remove the beginning of the range or the entire range and do the same for following ranges.
@@ -191,7 +175,6 @@ impl RangeSet {
                 }
 
                 return;
-
             } else if range.end() < self.ranges[index].end() {
                 // the range to subtract punches a hole into the current range -> we need to create two smaller ranges.
 
@@ -206,11 +189,9 @@ impl RangeSet {
                 self.ranges.insert(index, first_range);
 
                 return;
-
             } else if range.start < self.ranges[index].end() {
                 // the range truncates the existing range -> truncate the range. Let the for loop take care of overlaps with other ranges.
                 self.ranges[index].length = range.start - self.ranges[index].start;
-
             }
         }
     }
@@ -245,19 +226,15 @@ impl RangeSet {
                 let new_start = max(self.ranges[self_index].start, other.ranges[other_index].start);
                 let new_end = min(self.ranges[self_index].end(), other.ranges[other_index].end());
                 assert!(new_start <= new_end);
-                result.add_range(&Range::new(new_start, new_end-new_start));
+                result.add_range(&Range::new(new_start, new_end - new_start));
                 if self.ranges[self_index].end() <= other.ranges[other_index].end() {
                     self_index += 1;
                 } else {
                     other_index += 1;
                 }
-
             }
-
         }
 
         return result;
     }
-
 }
-

+ 4 - 8
core/src/channel.rs

@@ -3,7 +3,7 @@ use bytes::Bytes;
 use futures::sync::{mpsc, BiLock};
 use futures::{Async, Poll, Stream};
 use std::collections::HashMap;
-use std::time::{Instant};
+use std::time::Instant;
 
 use util::SeqGenerator;
 
@@ -64,11 +64,11 @@ impl ChannelManager {
         let id: u16 = BigEndian::read_u16(data.split_to(2).as_ref());
 
         self.lock(|inner| {
-
             let current_time = Instant::now();
             if let Some(download_measurement_start) = inner.download_measurement_start {
                 if (current_time - download_measurement_start).as_millis() > 1000 {
-                    inner.download_rate_estimate = 1000 * inner.download_measurement_bytes / (current_time - download_measurement_start).as_millis() as usize;
+                    inner.download_rate_estimate = 1000 * inner.download_measurement_bytes
+                        / (current_time - download_measurement_start).as_millis() as usize;
                     inner.download_measurement_start = Some(current_time);
                     inner.download_measurement_bytes = 0;
                 }
@@ -85,12 +85,8 @@ impl ChannelManager {
     }
 
     pub fn get_download_rate_estimate(&self) -> usize {
-        return self.lock(|inner| {
-            inner.download_rate_estimate
-        });
-
+        return self.lock(|inner| inner.download_rate_estimate);
     }
-
 }
 
 impl Channel {

+ 45 - 19
playback/src/player.rs

@@ -4,20 +4,23 @@ use futures::sync::oneshot;
 use futures::{future, Future};
 use std;
 use std::borrow::Cow;
+use std::cmp::max;
 use std::io::{Read, Result, Seek, SeekFrom};
 use std::mem;
 use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError};
 use std::thread;
 use std::time::Duration;
-use std::cmp::max;
 
 use config::{Bitrate, PlayerConfig};
 use librespot_core::session::Session;
 use librespot_core::spotify_id::SpotifyId;
 
 use audio::{AudioDecrypt, AudioFile, StreamLoaderController};
-use audio::{READ_AHEAD_BEFORE_PLAYBACK_SECONDS, READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS};
 use audio::{VorbisDecoder, VorbisPacket};
+use audio::{
+    READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_BEFORE_PLAYBACK_SECONDS,
+    READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS,
+};
 use audio_backend::Sink;
 use metadata::{AudioItem, FileFormat};
 use mixer::AudioFilter;
@@ -244,7 +247,14 @@ impl PlayerState {
         use self::PlayerState::*;
         match *self {
             Stopped | EndOfTrack { .. } => None,
-            Paused { ref mut stream_loader_controller, .. } | Playing { ref mut stream_loader_controller, .. } => Some(stream_loader_controller),
+            Paused {
+                ref mut stream_loader_controller,
+                ..
+            }
+            | Playing {
+                ref mut stream_loader_controller,
+                ..
+            } => Some(stream_loader_controller),
             Invalid => panic!("invalid state"),
         }
     }
@@ -273,7 +283,7 @@ impl PlayerState {
                 end_of_track,
                 normalisation_factor,
                 stream_loader_controller,
-                bytes_per_second
+                bytes_per_second,
             } => {
                 *self = Playing {
                     track_id: track_id,
@@ -426,7 +436,12 @@ impl PlayerInternal {
                 }
 
                 match self.load_track(track_id, position as i64) {
-                    Some((decoder, normalisation_factor, stream_loader_controller, bytes_per_second)) => {
+                    Some((
+                        decoder,
+                        normalisation_factor,
+                        stream_loader_controller,
+                        bytes_per_second,
+                    )) => {
                         if play {
                             match self.state {
                                 PlayerState::Playing {
@@ -503,25 +518,27 @@ impl PlayerInternal {
                 if let Some(stream_loader_controller) = self.state.stream_loader_controller() {
                     stream_loader_controller.set_stream_mode();
                 }
-                if let PlayerState::Playing{bytes_per_second, ..} = self.state {
+                if let PlayerState::Playing { bytes_per_second, .. } = self.state {
                     if let Some(stream_loader_controller) = self.state.stream_loader_controller() {
-
                         // Request our read ahead range
                         let request_data_length = max(
-                            (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * (0.001 * stream_loader_controller.ping_time_ms() as f64) * bytes_per_second as f64) as usize,
-                            (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize
+                            (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
+                                * (0.001 * stream_loader_controller.ping_time_ms() as f64)
+                                * bytes_per_second as f64) as usize,
+                            (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize,
                         );
                         stream_loader_controller.fetch_next(request_data_length);
 
                         // Request the part we want to wait for blocking. This effecively means we wait for the previous request to partially complete.
                         let wait_for_data_length = max(
-                            (READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS * (0.001 * stream_loader_controller.ping_time_ms() as f64) * bytes_per_second as f64) as usize,
-                            (READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize
+                            (READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS
+                                * (0.001 * stream_loader_controller.ping_time_ms() as f64)
+                                * bytes_per_second as f64) as usize,
+                            (READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize,
                         );
                         stream_loader_controller.fetch_next_blocking(wait_for_data_length);
                     }
                 }
-
             }
 
             PlayerCommand::Play => {
@@ -584,9 +601,9 @@ impl PlayerInternal {
 
     fn stream_data_rate(&self, format: FileFormat) -> usize {
         match format {
-            FileFormat::OGG_VORBIS_96 =>  12 * 1024,
+            FileFormat::OGG_VORBIS_96 => 12 * 1024,
             FileFormat::OGG_VORBIS_160 => 20 * 1024,
-            FileFormat::OGG_VORBIS_320=> 40 * 1024,
+            FileFormat::OGG_VORBIS_320 => 40 * 1024,
             FileFormat::MP3_256 => 32 * 1024,
             FileFormat::MP3_320 => 40 * 1024,
             FileFormat::MP3_160 => 20 * 1024,
@@ -601,7 +618,11 @@ impl PlayerInternal {
         }
     }
 
-    fn load_track(&self, spotify_id: SpotifyId, position: i64) -> Option<(Decoder, f32, StreamLoaderController, usize)> {
+    fn load_track(
+        &self,
+        spotify_id: SpotifyId,
+        position: i64,
+    ) -> Option<(Decoder, f32, StreamLoaderController, usize)> {
         let audio = AudioItem::get_audio_item(&self.session, spotify_id)
             .wait()
             .unwrap();
@@ -646,10 +667,11 @@ impl PlayerInternal {
         };
 
         let bytes_per_second = self.stream_data_rate(*format);
-        let play_from_beginning = position==0;
+        let play_from_beginning = position == 0;
 
         let key = self.session.audio_key().request(spotify_id, file_id);
-        let encrypted_file = AudioFile::open(&self.session, file_id, bytes_per_second, play_from_beginning);
+        let encrypted_file =
+            AudioFile::open(&self.session, file_id, bytes_per_second, play_from_beginning);
 
         let encrypted_file = encrypted_file.wait().unwrap();
 
@@ -663,7 +685,6 @@ impl PlayerInternal {
             stream_loader_controller.set_random_access_mode();
         }
 
-
         let key = key.wait().unwrap();
         let mut decrypted_file = AudioDecrypt::new(key, encrypted_file);
 
@@ -687,7 +708,12 @@ impl PlayerInternal {
             stream_loader_controller.set_stream_mode();
         }
         info!("<{}> loaded", audio.name);
-        Some((decoder, normalisation_factor, stream_loader_controller, bytes_per_second))
+        Some((
+            decoder,
+            normalisation_factor,
+            stream_loader_controller,
+            bytes_per_second,
+        ))
     }
 }