Explorar o código

Merge pull request #393 from kaymes/dynamic-blocks

Downloading files with dnamic block sizes for faster seek and playback
Sasha Hilton %!s(int64=5) %!d(string=hai) anos
pai
achega
fc070fd73d
Modificáronse 8 ficheiros con 1158 adicións e 112 borrados
  1. 1 1
      .travis.yml
  2. 1 0
      Cargo.lock
  3. 1 0
      audio/Cargo.toml
  4. 764 104
      audio/src/fetch.rs
  5. 9 2
      audio/src/lib.rs
  6. 240 0
      audio/src/range_set.rs
  7. 22 0
      core/src/channel.rs
  8. 120 5
      playback/src/player.rs

+ 1 - 1
.travis.yml

@@ -1,6 +1,6 @@
 language: rust
 rust:
-  - 1.32.0
+  - 1.33.0
   - stable
   - beta
   - nightly

+ 1 - 0
Cargo.lock

@@ -811,6 +811,7 @@ dependencies = [
  "aes-ctr 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "bit-set 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
  "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
  "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)",
  "lewton 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)",
  "librespot-core 0.1.0",

+ 1 - 0
audio/Cargo.toml

@@ -12,6 +12,7 @@ version = "0.1.0"
 [dependencies]
 bit-set = "0.5"
 byteorder = "1.3"
+bytes = "0.4"
 futures = "0.1"
 lewton = "0.9"
 log = "0.4"

A diferenza do arquivo foi suprimida porque é demasiado grande
+ 764 - 104
audio/src/fetch.rs


+ 9 - 2
audio/src/lib.rs

@@ -3,12 +3,13 @@ extern crate futures;
 #[macro_use]
 extern crate log;
 
+extern crate aes_ctr;
 extern crate bit_set;
 extern crate byteorder;
+extern crate bytes;
 extern crate num_bigint;
 extern crate num_traits;
 extern crate tempfile;
-extern crate aes_ctr;
 
 extern crate librespot_core;
 
@@ -20,8 +21,14 @@ mod lewton_decoder;
 #[cfg(any(feature = "with-tremor", feature = "with-vorbis"))]
 mod libvorbis_decoder;
 
+mod range_set;
+
 pub use decrypt::AudioDecrypt;
-pub use fetch::{AudioFile, AudioFileOpen};
+pub use fetch::{AudioFile, AudioFileOpen, StreamLoaderController};
+pub use fetch::{
+    READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_BEFORE_PLAYBACK_SECONDS,
+    READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS,
+};
 
 #[cfg(not(any(feature = "with-tremor", feature = "with-vorbis")))]
 pub use lewton_decoder::{VorbisDecoder, VorbisError, VorbisPacket};

+ 240 - 0
audio/src/range_set.rs

@@ -0,0 +1,240 @@
+use std::cmp::{max, min};
+use std::fmt;
+use std::slice::Iter;
+
+#[derive(Copy, Clone)]
+pub struct Range {
+    pub start: usize,
+    pub length: usize,
+}
+
+impl fmt::Display for Range {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        return write!(f, "[{}, {}]", self.start, self.start + self.length - 1);
+    }
+}
+
+impl Range {
+    pub fn new(start: usize, length: usize) -> Range {
+        return Range {
+            start: start,
+            length: length,
+        };
+    }
+
+    pub fn end(&self) -> usize {
+        return self.start + self.length;
+    }
+}
+
+#[derive(Clone)]
+pub struct RangeSet {
+    ranges: Vec<Range>,
+}
+
+impl fmt::Display for RangeSet {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "(").unwrap();
+        for range in self.ranges.iter() {
+            write!(f, "{}", range).unwrap();
+        }
+        write!(f, ")")
+    }
+}
+
+impl RangeSet {
+    pub fn new() -> RangeSet {
+        RangeSet {
+            ranges: Vec::<Range>::new(),
+        }
+    }
+
+    pub fn is_empty(&self) -> bool {
+        return self.ranges.is_empty();
+    }
+
+    pub fn len(&self) -> usize {
+        let mut result = 0;
+        for range in self.ranges.iter() {
+            result += range.length;
+        }
+        return result;
+    }
+
+    pub fn get_range(&self, index: usize) -> Range {
+        return self.ranges[index].clone();
+    }
+
+    pub fn iter(&self) -> Iter<Range> {
+        return self.ranges.iter();
+    }
+
+    pub fn contains(&self, value: usize) -> bool {
+        for range in self.ranges.iter() {
+            if value < range.start {
+                return false;
+            } else if range.start <= value && value < range.end() {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    pub fn contained_length_from_value(&self, value: usize) -> usize {
+        for range in self.ranges.iter() {
+            if value < range.start {
+                return 0;
+            } else if range.start <= value && value < range.end() {
+                return range.end() - value;
+            }
+        }
+        return 0;
+    }
+
+    #[allow(dead_code)]
+    pub fn contains_range_set(&self, other: &RangeSet) -> bool {
+        for range in other.ranges.iter() {
+            if self.contained_length_from_value(range.start) < range.length {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    pub fn add_range(&mut self, range: &Range) {
+        if range.length <= 0 {
+            // the interval is empty or invalid -> nothing to do.
+            return;
+        }
+
+        for index in 0..self.ranges.len() {
+            // the new range is clear of any ranges we already iterated over.
+            if range.end() < self.ranges[index].start {
+                // the new range starts after anything we already passed and ends before the next range starts (they don't touch) -> insert it.
+                self.ranges.insert(index, range.clone());
+                return;
+            } else if range.start <= self.ranges[index].end() && self.ranges[index].start <= range.end()
+            {
+                // the new range overlaps (or touches) the first range. They are to be merged.
+                // In addition we might have to merge further ranges in as well.
+
+                let mut new_range = range.clone();
+
+                while index < self.ranges.len() && self.ranges[index].start <= new_range.end() {
+                    let new_end = max(new_range.end(), self.ranges[index].end());
+                    new_range.start = min(new_range.start, self.ranges[index].start);
+                    new_range.length = new_end - new_range.start;
+                    self.ranges.remove(index);
+                }
+
+                self.ranges.insert(index, new_range);
+                return;
+            }
+        }
+
+        // the new range is after everything else -> just add it
+        self.ranges.push(range.clone());
+    }
+
+    #[allow(dead_code)]
+    pub fn add_range_set(&mut self, other: &RangeSet) {
+        for range in other.ranges.iter() {
+            self.add_range(range);
+        }
+    }
+
+    #[allow(dead_code)]
+    pub fn union(&self, other: &RangeSet) -> RangeSet {
+        let mut result = self.clone();
+        result.add_range_set(other);
+        return result;
+    }
+
+    pub fn subtract_range(&mut self, range: &Range) {
+        if range.length <= 0 {
+            return;
+        }
+
+        for index in 0..self.ranges.len() {
+            // the ranges we already passed don't overlap with the range to remove
+
+            if range.end() <= self.ranges[index].start {
+                // the remaining ranges are past the one to subtract. -> we're done.
+                return;
+            } else if range.start <= self.ranges[index].start && self.ranges[index].start < range.end() {
+                // the range to subtract started before the current range and reaches into the current range
+                // -> we have to remove the beginning of the range or the entire range and do the same for following ranges.
+
+                while index < self.ranges.len() && self.ranges[index].end() <= range.end() {
+                    self.ranges.remove(index);
+                }
+
+                if index < self.ranges.len() && self.ranges[index].start < range.end() {
+                    self.ranges[index].length -= range.end() - self.ranges[index].start;
+                    self.ranges[index].start = range.end();
+                }
+
+                return;
+            } else if range.end() < self.ranges[index].end() {
+                // the range to subtract punches a hole into the current range -> we need to create two smaller ranges.
+
+                let first_range = Range {
+                    start: self.ranges[index].start,
+                    length: range.start - self.ranges[index].start,
+                };
+
+                self.ranges[index].length -= range.end() - self.ranges[index].start;
+                self.ranges[index].start = range.end();
+
+                self.ranges.insert(index, first_range);
+
+                return;
+            } else if range.start < self.ranges[index].end() {
+                // the range truncates the existing range -> truncate the range. Let the for loop take care of overlaps with other ranges.
+                self.ranges[index].length = range.start - self.ranges[index].start;
+            }
+        }
+    }
+
+    pub fn subtract_range_set(&mut self, other: &RangeSet) {
+        for range in other.ranges.iter() {
+            self.subtract_range(range);
+        }
+    }
+
+    pub fn minus(&self, other: &RangeSet) -> RangeSet {
+        let mut result = self.clone();
+        result.subtract_range_set(other);
+        return result;
+    }
+
+    pub fn intersection(&self, other: &RangeSet) -> RangeSet {
+        let mut result = RangeSet::new();
+
+        let mut self_index: usize = 0;
+        let mut other_index: usize = 0;
+
+        while self_index < self.ranges.len() && other_index < other.ranges.len() {
+            if self.ranges[self_index].end() <= other.ranges[other_index].start {
+                // skip the interval
+                self_index += 1;
+            } else if other.ranges[other_index].end() <= self.ranges[self_index].start {
+                // skip the interval
+                other_index += 1;
+            } else {
+                // the two intervals overlap. Add the union and advance the index of the one that ends first.
+                let new_start = max(self.ranges[self_index].start, other.ranges[other_index].start);
+                let new_end = min(self.ranges[self_index].end(), other.ranges[other_index].end());
+                assert!(new_start <= new_end);
+                result.add_range(&Range::new(new_start, new_end - new_start));
+                if self.ranges[self_index].end() <= other.ranges[other_index].end() {
+                    self_index += 1;
+                } else {
+                    other_index += 1;
+                }
+            }
+        }
+
+        return result;
+    }
+}

+ 22 - 0
core/src/channel.rs

@@ -3,6 +3,7 @@ use bytes::Bytes;
 use futures::sync::{mpsc, BiLock};
 use futures::{Async, Poll, Stream};
 use std::collections::HashMap;
+use std::time::Instant;
 
 use util::SeqGenerator;
 
@@ -10,6 +11,9 @@ component! {
     ChannelManager : ChannelManagerInner {
         sequence: SeqGenerator<u16> = SeqGenerator::new(0),
         channels: HashMap<u16, mpsc::UnboundedSender<(u8, Bytes)>> = HashMap::new(),
+        download_rate_estimate: usize = 0,
+        download_measurement_start: Option<Instant> = None,
+        download_measurement_bytes: usize = 0,
     }
 }
 
@@ -60,11 +64,29 @@ impl ChannelManager {
         let id: u16 = BigEndian::read_u16(data.split_to(2).as_ref());
 
         self.lock(|inner| {
+            let current_time = Instant::now();
+            if let Some(download_measurement_start) = inner.download_measurement_start {
+                if (current_time - download_measurement_start).as_millis() > 1000 {
+                    inner.download_rate_estimate = 1000 * inner.download_measurement_bytes
+                        / (current_time - download_measurement_start).as_millis() as usize;
+                    inner.download_measurement_start = Some(current_time);
+                    inner.download_measurement_bytes = 0;
+                }
+            } else {
+                inner.download_measurement_start = Some(current_time);
+            }
+
+            inner.download_measurement_bytes += data.len();
+
             if let Entry::Occupied(entry) = inner.channels.entry(id) {
                 let _ = entry.get().unbounded_send((cmd, data));
             }
         });
     }
+
+    pub fn get_download_rate_estimate(&self) -> usize {
+        return self.lock(|inner| inner.download_rate_estimate);
+    }
 }
 
 impl Channel {

+ 120 - 5
playback/src/player.rs

@@ -4,6 +4,7 @@ use futures::sync::oneshot;
 use futures::{future, Future};
 use std;
 use std::borrow::Cow;
+use std::cmp::max;
 use std::io::{Read, Result, Seek, SeekFrom};
 use std::mem;
 use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError};
@@ -14,8 +15,12 @@ use config::{Bitrate, PlayerConfig};
 use librespot_core::session::Session;
 use librespot_core::spotify_id::SpotifyId;
 
-use audio::{AudioDecrypt, AudioFile};
+use audio::{AudioDecrypt, AudioFile, StreamLoaderController};
 use audio::{VorbisDecoder, VorbisPacket};
+use audio::{
+    READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_BEFORE_PLAYBACK_SECONDS,
+    READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS,
+};
 use audio_backend::Sink;
 use metadata::{AudioItem, FileFormat};
 use mixer::AudioFilter;
@@ -202,12 +207,16 @@ enum PlayerState {
         decoder: Decoder,
         end_of_track: oneshot::Sender<()>,
         normalisation_factor: f32,
+        stream_loader_controller: StreamLoaderController,
+        bytes_per_second: usize,
     },
     Playing {
         track_id: SpotifyId,
         decoder: Decoder,
         end_of_track: oneshot::Sender<()>,
         normalisation_factor: f32,
+        stream_loader_controller: StreamLoaderController,
+        bytes_per_second: usize,
     },
     EndOfTrack {
         track_id: SpotifyId,
@@ -234,6 +243,22 @@ impl PlayerState {
         }
     }
 
+    fn stream_loader_controller(&mut self) -> Option<&mut StreamLoaderController> {
+        use self::PlayerState::*;
+        match *self {
+            Stopped | EndOfTrack { .. } => None,
+            Paused {
+                ref mut stream_loader_controller,
+                ..
+            }
+            | Playing {
+                ref mut stream_loader_controller,
+                ..
+            } => Some(stream_loader_controller),
+            Invalid => panic!("invalid state"),
+        }
+    }
+
     fn playing_to_end_of_track(&mut self) {
         use self::PlayerState::*;
         match mem::replace(self, Invalid) {
@@ -257,12 +282,16 @@ impl PlayerState {
                 decoder,
                 end_of_track,
                 normalisation_factor,
+                stream_loader_controller,
+                bytes_per_second,
             } => {
                 *self = Playing {
                     track_id: track_id,
                     decoder: decoder,
                     end_of_track: end_of_track,
                     normalisation_factor: normalisation_factor,
+                    stream_loader_controller: stream_loader_controller,
+                    bytes_per_second: bytes_per_second,
                 };
             }
             _ => panic!("invalid state"),
@@ -277,12 +306,16 @@ impl PlayerState {
                 decoder,
                 end_of_track,
                 normalisation_factor,
+                stream_loader_controller,
+                bytes_per_second,
             } => {
                 *self = Paused {
                     track_id: track_id,
                     decoder: decoder,
                     end_of_track: end_of_track,
                     normalisation_factor: normalisation_factor,
+                    stream_loader_controller: stream_loader_controller,
+                    bytes_per_second: bytes_per_second,
                 };
             }
             _ => panic!("invalid state"),
@@ -403,7 +436,12 @@ impl PlayerInternal {
                 }
 
                 match self.load_track(track_id, position as i64) {
-                    Some((decoder, normalisation_factor)) => {
+                    Some((
+                        decoder,
+                        normalisation_factor,
+                        stream_loader_controller,
+                        bytes_per_second,
+                    )) => {
                         if play {
                             match self.state {
                                 PlayerState::Playing {
@@ -427,6 +465,8 @@ impl PlayerInternal {
                                 decoder: decoder,
                                 end_of_track: end_of_track,
                                 normalisation_factor: normalisation_factor,
+                                stream_loader_controller: stream_loader_controller,
+                                bytes_per_second: bytes_per_second,
                             };
                         } else {
                             self.state = PlayerState::Paused {
@@ -434,6 +474,8 @@ impl PlayerInternal {
                                 decoder: decoder,
                                 end_of_track: end_of_track,
                                 normalisation_factor: normalisation_factor,
+                                stream_loader_controller: stream_loader_controller,
+                                bytes_per_second: bytes_per_second,
                             };
                             match self.state {
                                 PlayerState::Playing {
@@ -460,6 +502,9 @@ impl PlayerInternal {
             }
 
             PlayerCommand::Seek(position) => {
+                if let Some(stream_loader_controller) = self.state.stream_loader_controller() {
+                    stream_loader_controller.set_random_access_mode();
+                }
                 if let Some(decoder) = self.state.decoder() {
                     match decoder.seek(position as i64) {
                         Ok(_) => (),
@@ -468,6 +513,32 @@ impl PlayerInternal {
                 } else {
                     warn!("Player::seek called from invalid state");
                 }
+
+                // If we're playing, ensure, that we have enough data leaded to avoid a buffer underrun.
+                if let Some(stream_loader_controller) = self.state.stream_loader_controller() {
+                    stream_loader_controller.set_stream_mode();
+                }
+                if let PlayerState::Playing { bytes_per_second, .. } = self.state {
+                    if let Some(stream_loader_controller) = self.state.stream_loader_controller() {
+                        // Request our read ahead range
+                        let request_data_length = max(
+                            (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
+                                * (0.001 * stream_loader_controller.ping_time_ms() as f64)
+                                * bytes_per_second as f64) as usize,
+                            (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize,
+                        );
+                        stream_loader_controller.fetch_next(request_data_length);
+
+                        // Request the part we want to wait for blocking. This effecively means we wait for the previous request to partially complete.
+                        let wait_for_data_length = max(
+                            (READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS
+                                * (0.001 * stream_loader_controller.ping_time_ms() as f64)
+                                * bytes_per_second as f64) as usize,
+                            (READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize,
+                        );
+                        stream_loader_controller.fetch_next_blocking(wait_for_data_length);
+                    }
+                }
             }
 
             PlayerCommand::Play => {
@@ -528,7 +599,30 @@ impl PlayerInternal {
         }
     }
 
-    fn load_track(&self, spotify_id: SpotifyId, position: i64) -> Option<(Decoder, f32)> {
+    fn stream_data_rate(&self, format: FileFormat) -> usize {
+        match format {
+            FileFormat::OGG_VORBIS_96 => 12 * 1024,
+            FileFormat::OGG_VORBIS_160 => 20 * 1024,
+            FileFormat::OGG_VORBIS_320 => 40 * 1024,
+            FileFormat::MP3_256 => 32 * 1024,
+            FileFormat::MP3_320 => 40 * 1024,
+            FileFormat::MP3_160 => 20 * 1024,
+            FileFormat::MP3_96 => 12 * 1024,
+            FileFormat::MP3_160_ENC => 20 * 1024,
+            FileFormat::MP4_128_DUAL => 16 * 1024,
+            FileFormat::OTHER3 => 40 * 1024, // better some high guess than nothing
+            FileFormat::AAC_160 => 20 * 1024,
+            FileFormat::AAC_320 => 40 * 1024,
+            FileFormat::MP4_128 => 16 * 1024,
+            FileFormat::OTHER5 => 40 * 1024, // better some high guess than nothing
+        }
+    }
+
+    fn load_track(
+        &self,
+        spotify_id: SpotifyId,
+        position: i64,
+    ) -> Option<(Decoder, f32, StreamLoaderController, usize)> {
         let audio = AudioItem::get_audio_item(&self.session, spotify_id)
             .wait()
             .unwrap();
@@ -572,10 +666,25 @@ impl PlayerInternal {
             }
         };
 
+        let bytes_per_second = self.stream_data_rate(*format);
+        let play_from_beginning = position == 0;
+
         let key = self.session.audio_key().request(spotify_id, file_id);
-        let encrypted_file = AudioFile::open(&self.session, file_id);
+        let encrypted_file =
+            AudioFile::open(&self.session, file_id, bytes_per_second, play_from_beginning);
 
         let encrypted_file = encrypted_file.wait().unwrap();
+
+        let mut stream_loader_controller = encrypted_file.get_stream_loader_controller();
+
+        if play_from_beginning {
+            // No need to seek -> we stream from the beginning
+            stream_loader_controller.set_stream_mode();
+        } else {
+            // we need to seek -> we set stream mode after the initial seek.
+            stream_loader_controller.set_random_access_mode();
+        }
+
         let key = key.wait().unwrap();
         let mut decrypted_file = AudioDecrypt::new(key, encrypted_file);
 
@@ -596,9 +705,15 @@ impl PlayerInternal {
                 Ok(_) => (),
                 Err(err) => error!("Vorbis error: {:?}", err),
             }
+            stream_loader_controller.set_stream_mode();
         }
         info!("<{}> loaded", audio.name);
-        Some((decoder, normalisation_factor))
+        Some((
+            decoder,
+            normalisation_factor,
+            stream_loader_controller,
+            bytes_per_second,
+        ))
     }
 }
 

Algúns arquivos non se mostraron porque demasiados arquivos cambiaron neste cambio