spirc.rs 46 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294
  1. use std;
  2. use std::time::{SystemTime, UNIX_EPOCH};
  3. use futures::future;
  4. use futures::sync::mpsc;
  5. use futures::{Async, Future, Poll, Sink, Stream};
  6. use protobuf::{self, Message};
  7. use rand;
  8. use rand::seq::SliceRandom;
  9. use serde_json;
  10. use crate::context::StationContext;
  11. use crate::playback::mixer::Mixer;
  12. use crate::playback::player::{Player, PlayerEvent, PlayerEventChannel};
  13. use crate::protocol;
  14. use crate::protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State, TrackRef};
  15. use librespot_core::config::ConnectConfig;
  16. use librespot_core::mercury::MercuryError;
  17. use librespot_core::session::Session;
  18. use librespot_core::spotify_id::{SpotifyAudioType, SpotifyId, SpotifyIdError};
  19. use librespot_core::util::SeqGenerator;
  20. use librespot_core::version;
  21. use librespot_core::volume::Volume;
  22. enum SpircPlayStatus {
  23. Stopped,
  24. LoadingPlay {
  25. position_ms: u32,
  26. },
  27. LoadingPause {
  28. position_ms: u32,
  29. },
  30. Playing {
  31. nominal_start_time: i64,
  32. preloading_of_next_track_triggered: bool,
  33. },
  34. Paused {
  35. position_ms: u32,
  36. preloading_of_next_track_triggered: bool,
  37. },
  38. }
  39. pub struct SpircTask {
  40. player: Player,
  41. mixer: Box<dyn Mixer>,
  42. config: SpircTaskConfig,
  43. sequence: SeqGenerator<u32>,
  44. ident: String,
  45. device: DeviceState,
  46. state: State,
  47. play_request_id: Option<u64>,
  48. mixer_started: bool,
  49. play_status: SpircPlayStatus,
  50. subscription: Box<dyn Stream<Item = Frame, Error = MercuryError>>,
  51. sender: Box<dyn Sink<SinkItem = Frame, SinkError = MercuryError>>,
  52. commands: mpsc::UnboundedReceiver<SpircCommand>,
  53. player_events: PlayerEventChannel,
  54. shutdown: bool,
  55. session: Session,
  56. context_fut: Box<dyn Future<Item = serde_json::Value, Error = MercuryError>>,
  57. autoplay_fut: Box<dyn Future<Item = String, Error = MercuryError>>,
  58. context: Option<StationContext>,
  59. }
  60. pub enum SpircCommand {
  61. Play,
  62. PlayPause,
  63. Pause,
  64. Prev,
  65. Next,
  66. VolumeUp,
  67. VolumeDown,
  68. Shutdown,
  69. }
  70. struct SpircTaskConfig {
  71. linear_volume: bool,
  72. autoplay: bool,
  73. }
  74. const CONTEXT_TRACKS_HISTORY: usize = 10;
  75. const CONTEXT_FETCH_THRESHOLD: u32 = 5;
  76. pub struct Spirc {
  77. commands: mpsc::UnboundedSender<SpircCommand>,
  78. }
  79. fn initial_state() -> State {
  80. let mut frame = protocol::spirc::State::new();
  81. frame.set_repeat(false);
  82. frame.set_shuffle(false);
  83. frame.set_status(PlayStatus::kPlayStatusStop);
  84. frame.set_position_ms(0);
  85. frame.set_position_measured_at(0);
  86. frame
  87. }
  88. fn initial_device_state(config: ConnectConfig) -> DeviceState {
  89. {
  90. let mut msg = DeviceState::new();
  91. msg.set_sw_version(version::version_string());
  92. msg.set_is_active(false);
  93. msg.set_can_play(true);
  94. msg.set_volume(0);
  95. msg.set_name(config.name);
  96. {
  97. let repeated = msg.mut_capabilities();
  98. {
  99. let msg = repeated.push_default();
  100. msg.set_typ(protocol::spirc::CapabilityType::kCanBePlayer);
  101. {
  102. let repeated = msg.mut_intValue();
  103. repeated.push(1)
  104. };
  105. msg
  106. };
  107. {
  108. let msg = repeated.push_default();
  109. msg.set_typ(protocol::spirc::CapabilityType::kDeviceType);
  110. {
  111. let repeated = msg.mut_intValue();
  112. repeated.push(config.device_type as i64)
  113. };
  114. msg
  115. };
  116. {
  117. let msg = repeated.push_default();
  118. msg.set_typ(protocol::spirc::CapabilityType::kGaiaEqConnectId);
  119. {
  120. let repeated = msg.mut_intValue();
  121. repeated.push(1)
  122. };
  123. msg
  124. };
  125. {
  126. let msg = repeated.push_default();
  127. msg.set_typ(protocol::spirc::CapabilityType::kSupportsLogout);
  128. {
  129. let repeated = msg.mut_intValue();
  130. repeated.push(0)
  131. };
  132. msg
  133. };
  134. {
  135. let msg = repeated.push_default();
  136. msg.set_typ(protocol::spirc::CapabilityType::kIsObservable);
  137. {
  138. let repeated = msg.mut_intValue();
  139. repeated.push(1)
  140. };
  141. msg
  142. };
  143. {
  144. let msg = repeated.push_default();
  145. msg.set_typ(protocol::spirc::CapabilityType::kVolumeSteps);
  146. {
  147. let repeated = msg.mut_intValue();
  148. repeated.push(64)
  149. };
  150. msg
  151. };
  152. {
  153. let msg = repeated.push_default();
  154. msg.set_typ(protocol::spirc::CapabilityType::kSupportsPlaylistV2);
  155. {
  156. let repeated = msg.mut_intValue();
  157. repeated.push(64)
  158. };
  159. msg
  160. };
  161. {
  162. let msg = repeated.push_default();
  163. msg.set_typ(protocol::spirc::CapabilityType::kSupportedContexts);
  164. {
  165. let repeated = msg.mut_stringValue();
  166. repeated.push(::std::convert::Into::into("album"));
  167. repeated.push(::std::convert::Into::into("playlist"));
  168. repeated.push(::std::convert::Into::into("search"));
  169. repeated.push(::std::convert::Into::into("inbox"));
  170. repeated.push(::std::convert::Into::into("toplist"));
  171. repeated.push(::std::convert::Into::into("starred"));
  172. repeated.push(::std::convert::Into::into("publishedstarred"));
  173. repeated.push(::std::convert::Into::into("track"))
  174. };
  175. msg
  176. };
  177. {
  178. let msg = repeated.push_default();
  179. msg.set_typ(protocol::spirc::CapabilityType::kSupportedTypes);
  180. {
  181. let repeated = msg.mut_stringValue();
  182. repeated.push(::std::convert::Into::into("audio/local"));
  183. repeated.push(::std::convert::Into::into("audio/track"));
  184. repeated.push(::std::convert::Into::into("audio/episode"));
  185. repeated.push(::std::convert::Into::into("local"));
  186. repeated.push(::std::convert::Into::into("track"))
  187. };
  188. msg
  189. };
  190. };
  191. msg
  192. }
  193. }
  194. fn calc_logarithmic_volume(volume: u16) -> u16 {
  195. // Volume conversion taken from https://www.dr-lex.be/info-stuff/volumecontrols.html#ideal2
  196. // Convert the given volume [0..0xffff] to a dB gain
  197. // We assume a dB range of 60dB.
  198. // Use the equation: a * exp(b * x)
  199. // in which a = IDEAL_FACTOR, b = 1/1000
  200. const IDEAL_FACTOR: f64 = 6.908;
  201. let normalized_volume = volume as f64 / std::u16::MAX as f64; // To get a value between 0 and 1
  202. let mut val = std::u16::MAX;
  203. // Prevent val > std::u16::MAX due to rounding errors
  204. if normalized_volume < 0.999 {
  205. let new_volume = (normalized_volume * IDEAL_FACTOR).exp() / 1000.0;
  206. val = (new_volume * std::u16::MAX as f64) as u16;
  207. }
  208. debug!("input volume:{} to mixer: {}", volume, val);
  209. // return the scale factor (0..0xffff) (equivalent to a voltage multiplier).
  210. val
  211. }
  212. fn volume_to_mixer(volume: u16, linear_volume: bool) -> u16 {
  213. if linear_volume {
  214. debug!("linear volume: {}", volume);
  215. volume
  216. } else {
  217. calc_logarithmic_volume(volume)
  218. }
  219. }
  220. impl Spirc {
  221. pub fn new(
  222. config: ConnectConfig,
  223. session: Session,
  224. player: Player,
  225. mixer: Box<dyn Mixer>,
  226. ) -> (Spirc, SpircTask) {
  227. debug!("new Spirc[{}]", session.session_id());
  228. let ident = session.device_id().to_owned();
  229. // Uri updated in response to issue #288
  230. let uri = format!("hm://remote/user/{}/", session.username());
  231. let subscription = session.mercury().subscribe(&uri as &str);
  232. let subscription = subscription
  233. .map(|stream| stream.map_err(|_| MercuryError))
  234. .flatten_stream();
  235. let subscription = Box::new(subscription.map(|response| -> Frame {
  236. let data = response.payload.first().unwrap();
  237. protobuf::parse_from_bytes(data).unwrap()
  238. }));
  239. let sender = Box::new(
  240. session
  241. .mercury()
  242. .sender(uri)
  243. .with(|frame: Frame| Ok(frame.write_to_bytes().unwrap())),
  244. );
  245. let (cmd_tx, cmd_rx) = mpsc::unbounded();
  246. let volume = config.volume;
  247. let task_config = SpircTaskConfig {
  248. linear_volume: config.linear_volume,
  249. autoplay: config.autoplay,
  250. };
  251. let device = initial_device_state(config);
  252. let player_events = player.get_player_event_channel();
  253. let mut task = SpircTask {
  254. player: player,
  255. mixer: mixer,
  256. config: task_config,
  257. sequence: SeqGenerator::new(1),
  258. ident: ident,
  259. device: device,
  260. state: initial_state(),
  261. play_request_id: None,
  262. mixer_started: false,
  263. play_status: SpircPlayStatus::Stopped,
  264. subscription: subscription,
  265. sender: sender,
  266. commands: cmd_rx,
  267. player_events: player_events,
  268. shutdown: false,
  269. session: session.clone(),
  270. context_fut: Box::new(future::empty()),
  271. autoplay_fut: Box::new(future::empty()),
  272. context: None,
  273. };
  274. task.set_volume(volume);
  275. let spirc = Spirc { commands: cmd_tx };
  276. task.hello();
  277. (spirc, task)
  278. }
  279. pub fn play(&self) {
  280. let _ = self.commands.unbounded_send(SpircCommand::Play);
  281. }
  282. pub fn play_pause(&self) {
  283. let _ = self.commands.unbounded_send(SpircCommand::PlayPause);
  284. }
  285. pub fn pause(&self) {
  286. let _ = self.commands.unbounded_send(SpircCommand::Pause);
  287. }
  288. pub fn prev(&self) {
  289. let _ = self.commands.unbounded_send(SpircCommand::Prev);
  290. }
  291. pub fn next(&self) {
  292. let _ = self.commands.unbounded_send(SpircCommand::Next);
  293. }
  294. pub fn volume_up(&self) {
  295. let _ = self.commands.unbounded_send(SpircCommand::VolumeUp);
  296. }
  297. pub fn volume_down(&self) {
  298. let _ = self.commands.unbounded_send(SpircCommand::VolumeDown);
  299. }
  300. pub fn shutdown(&self) {
  301. let _ = self.commands.unbounded_send(SpircCommand::Shutdown);
  302. }
  303. }
  304. impl Future for SpircTask {
  305. type Item = ();
  306. type Error = ();
  307. fn poll(&mut self) -> Poll<(), ()> {
  308. loop {
  309. let mut progress = false;
  310. if self.session.is_invalid() {
  311. return Ok(Async::Ready(()));
  312. }
  313. if !self.shutdown {
  314. match self.subscription.poll().unwrap() {
  315. Async::Ready(Some(frame)) => {
  316. progress = true;
  317. self.handle_frame(frame);
  318. }
  319. Async::Ready(None) => {
  320. error!("subscription terminated");
  321. self.shutdown = true;
  322. self.commands.close();
  323. }
  324. Async::NotReady => (),
  325. }
  326. match self.commands.poll().unwrap() {
  327. Async::Ready(Some(command)) => {
  328. progress = true;
  329. self.handle_command(command);
  330. }
  331. Async::Ready(None) => (),
  332. Async::NotReady => (),
  333. }
  334. match self.player_events.poll() {
  335. Ok(Async::NotReady) => (),
  336. Ok(Async::Ready(None)) => (),
  337. Err(_) => (),
  338. Ok(Async::Ready(Some(event))) => {
  339. progress = true;
  340. self.handle_player_event(event);
  341. }
  342. }
  343. // TODO: Refactor
  344. match self.context_fut.poll() {
  345. Ok(Async::Ready(value)) => {
  346. let r_context = serde_json::from_value::<StationContext>(value.clone());
  347. self.context = match r_context {
  348. Ok(context) => {
  349. info!(
  350. "Resolved {:?} tracks from <{:?}>",
  351. context.tracks.len(),
  352. self.state.get_context_uri(),
  353. );
  354. Some(context)
  355. }
  356. Err(e) => {
  357. error!("Unable to parse JSONContext {:?}\n{:?}", e, value);
  358. None
  359. }
  360. };
  361. // It needn't be so verbose - can be as simple as
  362. // if let Some(ref context) = r_context {
  363. // info!("Got {:?} tracks from <{}>", context.tracks.len(), context.uri);
  364. // }
  365. // self.context = r_context;
  366. progress = true;
  367. self.context_fut = Box::new(future::empty());
  368. }
  369. Ok(Async::NotReady) => (),
  370. Err(err) => {
  371. self.context_fut = Box::new(future::empty());
  372. error!("ContextError: {:?}", err)
  373. }
  374. }
  375. match self.autoplay_fut.poll() {
  376. Ok(Async::Ready(autoplay_station_uri)) => {
  377. info!("Autoplay uri resolved to <{:?}>", autoplay_station_uri);
  378. self.context_fut = self.resolve_station(&autoplay_station_uri);
  379. progress = true;
  380. self.autoplay_fut = Box::new(future::empty());
  381. }
  382. Ok(Async::NotReady) => (),
  383. Err(err) => {
  384. self.autoplay_fut = Box::new(future::empty());
  385. error!("AutoplayError: {:?}", err)
  386. }
  387. }
  388. }
  389. let poll_sender = self.sender.poll_complete().unwrap();
  390. // Only shutdown once we've flushed out all our messages
  391. if self.shutdown && poll_sender.is_ready() {
  392. return Ok(Async::Ready(()));
  393. }
  394. if !progress {
  395. return Ok(Async::NotReady);
  396. }
  397. }
  398. }
  399. }
  400. impl SpircTask {
  401. fn now_ms(&mut self) -> i64 {
  402. let dur = match SystemTime::now().duration_since(UNIX_EPOCH) {
  403. Ok(dur) => dur,
  404. Err(err) => err.duration(),
  405. };
  406. (dur.as_secs() as i64 + self.session.time_delta()) * 1000
  407. + (dur.subsec_nanos() / 1000_000) as i64
  408. }
  409. fn ensure_mixer_started(&mut self) {
  410. if !self.mixer_started {
  411. self.mixer.start();
  412. self.mixer_started = true;
  413. }
  414. }
  415. fn ensure_mixer_stopped(&mut self) {
  416. if self.mixer_started {
  417. self.mixer.stop();
  418. self.mixer_started = false;
  419. }
  420. }
  421. fn update_state_position(&mut self, position_ms: u32) {
  422. let now = self.now_ms();
  423. self.state.set_position_measured_at(now as u64);
  424. self.state.set_position_ms(position_ms);
  425. }
  426. fn handle_command(&mut self, cmd: SpircCommand) {
  427. let active = self.device.get_is_active();
  428. match cmd {
  429. SpircCommand::Play => {
  430. if active {
  431. self.handle_play();
  432. self.notify(None, true);
  433. } else {
  434. CommandSender::new(self, MessageType::kMessageTypePlay).send();
  435. }
  436. }
  437. SpircCommand::PlayPause => {
  438. if active {
  439. self.handle_play_pause();
  440. self.notify(None, true);
  441. } else {
  442. CommandSender::new(self, MessageType::kMessageTypePlayPause).send();
  443. }
  444. }
  445. SpircCommand::Pause => {
  446. if active {
  447. self.handle_pause();
  448. self.notify(None, true);
  449. } else {
  450. CommandSender::new(self, MessageType::kMessageTypePause).send();
  451. }
  452. }
  453. SpircCommand::Prev => {
  454. if active {
  455. self.handle_prev();
  456. self.notify(None, true);
  457. } else {
  458. CommandSender::new(self, MessageType::kMessageTypePrev).send();
  459. }
  460. }
  461. SpircCommand::Next => {
  462. if active {
  463. self.handle_next();
  464. self.notify(None, true);
  465. } else {
  466. CommandSender::new(self, MessageType::kMessageTypeNext).send();
  467. }
  468. }
  469. SpircCommand::VolumeUp => {
  470. if active {
  471. self.handle_volume_up();
  472. self.notify(None, true);
  473. } else {
  474. CommandSender::new(self, MessageType::kMessageTypeVolumeUp).send();
  475. }
  476. }
  477. SpircCommand::VolumeDown => {
  478. if active {
  479. self.handle_volume_down();
  480. self.notify(None, true);
  481. } else {
  482. CommandSender::new(self, MessageType::kMessageTypeVolumeDown).send();
  483. }
  484. }
  485. SpircCommand::Shutdown => {
  486. CommandSender::new(self, MessageType::kMessageTypeGoodbye).send();
  487. self.shutdown = true;
  488. self.commands.close();
  489. }
  490. }
  491. }
  492. fn handle_player_event(&mut self, event: PlayerEvent) {
  493. // we only process events if the play_request_id matches. If it doesn't, it is
  494. // an event that belongs to a previous track and only arrives now due to a race
  495. // condition. In this case we have updated the state already and don't want to
  496. // mess with it.
  497. if let Some(play_request_id) = event.get_play_request_id() {
  498. if Some(play_request_id) == self.play_request_id {
  499. match event {
  500. PlayerEvent::EndOfTrack { .. } => self.handle_end_of_track(),
  501. PlayerEvent::Loading { .. } => self.notify(None, false),
  502. PlayerEvent::Playing { position_ms, .. } => {
  503. let new_nominal_start_time = self.now_ms() - position_ms as i64;
  504. match self.play_status {
  505. SpircPlayStatus::Playing {
  506. ref mut nominal_start_time,
  507. ..
  508. } => {
  509. if (*nominal_start_time - new_nominal_start_time).abs() > 100 {
  510. *nominal_start_time = new_nominal_start_time;
  511. self.update_state_position(position_ms);
  512. self.notify(None, true);
  513. }
  514. }
  515. SpircPlayStatus::LoadingPlay { .. }
  516. | SpircPlayStatus::LoadingPause { .. } => {
  517. self.state.set_status(PlayStatus::kPlayStatusPlay);
  518. self.update_state_position(position_ms);
  519. self.notify(None, true);
  520. self.play_status = SpircPlayStatus::Playing {
  521. nominal_start_time: new_nominal_start_time,
  522. preloading_of_next_track_triggered: false,
  523. };
  524. }
  525. _ => (),
  526. };
  527. trace!("==> kPlayStatusPlay");
  528. }
  529. PlayerEvent::Paused {
  530. position_ms: new_position_ms,
  531. ..
  532. } => {
  533. match self.play_status {
  534. SpircPlayStatus::Paused {
  535. ref mut position_ms,
  536. ..
  537. } => {
  538. if *position_ms != new_position_ms {
  539. *position_ms = new_position_ms;
  540. self.update_state_position(new_position_ms);
  541. self.notify(None, true);
  542. }
  543. }
  544. SpircPlayStatus::LoadingPlay { .. }
  545. | SpircPlayStatus::LoadingPause { .. } => {
  546. self.state.set_status(PlayStatus::kPlayStatusPause);
  547. self.update_state_position(new_position_ms);
  548. self.notify(None, true);
  549. self.play_status = SpircPlayStatus::Paused {
  550. position_ms: new_position_ms,
  551. preloading_of_next_track_triggered: false,
  552. };
  553. }
  554. _ => (),
  555. }
  556. trace!("==> kPlayStatusPause");
  557. }
  558. PlayerEvent::Stopped { .. } => match self.play_status {
  559. SpircPlayStatus::Stopped => (),
  560. _ => {
  561. warn!("The player has stopped unexpectedly.");
  562. self.state.set_status(PlayStatus::kPlayStatusStop);
  563. self.ensure_mixer_stopped();
  564. self.notify(None, true);
  565. self.play_status = SpircPlayStatus::Stopped;
  566. }
  567. },
  568. PlayerEvent::TimeToPreloadNextTrack { .. } => match self.play_status {
  569. SpircPlayStatus::Paused {
  570. ref mut preloading_of_next_track_triggered,
  571. ..
  572. }
  573. | SpircPlayStatus::Playing {
  574. ref mut preloading_of_next_track_triggered,
  575. ..
  576. } => {
  577. *preloading_of_next_track_triggered = true;
  578. if let Some(track_id) = self.preview_next_track() {
  579. self.player.preload(track_id);
  580. }
  581. }
  582. SpircPlayStatus::LoadingPause { .. }
  583. | SpircPlayStatus::LoadingPlay { .. }
  584. | SpircPlayStatus::Stopped => (),
  585. },
  586. _ => (),
  587. }
  588. }
  589. }
  590. }
  591. fn handle_frame(&mut self, frame: Frame) {
  592. let state_string = match frame.get_state().get_status() {
  593. PlayStatus::kPlayStatusLoading => "kPlayStatusLoading",
  594. PlayStatus::kPlayStatusPause => "kPlayStatusPause",
  595. PlayStatus::kPlayStatusStop => "kPlayStatusStop",
  596. PlayStatus::kPlayStatusPlay => "kPlayStatusPlay",
  597. };
  598. debug!(
  599. "{:?} {:?} {} {} {} {}",
  600. frame.get_typ(),
  601. frame.get_device_state().get_name(),
  602. frame.get_ident(),
  603. frame.get_seq_nr(),
  604. frame.get_state_update_id(),
  605. state_string,
  606. );
  607. if frame.get_ident() == self.ident
  608. || (frame.get_recipient().len() > 0 && !frame.get_recipient().contains(&self.ident))
  609. {
  610. return;
  611. }
  612. match frame.get_typ() {
  613. MessageType::kMessageTypeHello => {
  614. self.notify(Some(frame.get_ident()), true);
  615. }
  616. MessageType::kMessageTypeLoad => {
  617. if !self.device.get_is_active() {
  618. let now = self.now_ms();
  619. self.device.set_is_active(true);
  620. self.device.set_became_active_at(now);
  621. }
  622. self.update_tracks(&frame);
  623. if self.state.get_track().len() > 0 {
  624. let start_playing =
  625. frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
  626. self.load_track(start_playing, frame.get_state().get_position_ms());
  627. } else {
  628. info!("No more tracks left in queue");
  629. self.state.set_status(PlayStatus::kPlayStatusStop);
  630. self.player.stop();
  631. self.mixer.stop();
  632. self.play_status = SpircPlayStatus::Stopped;
  633. }
  634. self.notify(None, true);
  635. }
  636. MessageType::kMessageTypePlay => {
  637. self.handle_play();
  638. self.notify(None, true);
  639. }
  640. MessageType::kMessageTypePlayPause => {
  641. self.handle_play_pause();
  642. self.notify(None, true);
  643. }
  644. MessageType::kMessageTypePause => {
  645. self.handle_pause();
  646. self.notify(None, true);
  647. }
  648. MessageType::kMessageTypeNext => {
  649. self.handle_next();
  650. self.notify(None, true);
  651. }
  652. MessageType::kMessageTypePrev => {
  653. self.handle_prev();
  654. self.notify(None, true);
  655. }
  656. MessageType::kMessageTypeVolumeUp => {
  657. self.handle_volume_up();
  658. self.notify(None, true);
  659. }
  660. MessageType::kMessageTypeVolumeDown => {
  661. self.handle_volume_down();
  662. self.notify(None, true);
  663. }
  664. MessageType::kMessageTypeRepeat => {
  665. self.state.set_repeat(frame.get_state().get_repeat());
  666. self.notify(None, true);
  667. }
  668. MessageType::kMessageTypeShuffle => {
  669. self.state.set_shuffle(frame.get_state().get_shuffle());
  670. if self.state.get_shuffle() {
  671. let current_index = self.state.get_playing_track_index();
  672. {
  673. let tracks = self.state.mut_track();
  674. tracks.swap(0, current_index as usize);
  675. if let Some((_, rest)) = tracks.split_first_mut() {
  676. let mut rng = rand::thread_rng();
  677. rest.shuffle(&mut rng);
  678. }
  679. }
  680. self.state.set_playing_track_index(0);
  681. } else {
  682. let context = self.state.get_context_uri();
  683. debug!("{:?}", context);
  684. }
  685. self.notify(None, true);
  686. }
  687. MessageType::kMessageTypeSeek => {
  688. self.handle_seek(frame.get_position());
  689. self.notify(None, true);
  690. }
  691. MessageType::kMessageTypeReplace => {
  692. self.update_tracks(&frame);
  693. self.notify(None, true);
  694. if let SpircPlayStatus::Playing {
  695. preloading_of_next_track_triggered,
  696. ..
  697. }
  698. | SpircPlayStatus::Paused {
  699. preloading_of_next_track_triggered,
  700. ..
  701. } = self.play_status
  702. {
  703. if preloading_of_next_track_triggered {
  704. if let Some(track_id) = self.preview_next_track() {
  705. self.player.preload(track_id);
  706. }
  707. }
  708. }
  709. }
  710. MessageType::kMessageTypeVolume => {
  711. self.set_volume(frame.get_volume() as u16);
  712. self.notify(None, true);
  713. }
  714. MessageType::kMessageTypeNotify => {
  715. if self.device.get_is_active()
  716. && frame.get_device_state().get_is_active()
  717. && self.device.get_became_active_at()
  718. <= frame.get_device_state().get_became_active_at()
  719. {
  720. self.device.set_is_active(false);
  721. self.state.set_status(PlayStatus::kPlayStatusStop);
  722. self.player.stop();
  723. self.ensure_mixer_stopped();
  724. self.play_status = SpircPlayStatus::Stopped;
  725. }
  726. }
  727. _ => (),
  728. }
  729. }
  730. fn handle_play(&mut self) {
  731. match self.play_status {
  732. SpircPlayStatus::Paused {
  733. position_ms,
  734. preloading_of_next_track_triggered,
  735. } => {
  736. self.ensure_mixer_started();
  737. self.player.play();
  738. self.state.set_status(PlayStatus::kPlayStatusPlay);
  739. self.update_state_position(position_ms);
  740. self.play_status = SpircPlayStatus::Playing {
  741. nominal_start_time: self.now_ms() as i64 - position_ms as i64,
  742. preloading_of_next_track_triggered,
  743. };
  744. }
  745. SpircPlayStatus::LoadingPause { position_ms } => {
  746. self.ensure_mixer_started();
  747. self.player.play();
  748. self.play_status = SpircPlayStatus::LoadingPlay { position_ms };
  749. }
  750. _ => (),
  751. }
  752. }
  753. fn handle_play_pause(&mut self) {
  754. match self.play_status {
  755. SpircPlayStatus::Paused { .. } | SpircPlayStatus::LoadingPause { .. } => {
  756. self.handle_play()
  757. }
  758. SpircPlayStatus::Playing { .. } | SpircPlayStatus::LoadingPlay { .. } => {
  759. self.handle_play()
  760. }
  761. _ => (),
  762. }
  763. }
  764. fn handle_pause(&mut self) {
  765. match self.play_status {
  766. SpircPlayStatus::Playing {
  767. nominal_start_time,
  768. preloading_of_next_track_triggered,
  769. } => {
  770. self.player.pause();
  771. self.state.set_status(PlayStatus::kPlayStatusPause);
  772. let position_ms = (self.now_ms() - nominal_start_time) as u32;
  773. self.update_state_position(position_ms);
  774. self.play_status = SpircPlayStatus::Paused {
  775. position_ms,
  776. preloading_of_next_track_triggered,
  777. };
  778. }
  779. SpircPlayStatus::LoadingPlay { position_ms } => {
  780. self.player.pause();
  781. self.play_status = SpircPlayStatus::LoadingPause { position_ms };
  782. }
  783. _ => (),
  784. }
  785. }
  786. fn handle_seek(&mut self, position_ms: u32) {
  787. self.update_state_position(position_ms);
  788. self.player.seek(position_ms);
  789. let now = self.now_ms();
  790. match self.play_status {
  791. SpircPlayStatus::Stopped => (),
  792. SpircPlayStatus::LoadingPause {
  793. position_ms: ref mut position,
  794. }
  795. | SpircPlayStatus::LoadingPlay {
  796. position_ms: ref mut position,
  797. }
  798. | SpircPlayStatus::Paused {
  799. position_ms: ref mut position,
  800. ..
  801. } => *position = position_ms,
  802. SpircPlayStatus::Playing {
  803. ref mut nominal_start_time,
  804. ..
  805. } => *nominal_start_time = now - position_ms as i64,
  806. };
  807. }
  808. fn consume_queued_track(&mut self) -> usize {
  809. // Removes current track if it is queued
  810. // Returns the index of the next track
  811. let current_index = self.state.get_playing_track_index() as usize;
  812. if (current_index < self.state.get_track().len())
  813. && self.state.get_track()[current_index].get_queued()
  814. {
  815. self.state.mut_track().remove(current_index);
  816. current_index
  817. } else {
  818. current_index + 1
  819. }
  820. }
  821. fn preview_next_track(&mut self) -> Option<SpotifyId> {
  822. self.get_track_id_to_play_from_playlist(self.state.get_playing_track_index() + 1)
  823. .and_then(|(track_id, _)| Some(track_id))
  824. }
  825. fn handle_next(&mut self) {
  826. let mut new_index = self.consume_queued_track() as u32;
  827. let mut continue_playing = true;
  828. let tracks_len = self.state.get_track().len() as u32;
  829. debug!(
  830. "At track {:?} of {:?} <{:?}> update [{}]",
  831. new_index,
  832. self.state.get_track().len(),
  833. self.state.get_context_uri(),
  834. tracks_len - new_index < CONTEXT_FETCH_THRESHOLD
  835. );
  836. let context_uri = self.state.get_context_uri().to_owned();
  837. if (context_uri.starts_with("spotify:station:")
  838. || context_uri.starts_with("spotify:dailymix:")
  839. // spotify:user:xxx:collection
  840. || context_uri.starts_with(&format!("spotify:user:{}:collection",self.session.username())))
  841. && ((self.state.get_track().len() as u32) - new_index) < CONTEXT_FETCH_THRESHOLD
  842. {
  843. self.context_fut = self.resolve_station(&context_uri);
  844. self.update_tracks_from_context();
  845. }
  846. if self.config.autoplay && new_index == tracks_len - 1 {
  847. // Extend the playlist
  848. // Note: This doesn't seem to reflect in the UI
  849. // the additional tracks in the frame don't show up as with station view
  850. debug!("Extending playlist <{}>", context_uri);
  851. self.update_tracks_from_context();
  852. }
  853. if new_index >= tracks_len {
  854. new_index = 0; // Loop around back to start
  855. continue_playing = self.state.get_repeat();
  856. }
  857. if tracks_len > 0 {
  858. self.state.set_playing_track_index(new_index);
  859. self.load_track(continue_playing, 0);
  860. } else {
  861. info!("Not playing next track because there are no more tracks left in queue.");
  862. self.state.set_playing_track_index(0);
  863. self.state.set_status(PlayStatus::kPlayStatusStop);
  864. self.player.stop();
  865. self.ensure_mixer_stopped();
  866. self.play_status = SpircPlayStatus::Stopped;
  867. }
  868. }
  869. fn handle_prev(&mut self) {
  870. // Previous behaves differently based on the position
  871. // Under 3s it goes to the previous song (starts playing)
  872. // Over 3s it seeks to zero (retains previous play status)
  873. if self.position() < 3000 {
  874. // Queued tracks always follow the currently playing track.
  875. // They should not be considered when calculating the previous
  876. // track so extract them beforehand and reinsert them after it.
  877. let mut queue_tracks = Vec::new();
  878. {
  879. let queue_index = self.consume_queued_track();
  880. let tracks = self.state.mut_track();
  881. while queue_index < tracks.len() && tracks[queue_index].get_queued() {
  882. queue_tracks.push(tracks.remove(queue_index));
  883. }
  884. }
  885. let current_index = self.state.get_playing_track_index();
  886. let new_index = if current_index > 0 {
  887. current_index - 1
  888. } else if self.state.get_repeat() {
  889. self.state.get_track().len() as u32 - 1
  890. } else {
  891. 0
  892. };
  893. // Reinsert queued tracks after the new playing track.
  894. let mut pos = (new_index + 1) as usize;
  895. for track in queue_tracks.into_iter() {
  896. self.state.mut_track().insert(pos, track);
  897. pos += 1;
  898. }
  899. self.state.set_playing_track_index(new_index);
  900. self.load_track(true, 0);
  901. } else {
  902. self.handle_seek(0);
  903. }
  904. }
  905. fn handle_volume_up(&mut self) {
  906. let mut volume: u32 = self.device.get_volume() as u32 + 4096;
  907. if volume > 0xFFFF {
  908. volume = 0xFFFF;
  909. }
  910. self.set_volume(volume as u16);
  911. }
  912. fn handle_volume_down(&mut self) {
  913. let mut volume: i32 = self.device.get_volume() as i32 - 4096;
  914. if volume < 0 {
  915. volume = 0;
  916. }
  917. self.set_volume(volume as u16);
  918. }
  919. fn handle_end_of_track(&mut self) {
  920. self.handle_next();
  921. self.notify(None, true);
  922. }
  923. fn position(&mut self) -> u32 {
  924. match self.play_status {
  925. SpircPlayStatus::Stopped => 0,
  926. SpircPlayStatus::LoadingPlay { position_ms }
  927. | SpircPlayStatus::LoadingPause { position_ms }
  928. | SpircPlayStatus::Paused { position_ms, .. } => position_ms,
  929. SpircPlayStatus::Playing {
  930. nominal_start_time, ..
  931. } => (self.now_ms() - nominal_start_time) as u32,
  932. }
  933. }
  934. fn resolve_station(
  935. &self,
  936. uri: &str,
  937. ) -> Box<dyn Future<Item = serde_json::Value, Error = MercuryError>> {
  938. let radio_uri = format!("hm://radio-apollo/v3/stations/{}", uri);
  939. self.resolve_uri(&radio_uri)
  940. }
  941. fn resolve_autoplay_uri(
  942. &self,
  943. uri: &str,
  944. ) -> Box<dyn Future<Item = String, Error = MercuryError>> {
  945. let query_uri = format!("hm://autoplay-enabled/query?uri={}", uri);
  946. let request = self.session.mercury().get(query_uri);
  947. Box::new(request.and_then(move |response| {
  948. if response.status_code == 200 {
  949. let data = response
  950. .payload
  951. .first()
  952. .expect("Empty autoplay uri")
  953. .to_vec();
  954. let autoplay_uri = String::from_utf8(data).unwrap();
  955. Ok(autoplay_uri)
  956. } else {
  957. warn!("No autoplay_uri found");
  958. Err(MercuryError)
  959. }
  960. }))
  961. }
  962. fn resolve_uri(
  963. &self,
  964. uri: &str,
  965. ) -> Box<dyn Future<Item = serde_json::Value, Error = MercuryError>> {
  966. let request = self.session.mercury().get(uri);
  967. Box::new(request.and_then(move |response| {
  968. let data = response
  969. .payload
  970. .first()
  971. .expect("Empty payload on context uri");
  972. let response: serde_json::Value = serde_json::from_slice(&data).unwrap();
  973. Ok(response)
  974. }))
  975. }
  976. fn update_tracks_from_context(&mut self) {
  977. if let Some(ref context) = self.context {
  978. self.context_fut = self.resolve_uri(&context.next_page_url);
  979. let new_tracks = &context.tracks;
  980. debug!("Adding {:?} tracks from context to frame", new_tracks.len());
  981. let mut track_vec = self.state.take_track().into_vec();
  982. if let Some(head) = track_vec.len().checked_sub(CONTEXT_TRACKS_HISTORY) {
  983. track_vec.drain(0..head);
  984. }
  985. track_vec.extend_from_slice(&new_tracks);
  986. self.state
  987. .set_track(protobuf::RepeatedField::from_vec(track_vec));
  988. // Update playing index
  989. if let Some(new_index) = self
  990. .state
  991. .get_playing_track_index()
  992. .checked_sub(CONTEXT_TRACKS_HISTORY as u32)
  993. {
  994. self.state.set_playing_track_index(new_index);
  995. }
  996. } else {
  997. warn!("No context to update from!");
  998. }
  999. }
  1000. fn update_tracks(&mut self, frame: &protocol::spirc::Frame) {
  1001. debug!("State: {:?}", frame.get_state());
  1002. let index = frame.get_state().get_playing_track_index();
  1003. let context_uri = frame.get_state().get_context_uri().to_owned();
  1004. let tracks = frame.get_state().get_track();
  1005. debug!("Frame has {:?} tracks", tracks.len());
  1006. if context_uri.starts_with("spotify:station:")
  1007. || context_uri.starts_with("spotify:dailymix:")
  1008. {
  1009. self.context_fut = self.resolve_station(&context_uri);
  1010. } else if self.config.autoplay {
  1011. info!("Fetching autoplay context uri");
  1012. // Get autoplay_station_uri for regular playlists
  1013. self.autoplay_fut = self.resolve_autoplay_uri(&context_uri);
  1014. }
  1015. self.state.set_playing_track_index(index);
  1016. self.state.set_track(tracks.into_iter().cloned().collect());
  1017. self.state.set_context_uri(context_uri);
  1018. // has_shuffle/repeat seem to always be true in these replace msgs,
  1019. // but to replicate the behaviour of the Android client we have to
  1020. // ignore false values.
  1021. let state = frame.get_state();
  1022. if state.get_repeat() {
  1023. self.state.set_repeat(true);
  1024. }
  1025. if state.get_shuffle() {
  1026. self.state.set_shuffle(true);
  1027. }
  1028. }
  1029. // should this be a method of SpotifyId directly?
  1030. fn get_spotify_id_for_track(&self, track_ref: &TrackRef) -> Result<SpotifyId, SpotifyIdError> {
  1031. SpotifyId::from_raw(track_ref.get_gid()).or_else(|_| {
  1032. let uri = track_ref.get_uri();
  1033. debug!("Malformed or no gid, attempting to parse URI <{}>", uri);
  1034. SpotifyId::from_uri(uri)
  1035. })
  1036. }
  1037. fn get_track_id_to_play_from_playlist(&self, index: u32) -> Option<(SpotifyId, u32)> {
  1038. let tracks_len = self.state.get_track().len() as u32;
  1039. let mut new_playlist_index = index;
  1040. if new_playlist_index >= tracks_len {
  1041. new_playlist_index = 0;
  1042. }
  1043. let start_index = new_playlist_index;
  1044. // Cycle through all tracks, break if we don't find any playable tracks
  1045. // tracks in each frame either have a gid or uri (that may or may not be a valid track)
  1046. // E.g - context based frames sometimes contain tracks with <spotify:meta:page:>
  1047. let mut track_ref = self.state.get_track()[new_playlist_index as usize].clone();
  1048. let mut track_id = self.get_spotify_id_for_track(&track_ref);
  1049. while track_id.is_err() || track_id.unwrap().audio_type == SpotifyAudioType::NonPlayable {
  1050. warn!(
  1051. "Skipping track <{:?}> at position [{}] of {}",
  1052. track_ref.get_uri(),
  1053. new_playlist_index,
  1054. tracks_len
  1055. );
  1056. new_playlist_index += 1;
  1057. if new_playlist_index >= tracks_len {
  1058. new_playlist_index = 0;
  1059. }
  1060. if new_playlist_index == start_index {
  1061. warn!("No playable track found in state: {:?}", self.state);
  1062. return None;
  1063. }
  1064. track_ref = self.state.get_track()[index as usize].clone();
  1065. track_id = self.get_spotify_id_for_track(&track_ref);
  1066. }
  1067. match track_id {
  1068. Ok(track_id) => Some((track_id, new_playlist_index)),
  1069. Err(_) => None,
  1070. }
  1071. }
  1072. fn load_track(&mut self, start_playing: bool, position_ms: u32) {
  1073. let index = self.state.get_playing_track_index();
  1074. match self.get_track_id_to_play_from_playlist(index) {
  1075. Some((track, index)) => {
  1076. self.state.set_playing_track_index(index);
  1077. self.play_request_id = Some(self.player.load(track, start_playing, position_ms));
  1078. self.update_state_position(position_ms);
  1079. if start_playing {
  1080. self.state.set_status(PlayStatus::kPlayStatusPlay);
  1081. self.play_status = SpircPlayStatus::LoadingPlay { position_ms };
  1082. } else {
  1083. self.state.set_status(PlayStatus::kPlayStatusPause);
  1084. self.play_status = SpircPlayStatus::LoadingPause { position_ms };
  1085. }
  1086. }
  1087. None => {
  1088. self.state.set_status(PlayStatus::kPlayStatusStop);
  1089. self.player.stop();
  1090. self.ensure_mixer_stopped();
  1091. self.play_status = SpircPlayStatus::Stopped;
  1092. }
  1093. }
  1094. }
  1095. fn hello(&mut self) {
  1096. CommandSender::new(self, MessageType::kMessageTypeHello).send();
  1097. }
  1098. fn notify(&mut self, recipient: Option<&str>, suppress_loading_status: bool) {
  1099. if suppress_loading_status && (self.state.get_status() == PlayStatus::kPlayStatusLoading) {
  1100. return;
  1101. };
  1102. let status_string = match self.state.get_status() {
  1103. PlayStatus::kPlayStatusLoading => "kPlayStatusLoading",
  1104. PlayStatus::kPlayStatusPause => "kPlayStatusPause",
  1105. PlayStatus::kPlayStatusStop => "kPlayStatusStop",
  1106. PlayStatus::kPlayStatusPlay => "kPlayStatusPlay",
  1107. };
  1108. trace!("Sending status to server: [{}]", status_string);
  1109. let mut cs = CommandSender::new(self, MessageType::kMessageTypeNotify);
  1110. if let Some(s) = recipient {
  1111. cs = cs.recipient(&s);
  1112. }
  1113. cs.send();
  1114. }
  1115. fn set_volume(&mut self, volume: u16) {
  1116. self.device.set_volume(volume as u32);
  1117. self.mixer
  1118. .set_volume(volume_to_mixer(volume, self.config.linear_volume));
  1119. if let Some(cache) = self.session.cache() {
  1120. cache.save_volume(Volume { volume })
  1121. }
  1122. self.player.emit_volume_set_event(volume);
  1123. }
  1124. }
  1125. impl Drop for SpircTask {
  1126. fn drop(&mut self) {
  1127. debug!("drop Spirc[{}]", self.session.session_id());
  1128. }
  1129. }
  1130. struct CommandSender<'a> {
  1131. spirc: &'a mut SpircTask,
  1132. frame: protocol::spirc::Frame,
  1133. }
  1134. impl<'a> CommandSender<'a> {
  1135. fn new(spirc: &'a mut SpircTask, cmd: MessageType) -> CommandSender {
  1136. let mut frame = protocol::spirc::Frame::new();
  1137. frame.set_version(1);
  1138. frame.set_protocol_version(::std::convert::Into::into("2.0.0"));
  1139. frame.set_ident(spirc.ident.clone());
  1140. frame.set_seq_nr(spirc.sequence.get());
  1141. frame.set_typ(cmd);
  1142. frame.set_device_state(spirc.device.clone());
  1143. frame.set_state_update_id(spirc.now_ms());
  1144. CommandSender {
  1145. spirc: spirc,
  1146. frame: frame,
  1147. }
  1148. }
  1149. fn recipient(mut self, recipient: &'a str) -> CommandSender {
  1150. self.frame.mut_recipient().push(recipient.to_owned());
  1151. self
  1152. }
  1153. #[allow(dead_code)]
  1154. fn state(mut self, state: protocol::spirc::State) -> CommandSender<'a> {
  1155. self.frame.set_state(state);
  1156. self
  1157. }
  1158. fn send(mut self) {
  1159. if !self.frame.has_state() && self.spirc.device.get_is_active() {
  1160. self.frame.set_state(self.spirc.state.clone());
  1161. }
  1162. let send = self.spirc.sender.start_send(self.frame).unwrap();
  1163. assert!(send.is_ready());
  1164. }
  1165. }