Browse Source

Initial Spotify Connect receiver.

Supports basic play/pause. Only a single track for now.
Paul Lietar 9 years ago
parent
commit
4fd0b37e2b
4 changed files with 258 additions and 172 deletions
  1. 1 1
      src/audio_file.rs
  2. 96 147
      src/main.rs
  3. 154 24
      src/player.rs
  4. 7 0
      src/util/mod.rs

+ 1 - 1
src/audio_file.rs

@@ -11,7 +11,7 @@ use stream::StreamEvent;
 use util::FileId;
 use session::Session;
 
-const CHUNK_SIZE : usize = 0x40000;
+const CHUNK_SIZE : usize = 0x10000;
 
 pub struct AudioFile<'s> {
     position: usize,

+ 96 - 147
src/main.rs

@@ -2,7 +2,7 @@
 
 #![feature(plugin,scoped,zero_one,iter_arith,slice_position_elem,slice_bytes,bitset,arc_weak,append,future)]
 #![allow(deprecated)]
-//#![allow(unused_imports,dead_code)]
+#![allow(unused_imports,dead_code)]
 
 #![plugin(protobuf_macros)]
 #[macro_use] extern crate lazy_static;
@@ -46,7 +46,7 @@ use metadata::{AlbumRef, ArtistRef, TrackRef};
 use session::{Config, Session};
 use util::SpotifyId;
 use util::version::version_string;
-use player::Player;
+use player::{Player, PlayerCommand};
 use mercury::{MercuryRequest, MercuryMethod};
 use librespot_protocol as protocol;
 use librespot_protocol::spirc::PlayStatus;
@@ -76,22 +76,32 @@ fn main() {
         }
     });
 
+    let player = Player::new(&session);
+
     SpircManager {
         session: &session,
+        player: &player,
+
         username: username.clone(),
+        state_update_id: 0,
+        seq_nr: 0,
+
         name: name.clone(),
         ident: session.config.device_id.clone(),
         device_type: 5,
+        can_play: true,
 
-        state_update_id: 0,
-        seq_nr: 0,
-
+        repeat: false,
+        shuffle: false,
         volume: 0x8000,
-        can_play: true,
+
         is_active: false,
         became_active_at: 0,
 
-        state: PlayerState::new()
+        last_command_ident: String::new(),
+        last_command_msgid: 0,
+
+        track: None
     }.run();
 
     poll_thread.join();
@@ -123,89 +133,10 @@ fn print_track(session: &Session, track_id: SpotifyId) {
     }
 }
 
-struct PlayerState {
-    status: PlayStatus,
-
-    context_uri: String,
-    index: u32,
-    queue: Vec<SpotifyId>,
-
-    repeat: bool,
-    shuffle: bool,
-
-    position_ms: u32,
-    position_measured_at: i64,
-
-    last_command_ident: String,
-    last_command_msgid: u32,
-}
-
-impl PlayerState {
-    fn new() -> PlayerState {
-        PlayerState {
-            status: PlayStatus::kPlayStatusPause,
-
-            context_uri: String::new(),
-            index: 0,
-            queue: Vec::new(),
-
-            repeat: false,
-            shuffle: false,
-
-            position_ms: 0,
-            position_measured_at: 0,
-
-            last_command_ident: String::new(),
-            last_command_msgid: 0
-        }
-    }
-
-    fn import(&mut self, state: &protocol::spirc::State) {
-        self.status = state.get_status();
-
-        self.context_uri = state.get_context_uri().to_string();
-        self.index = state.get_playing_track_index();
-        self.queue = state.get_track().iter().filter(|t| {
-            t.has_gid()
-        }).map(|t| {
-            SpotifyId::from_raw(t.get_gid())
-        }).collect();
-
-        self.repeat = state.get_repeat();
-        self.shuffle = state.get_shuffle();
-
-        self.position_ms = state.get_position_ms();
-        self.position_measured_at = SpircManager::now();
-    }
-
-    fn export(&self) -> protocol::spirc::State {
-        protobuf_init!(protocol::spirc::State::new(), {
-            status: self.status,
-
-            context_uri: self.context_uri.to_string(),
-            playing_track_index: self.index,
-            track: self.queue.iter().map(|t| {
-                protobuf_init!(protocol::spirc::TrackRef::new(), {
-                    gid: t.to_raw().to_vec()
-                })
-            }).collect(),
-
-            shuffle: self.shuffle,
-            repeat: self.repeat,
-
-            position_ms: self.position_ms,
-            position_measured_at: self.position_measured_at as u64,
-
-            playing_from_fallback: true,
-
-            last_command_ident: self.last_command_ident.clone(),
-            last_command_msgid: self.last_command_msgid
-        })
-    }
-}
-
 struct SpircManager<'s> {
