瀏覽代碼

Merge pull request #430 from kaymes/gapless_play

Gapless play and improved notifications
Sasha Hilton 5 年之前
父節點
當前提交
670170bd23
共有 7 個文件被更改,包括 1470 次插入463 次删除
  1. 9 0
      audio/src/fetch.rs
  2. 370 123
      connect/src/spirc.rs
  3. 4 2
      examples/play.rs
  4. 3 0
      metadata/src/lib.rs
  5. 1067 324
      playback/src/player.rs
  6. 12 10
      src/main.rs
  7. 5 4
      src/player_event_handler.rs

+ 9 - 0
audio/src/fetch.rs

@@ -144,6 +144,15 @@ impl StreamLoaderController {
         }
     }
 
+    pub fn range_to_end_available(&self) -> bool {
+        if let Some(ref shared) = self.stream_shared {
+            let read_position = shared.read_position.load(atomic::Ordering::Relaxed);
+            self.range_available(Range::new(read_position, self.len() - read_position))
+        } else {
+            true
+        }
+    }
+
     pub fn ping_time_ms(&self) -> usize {
         if let Some(ref shared) = self.stream_shared {
             return shared.ping_time_ms.load(atomic::Ordering::Relaxed);

+ 370 - 123
connect/src/spirc.rs

@@ -2,7 +2,7 @@ use std;
 use std::time::{SystemTime, UNIX_EPOCH};
 
 use futures::future;
-use futures::sync::{mpsc, oneshot};
+use futures::sync::mpsc;
 use futures::{Async, Future, Poll, Sink, Stream};
 use protobuf::{self, Message};
 use rand;
@@ -11,7 +11,7 @@ use serde_json;
 
 use crate::context::StationContext;
 use crate::playback::mixer::Mixer;
-use crate::playback::player::Player;
+use crate::playback::player::{Player, PlayerEvent, PlayerEventChannel};
 use crate::protocol;
 use crate::protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State, TrackRef};
 use librespot_core::config::ConnectConfig;
@@ -22,6 +22,24 @@ use librespot_core::util::SeqGenerator;
 use librespot_core::version;
 use librespot_core::volume::Volume;
 
+enum SpircPlayStatus {
+    Stopped,
+    LoadingPlay {
+        position_ms: u32,
+    },
+    LoadingPause {
+        position_ms: u32,
+    },
+    Playing {
+        nominal_start_time: i64,
+        preloading_of_next_track_triggered: bool,
+    },
+    Paused {
+        position_ms: u32,
+        preloading_of_next_track_triggered: bool,
+    },
+}
+
 pub struct SpircTask {
     player: Player,
     mixer: Box<dyn Mixer>,
@@ -32,11 +50,14 @@ pub struct SpircTask {
     ident: String,
     device: DeviceState,
     state: State,
+    play_request_id: Option<u64>,
+    mixer_started: bool,
+    play_status: SpircPlayStatus,
 
     subscription: Box<dyn Stream<Item = Frame, Error = MercuryError>>,
     sender: Box<dyn Sink<SinkItem = Frame, SinkError = MercuryError>>,
     commands: mpsc::UnboundedReceiver<SpircCommand>,
-    end_of_track: Box<dyn Future<Item = (), Error = oneshot::Canceled>>,
+    player_events: PlayerEventChannel,
 
     shutdown: bool,
     session: Session,
@@ -255,6 +276,8 @@ impl Spirc {
         };
         let device = initial_device_state(config);
 
+        let player_events = player.get_player_event_channel();
+
         let mut task = SpircTask {
             player: player,
             mixer: mixer,
@@ -266,11 +289,14 @@ impl Spirc {
 
             device: device,
             state: initial_state(),
+            play_request_id: None,
+            mixer_started: false,
+            play_status: SpircPlayStatus::Stopped,
 
             subscription: subscription,
             sender: sender,
             commands: cmd_rx,
-            end_of_track: Box::new(future::empty()),
+            player_events: player_events,
 
             shutdown: false,
             session: session.clone(),
@@ -350,13 +376,14 @@ impl Future for SpircTask {
                     Async::NotReady => (),
                 }
 
-                match self.end_of_track.poll() {
-                    Ok(Async::Ready(())) => {
+                match self.player_events.poll() {
+                    Ok(Async::NotReady) => (),
+                    Ok(Async::Ready(None)) => (),
+                    Err(_) => (),
+                    Ok(Async::Ready(Some(event))) => {
                         progress = true;
-                        self.handle_end_of_track();
+                        self.handle_player_event(event);
                     }
-                    Ok(Async::NotReady) => (),
-                    Err(oneshot::Canceled) => self.end_of_track = Box::new(future::empty()),
                 }
                 // TODO: Refactor
                 match self.context_fut.poll() {
@@ -431,13 +458,33 @@ impl SpircTask {
             + (dur.subsec_nanos() / 1000_000) as i64)
     }
 
+    fn ensure_mixer_started(&mut self) {
+        if !self.mixer_started {
+            self.mixer.start();
+            self.mixer_started = true;
+        }
+    }
+
+    fn ensure_mixer_stopped(&mut self) {
+        if self.mixer_started {
+            self.mixer.stop();
+            self.mixer_started = false;
+        }
+    }
+
+    fn update_state_position(&mut self, position_ms: u32) {
+        let now = self.now_ms();
+        self.state.set_position_measured_at(now as u64);
+        self.state.set_position_ms(position_ms);
+    }
+
     fn handle_command(&mut self, cmd: SpircCommand) {
         let active = self.device.get_is_active();
         match cmd {
             SpircCommand::Play => {
                 if active {
                     self.handle_play();
-                    self.notify(None);
+                    self.notify(None, true);
                 } else {
                     CommandSender::new(self, MessageType::kMessageTypePlay).send();
                 }
@@ -445,7 +492,7 @@ impl SpircTask {
             SpircCommand::PlayPause => {
                 if active {
                     self.handle_play_pause();
-                    self.notify(None);
+                    self.notify(None, true);
                 } else {
                     CommandSender::new(self, MessageType::kMessageTypePlayPause).send();
                 }
@@ -453,7 +500,7 @@ impl SpircTask {
             SpircCommand::Pause => {
                 if active {
                     self.handle_pause();
-                    self.notify(None);
+                    self.notify(None, true);
                 } else {
                     CommandSender::new(self, MessageType::kMessageTypePause).send();
                 }
@@ -461,7 +508,7 @@ impl SpircTask {
             SpircCommand::Prev => {
                 if active {
                     self.handle_prev();
-                    self.notify(None);
+                    self.notify(None, true);
                 } else {
                     CommandSender::new(self, MessageType::kMessageTypePrev).send();
                 }
@@ -469,7 +516,7 @@ impl SpircTask {
             SpircCommand::Next => {
                 if active {
                     self.handle_next();
-                    self.notify(None);
+                    self.notify(None, true);
                 } else {
                     CommandSender::new(self, MessageType::kMessageTypeNext).send();
                 }
@@ -477,7 +524,7 @@ impl SpircTask {
             SpircCommand::VolumeUp => {
                 if active {
                     self.handle_volume_up();
-                    self.notify(None);
+                    self.notify(None, true);
                 } else {
                     CommandSender::new(self, MessageType::kMessageTypeVolumeUp).send();
                 }
@@ -485,7 +532,7 @@ impl SpircTask {
             SpircCommand::VolumeDown => {
                 if active {
                     self.handle_volume_down();
-                    self.notify(None);
+                    self.notify(None, true);
                 } else {
                     CommandSender::new(self, MessageType::kMessageTypeVolumeDown).send();
                 }
@@ -498,14 +545,122 @@ impl SpircTask {
         }
     }
 
+    fn handle_player_event(&mut self, event: PlayerEvent) {
+        // we only process events if the play_request_id matches. If it doesn't, it is
+        // an event that belongs to a previous track and only arrives now due to a race
+        // condition. In this case we have updated the state already and don't want to
+        // mess with it.
+        if let Some(play_request_id) = event.get_play_request_id() {
+            if Some(play_request_id) == self.play_request_id {
+                match event {
+                    PlayerEvent::EndOfTrack { .. } => self.handle_end_of_track(),
+                    PlayerEvent::Loading { .. } => self.notify(None, false),
+                    PlayerEvent::Playing { position_ms, .. } => {
+                        let new_nominal_start_time = self.now_ms() - position_ms as i64;
+                        match self.play_status {
+                            SpircPlayStatus::Playing {
+                                ref mut nominal_start_time,
+                                ..
+                            } => {
+                                if (*nominal_start_time - new_nominal_start_time).abs() > 100 {
+                                    *nominal_start_time = new_nominal_start_time;
+                                    self.update_state_position(position_ms);
+                                    self.notify(None, true);
+                                }
+                            }
+                            SpircPlayStatus::LoadingPlay { .. }
+                            | SpircPlayStatus::LoadingPause { .. } => {
+                                self.state.set_status(PlayStatus::kPlayStatusPlay);
+                                self.update_state_position(position_ms);
+                                self.notify(None, true);
+                                self.play_status = SpircPlayStatus::Playing {
+                                    nominal_start_time: new_nominal_start_time,
+                                    preloading_of_next_track_triggered: false,
+                                };
+                            }
+                            _ => (),
+                        };
+                        trace!("==> kPlayStatusPlay");
+                    }
+                    PlayerEvent::Paused {
+                        position_ms: new_position_ms,
+                        ..
+                    } => {
+                        match self.play_status {
+                            SpircPlayStatus::Paused {
+                                ref mut position_ms,
+                                ..
+                            } => {
+                                if *position_ms != new_position_ms {
+                                    *position_ms = new_position_ms;
+                                    self.update_state_position(new_position_ms);
+                                    self.notify(None, true);
+                                }
+                            }
+                            SpircPlayStatus::LoadingPlay { .. }
+                            | SpircPlayStatus::LoadingPause { .. } => {
+                                self.state.set_status(PlayStatus::kPlayStatusPause);
+                                self.update_state_position(new_position_ms);
+                                self.notify(None, true);
+                                self.play_status = SpircPlayStatus::Paused {
+                                    position_ms: new_position_ms,
+                                    preloading_of_next_track_triggered: false,
+                                };
+                            }
+                            _ => (),
+                        }
+                        trace!("==> kPlayStatusPause");
+                    }
+                    PlayerEvent::Stopped { .. } => match self.play_status {
+                        SpircPlayStatus::Stopped => (),
+                        _ => {
+                            warn!("The player has stopped unexpectedly.");
+                            self.state.set_status(PlayStatus::kPlayStatusStop);
+                            self.ensure_mixer_stopped();
+                            self.notify(None, true);
+                            self.play_status = SpircPlayStatus::Stopped;
+                        }
+                    },
+                    PlayerEvent::TimeToPreloadNextTrack { .. } => match self.play_status {
+                        SpircPlayStatus::Paused {
+                            ref mut preloading_of_next_track_triggered,
+                            ..
+                        }
+                        | SpircPlayStatus::Playing {
+                            ref mut preloading_of_next_track_triggered,
+                            ..
+                        } => {
+                            *preloading_of_next_track_triggered = true;
+                            if let Some(track_id) = self.preview_next_track() {
+                                self.player.preload(track_id);
+                            }
+                        }
+                        SpircPlayStatus::LoadingPause { .. }
+                        | SpircPlayStatus::LoadingPlay { .. }
+                        | SpircPlayStatus::Stopped => (),
+                    },
+                    _ => (),
+                }
+            }
+        }
+    }
+
     fn handle_frame(&mut self, frame: Frame) {
+        let state_string = match frame.get_state().get_status() {
+            PlayStatus::kPlayStatusLoading => "kPlayStatusLoading",
+            PlayStatus::kPlayStatusPause => "kPlayStatusPause",
+            PlayStatus::kPlayStatusStop => "kPlayStatusStop",
+            PlayStatus::kPlayStatusPlay => "kPlayStatusPlay",
+        };
+
         debug!(
-            "{:?} {:?} {} {} {}",
+            "{:?} {:?} {} {} {} {}",
             frame.get_typ(),
             frame.get_device_state().get_name(),
             frame.get_ident(),
             frame.get_seq_nr(),
-            frame.get_state_update_id()
+            frame.get_state_update_id(),
+            state_string,
         );
 
         if frame.get_ident() == self.ident
@@ -516,7 +671,7 @@ impl SpircTask {
 
         match frame.get_typ() {
             MessageType::kMessageTypeHello => {
-                self.notify(Some(frame.get_ident()));
+                self.notify(Some(frame.get_ident()), true);
             }
 
             MessageType::kMessageTypeLoad => {
@@ -529,61 +684,58 @@ impl SpircTask {
                 self.update_tracks(&frame);
 
                 if self.state.get_track().len() > 0 {
-                    let now = self.now_ms();
-                    self.state
-                        .set_position_ms(frame.get_state().get_position_ms());
-                    self.state.set_position_measured_at(now as u64);
-
-                    let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
-                    self.load_track(play);
+                    let start_playing =
+                        frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
+                    self.load_track(start_playing, frame.get_state().get_position_ms());
                 } else {
                     info!("No more tracks left in queue");
                     self.state.set_status(PlayStatus::kPlayStatusStop);
                     self.player.stop();
                     self.mixer.stop();
+                    self.play_status = SpircPlayStatus::Stopped;
                 }
 
-                self.notify(None);
+                self.notify(None, true);
             }
 
             MessageType::kMessageTypePlay => {
                 self.handle_play();
-                self.notify(None);
+                self.notify(None, true);
             }
 
             MessageType::kMessageTypePlayPause => {
                 self.handle_play_pause();
-                self.notify(None);
+                self.notify(None, true);
             }
 
             MessageType::kMessageTypePause => {
                 self.handle_pause();
-                self.notify(None);
+                self.notify(None, true);
             }
 
             MessageType::kMessageTypeNext => {
                 self.handle_next();
-                self.notify(None);
+                self.notify(None, true);
             }
 
             MessageType::kMessageTypePrev => {
                 self.handle_prev();
-                self.notify(None);
+                self.notify(None, true);
             }
 
             MessageType::kMessageTypeVolumeUp => {
                 self.handle_volume_up();
-                self.notify(None);
+                self.notify(None, true);
             }
 
             MessageType::kMessageTypeVolumeDown => {
                 self.handle_volume_down();
-                self.notify(None);
+                self.notify(None, true);
             }
 
             MessageType::kMessageTypeRepeat => {
                 self.state.set_repeat(frame.get_state().get_repeat());
-                self.notify(None);
+                self.notify(None, true);
             }
 
             MessageType::kMessageTypeShuffle => {
@@ -603,27 +755,38 @@ impl SpircTask {
                     let context = self.state.get_context_uri();
                     debug!("{:?}", context);
                 }
-                self.notify(None);
+                self.notify(None, true);
             }
 
             MessageType::kMessageTypeSeek => {
-                let position = frame.get_position();
-
-                let now = self.now_ms();
-                self.state.set_position_ms(position);
-                self.state.set_position_measured_at(now as u64);
-                self.player.seek(position);
-                self.notify(None);
+                self.handle_seek(frame.get_position());
+                self.notify(None, true);
             }
 
             MessageType::kMessageTypeReplace => {
                 self.update_tracks(&frame);
-                self.notify(None);
+                self.notify(None, true);
+
+                if let SpircPlayStatus::Playing {
+                    preloading_of_next_track_triggered,
+                    ..
+                }
+                | SpircPlayStatus::Paused {
+                    preloading_of_next_track_triggered,
+                    ..
+                } = self.play_status
+                {
+                    if preloading_of_next_track_triggered {
+                        if let Some(track_id) = self.preview_next_track() {
+                            self.player.preload(track_id);
+                        }
+                    }
+                }
             }
 
             MessageType::kMessageTypeVolume => {
                 self.set_volume(frame.get_volume() as u16);
-                self.notify(None);
+                self.notify(None, true);
             }
 
             MessageType::kMessageTypeNotify => {
@@ -631,7 +794,8 @@ impl SpircTask {
                     self.device.set_is_active(false);
                     self.state.set_status(PlayStatus::kPlayStatusStop);
                     self.player.stop();
-                    self.mixer.stop();
+                    self.ensure_mixer_stopped();
+                    self.play_status = SpircPlayStatus::Stopped;
                 }
             }
 
@@ -640,39 +804,87 @@ impl SpircTask {
     }
 
     fn handle_play(&mut self) {
-        if self.state.get_status() == PlayStatus::kPlayStatusPause {
-            self.mixer.start();
-            self.player.play();
-            self.state.set_status(PlayStatus::kPlayStatusPlay);
-            let now = self.now_ms();
-            self.state.set_position_measured_at(now as u64);
+        match self.play_status {
+            SpircPlayStatus::Paused {
+                position_ms,
+                preloading_of_next_track_triggered,
+            } => {
+                self.ensure_mixer_started();
+                self.player.play();
+                self.state.set_status(PlayStatus::kPlayStatusPlay);
+                self.update_state_position(position_ms);
+                self.play_status = SpircPlayStatus::Playing {
+                    nominal_start_time: self.now_ms() as i64 - position_ms as i64,
+                    preloading_of_next_track_triggered,
+                };
+            }
+            SpircPlayStatus::LoadingPause { position_ms } => {
+                self.ensure_mixer_started();
+                self.player.play();
+                self.play_status = SpircPlayStatus::LoadingPlay { position_ms };
+            }
+            _ => (),
         }
     }
 
     fn handle_play_pause(&mut self) {
-        match self.state.get_status() {
-            PlayStatus::kPlayStatusPlay => self.handle_pause(),
-            PlayStatus::kPlayStatusPause => self.handle_play(),
+        match self.play_status {
+            SpircPlayStatus::Paused { .. } | SpircPlayStatus::LoadingPause { .. } => {
+                self.handle_play()
+            }
+            SpircPlayStatus::Playing { .. } | SpircPlayStatus::LoadingPlay { .. } => {
+                self.handle_play()
+            }
             _ => (),
         }
     }
 
     fn handle_pause(&mut self) {
-        if self.state.get_status() == PlayStatus::kPlayStatusPlay {
-            self.player.pause();
-            self.mixer.stop();
-            self.state.set_status(PlayStatus::kPlayStatusPause);
-
-            let now = self.now_ms() as u64;
-            let position = self.state.get_position_ms();
-
-            let diff = now - self.state.get_position_measured_at();
-
-            self.state.set_position_ms(position + diff as u32);
-            self.state.set_position_measured_at(now);
+        match self.play_status {
+            SpircPlayStatus::Playing {
+                nominal_start_time,
+                preloading_of_next_track_triggered,
+            } => {
+                self.player.pause();
+                self.state.set_status(PlayStatus::kPlayStatusPause);
+                let position_ms = (self.now_ms() - nominal_start_time) as u32;
+                self.update_state_position(position_ms);
+                self.play_status = SpircPlayStatus::Paused {
+                    position_ms,
+                    preloading_of_next_track_triggered,
+                };
+            }
+            SpircPlayStatus::LoadingPlay { position_ms } => {
+                self.player.pause();
+                self.play_status = SpircPlayStatus::LoadingPause { position_ms };
+            }
+            _ => (),
         }
     }
 
+    fn handle_seek(&mut self, position_ms: u32) {
+        self.update_state_position(position_ms);
+        self.player.seek(position_ms);
+        let now = self.now_ms();
+        match self.play_status {
+            SpircPlayStatus::Stopped => (),
+            SpircPlayStatus::LoadingPause {
+                position_ms: ref mut position,
+            }
+            | SpircPlayStatus::LoadingPlay {
+                position_ms: ref mut position,
+            }
+            | SpircPlayStatus::Paused {
+                position_ms: ref mut position,
+                ..
+            } => *position = position_ms,
+            SpircPlayStatus::Playing {
+                ref mut nominal_start_time,
+                ..
+            } => *nominal_start_time = now - position_ms as i64,
+        };
+    }
+
     fn consume_queued_track(&mut self) -> usize {
         // Removes current track if it is queued
         // Returns the index of the next track
@@ -687,6 +899,11 @@ impl SpircTask {
         }
     }
 
+    fn preview_next_track(&mut self) -> Option<SpotifyId> {
+        self.get_track_id_to_play_from_playlist(self.state.get_playing_track_index() + 1)
+            .and_then(|(track_id, _)| Some(track_id))
+    }
+
     fn handle_next(&mut self) {
         let mut new_index = self.consume_queued_track() as u32;
         let mut continue_playing = true;
@@ -720,17 +937,14 @@ impl SpircTask {
 
         if tracks_len > 0 {
             self.state.set_playing_track_index(new_index);
-            self.state.set_position_ms(0);
-            let now = self.now_ms();
-            self.state.set_position_measured_at(now as u64);
-
-            self.load_track(continue_playing);
+            self.load_track(continue_playing, 0);
         } else {
             info!("Not playing next track because there are no more tracks left in queue.");
             self.state.set_playing_track_index(0);
             self.state.set_status(PlayStatus::kPlayStatusStop);
             self.player.stop();
-            self.mixer.stop();
+            self.ensure_mixer_stopped();
+            self.play_status = SpircPlayStatus::Stopped;
         }
     }
 
@@ -765,17 +979,11 @@ impl SpircTask {
                 pos += 1;
             }
 
-            let now = self.now_ms();
             self.state.set_playing_track_index(new_index);
-            self.state.set_position_ms(0);
-            self.state.set_position_measured_at(now as u64);
 
-            self.load_track(true);
+            self.load_track(true, 0);
         } else {
-            let now = self.now_ms();
-            self.state.set_position_ms(0);
-            self.state.set_position_measured_at(now as u64);
-            self.player.seek(0);
+            self.handle_seek(0);
         }
     }
 
@@ -797,12 +1005,19 @@ impl SpircTask {
 
     fn handle_end_of_track(&mut self) {
         self.handle_next();
-        self.notify(None);
+        self.notify(None, true);
     }
 
     fn position(&mut self) -> u32 {
-        let diff = self.now_ms() as u64 - self.state.get_position_measured_at();
-        self.state.get_position_ms() + diff as u32
+        match self.play_status {
+            SpircPlayStatus::Stopped => 0,
+            SpircPlayStatus::LoadingPlay { position_ms }
+            | SpircPlayStatus::LoadingPause { position_ms }
+            | SpircPlayStatus::Paused { position_ms, .. } => position_ms,
+            SpircPlayStatus::Playing {
+                nominal_start_time, ..
+            } => (self.now_ms() - nominal_start_time) as u32,
+        }
     }
 
     fn resolve_station(
@@ -920,60 +1135,91 @@ impl SpircTask {
         })
     }
 
-    fn load_track(&mut self, play: bool) {
-        let context_uri = self.state.get_context_uri().to_owned();
-        let mut index = self.state.get_playing_track_index();
-        let start_index = index;
+    fn get_track_id_to_play_from_playlist(&self, index: u32) -> Option<(SpotifyId, u32)> {
         let tracks_len = self.state.get_track().len() as u32;
-        debug!(
-            "Loading context: <{}> index: [{}] of {}",
-            context_uri, index, tracks_len
-        );
+
+        let mut new_playlist_index = index;
+
+        if new_playlist_index >= tracks_len {
+            new_playlist_index = 0;
+        }
+
+        let start_index = new_playlist_index;
+
         // Cycle through all tracks, break if we don't find any playable tracks
-        // TODO: This will panic if no playable tracks are found!
         // tracks in each frame either have a gid or uri (that may or may not be a valid track)
         // E.g - context based frames sometimes contain tracks with <spotify:meta:page:>
-        let track = {
-            let mut track_ref = self.state.get_track()[index as usize].clone();
-            let mut track_id = self.get_spotify_id_for_track(&track_ref);
-            while track_id.is_err() || track_id.unwrap().audio_type == SpotifyAudioType::NonPlayable
-            {
-                warn!(
-                    "Skipping track <{:?}> at position [{}] of {}",
-                    track_ref.get_uri(),
-                    index,
-                    tracks_len
-                );
-                index = if index + 1 < tracks_len { index + 1 } else { 0 };
-                self.state.set_playing_track_index(index);
-                if index == start_index {
-                    warn!("No playable track found in state: {:?}", self.state);
-                    break;
-                }
-                track_ref = self.state.get_track()[index as usize].clone();
-                track_id = self.get_spotify_id_for_track(&track_ref);
+
+        let mut track_ref = self.state.get_track()[new_playlist_index as usize].clone();
+        let mut track_id = self.get_spotify_id_for_track(&track_ref);
+        while track_id.is_err() || track_id.unwrap().audio_type == SpotifyAudioType::NonPlayable {
+            warn!(
+                "Skipping track <{:?}> at position [{}] of {}",
+                track_ref.get_uri(),
+                new_playlist_index,
+                tracks_len
+            );
+
+            new_playlist_index += 1;
+            if new_playlist_index >= tracks_len {
+                new_playlist_index = 0;
             }
-            track_id
-        }
-        .expect("Invalid SpotifyId");
 
-        let position = self.state.get_position_ms();
-        let end_of_track = self.player.load(track, play, position);
+            if new_playlist_index == start_index {
+                warn!("No playable track found in state: {:?}", self.state);
+                return None;
+            }
+            track_ref = self.state.get_track()[index as usize].clone();
+            track_id = self.get_spotify_id_for_track(&track_ref);
+        }
 
-        if play {
-            self.state.set_status(PlayStatus::kPlayStatusPlay);
-        } else {
-            self.state.set_status(PlayStatus::kPlayStatusPause);
+        match track_id {
+            Ok(track_id) => Some((track_id, new_playlist_index)),
+            Err(_) => None,
         }
+    }
 
-        self.end_of_track = Box::new(end_of_track);
+    fn load_track(&mut self, start_playing: bool, position_ms: u32) {
+        let index = self.state.get_playing_track_index();
+
+        match self.get_track_id_to_play_from_playlist(index) {
+            Some((track, index)) => {
+                self.state.set_playing_track_index(index);
+
+                self.play_request_id = Some(self.player.load(track, start_playing, position_ms));
+
+                self.update_state_position(position_ms);
+                self.state.set_status(PlayStatus::kPlayStatusLoading);
+                if start_playing {
+                    self.play_status = SpircPlayStatus::LoadingPlay { position_ms };
+                } else {
+                    self.play_status = SpircPlayStatus::LoadingPause { position_ms };
+                }
+            }
+            None => {
+                self.state.set_status(PlayStatus::kPlayStatusStop);
+                self.player.stop();
+                self.ensure_mixer_stopped();
+                self.play_status = SpircPlayStatus::Stopped;
+            }
+        }
     }
 
     fn hello(&mut self) {
         CommandSender::new(self, MessageType::kMessageTypeHello).send();
     }
 
-    fn notify(&mut self, recipient: Option<&str>) {
+    fn notify(&mut self, recipient: Option<&str>, suppress_loading_status: bool) {
+        if suppress_loading_status && (self.state.get_status() == PlayStatus::kPlayStatusLoading) {
+            return;
+        };
+        let status_string = match self.state.get_status() {
+            PlayStatus::kPlayStatusLoading => "kPlayStatusLoading",
+            PlayStatus::kPlayStatusPause => "kPlayStatusPause",
+            PlayStatus::kPlayStatusStop => "kPlayStatusStop",
+            PlayStatus::kPlayStatusPlay => "kPlayStatusPlay",
+        };
+        trace!("Sending status to server: [{}]", status_string);
         let mut cs = CommandSender::new(self, MessageType::kMessageTypeNotify);
         if let Some(s) = recipient {
             cs = cs.recipient(&s);
@@ -988,6 +1234,7 @@ impl SpircTask {
         if let Some(cache) = self.session.cache() {
             cache.save_volume(Volume { volume })
         }
+        self.player.emit_volume_set_event(volume);
     }
 }
 

+ 4 - 2
examples/play.rs

@@ -34,12 +34,14 @@ fn main() {
         .run(Session::connect(session_config, credentials, None, handle))
         .unwrap();
 
-    let (player, _) = Player::new(player_config, session.clone(), None, move || {
+    let (mut player, _) = Player::new(player_config, session.clone(), None, move || {
         (backend)(None)
     });
 
+    player.load(track, true, 0);
+
     println!("Playing...");
-    core.run(player.load(track, true, 0)).unwrap();
+    core.run(player.get_end_of_track_future()).unwrap();
 
     println!("Done");
 }

+ 3 - 0
metadata/src/lib.rs

@@ -63,6 +63,7 @@ pub struct AudioItem {
     pub uri: String,
     pub files: LinearMap<FileFormat, FileId>,
     pub name: String,
+    pub duration: i32,
     pub available: bool,
     pub alternatives: Option<Vec<SpotifyId>>,
 }
@@ -100,6 +101,7 @@ impl AudioFiles for Track {
                 uri: format!("spotify:track:{}", id.to_base62()),
                 files: item.files,
                 name: item.name,
+                duration: item.duration,
                 available: item.available,
                 alternatives: Some(item.alternatives),
             })
@@ -118,6 +120,7 @@ impl AudioFiles for Episode {
                 uri: format!("spotify:episode:{}", id.to_base62()),
                 files: item.files,
                 name: item.name,
+                duration: item.duration,
                 available: item.available,
                 alternatives: None,
             })

+ 1067 - 324
playback/src/player.rs

@@ -1,20 +1,20 @@
 use byteorder::{LittleEndian, ReadBytesExt};
 use futures;
-use futures::sync::oneshot;
-use futures::{future, Future};
+use futures::{future, Async, Future, Poll, Stream};
 use std;
 use std::borrow::Cow;
 use std::cmp::max;
 use std::io::{Read, Result, Seek, SeekFrom};
 use std::mem;
-use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError};
 use std::thread;
-use std::time::Duration;
+use std::time::{Duration, Instant};
 
 use crate::config::{Bitrate, PlayerConfig};
 use librespot_core::session::Session;
 use librespot_core::spotify_id::SpotifyId;
 
+use librespot_core::util::SeqGenerator;
+
 use crate::audio::{AudioDecrypt, AudioFile, StreamLoaderController};
 use crate::audio::{VorbisDecoder, VorbisPacket};
 use crate::audio::{
@@ -25,48 +25,121 @@ use crate::audio_backend::Sink;
 use crate::metadata::{AudioItem, FileFormat};
 use crate::mixer::AudioFilter;
 
+const PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS: u32 = 30000;
+
 pub struct Player {
-    commands: Option<std::sync::mpsc::Sender<PlayerCommand>>,
+    commands: Option<futures::sync::mpsc::UnboundedSender<PlayerCommand>>,
     thread_handle: Option<thread::JoinHandle<()>>,
+    play_request_id_generator: SeqGenerator<u64>,
 }
 
 struct PlayerInternal {
     session: Session,
     config: PlayerConfig,
-    commands: std::sync::mpsc::Receiver<PlayerCommand>,
+    commands: futures::sync::mpsc::UnboundedReceiver<PlayerCommand>,
 
     state: PlayerState,
+    preload: PlayerPreload,
     sink: Box<dyn Sink>,
     sink_running: bool,
     audio_filter: Option<Box<dyn AudioFilter + Send>>,
-    event_sender: futures::sync::mpsc::UnboundedSender<PlayerEvent>,
+    event_senders: Vec<futures::sync::mpsc::UnboundedSender<PlayerEvent>>,
 }
 
 enum PlayerCommand {
-    Load(SpotifyId, bool, u32, oneshot::Sender<()>),
+    Load {
+        track_id: SpotifyId,
+        play_request_id: u64,
+        play: bool,
+        position_ms: u32,
+    },
+    Preload {
+        track_id: SpotifyId,
+    },
     Play,
     Pause,
     Stop,
     Seek(u32),
+    AddEventSender(futures::sync::mpsc::UnboundedSender<PlayerEvent>),
+    EmitVolumeSetEvent(u16),
 }
 
 #[derive(Debug, Clone)]
 pub enum PlayerEvent {
+    Stopped {
+        play_request_id: u64,
+        track_id: SpotifyId,
+    },
+    Loading {
+        play_request_id: u64,
+        track_id: SpotifyId,
+        position_ms: u32,
+    },
     Started {
+        play_request_id: u64,
         track_id: SpotifyId,
+        position_ms: u32,
     },
-
     Changed {
         old_track_id: SpotifyId,
         new_track_id: SpotifyId,
     },
-
-    Stopped {
+    Playing {
+        play_request_id: u64,
+        track_id: SpotifyId,
+        position_ms: u32,
+        duration_ms: u32,
+    },
+    Paused {
+        play_request_id: u64,
         track_id: SpotifyId,
+        position_ms: u32,
+        duration_ms: u32,
     },
+    TimeToPreloadNextTrack {
+        play_request_id: u64,
+        track_id: SpotifyId,
+    },
+    EndOfTrack {
+        play_request_id: u64,
+        track_id: SpotifyId,
+    },
+    VolumeSet {
+        volume: u16,
+    },
+}
+
+impl PlayerEvent {
+    pub fn get_play_request_id(&self) -> Option<u64> {
+        use PlayerEvent::*;
+        match self {
+            Loading {
+                play_request_id, ..
+            }
+            | Started {
+                play_request_id, ..
+            }
+            | Playing {
+                play_request_id, ..
+            }
+            | TimeToPreloadNextTrack {
+                play_request_id, ..
+            }
+            | EndOfTrack {
+                play_request_id, ..
+            }
+            | Paused {
+                play_request_id, ..
+            }
+            | Stopped {
+                play_request_id, ..
+            } => Some(*play_request_id),
+            Changed { .. } | VolumeSet { .. } => None,
+        }
+    }
 }
 
-type PlayerEventChannel = futures::sync::mpsc::UnboundedReceiver<PlayerEvent>;
+pub type PlayerEventChannel = futures::sync::mpsc::UnboundedReceiver<PlayerEvent>;
 
 #[derive(Clone, Copy, Debug)]
 struct NormalisationData {
@@ -125,7 +198,7 @@ impl Player {
     where
         F: FnOnce() -> Box<dyn Sink> + Send + 'static,
     {
-        let (cmd_tx, cmd_rx) = std::sync::mpsc::channel();
+        let (cmd_tx, cmd_rx) = futures::sync::mpsc::unbounded();
         let (event_sender, event_receiver) = futures::sync::mpsc::unbounded();
 
         let handle = thread::spawn(move || {
@@ -137,38 +210,47 @@ impl Player {
                 commands: cmd_rx,
 
                 state: PlayerState::Stopped,
+                preload: PlayerPreload::None,
                 sink: sink_builder(),
                 sink_running: false,
                 audio_filter: audio_filter,
-                event_sender: event_sender,
+                event_senders: [event_sender].to_vec(),
             };
 
-            internal.run();
+            // While PlayerInternal is written as a future, it still contains blocking code.
+            // It must be run by using wait() in a dedicated thread.
+            let _ = internal.wait();
+            debug!("PlayerInternal thread finished.");
         });
 
         (
             Player {
                 commands: Some(cmd_tx),
                 thread_handle: Some(handle),
+                play_request_id_generator: SeqGenerator::new(0),
             },
             event_receiver,
         )
     }
 
     fn command(&self, cmd: PlayerCommand) {
-        self.commands.as_ref().unwrap().send(cmd).unwrap();
+        self.commands.as_ref().unwrap().unbounded_send(cmd).unwrap();
     }
 
-    pub fn load(
-        &self,
-        track: SpotifyId,
-        start_playing: bool,
-        position_ms: u32,
-    ) -> oneshot::Receiver<()> {
-        let (tx, rx) = oneshot::channel();
-        self.command(PlayerCommand::Load(track, start_playing, position_ms, tx));
+    pub fn load(&mut self, track_id: SpotifyId, start_playing: bool, position_ms: u32) -> u64 {
+        let play_request_id = self.play_request_id_generator.get();
+        self.command(PlayerCommand::Load {
+            track_id,
+            play_request_id,
+            play: start_playing,
+            position_ms,
+        });
 
-        rx
+        play_request_id
+    }
+
+    pub fn preload(&self, track_id: SpotifyId) {
+        self.command(PlayerCommand::Preload { track_id });
     }
 
     pub fn play(&self) {
@@ -186,6 +268,29 @@ impl Player {
     pub fn seek(&self, position_ms: u32) {
         self.command(PlayerCommand::Seek(position_ms));
     }
+
+    pub fn get_player_event_channel(&self) -> PlayerEventChannel {
+        let (event_sender, event_receiver) = futures::sync::mpsc::unbounded();
+        self.command(PlayerCommand::AddEventSender(event_sender));
+        event_receiver
+    }
+
+    pub fn get_end_of_track_future(&self) -> Box<dyn Future<Item = (), Error = ()>> {
+        let result = self
+            .get_player_event_channel()
+            .filter(|event| match event {
+                PlayerEvent::EndOfTrack { .. } | PlayerEvent::Stopped { .. } => true,
+                _ => false,
+            })
+            .into_future()
+            .map_err(|_| ())
+            .map(|_| ());
+        Box::new(result)
+    }
+
+    pub fn emit_volume_set_event(&self, volume: u16) {
+        self.command(PlayerCommand::EmitVolumeSetEvent(volume));
+    }
 }
 
 impl Drop for Player {
@@ -201,27 +306,64 @@ impl Drop for Player {
     }
 }
 
+struct PlayerLoadedTrackData {
+    decoder: Decoder,
+    normalisation_factor: f32,
+    stream_loader_controller: StreamLoaderController,
+    bytes_per_second: usize,
+    duration_ms: u32,
+    stream_position_pcm: u64,
+}
+
+enum PlayerPreload {
+    None,
+    Loading {
+        track_id: SpotifyId,
+        loader: Box<dyn Future<Item = PlayerLoadedTrackData, Error = ()>>,
+    },
+    Ready {
+        track_id: SpotifyId,
+        loaded_track: PlayerLoadedTrackData,
+    },
+}
+
 type Decoder = VorbisDecoder<Subfile<AudioDecrypt<AudioFile>>>;
+
 enum PlayerState {
     Stopped,
+    Loading {
+        track_id: SpotifyId,
+        play_request_id: u64,
+        start_playback: bool,
+        loader: Box<dyn Future<Item = PlayerLoadedTrackData, Error = ()>>,
+    },
     Paused {
         track_id: SpotifyId,
+        play_request_id: u64,
         decoder: Decoder,
-        end_of_track: oneshot::Sender<()>,
         normalisation_factor: f32,
         stream_loader_controller: StreamLoaderController,
         bytes_per_second: usize,
+        duration_ms: u32,
+        stream_position_pcm: u64,
+        suggested_to_preload_next_track: bool,
     },
     Playing {
         track_id: SpotifyId,
+        play_request_id: u64,
         decoder: Decoder,
-        end_of_track: oneshot::Sender<()>,
         normalisation_factor: f32,
         stream_loader_controller: StreamLoaderController,
         bytes_per_second: usize,
+        duration_ms: u32,
+        stream_position_pcm: u64,
+        reported_nominal_start_time: Option<Instant>,
+        suggested_to_preload_next_track: bool,
     },
     EndOfTrack {
         track_id: SpotifyId,
+        play_request_id: u64,
+        loaded_track: Option<PlayerLoadedTrackData>,
     },
     Invalid,
 }
@@ -230,16 +372,24 @@ impl PlayerState {
     fn is_playing(&self) -> bool {
         use self::PlayerState::*;
         match *self {
-            Stopped | EndOfTrack { .. } | Paused { .. } => false,
+            Stopped | EndOfTrack { .. } | Paused { .. } | Loading { .. } => false,
             Playing { .. } => true,
             Invalid => panic!("invalid state"),
         }
     }
 
+    fn is_stopped(&self) -> bool {
+        use self::PlayerState::*;
+        match *self {
+            Stopped => true,
+            _ => false,
+        }
+    }
+
     fn decoder(&mut self) -> Option<&mut Decoder> {
         use self::PlayerState::*;
         match *self {
-            Stopped | EndOfTrack { .. } => None,
+            Stopped | EndOfTrack { .. } | Loading { .. } => None,
             Paused {
                 ref mut decoder, ..
             }
@@ -253,7 +403,7 @@ impl PlayerState {
     fn stream_loader_controller(&mut self) -> Option<&mut StreamLoaderController> {
         use self::PlayerState::*;
         match *self {
-            Stopped | EndOfTrack { .. } => None,
+            Stopped | EndOfTrack { .. } | Loading { .. } => None,
             Paused {
                 ref mut stream_loader_controller,
                 ..
@@ -271,11 +421,27 @@ impl PlayerState {
         match mem::replace(self, Invalid) {
             Playing {
                 track_id,
-                end_of_track,
+                play_request_id,
+                decoder,
+                duration_ms,
+                bytes_per_second,
+                normalisation_factor,
+                stream_loader_controller,
+                stream_position_pcm,
                 ..
             } => {
-                let _ = end_of_track.send(());
-                *self = EndOfTrack { track_id };
+                *self = EndOfTrack {
+                    track_id,
+                    play_request_id,
+                    loaded_track: Some(PlayerLoadedTrackData {
+                        decoder,
+                        duration_ms,
+                        bytes_per_second,
+                        normalisation_factor,
+                        stream_loader_controller,
+                        stream_position_pcm,
+                    }),
+                };
             }
             _ => panic!("Called playing_to_end_of_track in non-playing state."),
         }
@@ -286,19 +452,26 @@ impl PlayerState {
         match ::std::mem::replace(self, Invalid) {
             Paused {
                 track_id,
+                play_request_id,
                 decoder,
-                end_of_track,
                 normalisation_factor,
                 stream_loader_controller,
+                duration_ms,
                 bytes_per_second,
+                stream_position_pcm,
+                suggested_to_preload_next_track,
             } => {
                 *self = Playing {
-                    track_id: track_id,
-                    decoder: decoder,
-                    end_of_track: end_of_track,
-                    normalisation_factor: normalisation_factor,
-                    stream_loader_controller: stream_loader_controller,
-                    bytes_per_second: bytes_per_second,
+                    track_id,
+                    play_request_id,
+                    decoder,
+                    normalisation_factor,
+                    stream_loader_controller,
+                    duration_ms,
+                    bytes_per_second,
+                    stream_position_pcm,
+                    reported_nominal_start_time: None,
+                    suggested_to_preload_next_track,
                 };
             }
             _ => panic!("invalid state"),
@@ -310,19 +483,26 @@ impl PlayerState {
         match ::std::mem::replace(self, Invalid) {
             Playing {
                 track_id,
+                play_request_id,
                 decoder,
-                end_of_track,
                 normalisation_factor,
                 stream_loader_controller,
+                duration_ms,
                 bytes_per_second,
+                stream_position_pcm,
+                reported_nominal_start_time: _,
+                suggested_to_preload_next_track,
             } => {
                 *self = Paused {
-                    track_id: track_id,
-                    decoder: decoder,
-                    end_of_track: end_of_track,
-                    normalisation_factor: normalisation_factor,
-                    stream_loader_controller: stream_loader_controller,
-                    bytes_per_second: bytes_per_second,
+                    track_id,
+                    play_request_id,
+                    decoder,
+                    normalisation_factor,
+                    stream_loader_controller,
+                    duration_ms,
+                    bytes_per_second,
+                    stream_position_pcm,
+                    suggested_to_preload_next_track,
                 };
             }
             _ => panic!("invalid state"),
@@ -330,269 +510,12 @@ impl PlayerState {
     }
 }
 
-impl PlayerInternal {
-    fn run(mut self) {
-        loop {
-            let cmd = if self.state.is_playing() {
-                if self.sink_running {
-                    match self.commands.try_recv() {
-                        Ok(cmd) => Some(cmd),
-                        Err(TryRecvError::Empty) => None,
-                        Err(TryRecvError::Disconnected) => return,
-                    }
-                } else {
-                    match self.commands.recv_timeout(Duration::from_secs(5)) {
-                        Ok(cmd) => Some(cmd),
-                        Err(RecvTimeoutError::Timeout) => None,
-                        Err(RecvTimeoutError::Disconnected) => return,
-                    }
-                }
-            } else {
-                match self.commands.recv() {
-                    Ok(cmd) => Some(cmd),
-                    Err(RecvError) => return,
-                }
-            };
-
-            if let Some(cmd) = cmd {
-                self.handle_command(cmd);
-            }
-
-            if self.state.is_playing() && !self.sink_running {
-                self.start_sink();
-            }
-
-            if self.sink_running {
-                let mut current_normalisation_factor: f32 = 1.0;
-
-                let packet = if let PlayerState::Playing {
-                    ref mut decoder,
-                    normalisation_factor,
-                    ..
-                } = self.state
-                {
-                    current_normalisation_factor = normalisation_factor;
-                    Some(decoder.next_packet().expect("Vorbis error"))
-                } else {
-                    None
-                };
-
-                if let Some(packet) = packet {
-                    self.handle_packet(packet, current_normalisation_factor);
-                }
-            }
-
-            if self.session.is_invalid() {
-                return;
-            }
-        }
-    }
-
-    fn start_sink(&mut self) {
-        match self.sink.start() {
-            Ok(()) => self.sink_running = true,
-            Err(err) => error!("Could not start audio: {}", err),
-        }
-    }
-
-    fn stop_sink_if_running(&mut self) {
-        if self.sink_running {
-            self.stop_sink();
-        }
-    }
-
-    fn stop_sink(&mut self) {
-        self.sink.stop().unwrap();
-        self.sink_running = false;
-    }
-
-    fn handle_packet(&mut self, packet: Option<VorbisPacket>, normalisation_factor: f32) {
-        match packet {
-            Some(mut packet) => {
-                if packet.data().len() > 0 {
-                    if let Some(ref editor) = self.audio_filter {
-                        editor.modify_stream(&mut packet.data_mut())
-                    };
-
-                    if self.config.normalisation && normalisation_factor != 1.0 {
-                        for x in packet.data_mut().iter_mut() {
-                            *x = (*x as f32 * normalisation_factor) as i16;
-                        }
-                    }
-
-                    if let Err(err) = self.sink.write(&packet.data()) {
-                        error!("Could not write audio: {}", err);
-                        self.stop_sink();
-                    }
-                }
-            }
-
-            None => {
-                self.stop_sink();
-                self.state.playing_to_end_of_track();
-            }
-        }
-    }
-
-    fn handle_command(&mut self, cmd: PlayerCommand) {
-        debug!("command={:?}", cmd);
-        match cmd {
-            PlayerCommand::Load(track_id, play, position, end_of_track) => {
-                if self.state.is_playing() {
-                    self.stop_sink_if_running();
-                }
-
-                match self.load_track(track_id, position as i64) {
-                    Some((
-                        decoder,
-                        normalisation_factor,
-                        stream_loader_controller,
-                        bytes_per_second,
-                    )) => {
-                        if play {
-                            match self.state {
-                                PlayerState::Playing {
-                                    track_id: old_track_id,
-                                    ..
-                                }
-                                | PlayerState::EndOfTrack {
-                                    track_id: old_track_id,
-                                    ..
-                                } => self.send_event(PlayerEvent::Changed {
-                                    old_track_id: old_track_id,
-                                    new_track_id: track_id,
-                                }),
-                                _ => self.send_event(PlayerEvent::Started { track_id }),
-                            }
-
-                            self.start_sink();
-
-                            self.state = PlayerState::Playing {
-                                track_id: track_id,
-                                decoder: decoder,
-                                end_of_track: end_of_track,
-                                normalisation_factor: normalisation_factor,
-                                stream_loader_controller: stream_loader_controller,
-                                bytes_per_second: bytes_per_second,
-                            };
-                        } else {
-                            self.state = PlayerState::Paused {
-                                track_id: track_id,
-                                decoder: decoder,
-                                end_of_track: end_of_track,
-                                normalisation_factor: normalisation_factor,
-                                stream_loader_controller: stream_loader_controller,
-                                bytes_per_second: bytes_per_second,
-                            };
-                            match self.state {
-                                PlayerState::Playing {
-                                    track_id: old_track_id,
-                                    ..
-                                }
-                                | PlayerState::EndOfTrack {
-                                    track_id: old_track_id,
-                                    ..
-                                } => self.send_event(PlayerEvent::Changed {
-                                    old_track_id: old_track_id,
-                                    new_track_id: track_id,
-                                }),
-                                _ => (),
-                            }
-                            self.send_event(PlayerEvent::Stopped { track_id });
-                        }
-                    }
-
-                    None => {
-                        let _ = end_of_track.send(());
-                    }
-                }
-            }
-
-            PlayerCommand::Seek(position) => {
-                if let Some(stream_loader_controller) = self.state.stream_loader_controller() {
-                    stream_loader_controller.set_random_access_mode();
-                }
-                if let Some(decoder) = self.state.decoder() {
-                    match decoder.seek(position as i64) {
-                        Ok(_) => (),
-                        Err(err) => error!("Vorbis error: {:?}", err),
-                    }
-                } else {
-                    warn!("Player::seek called from invalid state");
-                }
-
-                // If we're playing, ensure, that we have enough data leaded to avoid a buffer underrun.
-                if let Some(stream_loader_controller) = self.state.stream_loader_controller() {
-                    stream_loader_controller.set_stream_mode();
-                }
-                if let PlayerState::Playing {
-                    bytes_per_second, ..
-                } = self.state
-                {
-                    if let Some(stream_loader_controller) = self.state.stream_loader_controller() {
-                        // Request our read ahead range
-                        let request_data_length = max(
-                            (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
-                                * (0.001 * stream_loader_controller.ping_time_ms() as f64)
-                                * bytes_per_second as f64) as usize,
-                            (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize,
-                        );
-                        stream_loader_controller.fetch_next(request_data_length);
-
-                        // Request the part we want to wait for blocking. This effecively means we wait for the previous request to partially complete.
-                        let wait_for_data_length = max(
-                            (READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS
-                                * (0.001 * stream_loader_controller.ping_time_ms() as f64)
-                                * bytes_per_second as f64) as usize,
-                            (READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize,
-                        );
-                        stream_loader_controller.fetch_next_blocking(wait_for_data_length);
-                    }
-                }
-            }
-
-            PlayerCommand::Play => {
-                if let PlayerState::Paused { track_id, .. } = self.state {
-                    self.state.paused_to_playing();
-
-                    self.send_event(PlayerEvent::Started { track_id });
-                    self.start_sink();
-                } else {
-                    warn!("Player::play called from invalid state");
-                }
-            }
-
-            PlayerCommand::Pause => {
-                if let PlayerState::Playing { track_id, .. } = self.state {
-                    self.state.playing_to_paused();
-
-                    self.stop_sink_if_running();
-                    self.send_event(PlayerEvent::Stopped { track_id });
-                } else {
-                    warn!("Player::pause called from invalid state");
-                }
-            }
-
-            PlayerCommand::Stop => match self.state {
-                PlayerState::Playing { track_id, .. }
-                | PlayerState::Paused { track_id, .. }
-                | PlayerState::EndOfTrack { track_id } => {
-                    self.stop_sink_if_running();
-                    self.send_event(PlayerEvent::Stopped { track_id });
-                    self.state = PlayerState::Stopped;
-                }
-                PlayerState::Stopped => {
-                    warn!("Player::stop called from invalid state");
-                }
-                PlayerState::Invalid => panic!("invalid state"),
-            },
-        }
-    }
-
-    fn send_event(&mut self, event: PlayerEvent) {
-        let _ = self.event_sender.unbounded_send(event.clone());
-    }
+struct PlayerTrackLoader {
+    session: Session,
+    config: PlayerConfig,
+}
 
+impl PlayerTrackLoader {
     fn find_available_alternative<'a>(&self, audio: &'a AudioItem) -> Option<Cow<'a, AudioItem>> {
         if audio.available {
             Some(Cow::Borrowed(audio))
@@ -631,11 +554,7 @@ impl PlayerInternal {
         }
     }
 
-    fn load_track(
-        &self,
-        spotify_id: SpotifyId,
-        position: i64,
-    ) -> Option<(Decoder, f32, StreamLoaderController, usize)> {
+    fn load_track(&self, spotify_id: SpotifyId, position_ms: u32) -> Option<PlayerLoadedTrackData> {
         let audio = match AudioItem::get_audio_item(&self.session, spotify_id).wait() {
             Ok(audio) => audio,
             Err(_) => {
@@ -653,6 +572,10 @@ impl PlayerInternal {
                 return None;
             }
         };
+
+        assert!(audio.duration >= 0);
+        let duration_ms = audio.duration as u32;
+
         // (Most) podcasts seem to support only 96 bit Vorbis, so fall back to it
         let formats = match self.config.bitrate {
             Bitrate::Bitrate96 => [
@@ -685,7 +608,7 @@ impl PlayerInternal {
         };
 
         let bytes_per_second = self.stream_data_rate(*format);
-        let play_from_beginning = position == 0;
+        let play_from_beginning = position_ms == 0;
 
         let key = self.session.audio_key().request(spotify_id, file_id);
         let encrypted_file = AudioFile::open(
@@ -737,42 +660,862 @@ impl PlayerInternal {
 
         let mut decoder = VorbisDecoder::new(audio_file).unwrap();
 
-        if position != 0 {
-            match decoder.seek(position) {
+        if position_ms != 0 {
+            match decoder.seek(position_ms as i64) {
                 Ok(_) => (),
                 Err(err) => error!("Vorbis error: {:?}", err),
             }
             stream_loader_controller.set_stream_mode();
         }
-        info!("<{}> loaded", audio.name);
-        Some((
+        let stream_position_pcm = PlayerInternal::position_ms_to_pcm(position_ms);
+        info!("<{}> ({} ms) loaded", audio.name, audio.duration);
+        Some(PlayerLoadedTrackData {
             decoder,
             normalisation_factor,
             stream_loader_controller,
             bytes_per_second,
-        ))
+            duration_ms,
+            stream_position_pcm,
+        })
+    }
+}
+
+impl Future for PlayerInternal {
+    type Item = ();
+    type Error = ();
+
+    fn poll(&mut self) -> Poll<(), ()> {
+        // While this is written as a future, it still contains blocking code.
+        // It must be run on its own thread.
+
+        loop {
+            let mut all_futures_completed_or_not_ready = true;
+
+            // process commands that were sent to us
+            let cmd = match self.commands.poll() {
+                Ok(Async::Ready(None)) => return Ok(Async::Ready(())), // client has disconnected - shut down.
+                Ok(Async::Ready(Some(cmd))) => {
+                    all_futures_completed_or_not_ready = false;
+                    Some(cmd)
+                }
+                Ok(Async::NotReady) => None,
+                Err(_) => None,
+            };
+
+            if let Some(cmd) = cmd {
+                self.handle_command(cmd);
+            }
+
+            // Handle loading of a new track to play
+            if let PlayerState::Loading {
+                ref mut loader,
+                track_id,
+                start_playback,
+                play_request_id,
+            } = self.state
+            {
+                match loader.poll() {
+                    Ok(Async::Ready(loaded_track)) => {
+                        self.start_playback(
+                            track_id,
+                            play_request_id,
+                            loaded_track,
+                            start_playback,
+                        );
+                        if let PlayerState::Loading { .. } = self.state {
+                            panic!("The state wasn't changed by start_playback()");
+                        }
+                    }
+                    Ok(Async::NotReady) => (),
+                    Err(_) => {
+                        self.handle_player_stop();
+                        assert!(self.state.is_stopped());
+                    }
+                }
+            }
+
+            // handle pending preload requests.
+            if let PlayerPreload::Loading {
+                ref mut loader,
+                track_id,
+            } = self.preload
+            {
+                match loader.poll() {
+                    Ok(Async::Ready(loaded_track)) => {
+                        self.preload = PlayerPreload::Ready {
+                            track_id,
+                            loaded_track,
+                        };
+                    }
+                    Ok(Async::NotReady) => (),
+                    Err(_) => {
+                        self.preload = PlayerPreload::None;
+                    }
+                }
+            }
+
+            if self.state.is_playing() {
+                self.ensure_sink_running();
+
+                if let PlayerState::Playing {
+                    track_id,
+                    play_request_id,
+                    ref mut decoder,
+                    normalisation_factor,
+                    ref mut stream_position_pcm,
+                    ref mut reported_nominal_start_time,
+                    duration_ms,
+                    ..
+                } = self.state
+                {
+                    let packet = decoder.next_packet().expect("Vorbis error");
+
+                    if let Some(ref packet) = packet {
+                        *stream_position_pcm =
+                            *stream_position_pcm + (packet.data().len() / 2) as u64;
+                        let stream_position_millis = Self::position_pcm_to_ms(*stream_position_pcm);
+
+                        let notify_about_position = match *reported_nominal_start_time {
+                            None => true,
+                            Some(reported_nominal_start_time) => {
+                                // only notify if we're behind. If we're ahead it's probably due to a buffer of the backend and we;re actually in time.
+                                let lag = (Instant::now() - reported_nominal_start_time).as_millis()
+                                    as i64
+                                    - stream_position_millis as i64;
+                                if lag > 1000 {
+                                    true
+                                } else {
+                                    false
+                                }
+                            }
+                        };
+                        if notify_about_position {
+                            *reported_nominal_start_time = Some(
+                                Instant::now()
+                                    - Duration::from_millis(stream_position_millis as u64),
+                            );
+                            self.send_event(PlayerEvent::Playing {
+                                track_id,
+                                play_request_id,
+                                position_ms: stream_position_millis as u32,
+                                duration_ms,
+                            });
+                        }
+                    }
+
+                    self.handle_packet(packet, normalisation_factor);
+                } else {
+                    unreachable!();
+                };
+            }
+
+            if let PlayerState::Playing {
+                track_id,
+                play_request_id,
+                duration_ms,
+                stream_position_pcm,
+                ref mut stream_loader_controller,
+                ref mut suggested_to_preload_next_track,
+                ..
+            }
+            | PlayerState::Paused {
+                track_id,
+                play_request_id,
+                duration_ms,
+                stream_position_pcm,
+                ref mut stream_loader_controller,
+                ref mut suggested_to_preload_next_track,
+                ..
+            } = self.state
+            {
+                if (!*suggested_to_preload_next_track)
+                    && ((duration_ms as i64 - Self::position_pcm_to_ms(stream_position_pcm) as i64)
+                        < PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS as i64)
+                    && stream_loader_controller.range_to_end_available()
+                {
+                    *suggested_to_preload_next_track = true;
+                    self.send_event(PlayerEvent::TimeToPreloadNextTrack {
+                        track_id,
+                        play_request_id,
+                    });
+                }
+            }
+
+            if self.session.is_invalid() {
+                return Ok(Async::Ready(()));
+            }
+
+            if (!self.state.is_playing()) && all_futures_completed_or_not_ready {
+                return Ok(Async::NotReady);
+            }
+        }
+    }
+}
+
+impl PlayerInternal {
+    fn position_pcm_to_ms(position_pcm: u64) -> u32 {
+        (position_pcm * 10 / 441) as u32
+    }
+
+    fn position_ms_to_pcm(position_ms: u32) -> u64 {
+        position_ms as u64 * 441 / 10
+    }
+
+    fn ensure_sink_running(&mut self) {
+        if !self.sink_running {
+            trace!("== Starting sink ==");
+            match self.sink.start() {
+                Ok(()) => self.sink_running = true,
+                Err(err) => error!("Could not start audio: {}", err),
+            }
+        }
+    }
+
+    fn ensure_sink_stopped(&mut self) {
+        if self.sink_running {
+            trace!("== Stopping sink ==");
+            self.sink.stop().unwrap();
+            self.sink_running = false;
+        }
+    }
+
+    fn handle_player_stop(&mut self) {
+        match self.state {
+            PlayerState::Playing {
+                track_id,
+                play_request_id,
+                ..
+            }
+            | PlayerState::Paused {
+                track_id,
+                play_request_id,
+                ..
+            }
+            | PlayerState::EndOfTrack {
+                track_id,
+                play_request_id,
+                ..
+            }
+            | PlayerState::Loading {
+                track_id,
+                play_request_id,
+                ..
+            } => {
+                self.ensure_sink_stopped();
+                self.send_event(PlayerEvent::Stopped {
+                    track_id,
+                    play_request_id,
+                });
+                self.state = PlayerState::Stopped;
+            }
+            PlayerState::Stopped => (),
+            PlayerState::Invalid => panic!("invalid state"),
+        }
+    }
+
+    fn handle_play(&mut self) {
+        if let PlayerState::Paused {
+            track_id,
+            play_request_id,
+            stream_position_pcm,
+            duration_ms,
+            ..
+        } = self.state
+        {
+            self.state.paused_to_playing();
+
+            let position_ms = Self::position_pcm_to_ms(stream_position_pcm);
+            self.send_event(PlayerEvent::Playing {
+                track_id,
+                play_request_id,
+                position_ms,
+                duration_ms,
+            });
+            self.ensure_sink_running();
+        } else {
+            warn!("Player::play called from invalid state");
+        }
+    }
+
+    fn handle_pause(&mut self) {
+        if let PlayerState::Playing {
+            track_id,
+            play_request_id,
+            stream_position_pcm,
+            duration_ms,
+            ..
+        } = self.state
+        {
+            self.state.playing_to_paused();
+
+            self.ensure_sink_stopped();
+            let position_ms = Self::position_pcm_to_ms(stream_position_pcm);
+            self.send_event(PlayerEvent::Paused {
+                track_id,
+                play_request_id,
+                position_ms,
+                duration_ms,
+            });
+        } else {
+            warn!("Player::pause called from invalid state");
+        }
+    }
+
+    fn handle_packet(&mut self, packet: Option<VorbisPacket>, normalisation_factor: f32) {
+        match packet {
+            Some(mut packet) => {
+                if packet.data().len() > 0 {
+                    if let Some(ref editor) = self.audio_filter {
+                        editor.modify_stream(&mut packet.data_mut())
+                    };
+
+                    if self.config.normalisation && normalisation_factor != 1.0 {
+                        for x in packet.data_mut().iter_mut() {
+                            *x = (*x as f32 * normalisation_factor) as i16;
+                        }
+                    }
+
+                    if let Err(err) = self.sink.write(&packet.data()) {
+                        error!("Could not write audio: {}", err);
+                        self.ensure_sink_stopped();
+                    }
+                }
+            }
+
+            None => {
+                self.state.playing_to_end_of_track();
+                if let PlayerState::EndOfTrack {
+                    track_id,
+                    play_request_id,
+                    ..
+                } = self.state
+                {
+                    self.send_event(PlayerEvent::EndOfTrack {
+                        track_id,
+                        play_request_id,
+                    })
+                } else {
+                    unreachable!();
+                }
+            }
+        }
+    }
+
+    fn start_playback(
+        &mut self,
+        track_id: SpotifyId,
+        play_request_id: u64,
+        loaded_track: PlayerLoadedTrackData,
+        start_playback: bool,
+    ) {
+        let position_ms = Self::position_pcm_to_ms(loaded_track.stream_position_pcm);
+
+        if start_playback {
+            self.ensure_sink_running();
+
+            self.send_event(PlayerEvent::Playing {
+                track_id,
+                play_request_id,
+                position_ms,
+                duration_ms: loaded_track.duration_ms,
+            });
+
+            self.state = PlayerState::Playing {
+                track_id: track_id,
+                play_request_id: play_request_id,
+                decoder: loaded_track.decoder,
+                normalisation_factor: loaded_track.normalisation_factor,
+                stream_loader_controller: loaded_track.stream_loader_controller,
+                duration_ms: loaded_track.duration_ms,
+                bytes_per_second: loaded_track.bytes_per_second,
+                stream_position_pcm: loaded_track.stream_position_pcm,
+                reported_nominal_start_time: Some(
+                    Instant::now() - Duration::from_millis(position_ms as u64),
+                ),
+                suggested_to_preload_next_track: false,
+            };
+        } else {
+            self.ensure_sink_stopped();
+
+            self.state = PlayerState::Paused {
+                track_id: track_id,
+                play_request_id: play_request_id,
+                decoder: loaded_track.decoder,
+                normalisation_factor: loaded_track.normalisation_factor,
+                stream_loader_controller: loaded_track.stream_loader_controller,
+                duration_ms: loaded_track.duration_ms,
+                bytes_per_second: loaded_track.bytes_per_second,
+                stream_position_pcm: loaded_track.stream_position_pcm,
+                suggested_to_preload_next_track: false,
+            };
+
+            self.send_event(PlayerEvent::Paused {
+                track_id,
+                play_request_id,
+                position_ms,
+                duration_ms: loaded_track.duration_ms,
+            });
+        }
+    }
+
+    fn handle_command_load(
+        &mut self,
+        track_id: SpotifyId,
+        play_request_id: u64,
+        play: bool,
+        position_ms: u32,
+    ) {
+        // emit the correct player event
+        match self.state {
+            PlayerState::Playing {
+                track_id: old_track_id,
+                ..
+            }
+            | PlayerState::Paused {
+                track_id: old_track_id,
+                ..
+            }
+            | PlayerState::EndOfTrack {
+                track_id: old_track_id,
+                ..
+            }
+            | PlayerState::Loading {
+                track_id: old_track_id,
+                ..
+            } => self.send_event(PlayerEvent::Changed {
+                old_track_id: old_track_id,
+                new_track_id: track_id,
+            }),
+            PlayerState::Stopped => self.send_event(PlayerEvent::Started {
+                track_id,
+                play_request_id,
+                position_ms,
+            }),
+            PlayerState::Invalid { .. } => panic!("Player is in an invalid state."),
+        }
+
+        // Now we check at different positions whether we already have a pre-loaded version
+        // of this track somewhere. If so, use it and return.
+
+        // Check if there's a matching loaded track in the EndOfTrack player state.
+        // This is the case if we're repeating the same track again.
+        if let PlayerState::EndOfTrack {
+            track_id: previous_track_id,
+            ref mut loaded_track,
+            ..
+        } = self.state
+        {
+            if previous_track_id == track_id {
+                let loaded_track = mem::replace(&mut *loaded_track, None);
+                if let Some(mut loaded_track) = loaded_track {
+                    if Self::position_ms_to_pcm(position_ms) != loaded_track.stream_position_pcm {
+                        loaded_track
+                            .stream_loader_controller
+                            .set_random_access_mode();
+                        let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking.
+                                                                               // But most likely the track is fully
+                                                                               // loaded already because we played
+                                                                               // to the end of it.
+                        loaded_track.stream_loader_controller.set_stream_mode();
+                        loaded_track.stream_position_pcm = Self::position_ms_to_pcm(position_ms);
+                    }
+                    self.preload = PlayerPreload::None;
+                    self.start_playback(track_id, play_request_id, loaded_track, play);
+                    return;
+                }
+            }
+        }
+
+        // Check if we are already playing the track. If so, just do a seek and update our info.
+        if let PlayerState::Playing {
+            track_id: current_track_id,
+            ref mut stream_position_pcm,
+            ref mut decoder,
+            ref mut stream_loader_controller,
+            ..
+        }
+        | PlayerState::Paused {
+            track_id: current_track_id,
+            ref mut stream_position_pcm,
+            ref mut decoder,
+            ref mut stream_loader_controller,
+            ..
+        } = self.state
+        {
+            if current_track_id == track_id {
+                // we can use the current decoder. Ensure it's at the correct position.
+                if Self::position_ms_to_pcm(position_ms) != *stream_position_pcm {
+                    stream_loader_controller.set_random_access_mode();
+                    let _ = decoder.seek(position_ms as i64); // This may be blocking.
+                    stream_loader_controller.set_stream_mode();
+                    *stream_position_pcm = Self::position_ms_to_pcm(position_ms);
+                }
+
+                // Move the info from the current state into a PlayerLoadedTrackData so we can use
+                // the usual code path to start playback.
+                let old_state = mem::replace(&mut self.state, PlayerState::Invalid);
+
+                if let PlayerState::Playing {
+                    stream_position_pcm,
+                    decoder,
+                    stream_loader_controller,
+                    bytes_per_second,
+                    duration_ms,
+                    normalisation_factor,
+                    ..
+                }
+                | PlayerState::Paused {
+                    stream_position_pcm,
+                    decoder,
+                    stream_loader_controller,
+                    bytes_per_second,
+                    duration_ms,
+                    normalisation_factor,
+                    ..
+                } = old_state
+                {
+                    let loaded_track = PlayerLoadedTrackData {
+                        decoder,
+                        normalisation_factor,
+                        stream_loader_controller,
+                        bytes_per_second,
+                        duration_ms,
+                        stream_position_pcm,
+                    };
+
+                    self.preload = PlayerPreload::None;
+                    self.start_playback(track_id, play_request_id, loaded_track, play);
+
+                    if let PlayerState::Invalid = self.state {
+                        panic!("start_playback() hasn't set a valid player state.");
+                    }
+
+                    return;
+                } else {
+                    unreachable!();
+                }
+            }
+        }
+
+        // Check if the requested track has been preloaded already. If so use the preloaded data.
+        if let PlayerPreload::Ready {
+            track_id: loaded_track_id,
+            ..
+        } = self.preload
+        {
+            if track_id == loaded_track_id {
+                let preload = std::mem::replace(&mut self.preload, PlayerPreload::None);
+                if let PlayerPreload::Ready {
+                    track_id,
+                    mut loaded_track,
+                } = preload
+                {
+                    if Self::position_ms_to_pcm(position_ms) != loaded_track.stream_position_pcm {
+                        loaded_track
+                            .stream_loader_controller
+                            .set_random_access_mode();
+                        let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking
+                        loaded_track.stream_loader_controller.set_stream_mode();
+                    }
+                    self.start_playback(track_id, play_request_id, loaded_track, play);
+                    return;
+                } else {
+                    unreachable!();
+                }
+            }
+        }
+
+        // We need to load the track - either from scratch or by completing a preload.
+        // In any case we go into a Loading state to load the track.
+        self.ensure_sink_stopped();
+
+        self.send_event(PlayerEvent::Loading {
+            track_id,
+            play_request_id,
+            position_ms,
+        });
+
+        // Try to extract a pending loader from the preloading mechanism
+        let loader = if let PlayerPreload::Loading {
+            track_id: loaded_track_id,
+            ..
+        } = self.preload
+        {
+            if (track_id == loaded_track_id) && (position_ms == 0) {
+                let mut preload = PlayerPreload::None;
+                std::mem::swap(&mut preload, &mut self.preload);
+                if let PlayerPreload::Loading { loader, .. } = preload {
+                    Some(loader)
+                } else {
+                    None
+                }
+            } else {
+                None
+            }
+        } else {
+            None
+        };
+
+        self.preload = PlayerPreload::None;
+
+        // If we don't have a loader yet, create one from scratch.
+        let loader = loader
+            .or_else(|| Some(self.load_track(track_id, position_ms)))
+            .unwrap();
+
+        // Set ourselves to a loading state.
+        self.state = PlayerState::Loading {
+            track_id,
+            play_request_id,
+            start_playback: play,
+            loader,
+        };
+    }
+
+    fn handle_command_preload(&mut self, track_id: SpotifyId) {
+        debug!("Preloading track");
+        let mut preload_track = true;
+
+        // check whether the track is already loaded somewhere or being loaded.
+        if let PlayerPreload::Loading {
+            track_id: currently_loading,
+            ..
+        }
+        | PlayerPreload::Ready {
+            track_id: currently_loading,
+            ..
+        } = self.preload
+        {
+            if currently_loading == track_id {
+                // we're already preloading the requested track.
+                preload_track = false;
+            } else {
+                // we're preloading something else - cancel it.
+                self.preload = PlayerPreload::None;
+            }
+        }
+
+        if let PlayerState::Playing {
+            track_id: current_track_id,
+            ..
+        }
+        | PlayerState::Paused {
+            track_id: current_track_id,
+            ..
+        }
+        | PlayerState::EndOfTrack {
+            track_id: current_track_id,
+            ..
+        } = self.state
+        {
+            if current_track_id == track_id {
+                // we already have the requested track loaded.
+                preload_track = false;
+            }
+        }
+
+        // schedule the preload if the current track if desired.
+        if preload_track {
+            let loader = self.load_track(track_id, 0);
+            self.preload = PlayerPreload::Loading { track_id, loader }
+        }
+    }
+
+    fn handle_command_seek(&mut self, position_ms: u32) {
+        if let Some(stream_loader_controller) = self.state.stream_loader_controller() {
+            stream_loader_controller.set_random_access_mode();
+        }
+        if let Some(decoder) = self.state.decoder() {
+            match decoder.seek(position_ms as i64) {
+                Ok(_) => {
+                    if let PlayerState::Playing {
+                        ref mut stream_position_pcm,
+                        ..
+                    }
+                    | PlayerState::Paused {
+                        ref mut stream_position_pcm,
+                        ..
+                    } = self.state
+                    {
+                        *stream_position_pcm = Self::position_ms_to_pcm(position_ms);
+                    }
+                }
+                Err(err) => error!("Vorbis error: {:?}", err),
+            }
+        } else {
+            warn!("Player::seek called from invalid state");
+        }
+
+        // If we're playing, ensure, that we have enough data leaded to avoid a buffer underrun.
+        if let Some(stream_loader_controller) = self.state.stream_loader_controller() {
+            stream_loader_controller.set_stream_mode();
+        }
+
+        // ensure we have a bit of a buffer of downloaded data
+        self.preload_data_before_playback();
+
+        if let PlayerState::Playing {
+            track_id,
+            play_request_id,
+            ref mut reported_nominal_start_time,
+            duration_ms,
+            ..
+        } = self.state
+        {
+            *reported_nominal_start_time =
+                Some(Instant::now() - Duration::from_millis(position_ms as u64));
+            self.send_event(PlayerEvent::Playing {
+                track_id,
+                play_request_id,
+                position_ms,
+                duration_ms,
+            });
+        }
+        if let PlayerState::Paused {
+            track_id,
+            play_request_id,
+            duration_ms,
+            ..
+        } = self.state
+        {
+            self.send_event(PlayerEvent::Paused {
+                track_id,
+                play_request_id,
+                position_ms,
+                duration_ms,
+            });
+        }
+    }
+
+    fn handle_command(&mut self, cmd: PlayerCommand) {
+        debug!("command={:?}", cmd);
+        match cmd {
+            PlayerCommand::Load {
+                track_id,
+                play_request_id,
+                play,
+                position_ms,
+            } => self.handle_command_load(track_id, play_request_id, play, position_ms),
+
+            PlayerCommand::Preload { track_id } => self.handle_command_preload(track_id),
+
+            PlayerCommand::Seek(position_ms) => self.handle_command_seek(position_ms),
+
+            PlayerCommand::Play => self.handle_play(),
+
+            PlayerCommand::Pause => self.handle_pause(),
+
+            PlayerCommand::Stop => self.handle_player_stop(),
+
+            PlayerCommand::AddEventSender(sender) => self.event_senders.push(sender),
+
+            PlayerCommand::EmitVolumeSetEvent(volume) => {
+                self.send_event(PlayerEvent::VolumeSet { volume })
+            }
+        }
+    }
+
+    fn send_event(&mut self, event: PlayerEvent) {
+        let mut index = 0;
+        while index < self.event_senders.len() {
+            match self.event_senders[index].unbounded_send(event.clone()) {
+                Ok(_) => index += 1,
+                Err(_) => {
+                    self.event_senders.remove(index);
+                }
+            }
+        }
+    }
+
+    fn load_track(
+        &self,
+        spotify_id: SpotifyId,
+        position_ms: u32,
+    ) -> Box<dyn Future<Item = PlayerLoadedTrackData, Error = ()>> {
+        // This method creates a future that returns the loaded stream and associated info.
+        // Ideally all work should be done using asynchronous code. However, seek() on the
+        // audio stream is implemented in a blocking fashion. Thus, we can't turn it into future
+        // easily. Instead we spawn a thread to do the work and return a one-shot channel as the
+        // future to work with.
+
+        let loader = PlayerTrackLoader {
+            session: self.session.clone(),
+            config: self.config.clone(),
+        };
+
+        let (result_tx, result_rx) = futures::sync::oneshot::channel();
+
+        std::thread::spawn(move || {
+            loader
+                .load_track(spotify_id, position_ms)
+                .and_then(move |data| {
+                    let _ = result_tx.send(data);
+                    Some(())
+                });
+        });
+
+        Box::new(result_rx.map_err(|_| ()))
+    }
+
+    fn preload_data_before_playback(&mut self) {
+        if let PlayerState::Playing {
+            bytes_per_second,
+            ref mut stream_loader_controller,
+            ..
+        } = self.state
+        {
+            // Request our read ahead range
+            let request_data_length = max(
+                (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
+                    * (0.001 * stream_loader_controller.ping_time_ms() as f64)
+                    * bytes_per_second as f64) as usize,
+                (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize,
+            );
+            stream_loader_controller.fetch_next(request_data_length);
+
+            // Request the part we want to wait for blocking. This effecively means we wait for the previous request to partially complete.
+            let wait_for_data_length = max(
+                (READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS
+                    * (0.001 * stream_loader_controller.ping_time_ms() as f64)
+                    * bytes_per_second as f64) as usize,
+                (READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize,
+            );
+            stream_loader_controller.fetch_next_blocking(wait_for_data_length);
+        }
     }
 }
 
 impl Drop for PlayerInternal {
     fn drop(&mut self) {
-        debug!("drop Player[{}]", self.session.session_id());
+        debug!("drop PlayerInternal[{}]", self.session.session_id());
     }
 }
 
 impl ::std::fmt::Debug for PlayerCommand {
     fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
         match *self {
-            PlayerCommand::Load(track, play, position, _) => f
+            PlayerCommand::Load {
+                track_id,
+                play,
+                position_ms,
+                ..
+            } => f
                 .debug_tuple("Load")
-                .field(&track)
+                .field(&track_id)
                 .field(&play)
-                .field(&position)
+                .field(&position_ms)
                 .finish(),
+            PlayerCommand::Preload { track_id } => {
+                f.debug_tuple("Preload").field(&track_id).finish()
+            }
             PlayerCommand::Play => f.debug_tuple("Play").finish(),
             PlayerCommand::Pause => f.debug_tuple("Pause").finish(),
             PlayerCommand::Stop => f.debug_tuple("Stop").finish(),
             PlayerCommand::Seek(position) => f.debug_tuple("Seek").field(&position).finish(),
+            PlayerCommand::AddEventSender(_) => f.debug_tuple("AddEventSender").finish(),
+            PlayerCommand::EmitVolumeSetEvent(volume) => {
+                f.debug_tuple("VolumeSet").field(&volume).finish()
+            }
         }
     }
 }

+ 12 - 10
src/main.rs

@@ -539,16 +539,18 @@ impl Future for Main {
             if let Some(ref mut player_event_channel) = self.player_event_channel {
                 if let Async::Ready(Some(event)) = player_event_channel.poll().unwrap() {
                     if let Some(ref program) = self.player_event_program {
-                        let child = run_program_on_events(event, program)
-                            .expect("program failed to start")
-                            .map(|status| {
-                                if !status.success() {
-                                    error!("child exited with status {:?}", status.code());
-                                }
-                            })
-                            .map_err(|e| error!("failed to wait on child process: {}", e));
-
-                        self.handle.spawn(child);
+                        if let Some(child) = run_program_on_events(event, program) {
+                            let child = child
+                                .expect("program failed to start")
+                                .map(|status| {
+                                    if !status.success() {
+                                        error!("child exited with status {:?}", status.code());
+                                    }
+                                })
+                                .map_err(|e| error!("failed to wait on child process: {}", e));
+
+                            self.handle.spawn(child);
+                        }
                     }
                 }
             }

+ 5 - 4
src/player_event_handler.rs

@@ -14,7 +14,7 @@ fn run_program(program: &str, env_vars: HashMap<&str, String>) -> io::Result<Chi
         .spawn_async()
 }
 
-pub fn run_program_on_events(event: PlayerEvent, onevent: &str) -> io::Result<Child> {
+pub fn run_program_on_events(event: PlayerEvent, onevent: &str) -> Option<io::Result<Child>> {
     let mut env_vars = HashMap::new();
     match event {
         PlayerEvent::Changed {
@@ -25,14 +25,15 @@ pub fn run_program_on_events(event: PlayerEvent, onevent: &str) -> io::Result<Ch
             env_vars.insert("OLD_TRACK_ID", old_track_id.to_base62());
             env_vars.insert("TRACK_ID", new_track_id.to_base62());
         }
-        PlayerEvent::Started { track_id } => {
+        PlayerEvent::Started { track_id, .. } => {
             env_vars.insert("PLAYER_EVENT", "start".to_string());
             env_vars.insert("TRACK_ID", track_id.to_base62());
         }
-        PlayerEvent::Stopped { track_id } => {
+        PlayerEvent::Stopped { track_id, .. } => {
             env_vars.insert("PLAYER_EVENT", "stop".to_string());
             env_vars.insert("TRACK_ID", track_id.to_base62());
         }
+        _ => return None,
     }
-    run_program(onevent, env_vars)
+    Some(run_program(onevent, env_vars))
 }