spirc.rs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476
  1. use eventual::Async;
  2. use protobuf::{self, Message, RepeatedField};
  3. use std::borrow::Cow;
  4. use std::sync::{Mutex, Arc};
  5. use std::collections::HashMap;
  6. use mercury::{MercuryRequest, MercuryMethod};
  7. use player::{Player, PlayerState};
  8. use session::Session;
  9. use util;
  10. use util::SpotifyId;
  11. use version;
  12. use protocol;
  13. pub use protocol::spirc::{PlayStatus, MessageType};
  14. #[derive(Clone)]
  15. pub struct SpircManager(Arc<Mutex<SpircInternal>>);
  16. struct SpircInternal {
  17. player: Player,
  18. session: Session,
  19. seq_nr: u32,
  20. name: String,
  21. ident: String,
  22. device_type: u8,
  23. can_play: bool,
  24. repeat: bool,
  25. shuffle: bool,
  26. is_active: bool,
  27. became_active_at: i64,
  28. last_command_ident: String,
  29. last_command_msgid: u32,
  30. tracks: Vec<SpotifyId>,
  31. index: u32,
  32. devices: HashMap<String, String>,
  33. }
  34. impl SpircManager {
  35. pub fn new(session: Session, player: Player) -> SpircManager {
  36. let ident = session.device_id().to_owned();
  37. let name = session.config().device_name.clone();
  38. SpircManager(Arc::new(Mutex::new(SpircInternal {
  39. player: player,
  40. session: session,
  41. seq_nr: 0,
  42. name: name,
  43. ident: ident,
  44. device_type: 5,
  45. can_play: true,
  46. repeat: false,
  47. shuffle: false,
  48. is_active: false,
  49. became_active_at: 0,
  50. last_command_ident: String::new(),
  51. last_command_msgid: 0,
  52. tracks: Vec::new(),
  53. index: 0,
  54. devices: HashMap::new(),
  55. })))
  56. }
  57. pub fn run(&self) {
  58. let rx = {
  59. let mut internal = self.0.lock().unwrap();
  60. let rx = internal.session.mercury_sub(internal.uri());
  61. internal.notify(true, None);
  62. // Use a weak pointer to avoid creating an Rc cycle between the player and the
  63. // SpircManager
  64. let _self = Arc::downgrade(&self.0);
  65. internal.player.add_observer(Box::new(move |state| {
  66. if let Some(_self) = _self.upgrade() {
  67. let mut internal = _self.lock().unwrap();
  68. internal.on_update(state);
  69. }
  70. }));
  71. rx
  72. };
  73. for pkt in rx {
  74. let data = pkt.payload.first().unwrap();
  75. let frame = protobuf::parse_from_bytes::<protocol::spirc::Frame>(data).unwrap();
  76. debug!("{:?} {:?} {} {} {}",
  77. frame.get_typ(),
  78. frame.get_device_state().get_name(),
  79. frame.get_ident(),
  80. frame.get_seq_nr(),
  81. frame.get_state_update_id());
  82. self.0.lock().unwrap().handle(frame);
  83. }
  84. }
  85. pub fn devices(&self) -> HashMap<String, String> {
  86. self.0.lock().unwrap().devices.clone()
  87. }
  88. pub fn send_play(&self, recipient: &str) {
  89. let mut internal = self.0.lock().unwrap();
  90. CommandSender::new(&mut *internal, MessageType::kMessageTypePlay)
  91. .recipient(recipient)
  92. .send();
  93. }
  94. pub fn send_pause(&self, recipient: &str) {
  95. let mut internal = self.0.lock().unwrap();
  96. CommandSender::new(&mut *internal, MessageType::kMessageTypePause)
  97. .recipient(recipient)
  98. .send();
  99. }
  100. pub fn send_prev(&self, recipient: &str) {
  101. let mut internal = self.0.lock().unwrap();
  102. CommandSender::new(&mut *internal, MessageType::kMessageTypePrev)
  103. .recipient(recipient)
  104. .send();
  105. }
  106. pub fn send_next(&self, recipient: &str) {
  107. let mut internal = self.0.lock().unwrap();
  108. CommandSender::new(&mut *internal, MessageType::kMessageTypeNext)
  109. .recipient(recipient)
  110. .send();
  111. }
  112. pub fn send_replace_tracks<I: Iterator<Item = SpotifyId>>(&mut self,
  113. recipient: &str,
  114. track_ids: I) {
  115. let state = track_ids_to_state(track_ids);
  116. let mut internal = self.0.lock().unwrap();
  117. CommandSender::new(&mut *internal, MessageType::kMessageTypeReplace)
  118. .recipient(recipient)
  119. .state(state)
  120. .send();
  121. }
  122. pub fn send_load_tracks<I: Iterator<Item = SpotifyId>>(&mut self,
  123. recipient: &str,
  124. track_ids: I) {
  125. let state = track_ids_to_state(track_ids);
  126. let mut internal = self.0.lock().unwrap();
  127. CommandSender::new(&mut *internal, MessageType::kMessageTypeLoad)
  128. .recipient(recipient)
  129. .state(state)
  130. .send();
  131. }
  132. pub fn send_goodbye(&self) {
  133. let mut internal = self.0.lock().unwrap();
  134. CommandSender::new(&mut *internal, MessageType::kMessageTypeGoodbye)
  135. .send();
  136. }
  137. pub fn get_queue(&self) -> Vec<SpotifyId> {
  138. self.0.lock().unwrap().tracks.clone()
  139. }
  140. }
  141. impl SpircInternal {
  142. fn on_update(&mut self, player_state: &PlayerState) {
  143. let end_of_track = player_state.end_of_track();
  144. if end_of_track {
  145. self.index = (self.index + 1) % self.tracks.len() as u32;
  146. let track = self.tracks[self.index as usize];
  147. self.player.load(track, true, 0);
  148. } else {
  149. self.notify_with_player_state(false, None, player_state);
  150. }
  151. }
  152. fn handle(&mut self, frame: protocol::spirc::Frame) {
  153. if frame.get_ident() == self.ident ||
  154. (frame.get_recipient().len() > 0 && !frame.get_recipient().contains(&self.ident)) {
  155. return;
  156. }
  157. if frame.get_recipient().len() > 0 {
  158. self.last_command_ident = frame.get_ident().to_owned();
  159. self.last_command_msgid = frame.get_seq_nr();
  160. }
  161. if frame.has_ident() && !frame.has_goodbye() && frame.has_device_state() {
  162. self.devices.insert(frame.get_ident().into(),
  163. frame.get_device_state().get_name().into());
  164. }
  165. match frame.get_typ() {
  166. MessageType::kMessageTypeHello => {
  167. self.notify(false, Some(frame.get_ident()));
  168. }
  169. MessageType::kMessageTypeLoad => {
  170. if !self.is_active {
  171. self.is_active = true;
  172. self.became_active_at = util::now_ms();
  173. }
  174. self.reload_tracks(&frame);
  175. if self.tracks.len() > 0 {
  176. let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
  177. let track = self.tracks[self.index as usize];
  178. let position = frame.get_state().get_position_ms();
  179. self.player.load(track, play, position);
  180. } else {
  181. self.notify(false, Some(frame.get_ident()));
  182. }
  183. }
  184. MessageType::kMessageTypePlay => {
  185. self.player.play();
  186. }
  187. MessageType::kMessageTypePause => {
  188. self.player.pause();
  189. }
  190. MessageType::kMessageTypeNext => {
  191. self.index = (self.index + 1) % self.tracks.len() as u32;
  192. let track = self.tracks[self.index as usize];
  193. self.player.load(track, true, 0);
  194. }
  195. MessageType::kMessageTypePrev => {
  196. self.index = (self.index - 1) % self.tracks.len() as u32;
  197. let track = self.tracks[self.index as usize];
  198. self.player.load(track, true, 0);
  199. }
  200. MessageType::kMessageTypeSeek => {
  201. self.player.seek(frame.get_position());
  202. }
  203. MessageType::kMessageTypeReplace => {
  204. self.reload_tracks(&frame);
  205. }
  206. MessageType::kMessageTypeNotify => {
  207. if self.is_active && frame.get_device_state().get_is_active() {
  208. self.is_active = false;
  209. self.player.stop();
  210. }
  211. }
  212. MessageType::kMessageTypeVolume => {
  213. self.player.volume(frame.get_volume() as u16);
  214. }
  215. MessageType::kMessageTypeGoodbye => {
  216. if frame.has_ident() {
  217. self.devices.remove(frame.get_ident());
  218. }
  219. }
  220. _ => (),
  221. }
  222. }
  223. fn reload_tracks(&mut self, ref frame: &protocol::spirc::Frame) {
  224. self.index = frame.get_state().get_playing_track_index();
  225. self.tracks = frame.get_state()
  226. .get_track()
  227. .iter()
  228. .filter(|track| track.has_gid())
  229. .map(|track| SpotifyId::from_raw(track.get_gid()))
  230. .collect();
  231. }
  232. fn notify(&mut self, hello: bool, recipient: Option<&str>) {
  233. let mut cs = CommandSender::new(self,
  234. if hello {
  235. MessageType::kMessageTypeHello
  236. } else {
  237. MessageType::kMessageTypeNotify
  238. });
  239. if let Some(s) = recipient {
  240. cs = cs.recipient(&s);
  241. }
  242. cs.send();
  243. }
  244. fn notify_with_player_state(&mut self,
  245. hello: bool,
  246. recipient: Option<&str>,
  247. player_state: &PlayerState) {
  248. let mut cs = CommandSender::new(self,
  249. if hello {
  250. MessageType::kMessageTypeHello
  251. } else {
  252. MessageType::kMessageTypeNotify
  253. })
  254. .player_state(player_state);
  255. if let Some(s) = recipient {
  256. cs = cs.recipient(&s);
  257. }
  258. cs.send();
  259. }
  260. fn spirc_state(&self, player_state: &PlayerState) -> protocol::spirc::State {
  261. let (position_ms, position_measured_at) = player_state.position();
  262. protobuf_init!(protocol::spirc::State::new(), {
  263. status: player_state.status(),
  264. position_ms: position_ms,
  265. position_measured_at: position_measured_at as u64,
  266. playing_track_index: self.index,
  267. track: self.tracks.iter().map(|track| {
  268. protobuf_init!(protocol::spirc::TrackRef::new(), {
  269. gid: track.to_raw().to_vec()
  270. })
  271. }).collect(),
  272. shuffle: self.shuffle,
  273. repeat: self.repeat,
  274. playing_from_fallback: true,
  275. last_command_ident: self.last_command_ident.clone(),
  276. last_command_msgid: self.last_command_msgid
  277. })
  278. }
  279. fn device_state(&self, player_state: &PlayerState) -> protocol::spirc::DeviceState {
  280. protobuf_init!(protocol::spirc::DeviceState::new(), {
  281. sw_version: version::version_string(),
  282. is_active: self.is_active,
  283. can_play: self.can_play,
  284. volume: player_state.volume() as u32,
  285. name: self.name.clone(),
  286. error_code: 0,
  287. became_active_at: if self.is_active { self.became_active_at as i64 } else { 0 },
  288. capabilities => [
  289. @{
  290. typ: protocol::spirc::CapabilityType::kCanBePlayer,
  291. intValue => [0]
  292. },
  293. @{
  294. typ: protocol::spirc::CapabilityType::kDeviceType,
  295. intValue => [ self.device_type as i64 ]
  296. },
  297. @{
  298. typ: protocol::spirc::CapabilityType::kGaiaEqConnectId,
  299. intValue => [1]
  300. },
  301. @{
  302. typ: protocol::spirc::CapabilityType::kSupportsLogout,
  303. intValue => [0]
  304. },
  305. @{
  306. typ: protocol::spirc::CapabilityType::kIsObservable,
  307. intValue => [1]
  308. },
  309. @{
  310. typ: protocol::spirc::CapabilityType::kVolumeSteps,
  311. intValue => [10]
  312. },
  313. @{
  314. typ: protocol::spirc::CapabilityType::kSupportedContexts,
  315. stringValue => [
  316. "album",
  317. "playlist",
  318. "search",
  319. "inbox",
  320. "toplist",
  321. "starred",
  322. "publishedstarred",
  323. "track",
  324. ]
  325. },
  326. @{
  327. typ: protocol::spirc::CapabilityType::kSupportedTypes,
  328. stringValue => [
  329. "audio/local",
  330. "audio/track",
  331. "local",
  332. "track",
  333. ]
  334. }
  335. ],
  336. })
  337. }
  338. fn uri(&self) -> String {
  339. format!("hm://remote/user/{}", self.session.username())
  340. }
  341. }
  342. struct CommandSender<'a> {
  343. spirc_internal: &'a mut SpircInternal,
  344. cmd: MessageType,
  345. recipient: Option<&'a str>,
  346. player_state: Option<&'a PlayerState>,
  347. state: Option<protocol::spirc::State>,
  348. }
  349. impl<'a> CommandSender<'a> {
  350. fn new(spirc_internal: &'a mut SpircInternal, cmd: MessageType) -> CommandSender {
  351. CommandSender {
  352. spirc_internal: spirc_internal,
  353. cmd: cmd,
  354. recipient: None,
  355. player_state: None,
  356. state: None,
  357. }
  358. }
  359. fn recipient(mut self, r: &'a str) -> CommandSender {
  360. self.recipient = Some(r);
  361. self
  362. }
  363. fn player_state(mut self, s: &'a PlayerState) -> CommandSender {
  364. self.player_state = Some(s);
  365. self
  366. }
  367. fn state(mut self, s: protocol::spirc::State) -> CommandSender<'a> {
  368. self.state = Some(s);
  369. self
  370. }
  371. fn send(self) {
  372. let state = self.player_state.map_or_else(|| {
  373. Cow::Owned(self.spirc_internal.player.state())
  374. }, |s| {
  375. Cow::Borrowed(s)
  376. });
  377. let mut pkt = protobuf_init!(protocol::spirc::Frame::new(), {
  378. version: 1,
  379. ident: self.spirc_internal.ident.clone(),
  380. protocol_version: "2.0.0",
  381. seq_nr: { self.spirc_internal.seq_nr += 1; self.spirc_internal.seq_nr },
  382. typ: self.cmd,
  383. recipient: RepeatedField::from_vec(
  384. self.recipient.map(|r| vec![r.to_owned()] ).unwrap_or(vec![])
  385. ),
  386. device_state: self.spirc_internal.device_state(&state),
  387. state_update_id: state.update_time()
  388. });
  389. if self.spirc_internal.is_active {
  390. pkt.set_state(self.spirc_internal.spirc_state(&state));
  391. }
  392. self.spirc_internal
  393. .session
  394. .mercury(MercuryRequest {
  395. method: MercuryMethod::SEND,
  396. uri: self.spirc_internal.uri(),
  397. content_type: None,
  398. payload: vec![pkt.write_to_bytes().unwrap()],
  399. })
  400. .fire();
  401. }
  402. }
  403. fn track_ids_to_state<I: Iterator<Item = SpotifyId>>(track_ids: I) -> protocol::spirc::State {
  404. let tracks: Vec<protocol::spirc::TrackRef> =
  405. track_ids.map(|i| {
  406. protobuf_init!(protocol::spirc::TrackRef::new(), { gid: i.to_raw().to_vec()})
  407. })
  408. .collect();
  409. protobuf_init!(protocol::spirc::State::new(), {
  410. track: RepeatedField::from_vec(tracks)
  411. })
  412. }