Pārlūkot izejas kodu

Merge pull request #321 from devgianlu/time-alignment-fix

Fix for time alignment issue
Sasha Hilton 6 gadi atpakaļ
vecāks
revīzija
e9b159e9d9
2 mainītis faili ar 59 papildinājumiem un 38 dzēšanām
  1. 35 30
      connect/src/spirc.rs
  2. 24 8
      core/src/session.rs

+ 35 - 30
connect/src/spirc.rs

@@ -1,8 +1,15 @@
+use std;
+use std::time::{SystemTime, UNIX_EPOCH};
+
+use futures::{Async, Future, Poll, Sink, Stream};
 use futures::future;
 use futures::sync::{mpsc, oneshot};
-use futures::{Async, Future, Poll, Sink, Stream};
 use protobuf::{self, Message};
+use rand;
+use rand::seq::SliceRandom;
+use serde_json;
 
+use context::StationContext;
 use core::config::ConnectConfig;
 use core::mercury::MercuryError;
 use core::session::Session;
@@ -10,19 +17,10 @@ use core::spotify_id::SpotifyId;
 use core::util::SeqGenerator;
 use core::version;
 use core::volume::Volume;
-
-use protocol;
-use protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State};
-
 use playback::mixer::Mixer;
 use playback::player::Player;
