Jelajahi Sumber

Implement support for dynamic playlists (Radio)

ashthespy 6 tahun lalu
induk
melakukan
96b432aa4c
2 mengubah file dengan 133 tambahan dan 3 penghapusan
  1. 3 0
      connect/src/lib.rs
  2. 130 3
      connect/src/spirc.rs

+ 3 - 0
connect/src/lib.rs

@@ -2,6 +2,9 @@
 extern crate log;
 #[macro_use]
 extern crate serde_json;
+#[macro_use]
+extern crate serde_derive;
+extern crate serde;
 
 extern crate base64;
 extern crate crypto;

+ 130 - 3
connect/src/spirc.rs

@@ -12,16 +12,60 @@ use core::version;
 use core::volume::Volume;
 
 use protocol;
-use protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State};
+use protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State, TrackRef};
 
 use playback::mixer::Mixer;
 use playback::player::Player;
+use serde;
+use serde_json;
 
 use rand;
 use rand::seq::SliceRandom;
 use std;
 use std::time::{SystemTime, UNIX_EPOCH};
 
+// Keep this here for now
+
+#[derive(Deserialize, Debug)]
+struct TrackContext {
+    album_uri: String,
+    artist_uri: String,
+    // metadata: String,
+    #[serde(rename = "original_gid")]
+    gid: String,
+    uid: String,
+    uri: String,
+}
+#[derive(Deserialize, Debug)]
+struct StationContext {
+    uri: String,
+    next_page_url: String,
+    seeds: Vec<String>,
+    #[serde(deserialize_with = "deserialize_protobuf_TrackRef")]
+    tracks: Vec<TrackRef>,
+}
+
+#[allow(non_snake_case)]
+fn deserialize_protobuf_TrackRef<D>(de: D) -> Result<Vec<TrackRef>, D::Error>
+where
+    D: serde::Deserializer,
+{
+    let v: Vec<TrackContext> = try!(serde::Deserialize::deserialize(de));
+    let track_vec = v
+        .iter()
+        .map(|v| {
+            let mut t = TrackRef::new();
+            //  This has got to be the most round about way of doing this.
+            t.set_gid(SpotifyId::from_base62(&v.gid).unwrap().to_raw().to_vec());
+            t.set_uri(v.uri.to_owned());
+
+            t
+        })
+        .collect::<Vec<TrackRef>>();
+
+    Ok(track_vec)
+}
+
 pub struct SpircTask {
     player: Player,
     mixer: Box<Mixer>,
@@ -40,6 +84,8 @@ pub struct SpircTask {
 
     shutdown: bool,
     session: Session,
+    context_fut: Box<Future<Item = serde_json::Value, Error = MercuryError>>,
+    context: Option<StationContext>,
 }
 
 pub enum SpircCommand {
@@ -139,6 +185,15 @@ fn initial_device_state(config: ConnectConfig) -> DeviceState {
                 };
                 msg
             };
+            {
+                let msg = repeated.push_default();
+                msg.set_typ(protocol::spirc::CapabilityType::kSupportsPlaylistV2);
+                {
+                    let repeated = msg.mut_intValue();
+                    repeated.push(64)
+                };
+                msg
+            };
             {
                 let msg = repeated.push_default();
                 msg.set_typ(protocol::spirc::CapabilityType::kSupportedContexts);
@@ -176,7 +231,7 @@ fn calc_logarithmic_volume(volume: u16) -> u16 {
     // Volume conversion taken from https://www.dr-lex.be/info-stuff/volumecontrols.html#ideal2
     // Convert the given volume [0..0xffff] to a dB gain
     // We assume a dB range of 60dB.
-    // Use the equatation: a * exp(b * x)
+    // Use the equation: a * exp(b * x)
     // in which a = IDEAL_FACTOR, b = 1/1000
     const IDEAL_FACTOR: f64 = 6.908;
     let normalized_volume = volume as f64 / std::u16::MAX as f64; // To get a value between 0 and 1
@@ -259,6 +314,9 @@ impl Spirc {
 
             shutdown: false,
             session: session.clone(),
+
+            context_fut: Box::new(future::empty()),
+            context: None,
         };
 
         task.set_volume(volume);
@@ -335,6 +393,25 @@ impl Future for SpircTask {
                     Ok(Async::NotReady) => (),
                     Err(oneshot::Canceled) => self.end_of_track = Box::new(future::empty()),
                 }
+
+                match self.context_fut.poll() {
+                    Ok(Async::Ready(value)) => {
+                        let r_context = serde_json::from_value::<StationContext>(value).ok();
+                        debug!("Radio Context: {:#?}", r_context);
+                        if let Some(ref context) = r_context {
+                            warn!("Got {:?} tracks from <{}>", context.tracks.len(), context.uri);
+                        }
+                        self.context = r_context;
+
+                        progress = true;
+                        self.context_fut = Box::new(future::empty());
+                    }
+                    Ok(Async::NotReady) => (),
+                    Err(err) => {
+                        self.context_fut = Box::new(future::empty());
+                        error!("Error: {:?}", err)
+                    }
+                }
             }
 
             let poll_sender = self.sender.poll_complete().unwrap();
@@ -455,6 +532,7 @@ impl SpircTask {
                     let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
                     self.load_track(play);
                 } else {
+                    info!("No more tracks left in queue");
                     self.state.set_status(PlayStatus::kPlayStatusStop);
                 }
 
@@ -600,6 +678,19 @@ impl SpircTask {
     fn handle_next(&mut self) {
         let mut new_index = self.consume_queued_track() as u32;
         let mut continue_playing = true;
+        debug!(
+            "At track {:?} of {:?} <{:?}> update [{}]",
+            new_index,
+            self.state.get_track().len(),
+            self.state.get_context_uri(),
+            self.state.get_track().len() as u32 - new_index < 5
+        );
+        let context_uri = self.state.get_context_uri().to_owned();
+        if context_uri.contains("station") && ((self.state.get_track().len() as u32) - new_index) < 5 {
+            self.context_fut = self.resolve_station(&context_uri);
+            self.update_tracks_from_context();
+        }
+
         if new_index >= self.state.get_track().len() as u32 {
             new_index = 0; // Loop around back to start
             continue_playing = self.state.get_repeat();
@@ -680,10 +771,46 @@ impl SpircTask {
         self.state.get_position_ms() + diff as u32
     }
 
+    fn resolve_station(&self, uri: &str) -> Box<Future<Item = serde_json::Value, Error = MercuryError>> {
+        let radio_uri = format!("hm://radio-apollo/v3/stations/{}", uri);
+
+        self.resolve_uri(&radio_uri)
+    }
+
+    fn resolve_uri(&self, uri: &str) -> Box<Future<Item = serde_json::Value, Error = MercuryError>> {
+        let request = self.session.mercury().get(uri);
+
+        Box::new(request.and_then(move |response| {
+            let data = response.payload.first().expect("Empty payload on context uri");
+            let response: serde_json::Value = serde_json::from_slice(&data).unwrap();
+
+            Ok(response)
+        }))
+    }
+
+    fn update_tracks_from_context(&mut self) {
+        if let Some(ref context) = self.context {
+            self.context_fut = self.resolve_uri(&context.next_page_url);
+
+            let new_tracks = &context.tracks;
+            debug!("Adding {:?} tracks from context to playlist", new_tracks.len());
+            // Can we just push the new tracks and forget it?
+            let tracks = self.state.mut_track();
+            // tracks.append(new_tracks.to_owned());
+            for t in new_tracks {
+                tracks.push(t.to_owned());
+            }
+        }
+    }
+
     fn update_tracks(&mut self, frame: &protocol::spirc::Frame) {
         let index = frame.get_state().get_playing_track_index();
-        let tracks = frame.get_state().get_track();
         let context_uri = frame.get_state().get_context_uri().to_owned();
+        let tracks = frame.get_state().get_track();
+        debug!("Frame has {:?} tracks", tracks.len());
+        if context_uri.contains("station") {
+            self.context_fut = self.resolve_station(&context_uri);
+        }
 
         self.state.set_playing_track_index(index);
         self.state.set_track(tracks.into_iter().cloned().collect());