spirc.rs 48 KB


  1. use std;
  2. use std::time::{SystemTime, UNIX_EPOCH};
  3. use futures::future;
  4. use futures::sync::mpsc;
  5. use futures::{Async, Future, Poll, Sink, Stream};
  6. use protobuf::{self, Message};
  7. use rand;
  8. use rand::seq::SliceRandom;
  9. use serde_json;
  10. use crate::context::StationContext;
  11. use crate::playback::mixer::Mixer;
  12. use crate::playback::player::{Player, PlayerEvent, PlayerEventChannel};
  13. use crate::protocol;
  14. use crate::protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State, TrackRef};
  15. use librespot_core::config::ConnectConfig;
  16. use librespot_core::mercury::MercuryError;
  17. use librespot_core::session::Session;
  18. use librespot_core::spotify_id::{SpotifyAudioType, SpotifyId, SpotifyIdError};
  19. use librespot_core::util::url_encode;
  20. use librespot_core::util::SeqGenerator;
  21. use librespot_core::version;
  22. use librespot_core::volume::Volume;
  23. enum SpircPlayStatus {
  24. Stopped,
  25. LoadingPlay {
  26. position_ms: u32,
  27. },
  28. LoadingPause {
  29. position_ms: u32,
  30. },
  31. Playing {
  32. nominal_start_time: i64,
  33. preloading_of_next_track_triggered: bool,
  34. },
  35. Paused {
  36. position_ms: u32,
  37. preloading_of_next_track_triggered: bool,
  38. },
  39. }
  40. pub struct SpircTask {
  41. player: Player,
  42. mixer: Box<dyn Mixer>,
  43. config: SpircTaskConfig,
  44. sequence: SeqGenerator<u32>,
  45. ident: String,
  46. device: DeviceState,
  47. state: State,
  48. play_request_id: Option<u64>,
  49. mixer_started: bool,
  50. play_status: SpircPlayStatus,
  51. subscription: Box<dyn Stream<Item = Frame, Error = MercuryError>>,
  52. sender: Box<dyn Sink<SinkItem = Frame, SinkError = MercuryError>>,
  53. commands: mpsc::UnboundedReceiver<SpircCommand>,
  54. player_events: PlayerEventChannel,
  55. shutdown: bool,
  56. session: Session,
  57. context_fut: Box<dyn Future<Item = serde_json::Value, Error = MercuryError>>,
  58. autoplay_fut: Box<dyn Future<Item = String, Error = MercuryError>>,
  59. context: Option<StationContext>,
  60. }
  61. pub enum SpircCommand {
  62. Play,
  63. PlayPause,
  64. Pause,
  65. Prev,
  66. Next,
  67. VolumeUp,
  68. VolumeDown,
  69. Shutdown,
  70. }
  71. struct SpircTaskConfig {
  72. linear_volume: bool,
  73. autoplay: bool,
  74. }
  75. const CONTEXT_TRACKS_HISTORY: usize = 10;
  76. const CONTEXT_FETCH_THRESHOLD: u32 = 5;
  77. pub struct Spirc {
  78. commands: mpsc::UnboundedSender<SpircCommand>,
  79. }
  80. fn initial_state() -> State {
  81. let mut frame = protocol::spirc::State::new();
  82. frame.set_repeat(false);
  83. frame.set_shuffle(false);
  84. frame.set_status(PlayStatus::kPlayStatusStop);
  85. frame.set_position_ms(0);
  86. frame.set_position_measured_at(0);
  87. frame
  88. }
  89. fn initial_device_state(config: ConnectConfig) -> DeviceState {
  90. {
  91. let mut msg = DeviceState::new();
  92. msg.set_sw_version(version::version_string());
  93. msg.set_is_active(false);
  94. msg.set_can_play(true);
  95. msg.set_volume(0);
  96. msg.set_name(config.name);
  97. {
  98. let repeated = msg.mut_capabilities();
  99. {
  100. let msg = repeated.push_default();
  101. msg.set_typ(protocol::spirc::CapabilityType::kCanBePlayer);
  102. {
  103. let repeated = msg.mut_intValue();
  104. repeated.push(1)
  105. };
  106. msg
  107. };
  108. {
  109. let msg = repeated.push_default();
  110. msg.set_typ(protocol::spirc::CapabilityType::kDeviceType);
  111. {
  112. let repeated = msg.mut_intValue();
  113. repeated.push(config.device_type as i64)
  114. };
  115. msg
  116. };
  117. {
  118. let msg = repeated.push_default();
  119. msg.set_typ(protocol::spirc::CapabilityType::kGaiaEqConnectId);
  120. {
  121. let repeated = msg.mut_intValue();
  122. repeated.push(1)
  123. };
  124. msg
  125. };
  126. {
  127. let msg = repeated.push_default();
  128. msg.set_typ(protocol::spirc::CapabilityType::kSupportsLogout);
  129. {
  130. let repeated = msg.mut_intValue();
  131. repeated.push(0)
  132. };
  133. msg
  134. };
  135. {
  136. let msg = repeated.push_default();
  137. msg.set_typ(protocol::spirc::CapabilityType::kIsObservable);
  138. {
  139. let repeated = msg.mut_intValue();
  140. repeated.push(1)
  141. };
  142. msg
  143. };
  144. {
  145. let msg = repeated.push_default();
  146. msg.set_typ(protocol::spirc::CapabilityType::kVolumeSteps);
  147. {
  148. let repeated = msg.mut_intValue();
  149. repeated.push(64)
  150. };
  151. msg
  152. };
  153. {
  154. let msg = repeated.push_default();
  155. msg.set_typ(protocol::spirc::CapabilityType::kSupportsPlaylistV2);
  156. {
  157. let repeated = msg.mut_intValue();
  158. repeated.push(64)
  159. };
  160. msg
  161. };
  162. {
  163. let msg = repeated.push_default();
  164. msg.set_typ(protocol::spirc::CapabilityType::kSupportedContexts);
  165. {
  166. let repeated = msg.mut_stringValue();
  167. repeated.push(::std::convert::Into::into("album"));
  168. repeated.push(::std::convert::Into::into("playlist"));
  169. repeated.push(::std::convert::Into::into("search"));
  170. repeated.push(::std::convert::Into::into("inbox"));
  171. repeated.push(::std::convert::Into::into("toplist"));
  172. repeated.push(::std::convert::Into::into("starred"));
  173. repeated.push(::std::convert::Into::into("publishedstarred"));
  174. repeated.push(::std::convert::Into::into("track"))
  175. };
  176. msg
  177. };
  178. {
  179. let msg = repeated.push_default();
  180. msg.set_typ(protocol::spirc::CapabilityType::kSupportedTypes);
  181. {
  182. let repeated = msg.mut_stringValue();
  183. repeated.push(::std::convert::Into::into("audio/local"));
  184. repeated.push(::std::convert::Into::into("audio/track"));
  185. repeated.push(::std::convert::Into::into("audio/episode"));
  186. repeated.push(::std::convert::Into::into("local"));
  187. repeated.push(::std::convert::Into::into("track"))
  188. };
  189. msg
  190. };
  191. };
  192. msg
  193. }
  194. }
  195. fn calc_logarithmic_volume(volume: u16) -> u16 {
  196. // Volume conversion taken from https://www.dr-lex.be/info-stuff/volumecontrols.html#ideal2
  197. // Convert the given volume [0..0xffff] to a dB gain
  198. // We assume a dB range of 60dB.
  199. // Use the equation: a * exp(b * x)
  200. // in which a = IDEAL_FACTOR, b = 1/1000
  201. const IDEAL_FACTOR: f64 = 6.908;
  202. let normalized_volume = volume as f64 / std::u16::MAX as f64; // To get a value between 0 and 1
  203. let mut val = std::u16::MAX;
  204. // Prevent val > std::u16::MAX due to rounding errors
  205. if normalized_volume < 0.999 {
  206. let new_volume = (normalized_volume * IDEAL_FACTOR).exp() / 1000.0;
  207. val = (new_volume * std::u16::MAX as f64) as u16;
  208. }
  209. debug!("input volume:{} to mixer: {}", volume, val);
  210. // return the scale factor (0..0xffff) (equivalent to a voltage multiplier).
  211. val
  212. }
  213. fn volume_to_mixer(volume: u16, linear_volume: bool) -> u16 {
  214. if linear_volume {
  215. debug!("linear volume: {}", volume);
  216. volume
  217. } else {
  218. calc_logarithmic_volume(volume)
  219. }
  220. }
  221. impl Spirc {
  222. pub fn new(
  223. config: ConnectConfig,
  224. session: Session,
  225. player: Player,
  226. mixer: Box<dyn Mixer>,
  227. ) -> (Spirc, SpircTask) {
  228. debug!("new Spirc[{}]", session.session_id());
  229. let ident = session.device_id().to_owned();
  230. // Uri updated in response to issue #288
  231. debug!("canonical_username: {}", url_encode(&session.username()));
  232. let uri = format!("hm://remote/user/{}/", url_encode(&session.username()));
  233. let subscription = session.mercury().subscribe(&uri as &str);
  234. let subscription = subscription
  235. .map(|stream| stream.map_err(|_| MercuryError))
  236. .flatten_stream();
  237. let subscription = Box::new(subscription.map(|response| -> Frame {
  238. let data = response.payload.first().unwrap();
  239. protobuf::parse_from_bytes(data).unwrap()
  240. }));
  241. let sender = Box::new(
  242. session
  243. .mercury()
  244. .sender(uri)
  245. .with(|frame: Frame| Ok(frame.write_to_bytes().unwrap())),
  246. );
  247. let (cmd_tx, cmd_rx) = mpsc::unbounded();
  248. let volume = config.volume;
  249. let task_config = SpircTaskConfig {
  250. linear_volume: config.linear_volume,
  251. autoplay: config.autoplay,
  252. };
  253. let device = initial_device_state(config);
  254. let player_events = player.get_player_event_channel();
  255. let mut task = SpircTask {
  256. player: player,
  257. mixer: mixer,
  258. config: task_config,
  259. sequence: SeqGenerator::new(1),
  260. ident: ident,
  261. device: device,
  262. state: initial_state(),
  263. play_request_id: None,
  264. mixer_started: false,
  265. play_status: SpircPlayStatus::Stopped,
  266. subscription: subscription,
  267. sender: sender,
  268. commands: cmd_rx,
  269. player_events: player_events,
  270. shutdown: false,
  271. session: session.clone(),
  272. context_fut: Box::new(future::empty()),
  273. autoplay_fut: Box::new(future::empty()),
  274. context: None,
  275. };
  276. task.set_volume(volume);
  277. let spirc = Spirc { commands: cmd_tx };
  278. task.hello();
  279. (spirc, task)
  280. }
  281. pub fn play(&self) {
  282. let _ = self.commands.unbounded_send(SpircCommand::Play);
  283. }
  284. pub fn play_pause(&self) {
  285. let _ = self.commands.unbounded_send(SpircCommand::PlayPause);
  286. }
  287. pub fn pause(&self) {
  288. let _ = self.commands.unbounded_send(SpircCommand::Pause);
  289. }
  290. pub fn prev(&self) {
  291. let _ = self.commands.unbounded_send(SpircCommand::Prev);
  292. }
  293. pub fn next(&self) {
  294. let _ = self.commands.unbounded_send(SpircCommand::Next);
  295. }
  296. pub fn volume_up(&self) {
  297. let _ = self.commands.unbounded_send(SpircCommand::VolumeUp);
  298. }
  299. pub fn volume_down(&self) {
  300. let _ = self.commands.unbounded_send(SpircCommand::VolumeDown);
  301. }
  302. pub fn shutdown(&self) {
  303. let _ = self.commands.unbounded_send(SpircCommand::Shutdown);
  304. }
  305. }
  306. impl Future for SpircTask {
  307. type Item = ();
  308. type Error = ();
  309. fn poll(&mut self) -> Poll<(), ()> {
  310. loop {
  311. let mut progress = false;
  312. if self.session.is_invalid() {
  313. return Ok(Async::Ready(()));
  314. }
  315. if !self.shutdown {
  316. match self.subscription.poll().unwrap() {
  317. Async::Ready(Some(frame)) => {
  318. progress = true;
  319. self.handle_frame(frame);
  320. }
  321. Async::Ready(None) => {
  322. error!("subscription terminated");
  323. self.shutdown = true;
  324. self.commands.close();
  325. }
  326. Async::NotReady => (),
  327. }
  328. match self.commands.poll().unwrap() {
  329. Async::Ready(Some(command)) => {
  330. progress = true;
  331. self.handle_command(command);
  332. }
  333. Async::Ready(None) => (),
  334. Async::NotReady => (),
  335. }
  336. match self.player_events.poll() {
  337. Ok(Async::NotReady) => (),
  338. Ok(Async::Ready(None)) => (),
  339. Err(_) => (),
  340. Ok(Async::Ready(Some(event))) => {
  341. progress = true;
  342. self.handle_player_event(event);
  343. }
  344. }
  345. // TODO: Refactor
  346. match self.context_fut.poll() {
  347. Ok(Async::Ready(value)) => {
  348. let r_context = serde_json::from_value::<StationContext>(value.clone());
  349. self.context = match r_context {
  350. Ok(context) => {
  351. info!(
  352. "Resolved {:?} tracks from <{:?}>",
  353. context.tracks.len(),
  354. self.state.get_context_uri(),
  355. );
  356. Some(context)
  357. }
  358. Err(e) => {
  359. error!("Unable to parse JSONContext {:?}\n{:?}", e, value);
  360. None
  361. }
  362. };
  363. // It needn't be so verbose - can be as simple as
  364. // if let Some(ref context) = r_context {
  365. // info!("Got {:?} tracks from <{}>", context.tracks.len(), context.uri);
  366. // }
  367. // self.context = r_context;
  368. progress = true;
  369. self.context_fut = Box::new(future::empty());
  370. }
  371. Ok(Async::NotReady) => (),
  372. Err(err) => {
  373. self.context_fut = Box::new(future::empty());
  374. error!("ContextError: {:?}", err)
  375. }
  376. }
  377. match self.autoplay_fut.poll() {
  378. Ok(Async::Ready(autoplay_station_uri)) => {
  379. info!("Autoplay uri resolved to <{:?}>", autoplay_station_uri);
  380. self.context_fut = self.resolve_station(&autoplay_station_uri);
  381. progress = true;
  382. self.autoplay_fut = Box::new(future::empty());
  383. }
  384. Ok(Async::NotReady) => (),
  385. Err(err) => {
  386. self.autoplay_fut = Box::new(future::empty());
  387. error!("AutoplayError: {:?}", err)
  388. }
  389. }
  390. }
  391. let poll_sender = self.sender.poll_complete().unwrap();
  392. // Only shutdown once we've flushed out all our messages
  393. if self.shutdown && poll_sender.is_ready() {
  394. return Ok(Async::Ready(()));
  395. }
  396. if !progress {
  397. return Ok(Async::NotReady);
  398. }
  399. }
  400. }
  401. }
  402. impl SpircTask {
  403. fn now_ms(&mut self) -> i64 {
  404. let dur = match SystemTime::now().duration_since(UNIX_EPOCH) {
  405. Ok(dur) => dur,
  406. Err(err) => err.duration(),
  407. };
  408. (dur.as_secs() as i64 + self.session.time_delta()) * 1000
  409. + (dur.subsec_nanos() / 1000_000) as i64
  410. }
  411. fn ensure_mixer_started(&mut self) {
  412. if !self.mixer_started {
  413. self.mixer.start();
  414. self.mixer_started = true;
  415. }
  416. }
  417. fn ensure_mixer_stopped(&mut self) {
  418. if self.mixer_started {
  419. self.mixer.stop();
  420. self.mixer_started = false;
  421. }
  422. }
  423. fn update_state_position(&mut self, position_ms: u32) {
  424. let now = self.now_ms();
  425. self.state.set_position_measured_at(now as u64);
  426. self.state.set_position_ms(position_ms);
  427. }
  428. fn handle_command(&mut self, cmd: SpircCommand) {
  429. let active = self.device.get_is_active();
  430. match cmd {
  431. SpircCommand::Play => {
  432. if active {
  433. self.handle_play();
  434. self.notify(None, true);
  435. } else {
  436. CommandSender::new(self, MessageType::kMessageTypePlay).send();
  437. }
  438. }
  439. SpircCommand::PlayPause => {
  440. if active {
  441. self.handle_play_pause();
  442. self.notify(None, true);
  443. } else {
  444. CommandSender::new(self, MessageType::kMessageTypePlayPause).send();
  445. }
  446. }
  447. SpircCommand::Pause => {
  448. if active {
  449. self.handle_pause();
  450. self.notify(None, true);
  451. } else {
  452. CommandSender::new(self, MessageType::kMessageTypePause).send();
  453. }
  454. }
  455. SpircCommand::Prev => {
  456. if active {
  457. self.handle_prev();
  458. self.notify(None, true);
  459. } else {
  460. CommandSender::new(self, MessageType::kMessageTypePrev).send();
  461. }
  462. }
  463. SpircCommand::Next => {
  464. if active {
  465. self.handle_next();
  466. self.notify(None, true);
  467. } else {
  468. CommandSender::new(self, MessageType::kMessageTypeNext).send();
  469. }
  470. }
  471. SpircCommand::VolumeUp => {
  472. if active {
  473. self.handle_volume_up();
  474. self.notify(None, true);
  475. } else {
  476. CommandSender::new(self, MessageType::kMessageTypeVolumeUp).send();
  477. }
  478. }
  479. SpircCommand::VolumeDown => {
  480. if active {
  481. self.handle_volume_down();
  482. self.notify(None, true);
  483. } else {
  484. CommandSender::new(self, MessageType::kMessageTypeVolumeDown).send();
  485. }
  486. }
  487. SpircCommand::Shutdown => {
  488. CommandSender::new(self, MessageType::kMessageTypeGoodbye).send();
  489. self.shutdown = true;
  490. self.commands.close();
  491. }
  492. }
  493. }
  494. fn handle_player_event(&mut self, event: PlayerEvent) {
  495. // we only process events if the play_request_id matches. If it doesn't, it is
  496. // an event that belongs to a previous track and only arrives now due to a race
  497. // condition. In this case we have updated the state already and don't want to
  498. // mess with it.
  499. if let Some(play_request_id) = event.get_play_request_id() {
  500. if Some(play_request_id) == self.play_request_id {
  501. match event {
  502. PlayerEvent::EndOfTrack { .. } => self.handle_end_of_track(),
  503. PlayerEvent::Loading { .. } => self.notify(None, false),
  504. PlayerEvent::Playing { position_ms, .. } => {
  505. let new_nominal_start_time = self.now_ms() - position_ms as i64;
  506. match self.play_status {
  507. SpircPlayStatus::Playing {
  508. ref mut nominal_start_time,
  509. ..
  510. } => {
  511. if (*nominal_start_time - new_nominal_start_time).abs() > 100 {
  512. *nominal_start_time = new_nominal_start_time;
  513. self.update_state_position(position_ms);
  514. self.notify(None, true);
  515. }
  516. }
  517. SpircPlayStatus::LoadingPlay { .. }
  518. | SpircPlayStatus::LoadingPause { .. } => {
  519. self.state.set_status(PlayStatus::kPlayStatusPlay);
  520. self.update_state_position(position_ms);
  521. self.notify(None, true);
  522. self.play_status = SpircPlayStatus::Playing {
  523. nominal_start_time: new_nominal_start_time,
  524. preloading_of_next_track_triggered: false,
  525. };
  526. }
  527. _ => (),
  528. };
  529. trace!("==> kPlayStatusPlay");
  530. }
  531. PlayerEvent::Paused {
  532. position_ms: new_position_ms,
  533. ..
  534. } => {
  535. match self.play_status {
  536. SpircPlayStatus::Paused {
  537. ref mut position_ms,
  538. ..
  539. } => {
  540. if *position_ms != new_position_ms {
  541. *position_ms = new_position_ms;
  542. self.update_state_position(new_position_ms);
  543. self.notify(None, true);
  544. }
  545. }
  546. SpircPlayStatus::LoadingPlay { .. }
  547. | SpircPlayStatus::LoadingPause { .. } => {
  548. self.state.set_status(PlayStatus::kPlayStatusPause);
  549. self.update_state_position(new_position_ms);
  550. self.notify(None, true);
  551. self.play_status = SpircPlayStatus::Paused {
  552. position_ms: new_position_ms,
  553. preloading_of_next_track_triggered: false,
  554. };
  555. }
  556. _ => (),
  557. }
  558. trace!("==> kPlayStatusPause");
  559. }
  560. PlayerEvent::Stopped { .. } => match self.play_status {
  561. SpircPlayStatus::Stopped => (),
  562. _ => {
  563. warn!("The player has stopped unexpectedly.");
  564. self.state.set_status(PlayStatus::kPlayStatusStop);
  565. self.ensure_mixer_stopped();
  566. self.notify(None, true);
  567. self.play_status = SpircPlayStatus::Stopped;
  568. }
  569. },
  570. PlayerEvent::TimeToPreloadNextTrack { .. } => self.handle_preload_next_track(),
  571. PlayerEvent::Unavailable { track_id, .. } => self.handle_unavailable(track_id),
  572. _ => (),
  573. }
  574. }
  575. }
  576. }
  577. fn handle_frame(&mut self, frame: Frame) {
  578. let state_string = match frame.get_state().get_status() {
  579. PlayStatus::kPlayStatusLoading => "kPlayStatusLoading",
  580. PlayStatus::kPlayStatusPause => "kPlayStatusPause",
  581. PlayStatus::kPlayStatusStop => "kPlayStatusStop",
  582. PlayStatus::kPlayStatusPlay => "kPlayStatusPlay",
  583. };
  584. debug!(
  585. "{:?} {:?} {} {} {} {}",
  586. frame.get_typ(),
  587. frame.get_device_state().get_name(),
  588. frame.get_ident(),
  589. frame.get_seq_nr(),
  590. frame.get_state_update_id(),
  591. state_string,
  592. );
  593. if frame.get_ident() == self.ident
  594. || (frame.get_recipient().len() > 0 && !frame.get_recipient().contains(&self.ident))
  595. {
  596. return;
  597. }
  598. match frame.get_typ() {
  599. MessageType::kMessageTypeHello => {
  600. self.notify(Some(frame.get_ident()), true);
  601. }
  602. MessageType::kMessageTypeLoad => {
  603. if !self.device.get_is_active() {
  604. let now = self.now_ms();
  605. self.device.set_is_active(true);
  606. self.device.set_became_active_at(now);
  607. }
  608. self.update_tracks(&frame);
  609. if self.state.get_track().len() > 0 {
  610. let start_playing =
  611. frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
  612. self.load_track(start_playing, frame.get_state().get_position_ms());
  613. } else {
  614. info!("No more tracks left in queue");
  615. self.state.set_status(PlayStatus::kPlayStatusStop);
  616. self.player.stop();
  617. self.mixer.stop();
  618. self.play_status = SpircPlayStatus::Stopped;
  619. }
  620. self.notify(None, true);
  621. }
  622. MessageType::kMessageTypePlay => {
  623. self.handle_play();
  624. self.notify(None, true);
  625. }
  626. MessageType::kMessageTypePlayPause => {
  627. self.handle_play_pause();
  628. self.notify(None, true);
  629. }
  630. MessageType::kMessageTypePause => {
  631. self.handle_pause();
  632. self.notify(None, true);
  633. }
  634. MessageType::kMessageTypeNext => {
  635. self.handle_next();
  636. self.notify(None, true);
  637. }
  638. MessageType::kMessageTypePrev => {
  639. self.handle_prev();
  640. self.notify(None, true);
  641. }
  642. MessageType::kMessageTypeVolumeUp => {
  643. self.handle_volume_up();
  644. self.notify(None, true);
  645. }
  646. MessageType::kMessageTypeVolumeDown => {
  647. self.handle_volume_down();
  648. self.notify(None, true);
  649. }
  650. MessageType::kMessageTypeRepeat => {
  651. self.state.set_repeat(frame.get_state().get_repeat());
  652. self.notify(None, true);
  653. }
  654. MessageType::kMessageTypeShuffle => {
  655. self.state.set_shuffle(frame.get_state().get_shuffle());
  656. if self.state.get_shuffle() {
  657. let current_index = self.state.get_playing_track_index();
  658. {
  659. let tracks = self.state.mut_track();
  660. tracks.swap(0, current_index as usize);
  661. if let Some((_, rest)) = tracks.split_first_mut() {
  662. let mut rng = rand::thread_rng();
  663. rest.shuffle(&mut rng);
  664. }
  665. }
  666. self.state.set_playing_track_index(0);
  667. } else {
  668. let context = self.state.get_context_uri();
  669. debug!("{:?}", context);
  670. }
  671. self.notify(None, true);
  672. }
  673. MessageType::kMessageTypeSeek => {
  674. self.handle_seek(frame.get_position());
  675. self.notify(None, true);
  676. }
  677. MessageType::kMessageTypeReplace => {
  678. self.update_tracks(&frame);
  679. self.notify(None, true);
  680. if let SpircPlayStatus::Playing {
  681. preloading_of_next_track_triggered,
  682. ..
  683. }
  684. | SpircPlayStatus::Paused {
  685. preloading_of_next_track_triggered,
  686. ..
  687. } = self.play_status
  688. {
  689. if preloading_of_next_track_triggered {
  690. // Get the next track_id in the playlist
  691. if let Some(track_id) = self.preview_next_track() {
  692. self.player.preload(track_id);
  693. }
  694. }
  695. }
  696. }
  697. MessageType::kMessageTypeVolume => {
  698. self.set_volume(frame.get_volume() as u16);
  699. self.notify(None, true);
  700. }
  701. MessageType::kMessageTypeNotify => {
  702. if self.device.get_is_active()
  703. && frame.get_device_state().get_is_active()
  704. && self.device.get_became_active_at()
  705. <= frame.get_device_state().get_became_active_at()
  706. {
  707. self.device.set_is_active(false);
  708. self.state.set_status(PlayStatus::kPlayStatusStop);
  709. self.player.stop();
  710. self.ensure_mixer_stopped();
  711. self.play_status = SpircPlayStatus::Stopped;
  712. }
  713. }
  714. _ => (),
  715. }
  716. }
  717. fn handle_play(&mut self) {
  718. match self.play_status {
  719. SpircPlayStatus::Paused {
  720. position_ms,
  721. preloading_of_next_track_triggered,
  722. } => {
  723. self.ensure_mixer_started();
  724. self.player.play();
  725. self.state.set_status(PlayStatus::kPlayStatusPlay);
  726. self.update_state_position(position_ms);
  727. self.play_status = SpircPlayStatus::Playing {
  728. nominal_start_time: self.now_ms() as i64 - position_ms as i64,
  729. preloading_of_next_track_triggered,
  730. };
  731. }
  732. SpircPlayStatus::LoadingPause { position_ms } => {
  733. self.ensure_mixer_started();
  734. self.player.play();
  735. self.play_status = SpircPlayStatus::LoadingPlay { position_ms };
  736. }
  737. _ => (),
  738. }
  739. }
  740. fn handle_play_pause(&mut self) {
  741. match self.play_status {
  742. SpircPlayStatus::Paused { .. } | SpircPlayStatus::LoadingPause { .. } => {
  743. self.handle_play()
  744. }
  745. SpircPlayStatus::Playing { .. } | SpircPlayStatus::LoadingPlay { .. } => {
  746. self.handle_play()
  747. }
  748. _ => (),
  749. }
  750. }
  751. fn handle_pause(&mut self) {
  752. match self.play_status {
  753. SpircPlayStatus::Playing {
  754. nominal_start_time,
  755. preloading_of_next_track_triggered,
  756. } => {
  757. self.player.pause();
  758. self.state.set_status(PlayStatus::kPlayStatusPause);
  759. let position_ms = (self.now_ms() - nominal_start_time) as u32;
  760. self.update_state_position(position_ms);
  761. self.play_status = SpircPlayStatus::Paused {
  762. position_ms,
  763. preloading_of_next_track_triggered,
  764. };
  765. }
  766. SpircPlayStatus::LoadingPlay { position_ms } => {
  767. self.player.pause();
  768. self.play_status = SpircPlayStatus::LoadingPause { position_ms };
  769. }
  770. _ => (),
  771. }
  772. }
  773. fn handle_seek(&mut self, position_ms: u32) {
  774. self.update_state_position(position_ms);
  775. self.player.seek(position_ms);
  776. let now = self.now_ms();
  777. match self.play_status {
  778. SpircPlayStatus::Stopped => (),
  779. SpircPlayStatus::LoadingPause {
  780. position_ms: ref mut position,
  781. }
  782. | SpircPlayStatus::LoadingPlay {
  783. position_ms: ref mut position,
  784. }
  785. | SpircPlayStatus::Paused {
  786. position_ms: ref mut position,
  787. ..
  788. } => *position = position_ms,
  789. SpircPlayStatus::Playing {
  790. ref mut nominal_start_time,
  791. ..
  792. } => *nominal_start_time = now - position_ms as i64,
  793. };
  794. }
  795. fn consume_queued_track(&mut self) -> usize {
  796. // Removes current track if it is queued
  797. // Returns the index of the next track
  798. let current_index = self.state.get_playing_track_index() as usize;
  799. if (current_index < self.state.get_track().len())
  800. && self.state.get_track()[current_index].get_queued()
  801. {
  802. self.state.mut_track().remove(current_index);
  803. current_index
  804. } else {
  805. current_index + 1
  806. }
  807. }
  808. fn preview_next_track(&mut self) -> Option<SpotifyId> {
  809. self.get_track_id_to_play_from_playlist(self.state.get_playing_track_index() + 1)
  810. .and_then(|(track_id, _)| Some(track_id))
  811. }
  812. fn handle_preload_next_track(&mut self) {
  813. // Requests the player thread to preload the next track
  814. match self.play_status {
  815. SpircPlayStatus::Paused {
  816. ref mut preloading_of_next_track_triggered,
  817. ..
  818. }
  819. | SpircPlayStatus::Playing {
  820. ref mut preloading_of_next_track_triggered,
  821. ..
  822. } => {
  823. *preloading_of_next_track_triggered = true;
  824. if let Some(track_id) = self.preview_next_track() {
  825. self.player.preload(track_id);
  826. }
  827. }
  828. SpircPlayStatus::LoadingPause { .. }
  829. | SpircPlayStatus::LoadingPlay { .. }
  830. | SpircPlayStatus::Stopped => (),
  831. }
  832. }
  833. // Mark unavailable tracks so we can skip them later
  834. fn handle_unavailable(&mut self, track_id: SpotifyId) {
  835. let unavailables = self.get_track_index_for_spotify_id(&track_id, 0);
  836. for &index in unavailables.iter() {
  837. debug_assert_eq!(self.state.get_track()[index].get_gid(), track_id.to_raw());
  838. let mut unplayable_track_ref = TrackRef::new();
  839. unplayable_track_ref.set_gid(self.state.get_track()[index].get_gid().to_vec());
  840. // Misuse context field to flag the track
  841. unplayable_track_ref.set_context(String::from("NonPlayable"));
  842. std::mem::swap(
  843. &mut self.state.mut_track()[index],
  844. &mut unplayable_track_ref,
  845. );
  846. debug!(
  847. "Marked <{:?}> at {:?} as NonPlayable",
  848. self.state.get_track()[index],
  849. index,
  850. );
  851. }
  852. self.handle_preload_next_track();
  853. }
  854. fn handle_next(&mut self) {
  855. let mut new_index = self.consume_queued_track() as u32;
  856. let mut continue_playing = true;
  857. let tracks_len = self.state.get_track().len() as u32;
  858. debug!(
  859. "At track {:?} of {:?} <{:?}> update [{}]",
  860. new_index,
  861. self.state.get_track().len(),
  862. self.state.get_context_uri(),
  863. tracks_len - new_index < CONTEXT_FETCH_THRESHOLD
  864. );
  865. let context_uri = self.state.get_context_uri().to_owned();
  866. if (context_uri.starts_with("spotify:station:")
  867. || context_uri.starts_with("spotify:dailymix:")
  868. // spotify:user:xxx:collection
  869. || context_uri.starts_with(&format!("spotify:user:{}:collection",url_encode(&self.session.username()))))
  870. && ((self.state.get_track().len() as u32) - new_index) < CONTEXT_FETCH_THRESHOLD
  871. {
  872. self.context_fut = self.resolve_station(&context_uri);
  873. self.update_tracks_from_context();
  874. }
  875. if self.config.autoplay && new_index == tracks_len - 1 {
  876. // Extend the playlist
  877. // Note: This doesn't seem to reflect in the UI
  878. // the additional tracks in the frame don't show up as with station view
  879. debug!("Extending playlist <{}>", context_uri);
  880. self.update_tracks_from_context();
  881. }
  882. if new_index >= tracks_len {
  883. new_index = 0; // Loop around back to start
  884. continue_playing = self.state.get_repeat();
  885. }
  886. if tracks_len > 0 {
  887. self.state.set_playing_track_index(new_index);
  888. self.load_track(continue_playing, 0);
  889. } else {
  890. info!("Not playing next track because there are no more tracks left in queue.");
  891. self.state.set_playing_track_index(0);
  892. self.state.set_status(PlayStatus::kPlayStatusStop);
  893. self.player.stop();
  894. self.ensure_mixer_stopped();
  895. self.play_status = SpircPlayStatus::Stopped;
  896. }
  897. }
  898. fn handle_prev(&mut self) {
  899. // Previous behaves differently based on the position
  900. // Under 3s it goes to the previous song (starts playing)
  901. // Over 3s it seeks to zero (retains previous play status)
  902. if self.position() < 3000 {
  903. // Queued tracks always follow the currently playing track.
  904. // They should not be considered when calculating the previous
  905. // track so extract them beforehand and reinsert them after it.
  906. let mut queue_tracks = Vec::new();
  907. {
  908. let queue_index = self.consume_queued_track();
  909. let tracks = self.state.mut_track();
  910. while queue_index < tracks.len() && tracks[queue_index].get_queued() {
  911. queue_tracks.push(tracks.remove(queue_index));
  912. }
  913. }
  914. let current_index = self.state.get_playing_track_index();
  915. let new_index = if current_index > 0 {
  916. current_index - 1
  917. } else if self.state.get_repeat() {
  918. self.state.get_track().len() as u32 - 1
  919. } else {
  920. 0
  921. };
  922. // Reinsert queued tracks after the new playing track.
  923. let mut pos = (new_index + 1) as usize;
  924. for track in queue_tracks.into_iter() {
  925. self.state.mut_track().insert(pos, track);
  926. pos += 1;
  927. }
  928. self.state.set_playing_track_index(new_index);
  929. self.load_track(true, 0);
  930. } else {
  931. self.handle_seek(0);
  932. }
  933. }
  934. fn handle_volume_up(&mut self) {
  935. let mut volume: u32 = self.device.get_volume() as u32 + 4096;
  936. if volume > 0xFFFF {
  937. volume = 0xFFFF;
  938. }
  939. self.set_volume(volume as u16);
  940. }
  941. fn handle_volume_down(&mut self) {
  942. let mut volume: i32 = self.device.get_volume() as i32 - 4096;
  943. if volume < 0 {
  944. volume = 0;
  945. }
  946. self.set_volume(volume as u16);
  947. }
  948. fn handle_end_of_track(&mut self) {
  949. self.handle_next();
  950. self.notify(None, true);
  951. }
  952. fn position(&mut self) -> u32 {
  953. match self.play_status {
  954. SpircPlayStatus::Stopped => 0,
  955. SpircPlayStatus::LoadingPlay { position_ms }
  956. | SpircPlayStatus::LoadingPause { position_ms }
  957. | SpircPlayStatus::Paused { position_ms, .. } => position_ms,
  958. SpircPlayStatus::Playing {
  959. nominal_start_time, ..
  960. } => (self.now_ms() - nominal_start_time) as u32,
  961. }
  962. }
  963. fn resolve_station(
  964. &self,
  965. uri: &str,
  966. ) -> Box<dyn Future<Item = serde_json::Value, Error = MercuryError>> {
  967. let radio_uri = format!("hm://radio-apollo/v3/stations/{}", uri);
  968. self.resolve_uri(&radio_uri)
  969. }
  970. fn resolve_autoplay_uri(
  971. &self,
  972. uri: &str,
  973. ) -> Box<dyn Future<Item = String, Error = MercuryError>> {
  974. let query_uri = format!("hm://autoplay-enabled/query?uri={}", uri);
  975. let request = self.session.mercury().get(query_uri);
  976. Box::new(request.and_then(move |response| {
  977. if response.status_code == 200 {
  978. let data = response
  979. .payload
  980. .first()
  981. .expect("Empty autoplay uri")
  982. .to_vec();
  983. let autoplay_uri = String::from_utf8(data).unwrap();
  984. Ok(autoplay_uri)
  985. } else {
  986. warn!("No autoplay_uri found");
  987. Err(MercuryError)
  988. }
  989. }))
  990. }
  991. fn resolve_uri(
  992. &self,
  993. uri: &str,
  994. ) -> Box<dyn Future<Item = serde_json::Value, Error = MercuryError>> {
  995. let request = self.session.mercury().get(uri);
  996. Box::new(request.and_then(move |response| {
  997. let data = response
  998. .payload
  999. .first()
  1000. .expect("Empty payload on context uri");
  1001. let response: serde_json::Value = serde_json::from_slice(&data).unwrap();
  1002. Ok(response)
  1003. }))
  1004. }
  1005. fn update_tracks_from_context(&mut self) {
  1006. if let Some(ref context) = self.context {
  1007. self.context_fut = self.resolve_uri(&context.next_page_url);
  1008. let new_tracks = &context.tracks;
  1009. debug!("Adding {:?} tracks from context to frame", new_tracks.len());
  1010. let mut track_vec = self.state.take_track().into_vec();
  1011. if let Some(head) = track_vec.len().checked_sub(CONTEXT_TRACKS_HISTORY) {
  1012. track_vec.drain(0..head);
  1013. }
  1014. track_vec.extend_from_slice(&new_tracks);
  1015. self.state
  1016. .set_track(protobuf::RepeatedField::from_vec(track_vec));
  1017. // Update playing index
  1018. if let Some(new_index) = self
  1019. .state
  1020. .get_playing_track_index()
  1021. .checked_sub(CONTEXT_TRACKS_HISTORY as u32)
  1022. {
  1023. self.state.set_playing_track_index(new_index);
  1024. }
  1025. } else {
  1026. warn!("No context to update from!");
  1027. }
  1028. }
  1029. fn update_tracks(&mut self, frame: &protocol::spirc::Frame) {
  1030. debug!("State: {:?}", frame.get_state());
  1031. let index = frame.get_state().get_playing_track_index();
  1032. let context_uri = frame.get_state().get_context_uri().to_owned();
  1033. let tracks = frame.get_state().get_track();
  1034. debug!("Frame has {:?} tracks", tracks.len());
  1035. if context_uri.starts_with("spotify:station:")
  1036. || context_uri.starts_with("spotify:dailymix:")
  1037. {
  1038. self.context_fut = self.resolve_station(&context_uri);
  1039. } else if self.config.autoplay {
  1040. info!("Fetching autoplay context uri");
  1041. // Get autoplay_station_uri for regular playlists
  1042. self.autoplay_fut = self.resolve_autoplay_uri(&context_uri);
  1043. }
  1044. self.state.set_playing_track_index(index);
  1045. self.state.set_track(tracks.into_iter().cloned().collect());
  1046. self.state.set_context_uri(context_uri);
  1047. // has_shuffle/repeat seem to always be true in these replace msgs,
  1048. // but to replicate the behaviour of the Android client we have to
  1049. // ignore false values.
  1050. let state = frame.get_state();
  1051. if state.get_repeat() {
  1052. self.state.set_repeat(true);
  1053. }
  1054. if state.get_shuffle() {
  1055. self.state.set_shuffle(true);
  1056. }
  1057. }
  1058. // should this be a method of SpotifyId directly?
  1059. fn get_spotify_id_for_track(&self, track_ref: &TrackRef) -> Result<SpotifyId, SpotifyIdError> {
  1060. SpotifyId::from_raw(track_ref.get_gid()).or_else(|_| {
  1061. let uri = track_ref.get_uri();
  1062. debug!("Malformed or no gid, attempting to parse URI <{}>", uri);
  1063. SpotifyId::from_uri(uri)
  1064. })
  1065. }
  1066. // Helper to find corresponding index(s) for track_id
  1067. fn get_track_index_for_spotify_id(
  1068. &self,
  1069. track_id: &SpotifyId,
  1070. start_index: usize,
  1071. ) -> Vec<usize> {
  1072. let index: Vec<usize> = self.state.get_track()[start_index..]
  1073. .iter()
  1074. .enumerate()
  1075. .filter(|&(_, track_ref)| track_ref.get_gid() == track_id.to_raw())
  1076. .map(|(idx, _)| start_index + idx)
  1077. .collect();
  1078. // Sanity check
  1079. debug_assert!(!index.is_empty());
  1080. index
  1081. }
  1082. // Broken out here so we can refactor this later when we move to SpotifyObjectID or similar
  1083. fn track_ref_is_unavailable(&self, track_ref: &TrackRef) -> bool {
  1084. track_ref.get_context() == "NonPlayable"
  1085. }
  1086. fn get_track_id_to_play_from_playlist(&self, index: u32) -> Option<(SpotifyId, u32)> {
  1087. let tracks_len = self.state.get_track().len();
  1088. let mut new_playlist_index = index as usize;
  1089. if new_playlist_index >= tracks_len {
  1090. new_playlist_index = 0;
  1091. }
  1092. let start_index = new_playlist_index;
  1093. // Cycle through all tracks, break if we don't find any playable tracks
  1094. // tracks in each frame either have a gid or uri (that may or may not be a valid track)
  1095. // E.g - context based frames sometimes contain tracks with <spotify:meta:page:>
  1096. let mut track_ref = self.state.get_track()[new_playlist_index].clone();
  1097. let mut track_id = self.get_spotify_id_for_track(&track_ref);
  1098. while self.track_ref_is_unavailable(&track_ref)
  1099. || track_id.is_err()
  1100. || track_id.unwrap().audio_type == SpotifyAudioType::NonPlayable
  1101. {
  1102. warn!(
  1103. "Skipping track <{:?}> at position [{}] of {}",
  1104. track_ref, new_playlist_index, tracks_len
  1105. );
  1106. new_playlist_index += 1;
  1107. if new_playlist_index >= tracks_len {
  1108. new_playlist_index = 0;
  1109. }
  1110. if new_playlist_index == start_index {
  1111. warn!("No playable track found in state: {:?}", self.state);
  1112. return None;
  1113. }
  1114. track_ref = self.state.get_track()[new_playlist_index].clone();
  1115. track_id = self.get_spotify_id_for_track(&track_ref);
  1116. }
  1117. match track_id {
  1118. Ok(track_id) => Some((track_id, new_playlist_index as u32)),
  1119. Err(_) => None,
  1120. }
  1121. }
  1122. fn load_track(&mut self, start_playing: bool, position_ms: u32) {
  1123. let index = self.state.get_playing_track_index();
  1124. match self.get_track_id_to_play_from_playlist(index) {
  1125. Some((track, index)) => {
  1126. self.state.set_playing_track_index(index);
  1127. self.play_request_id = Some(self.player.load(track, start_playing, position_ms));
  1128. self.update_state_position(position_ms);
  1129. if start_playing {
  1130. self.state.set_status(PlayStatus::kPlayStatusPlay);
  1131. self.play_status = SpircPlayStatus::LoadingPlay { position_ms };
  1132. } else {
  1133. self.state.set_status(PlayStatus::kPlayStatusPause);
  1134. self.play_status = SpircPlayStatus::LoadingPause { position_ms };
  1135. }
  1136. }
  1137. None => {
  1138. self.state.set_status(PlayStatus::kPlayStatusStop);
  1139. self.player.stop();
  1140. self.ensure_mixer_stopped();
  1141. self.play_status = SpircPlayStatus::Stopped;
  1142. }
  1143. }
  1144. }
  1145. fn hello(&mut self) {
  1146. CommandSender::new(self, MessageType::kMessageTypeHello).send();
  1147. }
  1148. fn notify(&mut self, recipient: Option<&str>, suppress_loading_status: bool) {
  1149. if suppress_loading_status && (self.state.get_status() == PlayStatus::kPlayStatusLoading) {
  1150. return;
  1151. };
  1152. let status_string = match self.state.get_status() {
  1153. PlayStatus::kPlayStatusLoading => "kPlayStatusLoading",
  1154. PlayStatus::kPlayStatusPause => "kPlayStatusPause",
  1155. PlayStatus::kPlayStatusStop => "kPlayStatusStop",
  1156. PlayStatus::kPlayStatusPlay => "kPlayStatusPlay",
  1157. };
  1158. trace!("Sending status to server: [{}]", status_string);
  1159. let mut cs = CommandSender::new(self, MessageType::kMessageTypeNotify);
  1160. if let Some(s) = recipient {
  1161. cs = cs.recipient(&s);
  1162. }
  1163. cs.send();
  1164. }
  1165. fn set_volume(&mut self, volume: u16) {
  1166. self.device.set_volume(volume as u32);
  1167. self.mixer
  1168. .set_volume(volume_to_mixer(volume, self.config.linear_volume));
  1169. if let Some(cache) = self.session.cache() {
  1170. cache.save_volume(Volume { volume })
  1171. }
  1172. self.player.emit_volume_set_event(volume);
  1173. }
  1174. }
  1175. impl Drop for SpircTask {
  1176. fn drop(&mut self) {
  1177. debug!("drop Spirc[{}]", self.session.session_id());
  1178. }
  1179. }
  1180. struct CommandSender<'a> {
  1181. spirc: &'a mut SpircTask,
  1182. frame: protocol::spirc::Frame,
  1183. }
  1184. impl<'a> CommandSender<'a> {
  1185. fn new(spirc: &'a mut SpircTask, cmd: MessageType) -> CommandSender {
  1186. let mut frame = protocol::spirc::Frame::new();
  1187. frame.set_version(1);
  1188. frame.set_protocol_version(::std::convert::Into::into("2.0.0"));
  1189. frame.set_ident(spirc.ident.clone());
  1190. frame.set_seq_nr(spirc.sequence.get());
  1191. frame.set_typ(cmd);
  1192. frame.set_device_state(spirc.device.clone());
  1193. frame.set_state_update_id(spirc.now_ms());
  1194. CommandSender {
  1195. spirc: spirc,
  1196. frame: frame,
  1197. }
  1198. }
  1199. fn recipient(mut self, recipient: &'a str) -> CommandSender {
  1200. self.frame.mut_recipient().push(recipient.to_owned());
  1201. self
  1202. }
  1203. #[allow(dead_code)]
  1204. fn state(mut self, state: protocol::spirc::State) -> CommandSender<'a> {
  1205. self.frame.set_state(state);
  1206. self
  1207. }
  1208. fn send(mut self) {
  1209. if !self.frame.has_state() && self.spirc.device.get_is_active() {
  1210. self.frame.set_state(self.spirc.state.clone());
  1211. }
  1212. let send = self.spirc.sender.start_send(self.frame).unwrap();
  1213. assert!(send.is_ready());
  1214. }
  1215. }