spirc.rs 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775
  1. use futures::future;
  2. use futures::sync::{mpsc, oneshot};
  3. use futures::{Async, Future, Poll, Sink, Stream};
  4. use protobuf::{self, Message};
  5. use core::config::ConnectConfig;
  6. use core::mercury::MercuryError;
  7. use core::session::Session;
  8. use core::spotify_id::SpotifyId;
  9. use core::util::SeqGenerator;
  10. use core::version;
  11. use protocol;
  12. use protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State};
  13. use playback::mixer::Mixer;
  14. use playback::player::Player;
  15. use rand;
  16. use rand::Rng;
  17. use std;
  18. use std::time::{SystemTime, UNIX_EPOCH};
  19. pub struct SpircTask {
  20. player: Player,
  21. mixer: Box<Mixer>,
  22. linear_volume: bool,
  23. sequence: SeqGenerator<u32>,
  24. ident: String,
  25. device: DeviceState,
  26. state: State,
  27. subscription: Box<Stream<Item = Frame, Error = MercuryError>>,
  28. sender: Box<Sink<SinkItem = Frame, SinkError = MercuryError>>,
  29. commands: mpsc::UnboundedReceiver<SpircCommand>,
  30. end_of_track: Box<Future<Item = (), Error = oneshot::Canceled>>,
  31. shutdown: bool,
  32. session: Session,
  33. }
  34. pub enum SpircCommand {
  35. Play,
  36. PlayPause,
  37. Pause,
  38. Prev,
  39. Next,
  40. VolumeUp,
  41. VolumeDown,
  42. Shutdown,
  43. }
  44. pub struct Spirc {
  45. commands: mpsc::UnboundedSender<SpircCommand>,
  46. }
  47. fn now_ms() -> i64 {
  48. let dur = match SystemTime::now().duration_since(UNIX_EPOCH) {
  49. Ok(dur) => dur,
  50. Err(err) => err.duration(),
  51. };
  52. (dur.as_secs() * 1000 + (dur.subsec_nanos() / 1000_000) as u64) as i64
  53. }
  54. fn initial_state() -> State {
  55. let mut frame = protocol::spirc::State::new();
  56. frame.set_repeat(false);
  57. frame.set_shuffle(false);
  58. frame.set_status(PlayStatus::kPlayStatusStop);
  59. frame.set_position_ms(0);
  60. frame.set_position_measured_at(0);
  61. frame
  62. }
  63. fn initial_device_state(config: ConnectConfig, volume: u16) -> DeviceState {
  64. {
  65. let mut msg = DeviceState::new();
  66. msg.set_sw_version(version::version_string());
  67. msg.set_is_active(false);
  68. msg.set_can_play(true);
  69. msg.set_volume(volume as u32);
  70. msg.set_name(config.name);
  71. {
  72. let repeated = msg.mut_capabilities();
  73. {
  74. let msg = repeated.push_default();
  75. msg.set_typ(protocol::spirc::CapabilityType::kCanBePlayer);
  76. {
  77. let repeated = msg.mut_intValue();
  78. repeated.push(1)
  79. };
  80. msg
  81. };
  82. {
  83. let msg = repeated.push_default();
  84. msg.set_typ(protocol::spirc::CapabilityType::kDeviceType);
  85. {
  86. let repeated = msg.mut_intValue();
  87. repeated.push(config.device_type as i64)
  88. };
  89. msg
  90. };
  91. {
  92. let msg = repeated.push_default();
  93. msg.set_typ(protocol::spirc::CapabilityType::kGaiaEqConnectId);
  94. {
  95. let repeated = msg.mut_intValue();
  96. repeated.push(1)
  97. };
  98. msg
  99. };
  100. {
  101. let msg = repeated.push_default();
  102. msg.set_typ(protocol::spirc::CapabilityType::kSupportsLogout);
  103. {
  104. let repeated = msg.mut_intValue();
  105. repeated.push(0)
  106. };
  107. msg
  108. };
  109. {
  110. let msg = repeated.push_default();
  111. msg.set_typ(protocol::spirc::CapabilityType::kIsObservable);
  112. {
  113. let repeated = msg.mut_intValue();
  114. repeated.push(1)
  115. };
  116. msg
  117. };
  118. {
  119. let msg = repeated.push_default();
  120. msg.set_typ(protocol::spirc::CapabilityType::kVolumeSteps);
  121. {
  122. let repeated = msg.mut_intValue();
  123. repeated.push(64)
  124. };
  125. msg
  126. };
  127. {
  128. let msg = repeated.push_default();
  129. msg.set_typ(protocol::spirc::CapabilityType::kSupportedContexts);
  130. {
  131. let repeated = msg.mut_stringValue();
  132. repeated.push(::std::convert::Into::into("album"));
  133. repeated.push(::std::convert::Into::into("playlist"));
  134. repeated.push(::std::convert::Into::into("search"));
  135. repeated.push(::std::convert::Into::into("inbox"));
  136. repeated.push(::std::convert::Into::into("toplist"));
  137. repeated.push(::std::convert::Into::into("starred"));
  138. repeated.push(::std::convert::Into::into("publishedstarred"));
  139. repeated.push(::std::convert::Into::into("track"))
  140. };
  141. msg
  142. };
  143. {
  144. let msg = repeated.push_default();
  145. msg.set_typ(protocol::spirc::CapabilityType::kSupportedTypes);
  146. {
  147. let repeated = msg.mut_stringValue();
  148. repeated.push(::std::convert::Into::into("audio/local"));
  149. repeated.push(::std::convert::Into::into("audio/track"));
  150. repeated.push(::std::convert::Into::into("local"));
  151. repeated.push(::std::convert::Into::into("track"))
  152. };
  153. msg
  154. };
  155. };
  156. msg
  157. }
  158. }
  159. fn calc_logarithmic_volume(volume: u16) -> u16 {
  160. // Volume conversion taken from https://www.dr-lex.be/info-stuff/volumecontrols.html#ideal2
  161. // Convert the given volume [0..0xffff] to a dB gain
  162. // We assume a dB range of 60dB.
  163. // Use the equatation: a * exp(b * x)
  164. // in which a = IDEAL_FACTOR, b = 1/1000
  165. const IDEAL_FACTOR: f64 = 6.908;
  166. let normalized_volume = volume as f64 / std::u16::MAX as f64; // To get a value between 0 and 1
  167. let mut val = std::u16::MAX;
  168. // Prevent val > std::u16::MAX due to rounding errors
  169. if normalized_volume < 0.999 {
  170. let new_volume = (normalized_volume * IDEAL_FACTOR).exp() / 1000.0;
  171. val = (new_volume * std::u16::MAX as f64) as u16;
  172. }
  173. debug!("input volume:{} to mixer: {}", volume, val);
  174. // return the scale factor (0..0xffff) (equivalent to a voltage multiplier).
  175. val
  176. }
  177. fn volume_to_mixer(volume: u16, linear_volume: bool) -> u16 {
  178. if linear_volume {
  179. debug!("linear volume: {}", volume);
  180. volume
  181. } else {
  182. calc_logarithmic_volume(volume)
  183. }
  184. }
  185. impl Spirc {
  186. pub fn new(
  187. config: ConnectConfig,
  188. session: Session,
  189. player: Player,
  190. mixer: Box<Mixer>,
  191. ) -> (Spirc, SpircTask) {
  192. debug!("new Spirc[{}]", session.session_id());
  193. let ident = session.device_id().to_owned();
  194. let uri = format!("hm://remote/3/user/{}/", session.username());
  195. let subscription = session.mercury().subscribe(&uri as &str);
  196. let subscription = subscription
  197. .map(|stream| stream.map_err(|_| MercuryError))
  198. .flatten_stream();
  199. let subscription = Box::new(subscription.map(|response| -> Frame {
  200. let data = response.payload.first().unwrap();
  201. protobuf::parse_from_bytes(data).unwrap()
  202. }));
  203. let sender = Box::new(
  204. session
  205. .mercury()
  206. .sender(uri)
  207. .with(|frame: Frame| Ok(frame.write_to_bytes().unwrap())),
  208. );
  209. let (cmd_tx, cmd_rx) = mpsc::unbounded();
  210. let volume = config.volume as u16;
  211. let linear_volume = config.linear_volume;
  212. let device = initial_device_state(config, volume);
  213. mixer.set_volume(volume_to_mixer(volume as u16, linear_volume));
  214. let mut task = SpircTask {
  215. player: player,
  216. mixer: mixer,
  217. linear_volume: linear_volume,
  218. sequence: SeqGenerator::new(1),
  219. ident: ident,
  220. device: device,
  221. state: initial_state(),
  222. subscription: subscription,
  223. sender: sender,
  224. commands: cmd_rx,
  225. end_of_track: Box::new(future::empty()),
  226. shutdown: false,
  227. session: session.clone(),
  228. };
  229. let spirc = Spirc { commands: cmd_tx };
  230. task.hello();
  231. (spirc, task)
  232. }
  233. pub fn play(&self) {
  234. let _ = self.commands.unbounded_send(SpircCommand::Play);
  235. }
  236. pub fn play_pause(&self) {
  237. let _ = self.commands.unbounded_send(SpircCommand::PlayPause);
  238. }
  239. pub fn pause(&self) {
  240. let _ = self.commands.unbounded_send(SpircCommand::Pause);
  241. }
  242. pub fn prev(&self) {
  243. let _ = self.commands.unbounded_send(SpircCommand::Prev);
  244. }
  245. pub fn next(&self) {
  246. let _ = self.commands.unbounded_send(SpircCommand::Next);
  247. }
  248. pub fn volume_up(&self) {
  249. let _ = self.commands.unbounded_send(SpircCommand::VolumeUp);
  250. }
  251. pub fn volume_down(&self) {
  252. let _ = self.commands.unbounded_send(SpircCommand::VolumeDown);
  253. }
  254. pub fn shutdown(&self) {
  255. let _ = self.commands.unbounded_send(SpircCommand::Shutdown);
  256. }
  257. }
  258. impl Future for SpircTask {
  259. type Item = ();
  260. type Error = ();
  261. fn poll(&mut self) -> Poll<(), ()> {
  262. loop {
  263. let mut progress = false;
  264. if self.session.is_invalid() {
  265. return Ok(Async::Ready(()));
  266. }
  267. if !self.shutdown {
  268. match self.subscription.poll().unwrap() {
  269. Async::Ready(Some(frame)) => {
  270. progress = true;
  271. self.handle_frame(frame);
  272. }
  273. Async::Ready(None) => panic!("subscription terminated"),
  274. Async::NotReady => (),
  275. }
  276. match self.commands.poll().unwrap() {
  277. Async::Ready(Some(command)) => {
  278. progress = true;
  279. self.handle_command(command);
  280. }
  281. Async::Ready(None) => (),
  282. Async::NotReady => (),
  283. }
  284. match self.end_of_track.poll() {
  285. Ok(Async::Ready(())) => {
  286. progress = true;
  287. self.handle_end_of_track();
  288. }
  289. Ok(Async::NotReady) => (),
  290. Err(oneshot::Canceled) => self.end_of_track = Box::new(future::empty()),
  291. }
  292. }
  293. let poll_sender = self.sender.poll_complete().unwrap();
  294. // Only shutdown once we've flushed out all our messages
  295. if self.shutdown && poll_sender.is_ready() {
  296. return Ok(Async::Ready(()));
  297. }
  298. if !progress {
  299. return Ok(Async::NotReady);
  300. }
  301. }
  302. }
  303. }
  304. impl SpircTask {
  305. fn handle_command(&mut self, cmd: SpircCommand) {
  306. let active = self.device.get_is_active();
  307. match cmd {
  308. SpircCommand::Play => {
  309. if active {
  310. self.handle_play();
  311. self.notify(None);
  312. } else {
  313. CommandSender::new(self, MessageType::kMessageTypePlay).send();
  314. }
  315. }
  316. SpircCommand::PlayPause => {
  317. if active {
  318. self.handle_play_pause();
  319. self.notify(None);
  320. } else {
  321. CommandSender::new(self, MessageType::kMessageTypePlayPause).send();
  322. }
  323. }
  324. SpircCommand::Pause => {
  325. if active {
  326. self.handle_pause();
  327. self.notify(None);
  328. } else {
  329. CommandSender::new(self, MessageType::kMessageTypePause).send();
  330. }
  331. }
  332. SpircCommand::Prev => {
  333. if active {
  334. self.handle_prev();
  335. self.notify(None);
  336. } else {
  337. CommandSender::new(self, MessageType::kMessageTypePrev).send();
  338. }
  339. }
  340. SpircCommand::Next => {
  341. if active {
  342. self.handle_next();
  343. self.notify(None);
  344. } else {
  345. CommandSender::new(self, MessageType::kMessageTypeNext).send();
  346. }
  347. }
  348. SpircCommand::VolumeUp => {
  349. if active {
  350. self.handle_volume_up();
  351. self.notify(None);
  352. } else {
  353. CommandSender::new(self, MessageType::kMessageTypeVolumeUp).send();
  354. }
  355. }
  356. SpircCommand::VolumeDown => {
  357. if active {
  358. self.handle_volume_down();
  359. self.notify(None);
  360. } else {
  361. CommandSender::new(self, MessageType::kMessageTypeVolumeDown).send();
  362. }
  363. }
  364. SpircCommand::Shutdown => {
  365. CommandSender::new(self, MessageType::kMessageTypeGoodbye).send();
  366. self.shutdown = true;
  367. self.commands.close();
  368. }
  369. }
  370. }
  371. fn handle_frame(&mut self, frame: Frame) {
  372. debug!(
  373. "{:?} {:?} {} {} {}",
  374. frame.get_typ(),
  375. frame.get_device_state().get_name(),
  376. frame.get_ident(),
  377. frame.get_seq_nr(),
  378. frame.get_state_update_id()
  379. );
  380. if frame.get_ident() == self.ident
  381. || (frame.get_recipient().len() > 0 && !frame.get_recipient().contains(&self.ident))
  382. {
  383. return;
  384. }
  385. match frame.get_typ() {
  386. MessageType::kMessageTypeHello => {
  387. self.notify(Some(frame.get_ident()));
  388. }
  389. MessageType::kMessageTypeLoad => {
  390. if !self.device.get_is_active() {
  391. self.device.set_is_active(true);
  392. self.device.set_became_active_at(now_ms());
  393. }
  394. self.update_tracks(&frame);
  395. if self.state.get_track().len() > 0 {
  396. self.state.set_position_ms(frame.get_state().get_position_ms());
  397. self.state.set_position_measured_at(now_ms() as u64);
  398. let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
  399. self.load_track(play);
  400. } else {
  401. self.state.set_status(PlayStatus::kPlayStatusStop);
  402. }
  403. self.notify(None);
  404. }
  405. MessageType::kMessageTypePlay => {
  406. self.handle_play();
  407. self.notify(None);
  408. }
  409. MessageType::kMessageTypePlayPause => {
  410. self.handle_play_pause();
  411. self.notify(None);
  412. }
  413. MessageType::kMessageTypePause => {
  414. self.handle_pause();
  415. self.notify(None);
  416. }
  417. MessageType::kMessageTypeNext => {
  418. self.handle_next();
  419. self.notify(None);
  420. }
  421. MessageType::kMessageTypePrev => {
  422. self.handle_prev();
  423. self.notify(None);
  424. }
  425. MessageType::kMessageTypeVolumeUp => {
  426. self.handle_volume_up();
  427. self.notify(None);
  428. }
  429. MessageType::kMessageTypeVolumeDown => {
  430. self.handle_volume_down();
  431. self.notify(None);
  432. }
  433. MessageType::kMessageTypeRepeat => {
  434. self.state.set_repeat(frame.get_state().get_repeat());
  435. self.notify(None);
  436. }
  437. MessageType::kMessageTypeShuffle => {
  438. self.state.set_shuffle(frame.get_state().get_shuffle());
  439. if self.state.get_shuffle() {
  440. let current_index = self.state.get_playing_track_index();
  441. {
  442. let tracks = self.state.mut_track();
  443. tracks.swap(0, current_index as usize);
  444. if let Some((_, rest)) = tracks.split_first_mut() {
  445. rand::thread_rng().shuffle(rest);
  446. }
  447. }
  448. self.state.set_playing_track_index(0);
  449. } else {
  450. let context = self.state.get_context_uri();
  451. debug!("{:?}", context);
  452. }
  453. self.notify(None);
  454. }
  455. MessageType::kMessageTypeSeek => {
  456. let position = frame.get_position();
  457. self.state.set_position_ms(position);
  458. self.state.set_position_measured_at(now_ms() as u64);
  459. self.player.seek(position);
  460. self.notify(None);
  461. }
  462. MessageType::kMessageTypeReplace => {
  463. self.update_tracks(&frame);
  464. self.notify(None);
  465. }
  466. MessageType::kMessageTypeVolume => {
  467. self.device.set_volume(frame.get_volume());
  468. self.mixer
  469. .set_volume(volume_to_mixer(frame.get_volume() as u16, self.linear_volume));
  470. self.notify(None);
  471. }
  472. MessageType::kMessageTypeNotify => {
  473. if self.device.get_is_active() && frame.get_device_state().get_is_active() {
  474. self.device.set_is_active(false);
  475. self.state.set_status(PlayStatus::kPlayStatusStop);
  476. self.player.stop();
  477. self.mixer.stop();
  478. }
  479. }
  480. _ => (),
  481. }
  482. }
  483. fn handle_play(&mut self) {
  484. if self.state.get_status() == PlayStatus::kPlayStatusPause {
  485. self.mixer.start();
  486. self.player.play();
  487. self.state.set_status(PlayStatus::kPlayStatusPlay);
  488. self.state.set_position_measured_at(now_ms() as u64);
  489. }
  490. }
  491. fn handle_play_pause(&mut self) {
  492. match self.state.get_status() {
  493. PlayStatus::kPlayStatusPlay => self.handle_pause(),
  494. PlayStatus::kPlayStatusPause => self.handle_play(),
  495. _ => (),
  496. }
  497. }
  498. fn handle_pause(&mut self) {
  499. if self.state.get_status() == PlayStatus::kPlayStatusPlay {
  500. self.player.pause();
  501. self.mixer.stop();
  502. self.state.set_status(PlayStatus::kPlayStatusPause);
  503. let now = now_ms() as u64;
  504. let position = self.state.get_position_ms();
  505. let diff = now - self.state.get_position_measured_at();
  506. self.state.set_position_ms(position + diff as u32);
  507. self.state.set_position_measured_at(now);
  508. }
  509. }
  510. fn consume_queued_track(&mut self) -> usize {
  511. // Removes current track if it is queued
  512. // Returns the index of the next track
  513. let current_index = self.state.get_playing_track_index() as usize;
  514. if self.state.get_track()[current_index].get_queued() {
  515. self.state.mut_track().remove(current_index);
  516. return current_index;
  517. }
  518. current_index + 1
  519. }
  520. fn handle_next(&mut self) {
  521. let mut new_index = self.consume_queued_track() as u32;
  522. let mut continue_playing = true;
  523. if new_index >= self.state.get_track().len() as u32 {
  524. new_index = 0; // Loop around back to start
  525. continue_playing = self.state.get_repeat();
  526. }
  527. self.state.set_playing_track_index(new_index);
  528. self.state.set_position_ms(0);
  529. self.state.set_position_measured_at(now_ms() as u64);
  530. self.load_track(continue_playing);
  531. }
  532. fn handle_prev(&mut self) {
  533. // Previous behaves differently based on the position
  534. // Under 3s it goes to the previous song (starts playing)
  535. // Over 3s it seeks to zero (retains previous play status)
  536. if self.position() < 3000 {
  537. // Queued tracks always follow the currently playing track.
  538. // They should not be considered when calculating the previous
  539. // track so extract them beforehand and reinsert them after it.
  540. let mut queue_tracks = Vec::new();
  541. {
  542. let queue_index = self.consume_queued_track();
  543. let tracks = self.state.mut_track();
  544. while queue_index < tracks.len() && tracks[queue_index].get_queued() {
  545. queue_tracks.push(tracks.remove(queue_index));
  546. }
  547. }
  548. let current_index = self.state.get_playing_track_index();
  549. let new_index = if current_index > 0 {
  550. current_index - 1
  551. } else if self.state.get_repeat() {
  552. self.state.get_track().len() as u32 - 1
  553. } else {
  554. 0
  555. };
  556. // Reinsert queued tracks after the new playing track.
  557. let mut pos = (new_index + 1) as usize;
  558. for track in queue_tracks.into_iter() {
  559. self.state.mut_track().insert(pos, track);
  560. pos += 1;
  561. }
  562. self.state.set_playing_track_index(new_index);
  563. self.state.set_position_ms(0);
  564. self.state.set_position_measured_at(now_ms() as u64);
  565. self.load_track(true);
  566. } else {
  567. self.state.set_position_ms(0);
  568. self.state.set_position_measured_at(now_ms() as u64);
  569. self.player.seek(0);
  570. }
  571. }
  572. fn handle_volume_up(&mut self) {
  573. let mut volume: u32 = self.device.get_volume() as u32 + 4096;
  574. if volume > 0xFFFF {
  575. volume = 0xFFFF;
  576. }
  577. self.device.set_volume(volume);
  578. self.mixer
  579. .set_volume(volume_to_mixer(volume as u16, self.linear_volume));
  580. }
  581. fn handle_volume_down(&mut self) {
  582. let mut volume: i32 = self.device.get_volume() as i32 - 4096;
  583. if volume < 0 {
  584. volume = 0;
  585. }
  586. self.device.set_volume(volume as u32);
  587. self.mixer
  588. .set_volume(volume_to_mixer(volume as u16, self.linear_volume));
  589. }
  590. fn handle_end_of_track(&mut self) {
  591. self.handle_next();
  592. self.notify(None);
  593. }
  594. fn position(&mut self) -> u32 {
  595. let diff = now_ms() as u64 - self.state.get_position_measured_at();
  596. self.state.get_position_ms() + diff as u32
  597. }
  598. fn update_tracks(&mut self, frame: &protocol::spirc::Frame) {
  599. let index = frame.get_state().get_playing_track_index();
  600. let tracks = frame.get_state().get_track();
  601. let context_uri = frame.get_state().get_context_uri().to_owned();
  602. self.state.set_playing_track_index(index);
  603. self.state.set_track(tracks.into_iter().cloned().collect());
  604. self.state.set_context_uri(context_uri);
  605. self.state.set_repeat(frame.get_state().get_repeat());
  606. self.state.set_shuffle(frame.get_state().get_shuffle());
  607. }
  608. fn load_track(&mut self, play: bool) {
  609. let index = self.state.get_playing_track_index();
  610. let track = {
  611. let gid = self.state.get_track()[index as usize].get_gid();
  612. SpotifyId::from_raw(gid).unwrap()
  613. };
  614. let position = self.state.get_position_ms();
  615. let end_of_track = self.player.load(track, play, position);
  616. if play {
  617. self.state.set_status(PlayStatus::kPlayStatusPlay);
  618. } else {
  619. self.state.set_status(PlayStatus::kPlayStatusPause);
  620. }
  621. self.end_of_track = Box::new(end_of_track);
  622. }
  623. fn hello(&mut self) {
  624. CommandSender::new(self, MessageType::kMessageTypeHello).send();
  625. }
  626. fn notify(&mut self, recipient: Option<&str>) {
  627. let mut cs = CommandSender::new(self, MessageType::kMessageTypeNotify);
  628. if let Some(s) = recipient {
  629. cs = cs.recipient(&s);
  630. }
  631. cs.send();
  632. }
  633. }
  634. impl Drop for SpircTask {
  635. fn drop(&mut self) {
  636. debug!("drop Spirc[{}]", self.session.session_id());
  637. }
  638. }
  639. struct CommandSender<'a> {
  640. spirc: &'a mut SpircTask,
  641. frame: protocol::spirc::Frame,
  642. }
  643. impl<'a> CommandSender<'a> {
  644. fn new(spirc: &'a mut SpircTask, cmd: MessageType) -> CommandSender {
  645. let mut frame = protocol::spirc::Frame::new();
  646. frame.set_version(1);
  647. frame.set_protocol_version(::std::convert::Into::into("2.0.0"));
  648. frame.set_ident(spirc.ident.clone());
  649. frame.set_seq_nr(spirc.sequence.get());
  650. frame.set_typ(cmd);
  651. frame.set_device_state(spirc.device.clone());
  652. frame.set_state_update_id(now_ms());
  653. CommandSender {
  654. spirc: spirc,
  655. frame: frame,
  656. }
  657. }
  658. fn recipient(mut self, recipient: &'a str) -> CommandSender {
  659. self.frame.mut_recipient().push(recipient.to_owned());
  660. self
  661. }
  662. #[allow(dead_code)]
  663. fn state(mut self, state: protocol::spirc::State) -> CommandSender<'a> {
  664. self.frame.set_state(state);
  665. self
  666. }
  667. fn send(mut self) {
  668. if !self.frame.has_state() && self.spirc.device.get_is_active() {
  669. self.frame.set_state(self.spirc.state.clone());
  670. }
  671. let send = self.spirc.sender.start_send(self.frame).unwrap();
  672. assert!(send.is_ready());
  673. }
  674. }