+    player: &'s Player<'s>,
     session: &'s Session,
+
     username: String,
     state_update_id: i64,
     seq_nr: u32,
@@ -213,90 +144,88 @@ struct SpircManager<'s> {
     name: String,
     ident: String,
     device_type: u8,
+    can_play: bool,
 
+    repeat: bool,
+    shuffle: bool,
     volume: u16,
-    can_play: bool,
+
     is_active: bool,
     became_active_at: i64,
 
-    state: PlayerState
+    last_command_ident: String,
+    last_command_msgid: u32,
+
+    track: Option<SpotifyId>
 }
 
 impl <'s> SpircManager<'s> {
     fn run(&mut self) {
-        let rx = self.session
-            .mercury_sub(format!("hm://remote/user/{}/v23", self.username))
-            .into_iter().map(|pkt| {
-            protobuf::parse_from_bytes::<protocol::spirc::Frame>(pkt.payload.front().unwrap()).unwrap()
-        });
+        let rx = self.session.mercury_sub(format!("hm://remote/user/{}/v23", self.username));
 
         self.notify(None);
 
-        for frame in rx {
-            println!("{:?} {} {} {} {}",
-                     frame.get_typ(),
-                     frame.get_device_state().get_name(),
-                     frame.get_ident(),
-                     frame.get_seq_nr(),
-                     frame.get_state_update_id());
-            if frame.get_ident() != self.ident &&
-                (frame.get_recipient().len() == 0 ||
-                 frame.get_recipient().contains(&self.ident)) {
-                self.handle(frame);
+        loop {
+            if let Ok(pkt) = rx.try_recv() {
+                let frame = protobuf::parse_from_bytes::<protocol::spirc::Frame>(
+                    pkt.payload.front().unwrap()).unwrap();
+                println!("{:?} {} {} {} {}",
+                         frame.get_typ(),
+                         frame.get_device_state().get_name(),
+                         frame.get_ident(),
+                         frame.get_seq_nr(),
+                         frame.get_state_update_id());
+                if frame.get_ident() != self.ident &&
+                    (frame.get_recipient().len() == 0 ||
+                     frame.get_recipient().contains(&self.ident)) {
+                    self.handle(frame);
+                }
+            }
+
+            let h = self.player.state.0.lock().unwrap();
+            if h.update_time > self.state_update_id {
+                self.state_update_id = util::now_ms();
+                drop(h);
+                self.notify(None);
             }
         }
     }
 
     fn handle(&mut self, frame: protocol::spirc::Frame) {
+        if frame.get_recipient().len() > 0 {
+            self.last_command_ident = frame.get_ident().to_string();
+            self.last_command_msgid = frame.get_seq_nr();
+        }
         match frame.get_typ() {
             protocol::spirc::MessageType::kMessageTypeHello => {
                 self.notify(Some(frame.get_ident()));
             }
             protocol::spirc::MessageType::kMessageTypeLoad => {
-                self.is_active = true;
-                self.became_active_at = SpircManager::now();
-
-                self.state.import(frame.get_state());
-
-                self.state.last_command_ident = frame.get_ident().to_string();
-                self.state.last_command_msgid = frame.get_seq_nr();
+                if !self.is_active {
+                    self.is_active = true;
+                    self.became_active_at = util::now_ms();
+                }
 
-                self.state_update_id = SpircManager::now();
-                self.notify(None);
+                let index = frame.get_state().get_playing_track_index() as usize;
+                let track = SpotifyId::from_raw(frame.get_state().get_track()[index].get_gid());
+                self.track = Some(track);
+                self.player.command(PlayerCommand::Load(track,
+                                                        frame.get_state().get_status() == PlayStatus::kPlayStatusPlay,
+                                                        frame.get_state().get_position_ms()));
             }
             protocol::spirc::MessageType::kMessageTypePlay => {
-                self.state.status = PlayStatus::kPlayStatusPlay;
-                self.state.position_measured_at = SpircManager::now();
-
-                self.state.last_command_ident = frame.get_ident().to_string();
-                self.state.last_command_msgid = frame.get_seq_nr();
-
-                self.state_update_id = SpircManager::now();
-                self.notify(None);
+                self.player.command(PlayerCommand::Play);
             }
             protocol::spirc::MessageType::kMessageTypePause => {
-                self.state.status = PlayStatus::kPlayStatusPause;
-                self.state.position_measured_at = SpircManager::now();
-
-                self.state.last_command_ident = frame.get_ident().to_string();
-                self.state.last_command_msgid = frame.get_seq_nr();
-
-                self.state_update_id = SpircManager::now();
-                self.notify(None);
+                self.player.command(PlayerCommand::Pause);
             }
             protocol::spirc::MessageType::kMessageTypeSeek => {
-                self.state.position_ms = frame.get_position();
-                self.state.position_measured_at = SpircManager::now();
-
-                self.state.last_command_ident = frame.get_ident().to_string();
-                self.state.last_command_msgid = frame.get_seq_nr();
-
-                self.state_update_id = SpircManager::now();
-                self.notify(None);
+                self.player.command(PlayerCommand::Seek(frame.get_position()));
             }
             protocol::spirc::MessageType::kMessageTypeNotify => {
-                if frame.get_device_state().get_is_active() {
-                    //println!("{:?}", frame.get_state());
+                if self.is_active && frame.get_device_state().get_is_active() {
+                    self.is_active = false;
+                    self.player.command(PlayerCommand::Stop);
                 }
             }
             _ => ()
@@ -318,7 +247,7 @@ impl <'s> SpircManager<'s> {
         });
 
         if self.is_active {
-            pkt.set_state(self.state.export());
+            pkt.set_state(self.state());
         }
 
         self.session.mercury(MercuryRequest{
@@ -329,6 +258,31 @@ impl <'s> SpircManager<'s> {
         });
     }
 
+    fn state(&mut self) -> protocol::spirc::State {
+        let state = self.player.state.0.lock().unwrap();
+
+        protobuf_init!(protocol::spirc::State::new(), {
+            status: state.status,
+            position_ms: state.position_ms,
+            position_measured_at: state.position_measured_at as u64,
+
+            playing_track_index: 0,
+            track => [
+                @{
+                    gid: self.track.unwrap().to_raw().to_vec()
+                }
+            ],
+
+            shuffle: self.shuffle,
+            repeat: self.repeat,
+
+            playing_from_fallback: true,
+
+            last_command_ident: self.last_command_ident.clone(),
+            last_command_msgid: self.last_command_msgid
+        })
+    }
+
     fn device_state(&mut self) -> protocol::spirc::DeviceState {
         protobuf_init!(protocol::spirc::DeviceState::new(), {
             sw_version: version_string(),
@@ -388,10 +342,5 @@ impl <'s> SpircManager<'s> {
             ],
         })
     }
-
-    fn now() -> i64 {
-        let ts = time::now_utc().to_timespec();
-        ts.sec * 1000 + ts.nsec as i64 / 1000000
-    }
 }
 

+ 154 - 24
src/player.rs

@@ -1,27 +1,79 @@
 use portaudio;
 use vorbis;
+use std::sync::{mpsc, Mutex, Arc, Condvar};
+use std::thread;
 
 use metadata::TrackRef;
 use session::Session;
 use audio_file::AudioFile;
 use audio_decrypt::AudioDecrypt;
-use util::Subfile;
+use util::{self, SpotifyId, Subfile};
+use librespot_protocol::spirc::PlayStatus;
 
-pub struct Player;
+pub enum PlayerCommand {
+    Load(SpotifyId, bool, u32),
+    Play,
+    Pause,
+    Stop,
+    Seek(u32)
+}
 
-impl Player {
-    pub fn play(session: &Session, track: TrackRef) {
-        let file_id = *track.wait().unwrap().files.first().unwrap();
+pub struct PlayerState {
+    pub status: PlayStatus,
+    pub position_ms: u32,
+    pub position_measured_at: i64,
+    pub update_time: i64
+}
 
-        let key = session.audio_key(track.id(), file_id).into_inner();
+struct PlayerInternal<'s> {
+    state: Arc<(Mutex<PlayerState>, Condvar)>,
 
-        let mut decoder = 
-            vorbis::Decoder::new(
-                Subfile::new(
-                        AudioDecrypt::new(key,
-                            AudioFile::new(session, file_id)), 0xa7)).unwrap();
-        //decoder.time_seek(60f64).unwrap();
+    session: &'s Session,
+    commands: mpsc::Receiver<PlayerCommand>,
+}
+
+pub struct Player<'s> {
+    pub state: Arc<(Mutex<PlayerState>, Condvar)>,
+
+    commands: mpsc::Sender<PlayerCommand>,
+
+    #[allow(dead_code)]
+    thread: thread::JoinGuard<'s, ()>,
+}
+
+impl <'s> Player<'s> {
+    pub fn new(session: &Session) -> Player {
+        let (cmd_tx, cmd_rx) = mpsc::channel();
+
+        let state = Arc::new((Mutex::new(PlayerState {
+            status: PlayStatus::kPlayStatusStop,
+            position_ms: 0,
+            position_measured_at: 0,
+            update_time: util::now_ms(),
+        }), Condvar::new()));
+
+        let internal = PlayerInternal {
+            session: session,
+            commands: cmd_rx,
+            state: state.clone()
+        };
+
+        Player {
+            commands: cmd_tx,
+            state: state,
+            thread: thread::scoped(move || {
+                internal.run()
+            })
+        }
+    }
+
+    pub fn command(&self, cmd: PlayerCommand) {
+        self.commands.send(cmd).unwrap();
+    }
+}
 
+impl <'s> PlayerInternal<'s> {
+    fn run(self) {
         portaudio::initialize().unwrap();
 
         let stream = portaudio::stream::Stream::<i16>::open_default(
@@ -31,20 +83,97 @@ impl Player {
                 portaudio::stream::FRAMES_PER_BUFFER_UNSPECIFIED,
                 None
                 ).unwrap();
-        stream.start().unwrap();
-
-        for pkt in decoder.packets() {
-            match pkt {
-                Ok(packet) => {
-                    match stream.write(&packet.data) {
-                        Ok(_) => (),
-                        Err(portaudio::PaError::OutputUnderflowed)
-                            => eprintln!("Underflow"),
-                        Err(e) => panic!("PA Error {}", e)
+
+        let mut decoder = None;
+
+        loop {
+            match self.commands.try_recv() {
+                Ok(PlayerCommand::Load(id, play, position)) => {
+                    println!("Load");
+                    let mut h = self.state.0.lock().unwrap();
+                    if h.status == PlayStatus::kPlayStatusPlay {
+                        stream.stop().unwrap();
+                    }
+                    h.status = PlayStatus::kPlayStatusLoading;
+                    h.position_ms = position;
+                    h.position_measured_at = util::now_ms();
+                    h.update_time = util::now_ms();
+                    drop(h);
+
+                    let track : TrackRef = self.session.metadata(id);
+                    let file_id = *track.wait().unwrap().files.first().unwrap();
+                    let key = self.session.audio_key(track.id(), file_id).into_inner();
+                    decoder = Some(
+                        vorbis::Decoder::new(
+                        Subfile::new(
+                        AudioDecrypt::new(key,
+                        AudioFile::new(self.session, file_id)), 0xa7)).unwrap());
+                    decoder.as_mut().unwrap().time_seek(position as f64 / 1000f64).unwrap();
+
+                    let mut h = self.state.0.lock().unwrap();
+                    h.status = if play {
+                        stream.start().unwrap();
+                        PlayStatus::kPlayStatusPlay
+                    } else {
+                        PlayStatus::kPlayStatusPause
                     };
+                    h.position_ms = position;
+                    h.position_measured_at = util::now_ms();
+                    h.update_time = util::now_ms();
+                    println!("Load Done");
+                }
+                Ok(PlayerCommand::Seek(ms)) => {
+                    let mut h = self.state.0.lock().unwrap();
+                    decoder.as_mut().unwrap().time_seek(ms as f64 / 1000f64).unwrap();
+                    h.position_ms = (decoder.as_mut().unwrap().time_tell().unwrap() * 1000f64) as u32;
+                    h.position_measured_at = util::now_ms();
+                    h.update_time = util::now_ms();
+                },
+                Ok(PlayerCommand::Play) => {
+                    println!("Play");
+                    let mut h = self.state.0.lock().unwrap();
+                    h.status = PlayStatus::kPlayStatusPlay;
+                    h.update_time = util::now_ms();
+
+                    stream.start().unwrap();
                 },
-                Err(vorbis::VorbisError::Hole) => (),
-                Err(e) => panic!("Vorbis error {:?}", e)
+                Ok(PlayerCommand::Pause) => {
+                    let mut h = self.state.0.lock().unwrap();
+                    h.status = PlayStatus::kPlayStatusPause;
+                    h.update_time = util::now_ms();
+
+                    stream.stop().unwrap();
+                },
+                Ok(PlayerCommand::Stop) => {
+                    let mut h = self.state.0.lock().unwrap();
+                    if h.status == PlayStatus::kPlayStatusPlay {
+                        stream.stop().unwrap();
+                    }
+
+                    h.status = PlayStatus::kPlayStatusPause;
+                    h.update_time = util::now_ms();
+                    decoder = None;
+                },
+                Err(..) => (),
+            }
+
+            if self.state.0.lock().unwrap().status == PlayStatus::kPlayStatusPlay {
+                match decoder.as_mut().unwrap().packets().next().unwrap() {
+                    Ok(packet) => {
+                        match stream.write(&packet.data) {
+                            Ok(_) => (),
+                            Err(portaudio::PaError::OutputUnderflowed)
+                                => eprintln!("Underflow"),
+                            Err(e) => panic!("PA Error {}", e)
+                        };
+                    },
+                    Err(vorbis::VorbisError::Hole) => (),
+                    Err(e) => panic!("Vorbis error {:?}", e)
+                }
+
+                let mut h = self.state.0.lock().unwrap();
+                h.position_ms = (decoder.as_mut().unwrap().time_tell().unwrap() * 1000f64) as u32;
+                h.position_measured_at = util::now_ms();
             }
         }
 
@@ -54,3 +183,4 @@ impl Player {
     }
 }
 
+

+ 7 - 0
src/util/mod.rs

@@ -1,4 +1,5 @@
 use rand::{Rng,Rand};
+use time;
 
 mod int128;
 mod spotify_id;
@@ -66,3 +67,9 @@ impl <T, E> IgnoreExt for Result<T, E> {
         }
     }
 }
+
+pub fn now_ms() -> i64 {
+    let ts = time::now_utc().to_timespec();
+    ts.sec * 1000 + ts.nsec as i64 / 1000000
+}
+