spirc.rs 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757
  1. use futures::{Async, Future, Poll, Sink, Stream};
  2. use futures::future;
  3. use futures::sync::{mpsc, oneshot};
  4. use protobuf::{self, Message};
  5. use core::config::ConnectConfig;
  6. use core::mercury::MercuryError;
  7. use core::session::Session;
  8. use core::spotify_id::SpotifyId;
  9. use core::util::SeqGenerator;
  10. use core::version;
  11. use protocol;
  12. use protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State};
  13. use playback::mixer::Mixer;
  14. use playback::player::Player;
  15. use rand;
  16. use rand::Rng;
  17. use std;
  18. use std::time::{SystemTime, UNIX_EPOCH};
  19. pub struct SpircTask {
  20. player: Player,
  21. mixer: Box<Mixer>,
  22. sequence: SeqGenerator<u32>,
  23. ident: String,
  24. device: DeviceState,
  25. state: State,
  26. subscription: Box<Stream<Item = Frame, Error = MercuryError>>,
  27. sender: Box<Sink<SinkItem = Frame, SinkError = MercuryError>>,
  28. commands: mpsc::UnboundedReceiver<SpircCommand>,
  29. end_of_track: Box<Future<Item = (), Error = oneshot::Canceled>>,
  30. shutdown: bool,
  31. session: Session,
  32. }
  33. pub enum SpircCommand {
  34. Play,
  35. PlayPause,
  36. Pause,
  37. Prev,
  38. Next,
  39. VolumeUp,
  40. VolumeDown,
  41. Shutdown,
  42. }
  43. pub struct Spirc {
  44. commands: mpsc::UnboundedSender<SpircCommand>,
  45. }
  46. fn now_ms() -> i64 {
  47. let dur = match SystemTime::now().duration_since(UNIX_EPOCH) {
  48. Ok(dur) => dur,
  49. Err(err) => err.duration(),
  50. };
  51. (dur.as_secs() * 1000 + (dur.subsec_nanos() / 1000_000) as u64) as i64
  52. }
  53. fn initial_state() -> State {
  54. let mut frame = protocol::spirc::State::new();
  55. frame.set_repeat(false);
  56. frame.set_shuffle(false);
  57. frame.set_status(PlayStatus::kPlayStatusStop);
  58. frame.set_position_ms(0);
  59. frame.set_position_measured_at(0);
  60. frame
  61. }
  62. fn initial_device_state(config: ConnectConfig, volume: u16) -> DeviceState {
  63. {
  64. let mut msg = DeviceState::new();
  65. msg.set_sw_version(version::version_string());
  66. msg.set_is_active(false);
  67. msg.set_can_play(true);
  68. msg.set_volume(volume as u32);
  69. msg.set_name(config.name);
  70. {
  71. let repeated = msg.mut_capabilities();
  72. {
  73. let msg = repeated.push_default();
  74. msg.set_typ(protocol::spirc::CapabilityType::kCanBePlayer);
  75. {
  76. let repeated = msg.mut_intValue();
  77. repeated.push(1)
  78. };
  79. msg
  80. };
  81. {
  82. let msg = repeated.push_default();
  83. msg.set_typ(protocol::spirc::CapabilityType::kDeviceType);
  84. {
  85. let repeated = msg.mut_intValue();
  86. repeated.push(config.device_type as i64)
  87. };
  88. msg
  89. };
  90. {
  91. let msg = repeated.push_default();
  92. msg.set_typ(protocol::spirc::CapabilityType::kGaiaEqConnectId);
  93. {
  94. let repeated = msg.mut_intValue();
  95. repeated.push(1)
  96. };
  97. msg
  98. };
  99. {
  100. let msg = repeated.push_default();
  101. msg.set_typ(protocol::spirc::CapabilityType::kSupportsLogout);
  102. {
  103. let repeated = msg.mut_intValue();
  104. repeated.push(0)
  105. };
  106. msg
  107. };
  108. {
  109. let msg = repeated.push_default();
  110. msg.set_typ(protocol::spirc::CapabilityType::kIsObservable);
  111. {
  112. let repeated = msg.mut_intValue();
  113. repeated.push(1)
  114. };
  115. msg
  116. };
  117. {
  118. let msg = repeated.push_default();
  119. msg.set_typ(protocol::spirc::CapabilityType::kVolumeSteps);
  120. {
  121. let repeated = msg.mut_intValue();
  122. repeated.push(64)
  123. };
  124. msg
  125. };
  126. {
  127. let msg = repeated.push_default();
  128. msg.set_typ(protocol::spirc::CapabilityType::kSupportedContexts);
  129. {
  130. let repeated = msg.mut_stringValue();
  131. repeated.push(::std::convert::Into::into("album"));
  132. repeated.push(::std::convert::Into::into("playlist"));
  133. repeated.push(::std::convert::Into::into("search"));
  134. repeated.push(::std::convert::Into::into("inbox"));
  135. repeated.push(::std::convert::Into::into("toplist"));
  136. repeated.push(::std::convert::Into::into("starred"));
  137. repeated.push(::std::convert::Into::into("publishedstarred"));
  138. repeated.push(::std::convert::Into::into("track"))
  139. };
  140. msg
  141. };
  142. {
  143. let msg = repeated.push_default();
  144. msg.set_typ(protocol::spirc::CapabilityType::kSupportedTypes);
  145. {
  146. let repeated = msg.mut_stringValue();
  147. repeated.push(::std::convert::Into::into("audio/local"));
  148. repeated.push(::std::convert::Into::into("audio/track"));
  149. repeated.push(::std::convert::Into::into("local"));
  150. repeated.push(::std::convert::Into::into("track"))
  151. };
  152. msg
  153. };
  154. };
  155. msg
  156. }
  157. }
  158. fn volume_to_mixer(volume: u16) -> u16 {
  159. // Volume conversion taken from https://www.dr-lex.be/info-stuff/volumecontrols.html#ideal2
  160. // Convert the given volume [0..0xffff] to a dB gain
  161. // We assume a dB range of 60dB.
  162. // Use the equatation: a * exp(b * x)
  163. // in which a = IDEAL_FACTOR, b = 1/1000
  164. const IDEAL_FACTOR: f64 = 6.908;
  165. let normalized_volume = volume as f64 / std::u16::MAX as f64; // To get a value between 0 and 1
  166. let mut val = std::u16::MAX;
  167. // Prevent val > std::u16::MAX due to rounding errors
  168. if normalized_volume < 0.999 {
  169. let new_volume = (normalized_volume * IDEAL_FACTOR).exp() / 1000.0;
  170. val = (new_volume * std::u16::MAX as f64) as u16;
  171. }
  172. debug!("input volume:{} to mixer: {}", volume, val);
  173. // return the scale factor (0..0xffff) (equivalent to a voltage multiplier).
  174. val
  175. }
  176. impl Spirc {
  177. pub fn new(
  178. config: ConnectConfig,
  179. session: Session,
  180. player: Player,
  181. mixer: Box<Mixer>,
  182. ) -> (Spirc, SpircTask) {
  183. debug!("new Spirc[{}]", session.session_id());
  184. let ident = session.device_id().to_owned();
  185. let uri = format!("hm://remote/3/user/{}/", session.username());
  186. let subscription = session.mercury().subscribe(&uri as &str);
  187. let subscription = subscription
  188. .map(|stream| stream.map_err(|_| MercuryError))
  189. .flatten_stream();
  190. let subscription = Box::new(subscription.map(|response| -> Frame {
  191. let data = response.payload.first().unwrap();
  192. protobuf::parse_from_bytes(data).unwrap()
  193. }));
  194. let sender = Box::new(
  195. session
  196. .mercury()
  197. .sender(uri)
  198. .with(|frame: Frame| Ok(frame.write_to_bytes().unwrap())),
  199. );
  200. let (cmd_tx, cmd_rx) = mpsc::unbounded();
  201. let volume = config.volume as u16;
  202. let device = initial_device_state(config, volume);
  203. mixer.set_volume(volume_to_mixer(volume as u16));
  204. let mut task = SpircTask {
  205. player: player,
  206. mixer: mixer,
  207. sequence: SeqGenerator::new(1),
  208. ident: ident,
  209. device: device,
  210. state: initial_state(),
  211. subscription: subscription,
  212. sender: sender,
  213. commands: cmd_rx,
  214. end_of_track: Box::new(future::empty()),
  215. shutdown: false,
  216. session: session.clone(),
  217. };
  218. let spirc = Spirc { commands: cmd_tx };
  219. task.hello();
  220. (spirc, task)
  221. }
  222. pub fn play(&self) {
  223. let _ = self.commands.unbounded_send(SpircCommand::Play);
  224. }
  225. pub fn play_pause(&self) {
  226. let _ = self.commands.unbounded_send(SpircCommand::PlayPause);
  227. }
  228. pub fn pause(&self) {
  229. let _ = self.commands.unbounded_send(SpircCommand::Pause);
  230. }
  231. pub fn prev(&self) {
  232. let _ = self.commands.unbounded_send(SpircCommand::Prev);
  233. }
  234. pub fn next(&self) {
  235. let _ = self.commands.unbounded_send(SpircCommand::Next);
  236. }
  237. pub fn volume_up(&self) {
  238. let _ = self.commands.unbounded_send(SpircCommand::VolumeUp);
  239. }
  240. pub fn volume_down(&self) {
  241. let _ = self.commands.unbounded_send(SpircCommand::VolumeDown);
  242. }
  243. pub fn shutdown(&self) {
  244. let _ = self.commands.unbounded_send(SpircCommand::Shutdown);
  245. }
  246. }
  247. impl Future for SpircTask {
  248. type Item = ();
  249. type Error = ();
  250. fn poll(&mut self) -> Poll<(), ()> {
  251. loop {
  252. let mut progress = false;
  253. if !self.shutdown {
  254. match self.subscription.poll().unwrap() {
  255. Async::Ready(Some(frame)) => {
  256. progress = true;
  257. self.handle_frame(frame);
  258. }
  259. Async::Ready(None) => panic!("subscription terminated"),
  260. Async::NotReady => (),
  261. }
  262. match self.commands.poll().unwrap() {
  263. Async::Ready(Some(command)) => {
  264. progress = true;
  265. self.handle_command(command);
  266. }
  267. Async::Ready(None) => (),
  268. Async::NotReady => (),
  269. }
  270. match self.end_of_track.poll() {
  271. Ok(Async::Ready(())) => {
  272. progress = true;
  273. self.handle_end_of_track();
  274. }
  275. Ok(Async::NotReady) => (),
  276. Err(oneshot::Canceled) => self.end_of_track = Box::new(future::empty()),
  277. }
  278. }
  279. let poll_sender = self.sender.poll_complete().unwrap();
  280. // Only shutdown once we've flushed out all our messages
  281. if self.shutdown && poll_sender.is_ready() {
  282. return Ok(Async::Ready(()));
  283. }
  284. if !progress {
  285. return Ok(Async::NotReady);
  286. }
  287. }
  288. }
  289. }
  290. impl SpircTask {
  291. fn handle_command(&mut self, cmd: SpircCommand) {
  292. let active = self.device.get_is_active();
  293. match cmd {
  294. SpircCommand::Play => {
  295. if active {
  296. self.handle_play();
  297. self.notify(None);
  298. } else {
  299. CommandSender::new(self, MessageType::kMessageTypePlay).send();
  300. }
  301. }
  302. SpircCommand::PlayPause => {
  303. if active {
  304. self.handle_play_pause();
  305. self.notify(None);
  306. } else {
  307. CommandSender::new(self, MessageType::kMessageTypePlayPause).send();
  308. }
  309. }
  310. SpircCommand::Pause => {
  311. if active {
  312. self.handle_pause();
  313. self.notify(None);
  314. } else {
  315. CommandSender::new(self, MessageType::kMessageTypePause).send();
  316. }
  317. }
  318. SpircCommand::Prev => {
  319. if active {
  320. self.handle_prev();
  321. self.notify(None);
  322. } else {
  323. CommandSender::new(self, MessageType::kMessageTypePrev).send();
  324. }
  325. }
  326. SpircCommand::Next => {
  327. if active {
  328. self.handle_next();
  329. self.notify(None);
  330. } else {
  331. CommandSender::new(self, MessageType::kMessageTypeNext).send();
  332. }
  333. }
  334. SpircCommand::VolumeUp => {
  335. if active {
  336. self.handle_volume_up();
  337. self.notify(None);
  338. } else {
  339. CommandSender::new(self, MessageType::kMessageTypeVolumeUp).send();
  340. }
  341. }
  342. SpircCommand::VolumeDown => {
  343. if active {
  344. self.handle_volume_down();
  345. self.notify(None);
  346. } else {
  347. CommandSender::new(self, MessageType::kMessageTypeVolumeDown).send();
  348. }
  349. }
  350. SpircCommand::Shutdown => {
  351. CommandSender::new(self, MessageType::kMessageTypeGoodbye).send();
  352. self.shutdown = true;
  353. self.commands.close();
  354. }
  355. }
  356. }
  357. fn handle_frame(&mut self, frame: Frame) {
  358. debug!(
  359. "{:?} {:?} {} {} {}",
  360. frame.get_typ(),
  361. frame.get_device_state().get_name(),
  362. frame.get_ident(),
  363. frame.get_seq_nr(),
  364. frame.get_state_update_id()
  365. );
  366. if frame.get_ident() == self.ident
  367. || (frame.get_recipient().len() > 0 && !frame.get_recipient().contains(&self.ident))
  368. {
  369. return;
  370. }
  371. match frame.get_typ() {
  372. MessageType::kMessageTypeHello => {
  373. self.notify(Some(frame.get_ident()));
  374. }
  375. MessageType::kMessageTypeLoad => {
  376. if !self.device.get_is_active() {
  377. self.device.set_is_active(true);
  378. self.device.set_became_active_at(now_ms());
  379. }
  380. self.update_tracks(&frame);
  381. if self.state.get_track().len() > 0 {
  382. self.state
  383. .set_position_ms(frame.get_state().get_position_ms());
  384. self.state.set_position_measured_at(now_ms() as u64);
  385. let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
  386. self.load_track(play);
  387. } else {
  388. self.state.set_status(PlayStatus::kPlayStatusStop);
  389. }
  390. self.notify(None);
  391. }
  392. MessageType::kMessageTypePlay => {
  393. self.handle_play();
  394. self.notify(None);
  395. }
  396. MessageType::kMessageTypePlayPause => {
  397. self.handle_play_pause();
  398. self.notify(None);
  399. }
  400. MessageType::kMessageTypePause => {
  401. self.handle_pause();
  402. self.notify(None);
  403. }
  404. MessageType::kMessageTypeNext => {
  405. self.handle_next();
  406. self.notify(None);
  407. }
  408. MessageType::kMessageTypePrev => {
  409. self.handle_prev();
  410. self.notify(None);
  411. }
  412. MessageType::kMessageTypeVolumeUp => {
  413. self.handle_volume_up();
  414. self.notify(None);
  415. }
  416. MessageType::kMessageTypeVolumeDown => {
  417. self.handle_volume_down();
  418. self.notify(None);
  419. }
  420. MessageType::kMessageTypeRepeat => {
  421. self.state.set_repeat(frame.get_state().get_repeat());
  422. self.notify(None);
  423. }
  424. MessageType::kMessageTypeShuffle => {
  425. self.state.set_shuffle(frame.get_state().get_shuffle());
  426. if self.state.get_shuffle() {
  427. let current_index = self.state.get_playing_track_index();
  428. {
  429. let tracks = self.state.mut_track();
  430. tracks.swap(0, current_index as usize);
  431. if let Some((_, rest)) = tracks.split_first_mut() {
  432. rand::thread_rng().shuffle(rest);
  433. }
  434. }
  435. self.state.set_playing_track_index(0);
  436. } else {
  437. let context = self.state.get_context_uri();
  438. debug!("{:?}", context);
  439. }
  440. self.notify(None);
  441. }
  442. MessageType::kMessageTypeSeek => {
  443. let position = frame.get_position();
  444. self.state.set_position_ms(position);
  445. self.state.set_position_measured_at(now_ms() as u64);
  446. self.player.seek(position);
  447. self.notify(None);
  448. }
  449. MessageType::kMessageTypeReplace => {
  450. self.update_tracks(&frame);
  451. self.notify(None);
  452. }
  453. MessageType::kMessageTypeVolume => {
  454. self.device.set_volume(frame.get_volume());
  455. self.mixer
  456. .set_volume(volume_to_mixer(frame.get_volume() as u16));
  457. self.notify(None);
  458. }
  459. MessageType::kMessageTypeNotify => {
  460. if self.device.get_is_active() && frame.get_device_state().get_is_active() {
  461. self.device.set_is_active(false);
  462. self.state.set_status(PlayStatus::kPlayStatusStop);
  463. self.player.stop();
  464. self.mixer.stop();
  465. }
  466. }
  467. _ => (),
  468. }
  469. }
  470. fn handle_play(&mut self) {
  471. if self.state.get_status() == PlayStatus::kPlayStatusPause {
  472. self.mixer.start();
  473. self.player.play();
  474. self.state.set_status(PlayStatus::kPlayStatusPlay);
  475. self.state.set_position_measured_at(now_ms() as u64);
  476. }
  477. }
  478. fn handle_play_pause(&mut self) {
  479. match self.state.get_status() {
  480. PlayStatus::kPlayStatusPlay => self.handle_pause(),
  481. PlayStatus::kPlayStatusPause => self.handle_play(),
  482. _ => (),
  483. }
  484. }
  485. fn handle_pause(&mut self) {
  486. if self.state.get_status() == PlayStatus::kPlayStatusPlay {
  487. self.player.pause();
  488. self.mixer.stop();
  489. self.state.set_status(PlayStatus::kPlayStatusPause);
  490. let now = now_ms() as u64;
  491. let position = self.state.get_position_ms();
  492. let diff = now - self.state.get_position_measured_at();
  493. self.state.set_position_ms(position + diff as u32);
  494. self.state.set_position_measured_at(now);
  495. }
  496. }
  497. fn consume_queued_track(&mut self) -> usize {
  498. // Removes current track if it is queued
  499. // Returns the index of the next track
  500. let current_index = self.state.get_playing_track_index() as usize;
  501. if self.state.get_track()[current_index].get_queued() {
  502. self.state.mut_track().remove(current_index);
  503. return current_index;
  504. }
  505. current_index + 1
  506. }
  507. fn handle_next(&mut self) {
  508. let mut new_index = self.consume_queued_track() as u32;
  509. let mut continue_playing = true;
  510. if new_index >= self.state.get_track().len() as u32 {
  511. new_index = 0; // Loop around back to start
  512. continue_playing = self.state.get_repeat();
  513. }
  514. self.state.set_playing_track_index(new_index);
  515. self.state.set_position_ms(0);
  516. self.state.set_position_measured_at(now_ms() as u64);
  517. self.load_track(continue_playing);
  518. }
  519. fn handle_prev(&mut self) {
  520. // Previous behaves differently based on the position
  521. // Under 3s it goes to the previous song (starts playing)
  522. // Over 3s it seeks to zero (retains previous play status)
  523. if self.position() < 3000 {
  524. // Queued tracks always follow the currently playing track.
  525. // They should not be considered when calculating the previous
  526. // track so extract them beforehand and reinsert them after it.
  527. let mut queue_tracks = Vec::new();
  528. {
  529. let queue_index = self.consume_queued_track();
  530. let tracks = self.state.mut_track();
  531. while queue_index < tracks.len() && tracks[queue_index].get_queued() {
  532. queue_tracks.push(tracks.remove(queue_index));
  533. }
  534. }
  535. let current_index = self.state.get_playing_track_index();
  536. let new_index = if current_index > 0 {
  537. current_index - 1
  538. } else if self.state.get_repeat() {
  539. self.state.get_track().len() as u32 - 1
  540. } else {
  541. 0
  542. };
  543. // Reinsert queued tracks after the new playing track.
  544. let mut pos = (new_index + 1) as usize;
  545. for track in queue_tracks.into_iter() {
  546. self.state.mut_track().insert(pos, track);
  547. pos += 1;
  548. }
  549. self.state.set_playing_track_index(new_index);
  550. self.state.set_position_ms(0);
  551. self.state.set_position_measured_at(now_ms() as u64);
  552. self.load_track(true);
  553. } else {
  554. self.state.set_position_ms(0);
  555. self.state.set_position_measured_at(now_ms() as u64);
  556. self.player.seek(0);
  557. }
  558. }
  559. fn handle_volume_up(&mut self) {
  560. let mut volume: u32 = self.device.get_volume() as u32 + 4096;
  561. if volume > 0xFFFF {
  562. volume = 0xFFFF;
  563. }
  564. self.device.set_volume(volume);
  565. self.mixer.set_volume(volume_to_mixer(volume as u16));
  566. }
  567. fn handle_volume_down(&mut self) {
  568. let mut volume: i32 = self.device.get_volume() as i32 - 4096;
  569. if volume < 0 {
  570. volume = 0;
  571. }
  572. self.device.set_volume(volume as u32);
  573. self.mixer.set_volume(volume_to_mixer(volume as u16));
  574. }
  575. fn handle_end_of_track(&mut self) {
  576. self.handle_next();
  577. self.notify(None);
  578. }
  579. fn position(&mut self) -> u32 {
  580. let diff = now_ms() as u64 - self.state.get_position_measured_at();
  581. self.state.get_position_ms() + diff as u32
  582. }
  583. fn update_tracks(&mut self, frame: &protocol::spirc::Frame) {
  584. let index = frame.get_state().get_playing_track_index();
  585. let tracks = frame.get_state().get_track();
  586. let context_uri = frame.get_state().get_context_uri().to_owned();
  587. self.state.set_playing_track_index(index);
  588. self.state.set_track(tracks.into_iter().cloned().collect());
  589. self.state.set_context_uri(context_uri);
  590. self.state.set_repeat(frame.get_state().get_repeat());
  591. self.state.set_shuffle(frame.get_state().get_shuffle());
  592. }
  593. fn load_track(&mut self, play: bool) {
  594. let index = self.state.get_playing_track_index();
  595. let track = {
  596. let gid = self.state.get_track()[index as usize].get_gid();
  597. SpotifyId::from_raw(gid).unwrap()
  598. };
  599. let position = self.state.get_position_ms();
  600. let end_of_track = self.player.load(track, play, position);
  601. if play {
  602. self.state.set_status(PlayStatus::kPlayStatusPlay);
  603. } else {
  604. self.state.set_status(PlayStatus::kPlayStatusPause);
  605. }
  606. self.end_of_track = Box::new(end_of_track);
  607. }
  608. fn hello(&mut self) {
  609. CommandSender::new(self, MessageType::kMessageTypeHello).send();
  610. }
  611. fn notify(&mut self, recipient: Option<&str>) {
  612. let mut cs = CommandSender::new(self, MessageType::kMessageTypeNotify);
  613. if let Some(s) = recipient {
  614. cs = cs.recipient(&s);
  615. }
  616. cs.send();
  617. }
  618. }
  619. impl Drop for SpircTask {
  620. fn drop(&mut self) {
  621. debug!("drop Spirc[{}]", self.session.session_id());
  622. }
  623. }
  624. struct CommandSender<'a> {
  625. spirc: &'a mut SpircTask,
  626. frame: protocol::spirc::Frame,
  627. }
  628. impl<'a> CommandSender<'a> {
  629. fn new(spirc: &'a mut SpircTask, cmd: MessageType) -> CommandSender {
  630. let mut frame = protocol::spirc::Frame::new();
  631. frame.set_version(1);
  632. frame.set_protocol_version(::std::convert::Into::into("2.0.0"));
  633. frame.set_ident(spirc.ident.clone());
  634. frame.set_seq_nr(spirc.sequence.get());
  635. frame.set_typ(cmd);
  636. frame.set_device_state(spirc.device.clone());
  637. frame.set_state_update_id(now_ms());
  638. CommandSender {
  639. spirc: spirc,
  640. frame: frame,
  641. }
  642. }
  643. fn recipient(mut self, recipient: &'a str) -> CommandSender {
  644. self.frame.mut_recipient().push(recipient.to_owned());
  645. self
  646. }
  647. #[allow(dead_code)]
  648. fn state(mut self, state: protocol::spirc::State) -> CommandSender<'a> {
  649. self.frame.set_state(state);
  650. self
  651. }
  652. fn send(mut self) {
  653. if !self.frame.has_state() && self.spirc.device.get_is_active() {
  654. self.frame.set_state(self.spirc.state.clone());
  655. }
  656. let send = self.spirc.sender.start_send(self.frame).unwrap();
  657. assert!(send.is_ready());
  658. }
  659. }