spirc.rs 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779
  1. use futures::future;
  2. use futures::sync::{mpsc, oneshot};
  3. use futures::{Async, Future, Poll, Sink, Stream};
  4. use protobuf::{self, Message};
  5. use core::config::ConnectConfig;
  6. use core::mercury::MercuryError;
  7. use core::session::Session;
  8. use core::spotify_id::SpotifyId;
  9. use core::util::SeqGenerator;
  10. use core::version;
  11. use core::volume::Volume;
  12. use protocol;
  13. use protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State};
  14. use playback::mixer::Mixer;
  15. use playback::player::Player;
  16. use rand;
  17. use rand::Rng;
  18. use std;
  19. use std::time::{SystemTime, UNIX_EPOCH};
  20. pub struct SpircTask {
  21. player: Player,
  22. mixer: Box<Mixer>,
  23. linear_volume: bool,
  24. sequence: SeqGenerator<u32>,
  25. ident: String,
  26. device: DeviceState,
  27. state: State,
  28. subscription: Box<Stream<Item = Frame, Error = MercuryError>>,
  29. sender: Box<Sink<SinkItem = Frame, SinkError = MercuryError>>,
  30. commands: mpsc::UnboundedReceiver<SpircCommand>,
  31. end_of_track: Box<Future<Item = (), Error = oneshot::Canceled>>,
  32. shutdown: bool,
  33. session: Session,
  34. }
  35. pub enum SpircCommand {
  36. Play,
  37. PlayPause,
  38. Pause,
  39. Prev,
  40. Next,
  41. VolumeUp,
  42. VolumeDown,
  43. Shutdown,
  44. }
  45. pub struct Spirc {
  46. commands: mpsc::UnboundedSender<SpircCommand>,
  47. }
  48. fn now_ms() -> i64 {
  49. let dur = match SystemTime::now().duration_since(UNIX_EPOCH) {
  50. Ok(dur) => dur,
  51. Err(err) => err.duration(),
  52. };
  53. (dur.as_secs() * 1000 + (dur.subsec_nanos() / 1000_000) as u64) as i64
  54. }
  55. fn initial_state() -> State {
  56. let mut frame = protocol::spirc::State::new();
  57. frame.set_repeat(false);
  58. frame.set_shuffle(false);
  59. frame.set_status(PlayStatus::kPlayStatusStop);
  60. frame.set_position_ms(0);
  61. frame.set_position_measured_at(0);
  62. frame
  63. }
  64. fn initial_device_state(config: ConnectConfig) -> DeviceState {
  65. {
  66. let mut msg = DeviceState::new();
  67. msg.set_sw_version(version::version_string());
  68. msg.set_is_active(false);
  69. msg.set_can_play(true);
  70. msg.set_volume(0);
  71. msg.set_name(config.name);
  72. {
  73. let repeated = msg.mut_capabilities();
  74. {
  75. let msg = repeated.push_default();
  76. msg.set_typ(protocol::spirc::CapabilityType::kCanBePlayer);
  77. {
  78. let repeated = msg.mut_intValue();
  79. repeated.push(1)
  80. };
  81. msg
  82. };
  83. {
  84. let msg = repeated.push_default();
  85. msg.set_typ(protocol::spirc::CapabilityType::kDeviceType);
  86. {
  87. let repeated = msg.mut_intValue();
  88. repeated.push(config.device_type as i64)
  89. };
  90. msg
  91. };
  92. {
  93. let msg = repeated.push_default();
  94. msg.set_typ(protocol::spirc::CapabilityType::kGaiaEqConnectId);
  95. {
  96. let repeated = msg.mut_intValue();
  97. repeated.push(1)
  98. };
  99. msg
  100. };
  101. {
  102. let msg = repeated.push_default();
  103. msg.set_typ(protocol::spirc::CapabilityType::kSupportsLogout);
  104. {
  105. let repeated = msg.mut_intValue();
  106. repeated.push(0)
  107. };
  108. msg
  109. };
  110. {
  111. let msg = repeated.push_default();
  112. msg.set_typ(protocol::spirc::CapabilityType::kIsObservable);
  113. {
  114. let repeated = msg.mut_intValue();
  115. repeated.push(1)
  116. };
  117. msg
  118. };
  119. {
  120. let msg = repeated.push_default();
  121. msg.set_typ(protocol::spirc::CapabilityType::kVolumeSteps);
  122. {
  123. let repeated = msg.mut_intValue();
  124. repeated.push(64)
  125. };
  126. msg
  127. };
  128. {
  129. let msg = repeated.push_default();
  130. msg.set_typ(protocol::spirc::CapabilityType::kSupportedContexts);
  131. {
  132. let repeated = msg.mut_stringValue();
  133. repeated.push(::std::convert::Into::into("album"));
  134. repeated.push(::std::convert::Into::into("playlist"));
  135. repeated.push(::std::convert::Into::into("search"));
  136. repeated.push(::std::convert::Into::into("inbox"));
  137. repeated.push(::std::convert::Into::into("toplist"));
  138. repeated.push(::std::convert::Into::into("starred"));
  139. repeated.push(::std::convert::Into::into("publishedstarred"));
  140. repeated.push(::std::convert::Into::into("track"))
  141. };
  142. msg
  143. };
  144. {
  145. let msg = repeated.push_default();
  146. msg.set_typ(protocol::spirc::CapabilityType::kSupportedTypes);
  147. {
  148. let repeated = msg.mut_stringValue();
  149. repeated.push(::std::convert::Into::into("audio/local"));
  150. repeated.push(::std::convert::Into::into("audio/track"));
  151. repeated.push(::std::convert::Into::into("local"));
  152. repeated.push(::std::convert::Into::into("track"))
  153. };
  154. msg
  155. };
  156. };
  157. msg
  158. }
  159. }
  160. fn calc_logarithmic_volume(volume: u16) -> u16 {
  161. // Volume conversion taken from https://www.dr-lex.be/info-stuff/volumecontrols.html#ideal2
  162. // Convert the given volume [0..0xffff] to a dB gain
  163. // We assume a dB range of 60dB.
  164. // Use the equatation: a * exp(b * x)
  165. // in which a = IDEAL_FACTOR, b = 1/1000
  166. const IDEAL_FACTOR: f64 = 6.908;
  167. let normalized_volume = volume as f64 / std::u16::MAX as f64; // To get a value between 0 and 1
  168. let mut val = std::u16::MAX;
  169. // Prevent val > std::u16::MAX due to rounding errors
  170. if normalized_volume < 0.999 {
  171. let new_volume = (normalized_volume * IDEAL_FACTOR).exp() / 1000.0;
  172. val = (new_volume * std::u16::MAX as f64) as u16;
  173. }
  174. debug!("input volume:{} to mixer: {}", volume, val);
  175. // return the scale factor (0..0xffff) (equivalent to a voltage multiplier).
  176. val
  177. }
  178. fn volume_to_mixer(volume: u16, linear_volume: bool) -> u16 {
  179. if linear_volume {
  180. debug!("linear volume: {}", volume);
  181. volume
  182. } else {
  183. calc_logarithmic_volume(volume)
  184. }
  185. }
  186. impl Spirc {
  187. pub fn new(
  188. config: ConnectConfig,
  189. session: Session,
  190. player: Player,
  191. mixer: Box<Mixer>,
  192. ) -> (Spirc, SpircTask) {
  193. debug!("new Spirc[{}]", session.session_id());
  194. let ident = session.device_id().to_owned();
  195. let uri = format!("hm://remote/3/user/{}/", session.username());
  196. let subscription = session.mercury().subscribe(&uri as &str);
  197. let subscription = subscription
  198. .map(|stream| stream.map_err(|_| MercuryError))
  199. .flatten_stream();
  200. let subscription = Box::new(subscription.map(|response| -> Frame {
  201. let data = response.payload.first().unwrap();
  202. protobuf::parse_from_bytes(data).unwrap()
  203. }));
  204. let sender = Box::new(
  205. session
  206. .mercury()
  207. .sender(uri)
  208. .with(|frame: Frame| Ok(frame.write_to_bytes().unwrap())),
  209. );
  210. let (cmd_tx, cmd_rx) = mpsc::unbounded();
  211. let volume = config.volume;
  212. let linear_volume = config.linear_volume;
  213. let device = initial_device_state(config);
  214. let mut task = SpircTask {
  215. player: player,
  216. mixer: mixer,
  217. linear_volume: linear_volume,
  218. sequence: SeqGenerator::new(1),
  219. ident: ident,
  220. device: device,
  221. state: initial_state(),
  222. subscription: subscription,
  223. sender: sender,
  224. commands: cmd_rx,
  225. end_of_track: Box::new(future::empty()),
  226. shutdown: false,
  227. session: session.clone(),
  228. };
  229. task.set_volume(volume);
  230. let spirc = Spirc { commands: cmd_tx };
  231. task.hello();
  232. (spirc, task)
  233. }
  234. pub fn play(&self) {
  235. let _ = self.commands.unbounded_send(SpircCommand::Play);
  236. }
  237. pub fn play_pause(&self) {
  238. let _ = self.commands.unbounded_send(SpircCommand::PlayPause);
  239. }
  240. pub fn pause(&self) {
  241. let _ = self.commands.unbounded_send(SpircCommand::Pause);
  242. }
  243. pub fn prev(&self) {
  244. let _ = self.commands.unbounded_send(SpircCommand::Prev);
  245. }
  246. pub fn next(&self) {
  247. let _ = self.commands.unbounded_send(SpircCommand::Next);
  248. }
  249. pub fn volume_up(&self) {
  250. let _ = self.commands.unbounded_send(SpircCommand::VolumeUp);
  251. }
  252. pub fn volume_down(&self) {
  253. let _ = self.commands.unbounded_send(SpircCommand::VolumeDown);
  254. }
  255. pub fn shutdown(&self) {
  256. let _ = self.commands.unbounded_send(SpircCommand::Shutdown);
  257. }
  258. }
  259. impl Future for SpircTask {
  260. type Item = ();
  261. type Error = ();
  262. fn poll(&mut self) -> Poll<(), ()> {
  263. loop {
  264. let mut progress = false;
  265. if self.session.is_invalid() {
  266. return Ok(Async::Ready(()));
  267. }
  268. if !self.shutdown {
  269. match self.subscription.poll().unwrap() {
  270. Async::Ready(Some(frame)) => {
  271. progress = true;
  272. self.handle_frame(frame);
  273. }
  274. Async::Ready(None) => panic!("subscription terminated"),
  275. Async::NotReady => (),
  276. }
  277. match self.commands.poll().unwrap() {
  278. Async::Ready(Some(command)) => {
  279. progress = true;
  280. self.handle_command(command);
  281. }
  282. Async::Ready(None) => (),
  283. Async::NotReady => (),
  284. }
  285. match self.end_of_track.poll() {
  286. Ok(Async::Ready(())) => {
  287. progress = true;
  288. self.handle_end_of_track();
  289. }
  290. Ok(Async::NotReady) => (),
  291. Err(oneshot::Canceled) => self.end_of_track = Box::new(future::empty()),
  292. }
  293. }
  294. let poll_sender = self.sender.poll_complete().unwrap();
  295. // Only shutdown once we've flushed out all our messages
  296. if self.shutdown && poll_sender.is_ready() {
  297. return Ok(Async::Ready(()));
  298. }
  299. if !progress {
  300. return Ok(Async::NotReady);
  301. }
  302. }
  303. }
  304. }
  305. impl SpircTask {
  306. fn handle_command(&mut self, cmd: SpircCommand) {
  307. let active = self.device.get_is_active();
  308. match cmd {
  309. SpircCommand::Play => {
  310. if active {
  311. self.handle_play();
  312. self.notify(None);
  313. } else {
  314. CommandSender::new(self, MessageType::kMessageTypePlay).send();
  315. }
  316. }
  317. SpircCommand::PlayPause => {
  318. if active {
  319. self.handle_play_pause();
  320. self.notify(None);
  321. } else {
  322. CommandSender::new(self, MessageType::kMessageTypePlayPause).send();
  323. }
  324. }
  325. SpircCommand::Pause => {
  326. if active {
  327. self.handle_pause();
  328. self.notify(None);
  329. } else {
  330. CommandSender::new(self, MessageType::kMessageTypePause).send();
  331. }
  332. }
  333. SpircCommand::Prev => {
  334. if active {
  335. self.handle_prev();
  336. self.notify(None);
  337. } else {
  338. CommandSender::new(self, MessageType::kMessageTypePrev).send();
  339. }
  340. }
  341. SpircCommand::Next => {
  342. if active {
  343. self.handle_next();
  344. self.notify(None);
  345. } else {
  346. CommandSender::new(self, MessageType::kMessageTypeNext).send();
  347. }
  348. }
  349. SpircCommand::VolumeUp => {
  350. if active {
  351. self.handle_volume_up();
  352. self.notify(None);
  353. } else {
  354. CommandSender::new(self, MessageType::kMessageTypeVolumeUp).send();
  355. }
  356. }
  357. SpircCommand::VolumeDown => {
  358. if active {
  359. self.handle_volume_down();
  360. self.notify(None);
  361. } else {
  362. CommandSender::new(self, MessageType::kMessageTypeVolumeDown).send();
  363. }
  364. }
  365. SpircCommand::Shutdown => {
  366. CommandSender::new(self, MessageType::kMessageTypeGoodbye).send();
  367. self.shutdown = true;
  368. self.commands.close();
  369. }
  370. }
  371. }
  372. fn handle_frame(&mut self, frame: Frame) {
  373. debug!(
  374. "{:?} {:?} {} {} {}",
  375. frame.get_typ(),
  376. frame.get_device_state().get_name(),
  377. frame.get_ident(),
  378. frame.get_seq_nr(),
  379. frame.get_state_update_id()
  380. );
  381. if frame.get_ident() == self.ident
  382. || (frame.get_recipient().len() > 0 && !frame.get_recipient().contains(&self.ident))
  383. {
  384. return;
  385. }
  386. match frame.get_typ() {
  387. MessageType::kMessageTypeHello => {
  388. self.notify(Some(frame.get_ident()));
  389. }
  390. MessageType::kMessageTypeLoad => {
  391. if !self.device.get_is_active() {
  392. self.device.set_is_active(true);
  393. self.device.set_became_active_at(now_ms());
  394. }
  395. self.update_tracks(&frame);
  396. if self.state.get_track().len() > 0 {
  397. self.state.set_position_ms(frame.get_state().get_position_ms());
  398. self.state.set_position_measured_at(now_ms() as u64);
  399. let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
  400. self.load_track(play);
  401. } else {
  402. self.state.set_status(PlayStatus::kPlayStatusStop);
  403. }
  404. self.notify(None);
  405. }
  406. MessageType::kMessageTypePlay => {
  407. self.handle_play();
  408. self.notify(None);
  409. }
  410. MessageType::kMessageTypePlayPause => {
  411. self.handle_play_pause();
  412. self.notify(None);
  413. }
  414. MessageType::kMessageTypePause => {
  415. self.handle_pause();
  416. self.notify(None);
  417. }
  418. MessageType::kMessageTypeNext => {
  419. self.handle_next();
  420. self.notify(None);
  421. }
  422. MessageType::kMessageTypePrev => {
  423. self.handle_prev();
  424. self.notify(None);
  425. }
  426. MessageType::kMessageTypeVolumeUp => {
  427. self.handle_volume_up();
  428. self.notify(None);
  429. }
  430. MessageType::kMessageTypeVolumeDown => {
  431. self.handle_volume_down();
  432. self.notify(None);
  433. }
  434. MessageType::kMessageTypeRepeat => {
  435. self.state.set_repeat(frame.get_state().get_repeat());
  436. self.notify(None);
  437. }
  438. MessageType::kMessageTypeShuffle => {
  439. self.state.set_shuffle(frame.get_state().get_shuffle());
  440. if self.state.get_shuffle() {
  441. let current_index = self.state.get_playing_track_index();
  442. {
  443. let tracks = self.state.mut_track();
  444. tracks.swap(0, current_index as usize);
  445. if let Some((_, rest)) = tracks.split_first_mut() {
  446. rand::thread_rng().shuffle(rest);
  447. }
  448. }
  449. self.state.set_playing_track_index(0);
  450. } else {
  451. let context = self.state.get_context_uri();
  452. debug!("{:?}", context);
  453. }
  454. self.notify(None);
  455. }
  456. MessageType::kMessageTypeSeek => {
  457. let position = frame.get_position();
  458. self.state.set_position_ms(position);
  459. self.state.set_position_measured_at(now_ms() as u64);
  460. self.player.seek(position);
  461. self.notify(None);
  462. }
  463. MessageType::kMessageTypeReplace => {
  464. self.update_tracks(&frame);
  465. self.notify(None);
  466. }
  467. MessageType::kMessageTypeVolume => {
  468. self.set_volume(frame.get_volume() as u16);
  469. self.notify(None);
  470. }
  471. MessageType::kMessageTypeNotify => {
  472. if self.device.get_is_active() && frame.get_device_state().get_is_active() {
  473. self.device.set_is_active(false);
  474. self.state.set_status(PlayStatus::kPlayStatusStop);
  475. self.player.stop();
  476. self.mixer.stop();
  477. }
  478. }
  479. _ => (),
  480. }
  481. }
  482. fn handle_play(&mut self) {
  483. if self.state.get_status() == PlayStatus::kPlayStatusPause {
  484. self.mixer.start();
  485. self.player.play();
  486. self.state.set_status(PlayStatus::kPlayStatusPlay);
  487. self.state.set_position_measured_at(now_ms() as u64);
  488. }
  489. }
  490. fn handle_play_pause(&mut self) {
  491. match self.state.get_status() {
  492. PlayStatus::kPlayStatusPlay => self.handle_pause(),
  493. PlayStatus::kPlayStatusPause => self.handle_play(),
  494. _ => (),
  495. }
  496. }
  497. fn handle_pause(&mut self) {
  498. if self.state.get_status() == PlayStatus::kPlayStatusPlay {
  499. self.player.pause();
  500. self.mixer.stop();
  501. self.state.set_status(PlayStatus::kPlayStatusPause);
  502. let now = now_ms() as u64;
  503. let position = self.state.get_position_ms();
  504. let diff = now - self.state.get_position_measured_at();
  505. self.state.set_position_ms(position + diff as u32);
  506. self.state.set_position_measured_at(now);
  507. }
  508. }
  509. fn consume_queued_track(&mut self) -> usize {
  510. // Removes current track if it is queued
  511. // Returns the index of the next track
  512. let current_index = self.state.get_playing_track_index() as usize;
  513. if self.state.get_track()[current_index].get_queued() {
  514. self.state.mut_track().remove(current_index);
  515. return current_index;
  516. }
  517. current_index + 1
  518. }
  519. fn handle_next(&mut self) {
  520. let mut new_index = self.consume_queued_track() as u32;
  521. let mut continue_playing = true;
  522. if new_index >= self.state.get_track().len() as u32 {
  523. new_index = 0; // Loop around back to start
  524. continue_playing = self.state.get_repeat();
  525. }
  526. self.state.set_playing_track_index(new_index);
  527. self.state.set_position_ms(0);
  528. self.state.set_position_measured_at(now_ms() as u64);
  529. self.load_track(continue_playing);
  530. }
  531. fn handle_prev(&mut self) {
  532. // Previous behaves differently based on the position
  533. // Under 3s it goes to the previous song (starts playing)
  534. // Over 3s it seeks to zero (retains previous play status)
  535. if self.position() < 3000 {
  536. // Queued tracks always follow the currently playing track.
  537. // They should not be considered when calculating the previous
  538. // track so extract them beforehand and reinsert them after it.
  539. let mut queue_tracks = Vec::new();
  540. {
  541. let queue_index = self.consume_queued_track();
  542. let tracks = self.state.mut_track();
  543. while queue_index < tracks.len() && tracks[queue_index].get_queued() {
  544. queue_tracks.push(tracks.remove(queue_index));
  545. }
  546. }
  547. let current_index = self.state.get_playing_track_index();
  548. let new_index = if current_index > 0 {
  549. current_index - 1
  550. } else if self.state.get_repeat() {
  551. self.state.get_track().len() as u32 - 1
  552. } else {
  553. 0
  554. };
  555. // Reinsert queued tracks after the new playing track.
  556. let mut pos = (new_index + 1) as usize;
  557. for track in queue_tracks.into_iter() {
  558. self.state.mut_track().insert(pos, track);
  559. pos += 1;
  560. }
  561. self.state.set_playing_track_index(new_index);
  562. self.state.set_position_ms(0);
  563. self.state.set_position_measured_at(now_ms() as u64);
  564. self.load_track(true);
  565. } else {
  566. self.state.set_position_ms(0);
  567. self.state.set_position_measured_at(now_ms() as u64);
  568. self.player.seek(0);
  569. }
  570. }
  571. fn handle_volume_up(&mut self) {
  572. let mut volume: u32 = self.device.get_volume() as u32 + 4096;
  573. if volume > 0xFFFF {
  574. volume = 0xFFFF;
  575. }
  576. self.set_volume(volume as u16);
  577. }
  578. fn handle_volume_down(&mut self) {
  579. let mut volume: i32 = self.device.get_volume() as i32 - 4096;
  580. if volume < 0 {
  581. volume = 0;
  582. }
  583. self.set_volume(volume as u16);
  584. }
  585. fn handle_end_of_track(&mut self) {
  586. self.handle_next();
  587. self.notify(None);
  588. }
  589. fn position(&mut self) -> u32 {
  590. let diff = now_ms() as u64 - self.state.get_position_measured_at();
  591. self.state.get_position_ms() + diff as u32
  592. }
  593. fn update_tracks(&mut self, frame: &protocol::spirc::Frame) {
  594. let index = frame.get_state().get_playing_track_index();
  595. let tracks = frame.get_state().get_track();
  596. let context_uri = frame.get_state().get_context_uri().to_owned();
  597. self.state.set_playing_track_index(index);
  598. self.state.set_track(tracks.into_iter().cloned().collect());
  599. self.state.set_context_uri(context_uri);
  600. self.state.set_repeat(frame.get_state().get_repeat());
  601. self.state.set_shuffle(frame.get_state().get_shuffle());
  602. }
  603. fn load_track(&mut self, play: bool) {
  604. let index = self.state.get_playing_track_index();
  605. let track = {
  606. let gid = self.state.get_track()[index as usize].get_gid();
  607. SpotifyId::from_raw(gid).unwrap()
  608. };
  609. let position = self.state.get_position_ms();
  610. let end_of_track = self.player.load(track, play, position);
  611. if play {
  612. self.state.set_status(PlayStatus::kPlayStatusPlay);
  613. } else {
  614. self.state.set_status(PlayStatus::kPlayStatusPause);
  615. }
  616. self.end_of_track = Box::new(end_of_track);
  617. }
  618. fn hello(&mut self) {
  619. CommandSender::new(self, MessageType::kMessageTypeHello).send();
  620. }
  621. fn notify(&mut self, recipient: Option<&str>) {
  622. let mut cs = CommandSender::new(self, MessageType::kMessageTypeNotify);
  623. if let Some(s) = recipient {
  624. cs = cs.recipient(&s);
  625. }
  626. cs.send();
  627. }
  628. fn set_volume(&mut self, volume: u16) {
  629. self.device.set_volume(volume as u32);
  630. self.mixer.set_volume(volume_to_mixer(volume, self.linear_volume));
  631. if let Some(cache) = self.session.cache() {
  632. cache.save_volume(Volume { volume })
  633. }
  634. }
  635. }
  636. impl Drop for SpircTask {
  637. fn drop(&mut self) {
  638. debug!("drop Spirc[{}]", self.session.session_id());
  639. }
  640. }
  641. struct CommandSender<'a> {
  642. spirc: &'a mut SpircTask,
  643. frame: protocol::spirc::Frame,
  644. }
  645. impl<'a> CommandSender<'a> {
  646. fn new(spirc: &'a mut SpircTask, cmd: MessageType) -> CommandSender {
  647. let mut frame = protocol::spirc::Frame::new();
  648. frame.set_version(1);
  649. frame.set_protocol_version(::std::convert::Into::into("2.0.0"));
  650. frame.set_ident(spirc.ident.clone());
  651. frame.set_seq_nr(spirc.sequence.get());
  652. frame.set_typ(cmd);
  653. frame.set_device_state(spirc.device.clone());
  654. frame.set_state_update_id(now_ms());
  655. CommandSender {
  656. spirc: spirc,
  657. frame: frame,
  658. }
  659. }
  660. fn recipient(mut self, recipient: &'a str) -> CommandSender {
  661. self.frame.mut_recipient().push(recipient.to_owned());
  662. self
  663. }
  664. #[allow(dead_code)]
  665. fn state(mut self, state: protocol::spirc::State) -> CommandSender<'a> {
  666. self.frame.set_state(state);
  667. self
  668. }
  669. fn send(mut self) {
  670. if !self.frame.has_state() && self.spirc.device.get_is_active() {
  671. self.frame.set_state(self.spirc.state.clone());
  672. }
  673. let send = self.spirc.sender.start_send(self.frame).unwrap();
  674. assert!(send.is_ready());
  675. }
  676. }