-use serde_json;
-
-use context::StationContext;
-use rand;
-use rand::seq::SliceRandom;
-use std;
-use std::time::{SystemTime, UNIX_EPOCH};
+use protocol;
+use protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State};
 
 pub struct SpircTask {
     player: Player,
@@ -64,14 +62,6 @@ pub struct Spirc {
     commands: mpsc::UnboundedSender<SpircCommand>,
 }
 
-fn now_ms() -> i64 {
-    let dur = match SystemTime::now().duration_since(UNIX_EPOCH) {
-        Ok(dur) => dur,
-        Err(err) => err.duration(),
-    };
-    (dur.as_secs() * 1000 + (dur.subsec_nanos() / 1000_000) as u64) as i64
-}
-
 fn initial_state() -> State {
     let mut frame = protocol::spirc::State::new();
     frame.set_repeat(false);
@@ -404,6 +394,14 @@ impl Future for SpircTask {
 }
 
 impl SpircTask {
+    fn now_ms(&mut self) -> i64 {
+        let dur = match SystemTime::now().duration_since(UNIX_EPOCH) {
+            Ok(dur) => dur,
+            Err(err) => err.duration(),
+        };
+        ((dur.as_secs() + self.session.time_delta()) * 1000 + (dur.subsec_nanos() / 1000_000) as u64) as i64
+    }
+
     fn handle_command(&mut self, cmd: SpircCommand) {
         let active = self.device.get_is_active();
         match cmd {
@@ -494,15 +492,17 @@ impl SpircTask {
 
             MessageType::kMessageTypeLoad => {
                 if !self.device.get_is_active() {
+                    let now = self.now_ms();
                     self.device.set_is_active(true);
-                    self.device.set_became_active_at(now_ms());
+                    self.device.set_became_active_at(now);
                 }
 
                 self.update_tracks(&frame);
 
                 if self.state.get_track().len() > 0 {
+                    let now = self.now_ms();
                     self.state.set_position_ms(frame.get_state().get_position_ms());
-                    self.state.set_position_measured_at(now_ms() as u64);
+                    self.state.set_position_measured_at(now as u64);
 
                     let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
                     self.load_track(play);
@@ -577,8 +577,9 @@ impl SpircTask {
             MessageType::kMessageTypeSeek => {
                 let position = frame.get_position();
 
+                let now = self.now_ms();
                 self.state.set_position_ms(position);
-                self.state.set_position_measured_at(now_ms() as u64);
+                self.state.set_position_measured_at(now as u64);
                 self.player.seek(position);
                 self.notify(None);
             }
@@ -611,7 +612,8 @@ impl SpircTask {
             self.mixer.start();
             self.player.play();
             self.state.set_status(PlayStatus::kPlayStatusPlay);
-            self.state.set_position_measured_at(now_ms() as u64);
+            let now = self.now_ms();
+            self.state.set_position_measured_at(now as u64);
         }
     }
 
@@ -629,7 +631,7 @@ impl SpircTask {
             self.mixer.stop();
             self.state.set_status(PlayStatus::kPlayStatusPause);
 
-            let now = now_ms() as u64;
+            let now = self.now_ms() as u64;
             let position = self.state.get_position_ms();
 
             let diff = now - self.state.get_position_measured_at();
@@ -674,7 +676,8 @@ impl SpircTask {
         }
         self.state.set_playing_track_index(new_index);
         self.state.set_position_ms(0);
-        self.state.set_position_measured_at(now_ms() as u64);
+        let now = self.now_ms();
+        self.state.set_position_measured_at(now as u64);
 
         self.load_track(continue_playing);
     }
@@ -710,14 +713,16 @@ impl SpircTask {
                 pos += 1;
             }
 
+            let now = self.now_ms();
             self.state.set_playing_track_index(new_index);
             self.state.set_position_ms(0);
-            self.state.set_position_measured_at(now_ms() as u64);
+            self.state.set_position_measured_at(now as u64);
 
             self.load_track(true);
         } else {
+            let now = self.now_ms();
             self.state.set_position_ms(0);
-            self.state.set_position_measured_at(now_ms() as u64);
+            self.state.set_position_measured_at(now as u64);
             self.player.seek(0);
         }
     }
@@ -744,7 +749,7 @@ impl SpircTask {
     }
 
     fn position(&mut self) -> u32 {
-        let diff = now_ms() as u64 - self.state.get_position_measured_at();
+        let diff = self.now_ms() as u64 - self.state.get_position_measured_at();
         self.state.get_position_ms() + diff as u32
     }
 
@@ -881,7 +886,7 @@ impl<'a> CommandSender<'a> {
         frame.set_seq_nr(spirc.sequence.get());
         frame.set_typ(cmd);
         frame.set_device_state(spirc.device.clone());
-        frame.set_state_update_id(now_ms());
+        frame.set_state_update_id(spirc.now_ms());
         CommandSender {
             spirc: spirc,
             frame: frame,

+ 24 - 8
core/src/session.rs

@@ -1,24 +1,27 @@
-use bytes::Bytes;
-use futures::sync::mpsc;
-use futures::{Async, Future, IntoFuture, Poll, Stream};
 use std::io;
-use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
 use std::sync::{Arc, RwLock, Weak};
+use std::sync::atomic::{ATOMIC_USIZE_INIT, AtomicUsize, Ordering};
+use std::time::{SystemTime, UNIX_EPOCH};
+
+use byteorder::{BigEndian, ByteOrder};
+use bytes::Bytes;
+use futures::{Async, Future, IntoFuture, Poll, Stream};
+use futures::sync::mpsc;
 use tokio_core::reactor::{Handle, Remote};
 
 use apresolve::apresolve_or_fallback;
+use audio_key::AudioKeyManager;
 use authentication::Credentials;
 use cache::Cache;
+use channel::ChannelManager;
 use component::Lazy;
 use config::SessionConfig;
 use connection;
-
-use audio_key::AudioKeyManager;
-use channel::ChannelManager;
 use mercury::MercuryManager;
 
 struct SessionData {
     country: String,
+    time_delta: u64,
     canonical_username: String,
     invalid: bool,
 }
@@ -108,6 +111,7 @@ impl Session {
                 country: String::new(),
                 canonical_username: username,
                 invalid: false,
+                time_delta: 0,
             }),
 
             tx_connection: sender_tx,
@@ -146,6 +150,10 @@ impl Session {
         self.0.mercury.get(|| MercuryManager::new(self.weak()))
     }
 
+    pub fn time_delta(&self) -> u64 {
+        self.0.data.read().unwrap().time_delta
+    }
+
     pub fn spawn<F, R>(&self, f: F)
     where
         F: FnOnce(&Handle) -> R + Send + 'static,
@@ -168,8 +176,16 @@ impl Session {
     fn dispatch(&self, cmd: u8, data: Bytes) {
         match cmd {
             0x4 => {
+                let server_timestamp = BigEndian::read_u32(data.as_ref()) as u64;
+                let timestamp = match SystemTime::now().duration_since(UNIX_EPOCH) {
+                    Ok(dur) => dur,
+                    Err(err) => err.duration(),
+                }.as_secs() as u64;
+
+                self.0.data.write().unwrap().time_delta = server_timestamp - timestamp;
+
                 self.debug_info();
-                self.send_packet(0x49, data.as_ref().to_owned());
+                self.send_packet(0x49, vec![0, 0, 0, 0]);
             }
             0x4a => (),
             0x1b => {