spirc.rs 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910
  1. use futures::future;
  2. use futures::sync::{mpsc, oneshot};
  3. use futures::{Async, Future, Poll, Sink, Stream};
  4. use protobuf::{self, Message};
  5. use core::config::ConnectConfig;
  6. use core::mercury::MercuryError;
  7. use core::session::Session;
  8. use core::spotify_id::SpotifyId;
  9. use core::util::SeqGenerator;
  10. use core::version;
  11. use core::volume::Volume;
  12. use protocol;
  13. use protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State};
  14. use playback::mixer::Mixer;
  15. use playback::player::Player;
  16. use serde_json;
  17. use context::StationContext;
  18. use rand;
  19. use rand::seq::SliceRandom;
  20. use std;
  21. use std::time::{SystemTime, UNIX_EPOCH};
  22. pub struct SpircTask {
  23. player: Player,
  24. mixer: Box<Mixer>,
  25. linear_volume: bool,
  26. sequence: SeqGenerator<u32>,
  27. ident: String,
  28. device: DeviceState,
  29. state: State,
  30. subscription: Box<Stream<Item = Frame, Error = MercuryError>>,
  31. sender: Box<Sink<SinkItem = Frame, SinkError = MercuryError>>,
  32. commands: mpsc::UnboundedReceiver<SpircCommand>,
  33. end_of_track: Box<Future<Item = (), Error = oneshot::Canceled>>,
  34. shutdown: bool,
  35. session: Session,
  36. context_fut: Box<Future<Item = serde_json::Value, Error = MercuryError>>,
  37. context: Option<StationContext>,
  38. }
  39. pub enum SpircCommand {
  40. Play,
  41. PlayPause,
  42. Pause,
  43. Prev,
  44. Next,
  45. VolumeUp,
  46. VolumeDown,
  47. Shutdown,
  48. }
  49. const CONTEXT_TRACKS_HISTORY: usize = 10;
  50. const CONTEXT_FETCH_THRESHOLD: u32 = 5;
  51. pub struct Spirc {
  52. commands: mpsc::UnboundedSender<SpircCommand>,
  53. }
  54. fn now_ms() -> i64 {
  55. let dur = match SystemTime::now().duration_since(UNIX_EPOCH) {
  56. Ok(dur) => dur,
  57. Err(err) => err.duration(),
  58. };
  59. (dur.as_secs() * 1000 + (dur.subsec_nanos() / 1000_000) as u64) as i64
  60. }
  61. fn initial_state() -> State {
  62. let mut frame = protocol::spirc::State::new();
  63. frame.set_repeat(false);
  64. frame.set_shuffle(false);
  65. frame.set_status(PlayStatus::kPlayStatusStop);
  66. frame.set_position_ms(0);
  67. frame.set_position_measured_at(0);
  68. frame
  69. }
  70. fn initial_device_state(config: ConnectConfig) -> DeviceState {
  71. {
  72. let mut msg = DeviceState::new();
  73. msg.set_sw_version(version::version_string());
  74. msg.set_is_active(false);
  75. msg.set_can_play(true);
  76. msg.set_volume(0);
  77. msg.set_name(config.name);
  78. {
  79. let repeated = msg.mut_capabilities();
  80. {
  81. let msg = repeated.push_default();
  82. msg.set_typ(protocol::spirc::CapabilityType::kCanBePlayer);
  83. {
  84. let repeated = msg.mut_intValue();
  85. repeated.push(1)
  86. };
  87. msg
  88. };
  89. {
  90. let msg = repeated.push_default();
  91. msg.set_typ(protocol::spirc::CapabilityType::kDeviceType);
  92. {
  93. let repeated = msg.mut_intValue();
  94. repeated.push(config.device_type as i64)
  95. };
  96. msg
  97. };
  98. {
  99. let msg = repeated.push_default();
  100. msg.set_typ(protocol::spirc::CapabilityType::kGaiaEqConnectId);
  101. {
  102. let repeated = msg.mut_intValue();
  103. repeated.push(1)
  104. };
  105. msg
  106. };
  107. {
  108. let msg = repeated.push_default();
  109. msg.set_typ(protocol::spirc::CapabilityType::kSupportsLogout);
  110. {
  111. let repeated = msg.mut_intValue();
  112. repeated.push(0)
  113. };
  114. msg
  115. };
  116. {
  117. let msg = repeated.push_default();
  118. msg.set_typ(protocol::spirc::CapabilityType::kIsObservable);
  119. {
  120. let repeated = msg.mut_intValue();
  121. repeated.push(1)
  122. };
  123. msg
  124. };
  125. {
  126. let msg = repeated.push_default();
  127. msg.set_typ(protocol::spirc::CapabilityType::kVolumeSteps);
  128. {
  129. let repeated = msg.mut_intValue();
  130. repeated.push(64)
  131. };
  132. msg
  133. };
  134. {
  135. let msg = repeated.push_default();
  136. msg.set_typ(protocol::spirc::CapabilityType::kSupportsPlaylistV2);
  137. {
  138. let repeated = msg.mut_intValue();
  139. repeated.push(64)
  140. };
  141. msg
  142. };
  143. {
  144. let msg = repeated.push_default();
  145. msg.set_typ(protocol::spirc::CapabilityType::kSupportedContexts);
  146. {
  147. let repeated = msg.mut_stringValue();
  148. repeated.push(::std::convert::Into::into("album"));
  149. repeated.push(::std::convert::Into::into("playlist"));
  150. repeated.push(::std::convert::Into::into("search"));
  151. repeated.push(::std::convert::Into::into("inbox"));
  152. repeated.push(::std::convert::Into::into("toplist"));
  153. repeated.push(::std::convert::Into::into("starred"));
  154. repeated.push(::std::convert::Into::into("publishedstarred"));
  155. repeated.push(::std::convert::Into::into("track"))
  156. };
  157. msg
  158. };
  159. {
  160. let msg = repeated.push_default();
  161. msg.set_typ(protocol::spirc::CapabilityType::kSupportedTypes);
  162. {
  163. let repeated = msg.mut_stringValue();
  164. repeated.push(::std::convert::Into::into("audio/local"));
  165. repeated.push(::std::convert::Into::into("audio/track"));
  166. repeated.push(::std::convert::Into::into("local"));
  167. repeated.push(::std::convert::Into::into("track"))
  168. };
  169. msg
  170. };
  171. };
  172. msg
  173. }
  174. }
  175. fn calc_logarithmic_volume(volume: u16) -> u16 {
  176. // Volume conversion taken from https://www.dr-lex.be/info-stuff/volumecontrols.html#ideal2
  177. // Convert the given volume [0..0xffff] to a dB gain
  178. // We assume a dB range of 60dB.
  179. // Use the equation: a * exp(b * x)
  180. // in which a = IDEAL_FACTOR, b = 1/1000
  181. const IDEAL_FACTOR: f64 = 6.908;
  182. let normalized_volume = volume as f64 / std::u16::MAX as f64; // To get a value between 0 and 1
  183. let mut val = std::u16::MAX;
  184. // Prevent val > std::u16::MAX due to rounding errors
  185. if normalized_volume < 0.999 {
  186. let new_volume = (normalized_volume * IDEAL_FACTOR).exp() / 1000.0;
  187. val = (new_volume * std::u16::MAX as f64) as u16;
  188. }
  189. debug!("input volume:{} to mixer: {}", volume, val);
  190. // return the scale factor (0..0xffff) (equivalent to a voltage multiplier).
  191. val
  192. }
  193. fn volume_to_mixer(volume: u16, linear_volume: bool) -> u16 {
  194. if linear_volume {
  195. debug!("linear volume: {}", volume);
  196. volume
  197. } else {
  198. calc_logarithmic_volume(volume)
  199. }
  200. }
  201. impl Spirc {
  202. pub fn new(
  203. config: ConnectConfig,
  204. session: Session,
  205. player: Player,
  206. mixer: Box<Mixer>,
  207. ) -> (Spirc, SpircTask) {
  208. debug!("new Spirc[{}]", session.session_id());
  209. let ident = session.device_id().to_owned();
  210. // Uri updated in response to issue #288
  211. let uri = format!("hm://remote/user/{}/", session.username());
  212. let subscription = session.mercury().subscribe(&uri as &str);
  213. let subscription = subscription
  214. .map(|stream| stream.map_err(|_| MercuryError))
  215. .flatten_stream();
  216. let subscription = Box::new(subscription.map(|response| -> Frame {
  217. let data = response.payload.first().unwrap();
  218. protobuf::parse_from_bytes(data).unwrap()
  219. }));
  220. let sender = Box::new(
  221. session
  222. .mercury()
  223. .sender(uri)
  224. .with(|frame: Frame| Ok(frame.write_to_bytes().unwrap())),
  225. );
  226. let (cmd_tx, cmd_rx) = mpsc::unbounded();
  227. let volume = config.volume;
  228. let linear_volume = config.linear_volume;
  229. let device = initial_device_state(config);
  230. let mut task = SpircTask {
  231. player: player,
  232. mixer: mixer,
  233. linear_volume: linear_volume,
  234. sequence: SeqGenerator::new(1),
  235. ident: ident,
  236. device: device,
  237. state: initial_state(),
  238. subscription: subscription,
  239. sender: sender,
  240. commands: cmd_rx,
  241. end_of_track: Box::new(future::empty()),
  242. shutdown: false,
  243. session: session.clone(),
  244. context_fut: Box::new(future::empty()),
  245. context: None,
  246. };
  247. task.set_volume(volume);
  248. let spirc = Spirc { commands: cmd_tx };
  249. task.hello();
  250. (spirc, task)
  251. }
  252. pub fn play(&self) {
  253. let _ = self.commands.unbounded_send(SpircCommand::Play);
  254. }
  255. pub fn play_pause(&self) {
  256. let _ = self.commands.unbounded_send(SpircCommand::PlayPause);
  257. }
  258. pub fn pause(&self) {
  259. let _ = self.commands.unbounded_send(SpircCommand::Pause);
  260. }
  261. pub fn prev(&self) {
  262. let _ = self.commands.unbounded_send(SpircCommand::Prev);
  263. }
  264. pub fn next(&self) {
  265. let _ = self.commands.unbounded_send(SpircCommand::Next);
  266. }
  267. pub fn volume_up(&self) {
  268. let _ = self.commands.unbounded_send(SpircCommand::VolumeUp);
  269. }
  270. pub fn volume_down(&self) {
  271. let _ = self.commands.unbounded_send(SpircCommand::VolumeDown);
  272. }
  273. pub fn shutdown(&self) {
  274. let _ = self.commands.unbounded_send(SpircCommand::Shutdown);
  275. }
  276. }
  277. impl Future for SpircTask {
  278. type Item = ();
  279. type Error = ();
  280. fn poll(&mut self) -> Poll<(), ()> {
  281. loop {
  282. let mut progress = false;
  283. if self.session.is_invalid() {
  284. return Ok(Async::Ready(()));
  285. }
  286. if !self.shutdown {
  287. match self.subscription.poll().unwrap() {
  288. Async::Ready(Some(frame)) => {
  289. progress = true;
  290. self.handle_frame(frame);
  291. }
  292. Async::Ready(None) => panic!("subscription terminated"),
  293. Async::NotReady => (),
  294. }
  295. match self.commands.poll().unwrap() {
  296. Async::Ready(Some(command)) => {
  297. progress = true;
  298. self.handle_command(command);
  299. }
  300. Async::Ready(None) => (),
  301. Async::NotReady => (),
  302. }
  303. match self.end_of_track.poll() {
  304. Ok(Async::Ready(())) => {
  305. progress = true;
  306. self.handle_end_of_track();
  307. }
  308. Ok(Async::NotReady) => (),
  309. Err(oneshot::Canceled) => self.end_of_track = Box::new(future::empty()),
  310. }
  311. match self.context_fut.poll() {
  312. Ok(Async::Ready(value)) => {
  313. let r_context = serde_json::from_value::<StationContext>(value.clone());
  314. self.context = match r_context {
  315. Ok(context) => {
  316. info!(
  317. "Resolved {:?} tracks from <{:?}>",
  318. context.tracks.len(),
  319. self.state.get_context_uri(),
  320. );
  321. Some(context)
  322. }
  323. Err(e) => {
  324. error!("Unable to parse JSONContext {:?}\n{:?}", e, value);
  325. None
  326. }
  327. };
  328. // It needn't be so verbose - can be as simple as
  329. // if let Some(ref context) = r_context {
  330. // info!("Got {:?} tracks from <{}>", context.tracks.len(), context.uri);
  331. // }
  332. // self.context = r_context;
  333. progress = true;
  334. self.context_fut = Box::new(future::empty());
  335. }
  336. Ok(Async::NotReady) => (),
  337. Err(err) => {
  338. self.context_fut = Box::new(future::empty());
  339. error!("ContextError: {:?}", err)
  340. }
  341. }
  342. }
  343. let poll_sender = self.sender.poll_complete().unwrap();
  344. // Only shutdown once we've flushed out all our messages
  345. if self.shutdown && poll_sender.is_ready() {
  346. return Ok(Async::Ready(()));
  347. }
  348. if !progress {
  349. return Ok(Async::NotReady);
  350. }
  351. }
  352. }
  353. }
  354. impl SpircTask {
  355. fn handle_command(&mut self, cmd: SpircCommand) {
  356. let active = self.device.get_is_active();
  357. match cmd {
  358. SpircCommand::Play => {
  359. if active {
  360. self.handle_play();
  361. self.notify(None);
  362. } else {
  363. CommandSender::new(self, MessageType::kMessageTypePlay).send();
  364. }
  365. }
  366. SpircCommand::PlayPause => {
  367. if active {
  368. self.handle_play_pause();
  369. self.notify(None);
  370. } else {
  371. CommandSender::new(self, MessageType::kMessageTypePlayPause).send();
  372. }
  373. }
  374. SpircCommand::Pause => {
  375. if active {
  376. self.handle_pause();
  377. self.notify(None);
  378. } else {
  379. CommandSender::new(self, MessageType::kMessageTypePause).send();
  380. }
  381. }
  382. SpircCommand::Prev => {
  383. if active {
  384. self.handle_prev();
  385. self.notify(None);
  386. } else {
  387. CommandSender::new(self, MessageType::kMessageTypePrev).send();
  388. }
  389. }
  390. SpircCommand::Next => {
  391. if active {
  392. self.handle_next();
  393. self.notify(None);
  394. } else {
  395. CommandSender::new(self, MessageType::kMessageTypeNext).send();
  396. }
  397. }
  398. SpircCommand::VolumeUp => {
  399. if active {
  400. self.handle_volume_up();
  401. self.notify(None);
  402. } else {
  403. CommandSender::new(self, MessageType::kMessageTypeVolumeUp).send();
  404. }
  405. }
  406. SpircCommand::VolumeDown => {
  407. if active {
  408. self.handle_volume_down();
  409. self.notify(None);
  410. } else {
  411. CommandSender::new(self, MessageType::kMessageTypeVolumeDown).send();
  412. }
  413. }
  414. SpircCommand::Shutdown => {
  415. CommandSender::new(self, MessageType::kMessageTypeGoodbye).send();
  416. self.shutdown = true;
  417. self.commands.close();
  418. }
  419. }
  420. }
  421. fn handle_frame(&mut self, frame: Frame) {
  422. debug!(
  423. "{:?} {:?} {} {} {}",
  424. frame.get_typ(),
  425. frame.get_device_state().get_name(),
  426. frame.get_ident(),
  427. frame.get_seq_nr(),
  428. frame.get_state_update_id()
  429. );
  430. if frame.get_ident() == self.ident
  431. || (frame.get_recipient().len() > 0 && !frame.get_recipient().contains(&self.ident))
  432. {
  433. return;
  434. }
  435. match frame.get_typ() {
  436. MessageType::kMessageTypeHello => {
  437. self.notify(Some(frame.get_ident()));
  438. }
  439. MessageType::kMessageTypeLoad => {
  440. if !self.device.get_is_active() {
  441. self.device.set_is_active(true);
  442. self.device.set_became_active_at(now_ms());
  443. }
  444. self.update_tracks(&frame);
  445. if self.state.get_track().len() > 0 {
  446. self.state.set_position_ms(frame.get_state().get_position_ms());
  447. self.state.set_position_measured_at(now_ms() as u64);
  448. let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
  449. self.load_track(play);
  450. } else {
  451. info!("No more tracks left in queue");
  452. self.state.set_status(PlayStatus::kPlayStatusStop);
  453. }
  454. self.notify(None);
  455. }
  456. MessageType::kMessageTypePlay => {
  457. self.handle_play();
  458. self.notify(None);
  459. }
  460. MessageType::kMessageTypePlayPause => {
  461. self.handle_play_pause();
  462. self.notify(None);
  463. }
  464. MessageType::kMessageTypePause => {
  465. self.handle_pause();
  466. self.notify(None);
  467. }
  468. MessageType::kMessageTypeNext => {
  469. self.handle_next();
  470. self.notify(None);
  471. }
  472. MessageType::kMessageTypePrev => {
  473. self.handle_prev();
  474. self.notify(None);
  475. }
  476. MessageType::kMessageTypeVolumeUp => {
  477. self.handle_volume_up();
  478. self.notify(None);
  479. }
  480. MessageType::kMessageTypeVolumeDown => {
  481. self.handle_volume_down();
  482. self.notify(None);
  483. }
  484. MessageType::kMessageTypeRepeat => {
  485. self.state.set_repeat(frame.get_state().get_repeat());
  486. self.notify(None);
  487. }
  488. MessageType::kMessageTypeShuffle => {
  489. self.state.set_shuffle(frame.get_state().get_shuffle());
  490. if self.state.get_shuffle() {
  491. let current_index = self.state.get_playing_track_index();
  492. {
  493. let tracks = self.state.mut_track();
  494. tracks.swap(0, current_index as usize);
  495. if let Some((_, rest)) = tracks.split_first_mut() {
  496. let mut rng = rand::thread_rng();
  497. rest.shuffle(&mut rng);
  498. }
  499. }
  500. self.state.set_playing_track_index(0);
  501. } else {
  502. let context = self.state.get_context_uri();
  503. debug!("{:?}", context);
  504. }
  505. self.notify(None);
  506. }
  507. MessageType::kMessageTypeSeek => {
  508. let position = frame.get_position();
  509. self.state.set_position_ms(position);
  510. self.state.set_position_measured_at(now_ms() as u64);
  511. self.player.seek(position);
  512. self.notify(None);
  513. }
  514. MessageType::kMessageTypeReplace => {
  515. self.update_tracks(&frame);
  516. self.notify(None);
  517. }
  518. MessageType::kMessageTypeVolume => {
  519. self.set_volume(frame.get_volume() as u16);
  520. self.notify(None);
  521. }
  522. MessageType::kMessageTypeNotify => {
  523. if self.device.get_is_active() && frame.get_device_state().get_is_active() {
  524. self.device.set_is_active(false);
  525. self.state.set_status(PlayStatus::kPlayStatusStop);
  526. self.player.stop();
  527. self.mixer.stop();
  528. }
  529. }
  530. _ => (),
  531. }
  532. }
  533. fn handle_play(&mut self) {
  534. if self.state.get_status() == PlayStatus::kPlayStatusPause {
  535. self.mixer.start();
  536. self.player.play();
  537. self.state.set_status(PlayStatus::kPlayStatusPlay);
  538. self.state.set_position_measured_at(now_ms() as u64);
  539. }
  540. }
  541. fn handle_play_pause(&mut self) {
  542. match self.state.get_status() {
  543. PlayStatus::kPlayStatusPlay => self.handle_pause(),
  544. PlayStatus::kPlayStatusPause => self.handle_play(),
  545. _ => (),
  546. }
  547. }
  548. fn handle_pause(&mut self) {
  549. if self.state.get_status() == PlayStatus::kPlayStatusPlay {
  550. self.player.pause();
  551. self.mixer.stop();
  552. self.state.set_status(PlayStatus::kPlayStatusPause);
  553. let now = now_ms() as u64;
  554. let position = self.state.get_position_ms();
  555. let diff = now - self.state.get_position_measured_at();
  556. self.state.set_position_ms(position + diff as u32);
  557. self.state.set_position_measured_at(now);
  558. }
  559. }
  560. fn consume_queued_track(&mut self) -> usize {
  561. // Removes current track if it is queued
  562. // Returns the index of the next track
  563. let current_index = self.state.get_playing_track_index() as usize;
  564. if self.state.get_track()[current_index].get_queued() {
  565. self.state.mut_track().remove(current_index);
  566. return current_index;
  567. }
  568. current_index + 1
  569. }
  570. fn handle_next(&mut self) {
  571. let mut new_index = self.consume_queued_track() as u32;
  572. let mut continue_playing = true;
  573. debug!(
  574. "At track {:?} of {:?} <{:?}> update [{}]",
  575. new_index,
  576. self.state.get_track().len(),
  577. self.state.get_context_uri(),
  578. self.state.get_track().len() as u32 - new_index < CONTEXT_FETCH_THRESHOLD
  579. );
  580. let context_uri = self.state.get_context_uri().to_owned();
  581. if (context_uri.starts_with("spotify:station:") || context_uri.starts_with("spotify:dailymix:"))
  582. && ((self.state.get_track().len() as u32) - new_index) < CONTEXT_FETCH_THRESHOLD
  583. {
  584. self.context_fut = self.resolve_station(&context_uri);
  585. self.update_tracks_from_context();
  586. }
  587. if new_index >= self.state.get_track().len() as u32 {
  588. new_index = 0; // Loop around back to start
  589. continue_playing = self.state.get_repeat();
  590. }
  591. self.state.set_playing_track_index(new_index);
  592. self.state.set_position_ms(0);
  593. self.state.set_position_measured_at(now_ms() as u64);
  594. self.load_track(continue_playing);
  595. }
  596. fn handle_prev(&mut self) {
  597. // Previous behaves differently based on the position
  598. // Under 3s it goes to the previous song (starts playing)
  599. // Over 3s it seeks to zero (retains previous play status)
  600. if self.position() < 3000 {
  601. // Queued tracks always follow the currently playing track.
  602. // They should not be considered when calculating the previous
  603. // track so extract them beforehand and reinsert them after it.
  604. let mut queue_tracks = Vec::new();
  605. {
  606. let queue_index = self.consume_queued_track();
  607. let tracks = self.state.mut_track();
  608. while queue_index < tracks.len() && tracks[queue_index].get_queued() {
  609. queue_tracks.push(tracks.remove(queue_index));
  610. }
  611. }
  612. let current_index = self.state.get_playing_track_index();
  613. let new_index = if current_index > 0 {
  614. current_index - 1
  615. } else if self.state.get_repeat() {
  616. self.state.get_track().len() as u32 - 1
  617. } else {
  618. 0
  619. };
  620. // Reinsert queued tracks after the new playing track.
  621. let mut pos = (new_index + 1) as usize;
  622. for track in queue_tracks.into_iter() {
  623. self.state.mut_track().insert(pos, track);
  624. pos += 1;
  625. }
  626. self.state.set_playing_track_index(new_index);
  627. self.state.set_position_ms(0);
  628. self.state.set_position_measured_at(now_ms() as u64);
  629. self.load_track(true);
  630. } else {
  631. self.state.set_position_ms(0);
  632. self.state.set_position_measured_at(now_ms() as u64);
  633. self.player.seek(0);
  634. }
  635. }
  636. fn handle_volume_up(&mut self) {
  637. let mut volume: u32 = self.device.get_volume() as u32 + 4096;
  638. if volume > 0xFFFF {
  639. volume = 0xFFFF;
  640. }
  641. self.set_volume(volume as u16);
  642. }
  643. fn handle_volume_down(&mut self) {
  644. let mut volume: i32 = self.device.get_volume() as i32 - 4096;
  645. if volume < 0 {
  646. volume = 0;
  647. }
  648. self.set_volume(volume as u16);
  649. }
  650. fn handle_end_of_track(&mut self) {
  651. self.handle_next();
  652. self.notify(None);
  653. }
  654. fn position(&mut self) -> u32 {
  655. let diff = now_ms() as u64 - self.state.get_position_measured_at();
  656. self.state.get_position_ms() + diff as u32
  657. }
  658. fn resolve_station(&self, uri: &str) -> Box<Future<Item = serde_json::Value, Error = MercuryError>> {
  659. let radio_uri = format!("hm://radio-apollo/v3/stations/{}", uri);
  660. self.resolve_uri(&radio_uri)
  661. }
  662. fn resolve_uri(&self, uri: &str) -> Box<Future<Item = serde_json::Value, Error = MercuryError>> {
  663. let request = self.session.mercury().get(uri);
  664. Box::new(request.and_then(move |response| {
  665. let data = response.payload.first().expect("Empty payload on context uri");
  666. let response: serde_json::Value = serde_json::from_slice(&data).unwrap();
  667. Ok(response)
  668. }))
  669. }
  670. fn update_tracks_from_context(&mut self) {
  671. if let Some(ref context) = self.context {
  672. self.context_fut = self.resolve_uri(&context.next_page_url);
  673. let new_tracks = &context.tracks;
  674. debug!("Adding {:?} tracks from context to playlist", new_tracks.len());
  675. let current_index = self.state.get_playing_track_index();
  676. let mut new_index = 0;
  677. {
  678. let mut tracks = self.state.mut_track();
  679. // Does this need to be optimised - we don't need to actually traverse the len of tracks
  680. let tracks_len = tracks.len();
  681. if tracks_len > CONTEXT_TRACKS_HISTORY {
  682. tracks.rotate_right(tracks_len - CONTEXT_TRACKS_HISTORY);
  683. tracks.truncate(CONTEXT_TRACKS_HISTORY);
  684. }
  685. // tracks.extend_from_slice(&mut new_tracks); // method doesn't exist for protobuf::RepeatedField
  686. for t in new_tracks {
  687. tracks.push(t.to_owned());
  688. }
  689. if current_index > CONTEXT_TRACKS_HISTORY as u32 {
  690. new_index = current_index - CONTEXT_TRACKS_HISTORY as u32;
  691. }
  692. }
  693. self.state.set_playing_track_index(new_index);
  694. }
  695. }
  696. fn update_tracks(&mut self, frame: &protocol::spirc::Frame) {
  697. let index = frame.get_state().get_playing_track_index();
  698. let context_uri = frame.get_state().get_context_uri().to_owned();
  699. let tracks = frame.get_state().get_track();
  700. debug!("Frame has {:?} tracks", tracks.len());
  701. if context_uri.starts_with("spotify:station:") || context_uri.starts_with("spotify:dailymix:") {
  702. self.context_fut = self.resolve_station(&context_uri);
  703. }
  704. self.state.set_playing_track_index(index);
  705. self.state.set_track(tracks.into_iter().cloned().collect());
  706. self.state.set_context_uri(context_uri);
  707. self.state.set_repeat(frame.get_state().get_repeat());
  708. self.state.set_shuffle(frame.get_state().get_shuffle());
  709. }
  710. fn load_track(&mut self, play: bool) {
  711. let track = {
  712. let mut index = self.state.get_playing_track_index();
  713. // Check for malformed gid
  714. let tracks_len = self.state.get_track().len() as u32;
  715. let mut track_ref = &self.state.get_track()[index as usize];
  716. while track_ref.get_gid().len() != 16 {
  717. warn!(
  718. "Skipping track {:?} at position [{}] of {}",
  719. track_ref.get_uri(),
  720. index,
  721. tracks_len
  722. );
  723. index = if index + 1 < tracks_len { index + 1 } else { 0 };
  724. track_ref = &self.state.get_track()[index as usize];
  725. }
  726. SpotifyId::from_raw(track_ref.get_gid()).unwrap()
  727. };
  728. let position = self.state.get_position_ms();
  729. let end_of_track = self.player.load(track, play, position);
  730. if play {
  731. self.state.set_status(PlayStatus::kPlayStatusPlay);
  732. } else {
  733. self.state.set_status(PlayStatus::kPlayStatusPause);
  734. }
  735. self.end_of_track = Box::new(end_of_track);
  736. }
  737. fn hello(&mut self) {
  738. CommandSender::new(self, MessageType::kMessageTypeHello).send();
  739. }
  740. fn notify(&mut self, recipient: Option<&str>) {
  741. let mut cs = CommandSender::new(self, MessageType::kMessageTypeNotify);
  742. if let Some(s) = recipient {
  743. cs = cs.recipient(&s);
  744. }
  745. cs.send();
  746. }
  747. fn set_volume(&mut self, volume: u16) {
  748. self.device.set_volume(volume as u32);
  749. self.mixer.set_volume(volume_to_mixer(volume, self.linear_volume));
  750. if let Some(cache) = self.session.cache() {
  751. cache.save_volume(Volume { volume })
  752. }
  753. }
  754. }
  755. impl Drop for SpircTask {
  756. fn drop(&mut self) {
  757. debug!("drop Spirc[{}]", self.session.session_id());
  758. }
  759. }
  760. struct CommandSender<'a> {
  761. spirc: &'a mut SpircTask,
  762. frame: protocol::spirc::Frame,
  763. }
  764. impl<'a> CommandSender<'a> {
  765. fn new(spirc: &'a mut SpircTask, cmd: MessageType) -> CommandSender {
  766. let mut frame = protocol::spirc::Frame::new();
  767. frame.set_version(1);
  768. frame.set_protocol_version(::std::convert::Into::into("2.0.0"));
  769. frame.set_ident(spirc.ident.clone());
  770. frame.set_seq_nr(spirc.sequence.get());
  771. frame.set_typ(cmd);
  772. frame.set_device_state(spirc.device.clone());
  773. frame.set_state_update_id(now_ms());
  774. CommandSender {
  775. spirc: spirc,
  776. frame: frame,
  777. }
  778. }
  779. fn recipient(mut self, recipient: &'a str) -> CommandSender {
  780. self.frame.mut_recipient().push(recipient.to_owned());
  781. self
  782. }
  783. #[allow(dead_code)]
  784. fn state(mut self, state: protocol::spirc::State) -> CommandSender<'a> {
  785. self.frame.set_state(state);
  786. self
  787. }
  788. fn send(mut self) {
  789. if !self.frame.has_state() && self.spirc.device.get_is_active() {
  790. self.frame.set_state(self.spirc.state.clone());
  791. }
  792. let send = self.spirc.sender.start_send(self.frame).unwrap();
  793. assert!(send.is_ready());
  794. }
  795. }