123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469 |
- use eventual::Async;
- use protobuf::{self, Message, RepeatedField};
- use std::borrow::Cow;
- use std::sync::{Mutex, Arc};
- use std::collections::HashMap;
- use mercury::{MercuryRequest, MercuryMethod};
- use player::{Player, PlayerState};
- use mixer::Mixer;
- use session::Session;
- use util;
- use util::SpotifyId;
- use version;
- use protocol;
- pub use protocol::spirc::{PlayStatus, MessageType};
- #[derive(Clone)]
- pub struct SpircManager(Arc<Mutex<SpircInternal>>);
- struct SpircInternal {
- player: Player,
- session: Session,
- mixer: Box<Mixer + Send>,
- seq_nr: u32,
- name: String,
- ident: String,
- device_type: u8,
- can_play: bool,
- repeat: bool,
- shuffle: bool,
- is_active: bool,
- became_active_at: i64,
- last_command_ident: String,
- last_command_msgid: u32,
- tracks: Vec<SpotifyId>,
- index: u32,
- devices: HashMap<String, String>,
- }
- #[derive(Clone)]
- pub struct State {
- pub status: PlayStatus,
- pub position_ms: u32,
- pub position_measured_at: i64,
- pub update_time: i64,
- pub volume: u16,
- pub track: Option<SpotifyId>,
- pub end_of_track: bool,
- }
- impl SpircManager {
- pub fn new(session: Session, player: Player, mixer: Box<Mixer + Send>) -> SpircManager {
- let ident = session.device_id().to_owned();
- let name = session.config().device_name.clone();
- SpircManager(Arc::new(Mutex::new(SpircInternal {
- player: player,
- session: session,
- mixer: mixer,
- seq_nr: 0,
- name: name,
- ident: ident,
- device_type: 5,
- can_play: true,
- repeat: false,
- shuffle: false,
- is_active: false,
- became_active_at: 0,
- last_command_ident: String::new(),
- last_command_msgid: 0,
- tracks: Vec::new(),
- index: 0,
- devices: HashMap::new(),
- })))
- }
- pub fn run(&self) {
- let rx = {
- let mut internal = self.0.lock().unwrap();
- let rx = internal.session.mercury_sub(internal.uri());
- internal.notify(true, None);
- // Use a weak pointer to avoid creating an Rc cycle between the player and the
- // SpircManager
- let _self = Arc::downgrade(&self.0);
- internal.player.add_observer(Box::new(move |state| {
- if let Some(_self) = _self.upgrade() {
- let mut internal = _self.lock().unwrap();
- internal.on_update(state);
- }
- }));
- rx
- };
- for pkt in rx {
- let data = pkt.payload.first().unwrap();
- let frame = protobuf::parse_from_bytes::<protocol::spirc::Frame>(data).unwrap();
- debug!("{:?} {:?} {} {} {}",
- frame.get_typ(),
- frame.get_device_state().get_name(),
- frame.get_ident(),
- frame.get_seq_nr(),
- frame.get_state_update_id());
- self.0.lock().unwrap().handle(frame);
- }
- }
- pub fn devices(&self) -> HashMap<String, String> {
- self.0.lock().unwrap().devices.clone()
- }
- pub fn send_play(&self, recipient: &str) {
- let mut internal = self.0.lock().unwrap();
- CommandSender::new(&mut *internal, MessageType::kMessageTypePlay)
- .recipient(recipient)
- .send();
- }
- pub fn send_pause(&self, recipient: &str) {
- let mut internal = self.0.lock().unwrap();
- CommandSender::new(&mut *internal, MessageType::kMessageTypePause)
- .recipient(recipient)
- .send();
- }
- pub fn send_prev(&self, recipient: &str) {
- let mut internal = self.0.lock().unwrap();
- CommandSender::new(&mut *internal, MessageType::kMessageTypePrev)
- .recipient(recipient)
- .send();
- }
- pub fn send_next(&self, recipient: &str) {
- let mut internal = self.0.lock().unwrap();
- CommandSender::new(&mut *internal, MessageType::kMessageTypeNext)
- .recipient(recipient)
- .send();
- }
- pub fn send_replace_tracks<I: Iterator<Item = SpotifyId>>(&mut self,
- recipient: &str,
- track_ids: I) {
- let state = track_ids_to_state(track_ids);
- let mut internal = self.0.lock().unwrap();
- CommandSender::new(&mut *internal, MessageType::kMessageTypeReplace)
- .recipient(recipient)
- .state(state)
- .send();
- }
- pub fn send_load_tracks<I: Iterator<Item = SpotifyId>>(&mut self,
- recipient: &str,
- track_ids: I) {
- let state = track_ids_to_state(track_ids);
- let mut internal = self.0.lock().unwrap();
- CommandSender::new(&mut *internal, MessageType::kMessageTypeLoad)
- .recipient(recipient)
- .state(state)
- .send();
- }
- pub fn send_goodbye(&self) {
- let mut internal = self.0.lock().unwrap();
- CommandSender::new(&mut *internal, MessageType::kMessageTypeGoodbye)
- .send();
- }
- pub fn get_queue(&self) -> Vec<SpotifyId> {
- self.0.lock().unwrap().tracks.clone()
- }
- }
- impl SpircInternal {
- fn on_update(&mut self, player_state: &PlayerState) {
- let end_of_track = player_state.end_of_track();
- if end_of_track {
- self.index = (self.index + 1) % self.tracks.len() as u32;
- let track = self.tracks[self.index as usize];
- self.player.load(track, true, 0);
- } else {
- self.notify(false, None);
- }
- }
- fn handle(&mut self, frame: protocol::spirc::Frame) {
- if frame.get_ident() == self.ident ||
- (frame.get_recipient().len() > 0 && !frame.get_recipient().contains(&self.ident)) {
- return;
- }
- if frame.get_recipient().len() > 0 {
- self.last_command_ident = frame.get_ident().to_owned();
- self.last_command_msgid = frame.get_seq_nr();
- }
- if frame.has_ident() && !frame.has_goodbye() && frame.has_device_state() {
- self.devices.insert(frame.get_ident().into(),
- frame.get_device_state().get_name().into());
- }
- match frame.get_typ() {
- MessageType::kMessageTypeHello => {
- self.notify(false, Some(frame.get_ident()));
- }
- MessageType::kMessageTypeLoad => {
- if !self.is_active {
- self.is_active = true;
- self.became_active_at = util::now_ms();
- }
- self.reload_tracks(&frame);
- if self.tracks.len() > 0 {
- let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
- let track = self.tracks[self.index as usize];
- let position = frame.get_state().get_position_ms();
- self.player.load(track, play, position);
- } else {
- self.notify(false, Some(frame.get_ident()));
- }
- }
- MessageType::kMessageTypePlay => {
- self.player.play();
- }
- MessageType::kMessageTypePause => {
- self.player.pause();
- }
- MessageType::kMessageTypeNext => {
- self.index = (self.index + 1) % self.tracks.len() as u32;
- let track = self.tracks[self.index as usize];
- self.player.load(track, true, 0);
- }
- MessageType::kMessageTypePrev => {
- self.index = (self.index - 1) % self.tracks.len() as u32;
- let track = self.tracks[self.index as usize];
- self.player.load(track, true, 0);
- }
- MessageType::kMessageTypeSeek => {
- self.player.seek(frame.get_position());
- }
- MessageType::kMessageTypeReplace => {
- self.reload_tracks(&frame);
- }
- MessageType::kMessageTypeNotify => {
- if self.is_active && frame.get_device_state().get_is_active() {
- self.is_active = false;
- self.player.stop();
- }
- }
- MessageType::kMessageTypeVolume => {
- self.mixer.set_volume(frame.get_volume() as u16);
- }
- MessageType::kMessageTypeGoodbye => {
- if frame.has_ident() {
- self.devices.remove(frame.get_ident());
- }
- }
- _ => (),
- }
- }
- fn reload_tracks(&mut self, ref frame: &protocol::spirc::Frame) {
- self.index = frame.get_state().get_playing_track_index();
- self.tracks = frame.get_state()
- .get_track()
- .iter()
- .filter(|track| track.has_gid())
- .map(|track| SpotifyId::from_raw(track.get_gid()))
- .collect();
- }
- fn notify(&mut self, hello: bool, recipient: Option<&str>) {
- let mut cs = CommandSender::new(self,
- if hello {
- MessageType::kMessageTypeHello
- } else {
- MessageType::kMessageTypeNotify
- });
- if let Some(s) = recipient {
- cs = cs.recipient(&s);
- }
- cs.send();
- }
- fn spirc_state(&self, state: &State) -> protocol::spirc::State {
- protobuf_init!(protocol::spirc::State::new(), {
- status: state.status,
- position_ms: state.position_ms,
- position_measured_at: state.position_measured_at as u64,
- playing_track_index: self.index,
- track: self.tracks.iter().map(|track| {
- protobuf_init!(protocol::spirc::TrackRef::new(), {
- gid: track.to_raw().to_vec()
- })
- }).collect(),
- shuffle: self.shuffle,
- repeat: self.repeat,
- playing_from_fallback: true,
- last_command_ident: self.last_command_ident.clone(),
- last_command_msgid: self.last_command_msgid
- })
- }
- fn device_state(&self, state: &State) -> protocol::spirc::DeviceState {
- protobuf_init!(protocol::spirc::DeviceState::new(), {
- sw_version: version::version_string(),
- is_active: self.is_active,
- can_play: self.can_play,
- volume: state.volume as u32,
- name: self.name.clone(),
- error_code: 0,
- became_active_at: if self.is_active { self.became_active_at as i64 } else { 0 },
- capabilities => [
- @{
- typ: protocol::spirc::CapabilityType::kCanBePlayer,
- intValue => [0]
- },
- @{
- typ: protocol::spirc::CapabilityType::kDeviceType,
- intValue => [ self.device_type as i64 ]
- },
- @{
- typ: protocol::spirc::CapabilityType::kGaiaEqConnectId,
- intValue => [1]
- },
- @{
- typ: protocol::spirc::CapabilityType::kSupportsLogout,
- intValue => [0]
- },
- @{
- typ: protocol::spirc::CapabilityType::kIsObservable,
- intValue => [1]
- },
- @{
- typ: protocol::spirc::CapabilityType::kVolumeSteps,
- intValue => [64]
- },
- @{
- typ: protocol::spirc::CapabilityType::kSupportedContexts,
- stringValue => [
- "album",
- "playlist",
- "search",
- "inbox",
- "toplist",
- "starred",
- "publishedstarred",
- "track",
- ]
- },
- @{
- typ: protocol::spirc::CapabilityType::kSupportedTypes,
- stringValue => [
- "audio/local",
- "audio/track",
- "local",
- "track",
- ]
- }
- ],
- })
- }
- fn uri(&self) -> String {
- format!("hm://remote/user/{}", self.session.username())
- }
- }
- struct CommandSender<'a> {
- spirc_internal: &'a mut SpircInternal,
- cmd: MessageType,
- recipient: Option<&'a str>,
- state: Option<protocol::spirc::State>,
- }
- impl<'a> CommandSender<'a> {
- fn new(spirc_internal: &'a mut SpircInternal, cmd: MessageType) -> CommandSender {
- CommandSender {
- spirc_internal: spirc_internal,
- cmd: cmd,
- recipient: None,
- state: None,
- }
- }
- fn recipient(mut self, r: &'a str) -> CommandSender {
- self.recipient = Some(r);
- self
- }
- fn state(mut self, s: protocol::spirc::State) -> CommandSender<'a> {
- self.state = Some(s);
- self
- }
- fn send(self) {
- //TODO: get data
- let state = Cow::Owned(State {
- status: PlayStatus::kPlayStatusStop,
- position_ms: 0,
- position_measured_at: 0,
- update_time: util::now_ms(),
- volume: 0,
- track: None,
- end_of_track: false,
- });
- let mut pkt = protobuf_init!(protocol::spirc::Frame::new(), {
- version: 1,
- ident: self.spirc_internal.ident.clone(),
- protocol_version: "2.0.0",
- seq_nr: { self.spirc_internal.seq_nr += 1; self.spirc_internal.seq_nr },
- typ: self.cmd,
- recipient: RepeatedField::from_vec(
- self.recipient.map(|r| vec![r.to_owned()] ).unwrap_or(vec![])
- ),
- device_state: self.spirc_internal.device_state(&state),
- state_update_id: state.update_time
- });
- if self.spirc_internal.is_active {
- pkt.set_state(self.spirc_internal.spirc_state(&state));
- }
- self.spirc_internal
- .session
- .mercury(MercuryRequest {
- method: MercuryMethod::SEND,
- uri: self.spirc_internal.uri(),
- content_type: None,
- payload: vec![pkt.write_to_bytes().unwrap()],
- })
- .fire();
- }
- }
- fn track_ids_to_state<I: Iterator<Item = SpotifyId>>(track_ids: I) -> protocol::spirc::State {
- let tracks: Vec<protocol::spirc::TrackRef> =
- track_ids.map(|i| {
- protobuf_init!(protocol::spirc::TrackRef::new(), { gid: i.to_raw().to_vec()})
- })
- .collect();
- protobuf_init!(protocol::spirc::State::new(), {
- track: RepeatedField::from_vec(tracks)
- })
- }
|