spirc.rs 21 KB


  1. use futures::future;
  2. use futures::sync::{oneshot, mpsc};
  3. use futures::{Future, Stream, Sink, Async, Poll};
  4. use protobuf::{self, Message};
  5. use core::config::ConnectConfig;
  6. use core::mercury::MercuryError;
  7. use core::session::Session;
  8. use core::util::{now_ms, SpotifyId, SeqGenerator};
  9. use core::version;
  10. use protocol;
  11. use protocol::spirc::{PlayStatus, State, MessageType, Frame, DeviceState};
  12. use playback::mixer::Mixer;
  13. use playback::player::Player;
  14. use std;
  15. use rand;
  16. use rand::Rng;
  17. pub struct SpircTask {
  18. player: Player,
  19. mixer: Box<Mixer>,
  20. sequence: SeqGenerator<u32>,
  21. ident: String,
  22. device: DeviceState,
  23. state: State,
  24. subscription: Box<Stream<Item = Frame, Error = MercuryError>>,
  25. sender: Box<Sink<SinkItem = Frame, SinkError = MercuryError>>,
  26. commands: mpsc::UnboundedReceiver<SpircCommand>,
  27. end_of_track: Box<Future<Item = (), Error = oneshot::Canceled>>,
  28. shutdown: bool,
  29. session: Session,
  30. }
  31. pub enum SpircCommand {
  32. Play,
  33. PlayPause,
  34. Pause,
  35. Prev,
  36. Next,
  37. VolumeUp,
  38. VolumeDown,
  39. Shutdown
  40. }
  41. pub struct Spirc {
  42. commands: mpsc::UnboundedSender<SpircCommand>,
  43. }
  44. fn initial_state() -> State {
  45. protobuf_init!(protocol::spirc::State::new(), {
  46. repeat: false,
  47. shuffle: false,
  48. status: PlayStatus::kPlayStatusStop,
  49. position_ms: 0,
  50. position_measured_at: 0,
  51. })
  52. }
  53. fn initial_device_state(config: ConnectConfig, volume: u16) -> DeviceState {
  54. protobuf_init!(DeviceState::new(), {
  55. sw_version: version::version_string(),
  56. is_active: false,
  57. can_play: true,
  58. volume: volume as u32,
  59. name: config.name,
  60. capabilities => [
  61. @{
  62. typ: protocol::spirc::CapabilityType::kCanBePlayer,
  63. intValue => [1]
  64. },
  65. @{
  66. typ: protocol::spirc::CapabilityType::kDeviceType,
  67. intValue => [config.device_type as i64]
  68. },
  69. @{
  70. typ: protocol::spirc::CapabilityType::kGaiaEqConnectId,
  71. intValue => [1]
  72. },
  73. @{
  74. typ: protocol::spirc::CapabilityType::kSupportsLogout,
  75. intValue => [0]
  76. },
  77. @{
  78. typ: protocol::spirc::CapabilityType::kIsObservable,
  79. intValue => [1]
  80. },
  81. @{
  82. typ: protocol::spirc::CapabilityType::kVolumeSteps,
  83. intValue => [64]
  84. },
  85. @{
  86. typ: protocol::spirc::CapabilityType::kSupportedContexts,
  87. stringValue => [
  88. "album",
  89. "playlist",
  90. "search",
  91. "inbox",
  92. "toplist",
  93. "starred",
  94. "publishedstarred",
  95. "track",
  96. ]
  97. },
  98. @{
  99. typ: protocol::spirc::CapabilityType::kSupportedTypes,
  100. stringValue => [
  101. "audio/local",
  102. "audio/track",
  103. "local",
  104. "track",
  105. ]
  106. }
  107. ],
  108. })
  109. }
  110. fn volume_to_mixer(volume: u16) -> u16 {
  111. // Volume conversion taken from https://www.dr-lex.be/info-stuff/volumecontrols.html#ideal2
  112. // Convert the given volume [0..0xffff] to a dB gain
  113. // We assume a dB range of 60dB.
  114. // Use the equatation: a * exp(b * x)
  115. // in which a = IDEAL_FACTOR, b = 1/1000
  116. const IDEAL_FACTOR: f64 = 6.908;
  117. let normalized_volume = volume as f64 / std::u16::MAX as f64; // To get a value between 0 and 1
  118. let mut val = std::u16::MAX;
  119. // Prevent val > std::u16::MAX due to rounding errors
  120. if normalized_volume < 0.999 {
  121. let new_volume = (normalized_volume * IDEAL_FACTOR).exp() / 1000.0;
  122. val = (new_volume * std::u16::MAX as f64) as u16;
  123. }
  124. debug!("input volume:{} to mixer: {}", volume, val);
  125. // return the scale factor (0..0xffff) (equivalent to a voltage multiplier).
  126. val
  127. }
  128. impl Spirc {
  129. pub fn new(config: ConnectConfig, session: Session, player: Player, mixer: Box<Mixer>)
  130. -> (Spirc, SpircTask)
  131. {
  132. debug!("new Spirc[{}]", session.session_id());
  133. let ident = session.device_id().to_owned();
  134. let uri = format!("hm://remote/3/user/{}/", session.username());
  135. let subscription = session.mercury().subscribe(&uri as &str);
  136. let subscription = subscription.map(|stream| stream.map_err(|_| MercuryError)).flatten_stream();
  137. let subscription = Box::new(subscription.map(|response| -> Frame {
  138. let data = response.payload.first().unwrap();
  139. protobuf::parse_from_bytes(data).unwrap()
  140. }));
  141. let sender = Box::new(session.mercury().sender(uri).with(|frame: Frame| {
  142. Ok(frame.write_to_bytes().unwrap())
  143. }));
  144. let (cmd_tx, cmd_rx) = mpsc::unbounded();
  145. let volume = config.volume as u16;
  146. let device = initial_device_state(config, volume);
  147. mixer.set_volume(volume_to_mixer(volume as u16));
  148. let mut task = SpircTask {
  149. player: player,
  150. mixer: mixer,
  151. sequence: SeqGenerator::new(1),
  152. ident: ident,
  153. device: device,
  154. state: initial_state(),
  155. subscription: subscription,
  156. sender: sender,
  157. commands: cmd_rx,
  158. end_of_track: Box::new(future::empty()),
  159. shutdown: false,
  160. session: session.clone(),
  161. };
  162. let spirc = Spirc {
  163. commands: cmd_tx,
  164. };
  165. task.hello();
  166. (spirc, task)
  167. }
  168. pub fn play(&self) {
  169. let _ = self.commands.unbounded_send(SpircCommand::Play);
  170. }
  171. pub fn play_pause(&self) {
  172. let _ = self.commands.unbounded_send(SpircCommand::PlayPause);
  173. }
  174. pub fn pause(&self) {
  175. let _ = self.commands.unbounded_send(SpircCommand::Pause);
  176. }
  177. pub fn prev(&self) {
  178. let _ = self.commands.unbounded_send(SpircCommand::Prev);
  179. }
  180. pub fn next(&self) {
  181. let _ = self.commands.unbounded_send(SpircCommand::Next);
  182. }
  183. pub fn volume_up(&self) {
  184. let _ = self.commands.unbounded_send(SpircCommand::VolumeUp);
  185. }
  186. pub fn volume_down(&self) {
  187. let _ = self.commands.unbounded_send(SpircCommand::VolumeDown);
  188. }
  189. pub fn shutdown(&self) {
  190. let _ = self.commands.unbounded_send(SpircCommand::Shutdown);
  191. }
  192. }
  193. impl Future for SpircTask {
  194. type Item = ();
  195. type Error = ();
  196. fn poll(&mut self) -> Poll<(), ()> {
  197. loop {
  198. let mut progress = false;
  199. if !self.shutdown {
  200. match self.subscription.poll().unwrap() {
  201. Async::Ready(Some(frame)) => {
  202. progress = true;
  203. self.handle_frame(frame);
  204. }
  205. Async::Ready(None) => panic!("subscription terminated"),
  206. Async::NotReady => (),
  207. }
  208. match self.commands.poll().unwrap() {
  209. Async::Ready(Some(command)) => {
  210. progress = true;
  211. self.handle_command(command);
  212. }
  213. Async::Ready(None) => (),
  214. Async::NotReady => (),
  215. }
  216. match self.end_of_track.poll() {
  217. Ok(Async::Ready(())) => {
  218. progress = true;
  219. self.handle_end_of_track();
  220. }
  221. Ok(Async::NotReady) => (),
  222. Err(oneshot::Canceled) => {
  223. self.end_of_track = Box::new(future::empty())
  224. }
  225. }
  226. }
  227. let poll_sender = self.sender.poll_complete().unwrap();
  228. // Only shutdown once we've flushed out all our messages
  229. if self.shutdown && poll_sender.is_ready() {
  230. return Ok(Async::Ready(()));
  231. }
  232. if !progress {
  233. return Ok(Async::NotReady);
  234. }
  235. }
  236. }
  237. }
  238. impl SpircTask {
  239. fn handle_command(&mut self, cmd: SpircCommand) {
  240. let active = self.device.get_is_active();
  241. match cmd {
  242. SpircCommand::Play => {
  243. if active {
  244. self.handle_play();
  245. self.notify(None);
  246. } else {
  247. CommandSender::new(self, MessageType::kMessageTypePlay).send();
  248. }
  249. }
  250. SpircCommand::PlayPause => {
  251. if active {
  252. self.handle_play_pause();
  253. self.notify(None);
  254. } else {
  255. CommandSender::new(self, MessageType::kMessageTypePlayPause).send();
  256. }
  257. }
  258. SpircCommand::Pause => {
  259. if active {
  260. self.handle_pause();
  261. self.notify(None);
  262. } else {
  263. CommandSender::new(self, MessageType::kMessageTypePause).send();
  264. }
  265. }
  266. SpircCommand::Prev => {
  267. if active {
  268. self.handle_prev();
  269. self.notify(None);
  270. } else {
  271. CommandSender::new(self, MessageType::kMessageTypePrev).send();
  272. }
  273. }
  274. SpircCommand::Next => {
  275. if active {
  276. self.handle_next();
  277. self.notify(None);
  278. } else {
  279. CommandSender::new(self, MessageType::kMessageTypeNext).send();
  280. }
  281. }
  282. SpircCommand::VolumeUp => {
  283. if active {
  284. self.handle_volume_up();
  285. self.notify(None);
  286. } else {
  287. CommandSender::new(self, MessageType::kMessageTypeVolumeUp).send();
  288. }
  289. }
  290. SpircCommand::VolumeDown => {
  291. if active {
  292. self.handle_volume_down();
  293. self.notify(None);
  294. } else {
  295. CommandSender::new(self, MessageType::kMessageTypeVolumeDown).send();
  296. }
  297. }
  298. SpircCommand::Shutdown => {
  299. CommandSender::new(self, MessageType::kMessageTypeGoodbye).send();
  300. self.shutdown = true;
  301. self.commands.close();
  302. }
  303. }
  304. }
  305. fn handle_frame(&mut self, frame: Frame) {
  306. debug!("{:?} {:?} {} {} {}",
  307. frame.get_typ(),
  308. frame.get_device_state().get_name(),
  309. frame.get_ident(),
  310. frame.get_seq_nr(),
  311. frame.get_state_update_id());
  312. if frame.get_ident() == self.ident ||
  313. (frame.get_recipient().len() > 0 && !frame.get_recipient().contains(&self.ident)) {
  314. return;
  315. }
  316. match frame.get_typ() {
  317. MessageType::kMessageTypeHello => {
  318. self.notify(Some(frame.get_ident()));
  319. }
  320. MessageType::kMessageTypeLoad => {
  321. if !self.device.get_is_active() {
  322. self.device.set_is_active(true);
  323. self.device.set_became_active_at(now_ms());
  324. }
  325. self.update_tracks(&frame);
  326. if self.state.get_track().len() > 0 {
  327. self.state.set_position_ms(frame.get_state().get_position_ms());
  328. self.state.set_position_measured_at(now_ms() as u64);
  329. let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
  330. self.load_track(play);
  331. } else {
  332. self.state.set_status(PlayStatus::kPlayStatusStop);
  333. }
  334. self.notify(None);
  335. }
  336. MessageType::kMessageTypePlay => {
  337. self.handle_play();
  338. self.notify(None);
  339. }
  340. MessageType::kMessageTypePlayPause => {
  341. self.handle_play_pause();
  342. self.notify(None);
  343. }
  344. MessageType::kMessageTypePause => {
  345. self.handle_pause();
  346. self.notify(None);
  347. }
  348. MessageType::kMessageTypeNext => {
  349. self.handle_next();
  350. self.notify(None);
  351. }
  352. MessageType::kMessageTypePrev => {
  353. self.handle_prev();
  354. self.notify(None);
  355. }
  356. MessageType::kMessageTypeVolumeUp => {
  357. self.handle_volume_up();
  358. self.notify(None);
  359. }
  360. MessageType::kMessageTypeVolumeDown => {
  361. self.handle_volume_down();
  362. self.notify(None);
  363. }
  364. MessageType::kMessageTypeRepeat => {
  365. self.state.set_repeat(frame.get_state().get_repeat());
  366. self.notify(None);
  367. }
  368. MessageType::kMessageTypeShuffle => {
  369. self.state.set_shuffle(frame.get_state().get_shuffle());
  370. if self.state.get_shuffle()
  371. {
  372. let current_index = self.state.get_playing_track_index();
  373. {
  374. let tracks = self.state.mut_track();
  375. tracks.swap(0, current_index as usize);
  376. if let Some((_, rest)) = tracks.split_first_mut() {
  377. rand::thread_rng().shuffle(rest);
  378. }
  379. }
  380. self.state.set_playing_track_index(0);
  381. } else {
  382. let context = self.state.get_context_uri();
  383. debug!("{:?}", context);
  384. }
  385. self.notify(None);
  386. }
  387. MessageType::kMessageTypeSeek => {
  388. let position = frame.get_position();
  389. self.state.set_position_ms(position);
  390. self.state.set_position_measured_at(now_ms() as u64);
  391. self.player.seek(position);
  392. self.notify(None);
  393. }
  394. MessageType::kMessageTypeReplace => {
  395. self.update_tracks(&frame);
  396. self.notify(None);
  397. }
  398. MessageType::kMessageTypeVolume => {
  399. self.device.set_volume(frame.get_volume());
  400. self.mixer.set_volume(volume_to_mixer(frame.get_volume() as u16));
  401. self.notify(None);
  402. }
  403. MessageType::kMessageTypeNotify => {
  404. if self.device.get_is_active() &&
  405. frame.get_device_state().get_is_active()
  406. {
  407. self.device.set_is_active(false);
  408. self.state.set_status(PlayStatus::kPlayStatusStop);
  409. self.player.stop();
  410. self.mixer.stop();
  411. }
  412. }
  413. _ => (),
  414. }
  415. }
  416. fn handle_play(&mut self) {
  417. if self.state.get_status() == PlayStatus::kPlayStatusPause {
  418. self.mixer.start();
  419. self.player.play();
  420. self.state.set_status(PlayStatus::kPlayStatusPlay);
  421. self.state.set_position_measured_at(now_ms() as u64);
  422. }
  423. }
  424. fn handle_play_pause(&mut self) {
  425. match self.state.get_status() {
  426. PlayStatus::kPlayStatusPlay => self.handle_pause(),
  427. PlayStatus::kPlayStatusPause => self.handle_play(),
  428. _ => (),
  429. }
  430. }
  431. fn handle_pause(&mut self) {
  432. if self.state.get_status() == PlayStatus::kPlayStatusPlay {
  433. self.player.pause();
  434. self.mixer.stop();
  435. self.state.set_status(PlayStatus::kPlayStatusPause);
  436. let now = now_ms() as u64;
  437. let position = self.state.get_position_ms();
  438. let diff = now - self.state.get_position_measured_at();
  439. self.state.set_position_ms(position + diff as u32);
  440. self.state.set_position_measured_at(now);
  441. }
  442. }
  443. fn handle_next(&mut self) {
  444. let current_index = self.state.get_playing_track_index();
  445. let num_tracks = self.state.get_track().len() as u32;
  446. let new_index = (current_index + 1) % num_tracks;
  447. let mut was_last_track = (current_index + 1) >= num_tracks;
  448. if self.state.get_repeat() {
  449. was_last_track = false;
  450. }
  451. self.state.set_playing_track_index(new_index);
  452. self.state.set_position_ms(0);
  453. self.state.set_position_measured_at(now_ms() as u64);
  454. self.load_track(!was_last_track);
  455. }
  456. fn handle_prev(&mut self) {
  457. // Previous behaves differently based on the position
  458. // Under 3s it goes to the previous song
  459. // Over 3s it seeks to zero
  460. if self.position() < 3000 {
  461. let current_index = self.state.get_playing_track_index();
  462. let new_index = if current_index == 0 {
  463. self.state.get_track().len() as u32 - 1
  464. } else {
  465. current_index - 1
  466. };
  467. self.state.set_playing_track_index(new_index);
  468. self.state.set_position_ms(0);
  469. self.state.set_position_measured_at(now_ms() as u64);
  470. self.load_track(true);
  471. } else {
  472. self.state.set_position_ms(0);
  473. self.state.set_position_measured_at(now_ms() as u64);
  474. self.player.seek(0);
  475. }
  476. }
  477. fn handle_volume_up(&mut self) {
  478. let mut volume: u32 = self.device.get_volume() as u32 + 4096;
  479. if volume > 0xFFFF {
  480. volume = 0xFFFF;
  481. }
  482. self.device.set_volume(volume);
  483. self.mixer.set_volume(volume_to_mixer(volume as u16));
  484. }
  485. fn handle_volume_down(&mut self) {
  486. let mut volume: i32 = self.device.get_volume() as i32 - 4096;
  487. if volume < 0 {
  488. volume = 0;
  489. }
  490. self.device.set_volume(volume as u32);
  491. self.mixer.set_volume(volume_to_mixer(volume as u16));
  492. }
  493. fn handle_end_of_track(&mut self) {
  494. self.handle_next();
  495. self.notify(None);
  496. }
  497. fn position(&mut self) -> u32 {
  498. let diff = now_ms() as u64 - self.state.get_position_measured_at();
  499. self.state.get_position_ms() + diff as u32
  500. }
  501. fn update_tracks(&mut self, frame: &protocol::spirc::Frame) {
  502. let index = frame.get_state().get_playing_track_index();
  503. let tracks = frame.get_state().get_track();
  504. self.state.set_playing_track_index(index);
  505. self.state.set_track(tracks.into_iter().cloned().collect());
  506. }
  507. fn load_track(&mut self, play: bool) {
  508. let index = self.state.get_playing_track_index();
  509. let track = {
  510. let gid = self.state.get_track()[index as usize].get_gid();
  511. SpotifyId::from_raw(gid)
  512. };
  513. let position = self.state.get_position_ms();
  514. let end_of_track = self.player.load(track, play, position);
  515. if play {
  516. self.state.set_status(PlayStatus::kPlayStatusPlay);
  517. } else {
  518. self.state.set_status(PlayStatus::kPlayStatusPause);
  519. }
  520. self.end_of_track = Box::new(end_of_track);
  521. }
  522. fn hello(&mut self) {
  523. CommandSender::new(self, MessageType::kMessageTypeHello).send();
  524. }
  525. fn notify(&mut self, recipient: Option<&str>) {
  526. let mut cs = CommandSender::new(self, MessageType::kMessageTypeNotify);
  527. if let Some(s) = recipient {
  528. cs = cs.recipient(&s);
  529. }
  530. cs.send();
  531. }
  532. }
  533. impl Drop for SpircTask {
  534. fn drop(&mut self) {
  535. debug!("drop Spirc[{}]", self.session.session_id());
  536. }
  537. }
  538. struct CommandSender<'a> {
  539. spirc: &'a mut SpircTask,
  540. frame: protocol::spirc::Frame,
  541. }
  542. impl<'a> CommandSender<'a> {
  543. fn new(spirc: &'a mut SpircTask, cmd: MessageType) -> CommandSender {
  544. let frame = protobuf_init!(protocol::spirc::Frame::new(), {
  545. version: 1,
  546. protocol_version: "2.0.0",
  547. ident: spirc.ident.clone(),
  548. seq_nr: spirc.sequence.get(),
  549. typ: cmd,
  550. device_state: spirc.device.clone(),
  551. state_update_id: now_ms(),
  552. });
  553. CommandSender {
  554. spirc: spirc,
  555. frame: frame,
  556. }
  557. }
  558. fn recipient(mut self, recipient: &'a str) -> CommandSender {
  559. self.frame.mut_recipient().push(recipient.to_owned());
  560. self
  561. }
  562. #[allow(dead_code)]
  563. fn state(mut self, state: protocol::spirc::State) -> CommandSender<'a> {
  564. self.frame.set_state(state);
  565. self
  566. }
  567. fn send(mut self) {
  568. if !self.frame.has_state() && self.spirc.device.get_is_active() {
  569. self.frame.set_state(self.spirc.state.clone());
  570. }
  571. let send = self.spirc.sender.start_send(self.frame).unwrap();
  572. assert!(send.is_ready());
  573. }
  574. }