spirc.rs 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780
  1. use futures::future;
  2. use futures::sync::{mpsc, oneshot};
  3. use futures::{Async, Future, Poll, Sink, Stream};
  4. use protobuf::{self, Message};
  5. use core::config::ConnectConfig;
  6. use core::mercury::MercuryError;
  7. use core::session::Session;
  8. use core::spotify_id::SpotifyId;
  9. use core::util::SeqGenerator;
  10. use core::version;
  11. use core::volume::Volume;
  12. use protocol;
  13. use protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State};
  14. use playback::mixer::Mixer;
  15. use playback::player::Player;
  16. use rand;
  17. use rand::Rng;
  18. use std;
  19. use std::time::{SystemTime, UNIX_EPOCH};
  20. pub struct SpircTask {
  21. player: Player,
  22. mixer: Box<Mixer>,
  23. linear_volume: bool,
  24. sequence: SeqGenerator<u32>,
  25. ident: String,
  26. device: DeviceState,
  27. state: State,
  28. subscription: Box<Stream<Item = Frame, Error = MercuryError>>,
  29. sender: Box<Sink<SinkItem = Frame, SinkError = MercuryError>>,
  30. commands: mpsc::UnboundedReceiver<SpircCommand>,
  31. end_of_track: Box<Future<Item = (), Error = oneshot::Canceled>>,
  32. shutdown: bool,
  33. session: Session,
  34. }
  35. pub enum SpircCommand {
  36. Play,
  37. PlayPause,
  38. Pause,
  39. Prev,
  40. Next,
  41. VolumeUp,
  42. VolumeDown,
  43. Shutdown,
  44. }
  45. pub struct Spirc {
  46. commands: mpsc::UnboundedSender<SpircCommand>,
  47. }
  48. fn now_ms() -> i64 {
  49. let dur = match SystemTime::now().duration_since(UNIX_EPOCH) {
  50. Ok(dur) => dur,
  51. Err(err) => err.duration(),
  52. };
  53. (dur.as_secs() * 1000 + (dur.subsec_nanos() / 1000_000) as u64) as i64
  54. }
  55. fn initial_state() -> State {
  56. let mut frame = protocol::spirc::State::new();
  57. frame.set_repeat(false);
  58. frame.set_shuffle(false);
  59. frame.set_status(PlayStatus::kPlayStatusStop);
  60. frame.set_position_ms(0);
  61. frame.set_position_measured_at(0);
  62. frame
  63. }
  64. fn initial_device_state(config: ConnectConfig) -> DeviceState {
  65. {
  66. let mut msg = DeviceState::new();
  67. msg.set_sw_version(version::version_string());
  68. msg.set_is_active(false);
  69. msg.set_can_play(true);
  70. msg.set_volume(0);
  71. msg.set_name(config.name);
  72. {
  73. let repeated = msg.mut_capabilities();
  74. {
  75. let msg = repeated.push_default();
  76. msg.set_typ(protocol::spirc::CapabilityType::kCanBePlayer);
  77. {
  78. let repeated = msg.mut_intValue();
  79. repeated.push(1)
  80. };
  81. msg
  82. };
  83. {
  84. let msg = repeated.push_default();
  85. msg.set_typ(protocol::spirc::CapabilityType::kDeviceType);
  86. {
  87. let repeated = msg.mut_intValue();
  88. repeated.push(config.device_type as i64)
  89. };
  90. msg
  91. };
  92. {
  93. let msg = repeated.push_default();
  94. msg.set_typ(protocol::spirc::CapabilityType::kGaiaEqConnectId);
  95. {
  96. let repeated = msg.mut_intValue();
  97. repeated.push(1)
  98. };
  99. msg
  100. };
  101. {
  102. let msg = repeated.push_default();
  103. msg.set_typ(protocol::spirc::CapabilityType::kSupportsLogout);
  104. {
  105. let repeated = msg.mut_intValue();
  106. repeated.push(0)
  107. };
  108. msg
  109. };
  110. {
  111. let msg = repeated.push_default();
  112. msg.set_typ(protocol::spirc::CapabilityType::kIsObservable);
  113. {
  114. let repeated = msg.mut_intValue();
  115. repeated.push(1)
  116. };
  117. msg
  118. };
  119. {
  120. let msg = repeated.push_default();
  121. msg.set_typ(protocol::spirc::CapabilityType::kVolumeSteps);
  122. {
  123. let repeated = msg.mut_intValue();
  124. repeated.push(64)
  125. };
  126. msg
  127. };
  128. {
  129. let msg = repeated.push_default();
  130. msg.set_typ(protocol::spirc::CapabilityType::kSupportedContexts);
  131. {
  132. let repeated = msg.mut_stringValue();
  133. repeated.push(::std::convert::Into::into("album"));
  134. repeated.push(::std::convert::Into::into("playlist"));
  135. repeated.push(::std::convert::Into::into("search"));
  136. repeated.push(::std::convert::Into::into("inbox"));
  137. repeated.push(::std::convert::Into::into("toplist"));
  138. repeated.push(::std::convert::Into::into("starred"));
  139. repeated.push(::std::convert::Into::into("publishedstarred"));
  140. repeated.push(::std::convert::Into::into("track"))
  141. };
  142. msg
  143. };
  144. {
  145. let msg = repeated.push_default();
  146. msg.set_typ(protocol::spirc::CapabilityType::kSupportedTypes);
  147. {
  148. let repeated = msg.mut_stringValue();
  149. repeated.push(::std::convert::Into::into("audio/local"));
  150. repeated.push(::std::convert::Into::into("audio/track"));
  151. repeated.push(::std::convert::Into::into("local"));
  152. repeated.push(::std::convert::Into::into("track"))
  153. };
  154. msg
  155. };
  156. };
  157. msg
  158. }
  159. }
  160. fn calc_logarithmic_volume(volume: u16) -> u16 {
  161. // Volume conversion taken from https://www.dr-lex.be/info-stuff/volumecontrols.html#ideal2
  162. // Convert the given volume [0..0xffff] to a dB gain
  163. // We assume a dB range of 60dB.
  164. // Use the equatation: a * exp(b * x)
  165. // in which a = IDEAL_FACTOR, b = 1/1000
  166. const IDEAL_FACTOR: f64 = 6.908;
  167. let normalized_volume = volume as f64 / std::u16::MAX as f64; // To get a value between 0 and 1
  168. let mut val = std::u16::MAX;
  169. // Prevent val > std::u16::MAX due to rounding errors
  170. if normalized_volume < 0.999 {
  171. let new_volume = (normalized_volume * IDEAL_FACTOR).exp() / 1000.0;
  172. val = (new_volume * std::u16::MAX as f64) as u16;
  173. }
  174. debug!("input volume:{} to mixer: {}", volume, val);
  175. // return the scale factor (0..0xffff) (equivalent to a voltage multiplier).
  176. val
  177. }
  178. fn volume_to_mixer(volume: u16, linear_volume: bool) -> u16 {
  179. if linear_volume {
  180. debug!("linear volume: {}", volume);
  181. volume
  182. } else {
  183. calc_logarithmic_volume(volume)
  184. }
  185. }
  186. impl Spirc {
  187. pub fn new(
  188. config: ConnectConfig,
  189. session: Session,
  190. player: Player,
  191. mixer: Box<Mixer>,
  192. ) -> (Spirc, SpircTask) {
  193. debug!("new Spirc[{}]", session.session_id());
  194. let ident = session.device_id().to_owned();
  195. // Uri updated in response to issue #288
  196. let uri = format!("hm://remote/user/{}/", session.username());
  197. let subscription = session.mercury().subscribe(&uri as &str);
  198. let subscription = subscription
  199. .map(|stream| stream.map_err(|_| MercuryError))
  200. .flatten_stream();
  201. let subscription = Box::new(subscription.map(|response| -> Frame {
  202. let data = response.payload.first().unwrap();
  203. protobuf::parse_from_bytes(data).unwrap()
  204. }));
  205. let sender = Box::new(
  206. session
  207. .mercury()
  208. .sender(uri)
  209. .with(|frame: Frame| Ok(frame.write_to_bytes().unwrap())),
  210. );
  211. let (cmd_tx, cmd_rx) = mpsc::unbounded();
  212. let volume = config.volume;
  213. let linear_volume = config.linear_volume;
  214. let device = initial_device_state(config);
  215. let mut task = SpircTask {
  216. player: player,
  217. mixer: mixer,
  218. linear_volume: linear_volume,
  219. sequence: SeqGenerator::new(1),
  220. ident: ident,
  221. device: device,
  222. state: initial_state(),
  223. subscription: subscription,
  224. sender: sender,
  225. commands: cmd_rx,
  226. end_of_track: Box::new(future::empty()),
  227. shutdown: false,
  228. session: session.clone(),
  229. };
  230. task.set_volume(volume);
  231. let spirc = Spirc { commands: cmd_tx };
  232. task.hello();
  233. (spirc, task)
  234. }
  235. pub fn play(&self) {
  236. let _ = self.commands.unbounded_send(SpircCommand::Play);
  237. }
  238. pub fn play_pause(&self) {
  239. let _ = self.commands.unbounded_send(SpircCommand::PlayPause);
  240. }
  241. pub fn pause(&self) {
  242. let _ = self.commands.unbounded_send(SpircCommand::Pause);
  243. }
  244. pub fn prev(&self) {
  245. let _ = self.commands.unbounded_send(SpircCommand::Prev);
  246. }
  247. pub fn next(&self) {
  248. let _ = self.commands.unbounded_send(SpircCommand::Next);
  249. }
  250. pub fn volume_up(&self) {
  251. let _ = self.commands.unbounded_send(SpircCommand::VolumeUp);
  252. }
  253. pub fn volume_down(&self) {
  254. let _ = self.commands.unbounded_send(SpircCommand::VolumeDown);
  255. }
  256. pub fn shutdown(&self) {
  257. let _ = self.commands.unbounded_send(SpircCommand::Shutdown);
  258. }
  259. }
  260. impl Future for SpircTask {
  261. type Item = ();
  262. type Error = ();
  263. fn poll(&mut self) -> Poll<(), ()> {
  264. loop {
  265. let mut progress = false;
  266. if self.session.is_invalid() {
  267. return Ok(Async::Ready(()));
  268. }
  269. if !self.shutdown {
  270. match self.subscription.poll().unwrap() {
  271. Async::Ready(Some(frame)) => {
  272. progress = true;
  273. self.handle_frame(frame);
  274. }
  275. Async::Ready(None) => panic!("subscription terminated"),
  276. Async::NotReady => (),
  277. }
  278. match self.commands.poll().unwrap() {
  279. Async::Ready(Some(command)) => {
  280. progress = true;
  281. self.handle_command(command);
  282. }
  283. Async::Ready(None) => (),
  284. Async::NotReady => (),
  285. }
  286. match self.end_of_track.poll() {
  287. Ok(Async::Ready(())) => {
  288. progress = true;
  289. self.handle_end_of_track();
  290. }
  291. Ok(Async::NotReady) => (),
  292. Err(oneshot::Canceled) => self.end_of_track = Box::new(future::empty()),
  293. }
  294. }
  295. let poll_sender = self.sender.poll_complete().unwrap();
  296. // Only shutdown once we've flushed out all our messages
  297. if self.shutdown && poll_sender.is_ready() {
  298. return Ok(Async::Ready(()));
  299. }
  300. if !progress {
  301. return Ok(Async::NotReady);
  302. }
  303. }
  304. }
  305. }
  306. impl SpircTask {
  307. fn handle_command(&mut self, cmd: SpircCommand) {
  308. let active = self.device.get_is_active();
  309. match cmd {
  310. SpircCommand::Play => {
  311. if active {
  312. self.handle_play();
  313. self.notify(None);
  314. } else {
  315. CommandSender::new(self, MessageType::kMessageTypePlay).send();
  316. }
  317. }
  318. SpircCommand::PlayPause => {
  319. if active {
  320. self.handle_play_pause();
  321. self.notify(None);
  322. } else {
  323. CommandSender::new(self, MessageType::kMessageTypePlayPause).send();
  324. }
  325. }
  326. SpircCommand::Pause => {
  327. if active {
  328. self.handle_pause();
  329. self.notify(None);
  330. } else {
  331. CommandSender::new(self, MessageType::kMessageTypePause).send();
  332. }
  333. }
  334. SpircCommand::Prev => {
  335. if active {
  336. self.handle_prev();
  337. self.notify(None);
  338. } else {
  339. CommandSender::new(self, MessageType::kMessageTypePrev).send();
  340. }
  341. }
  342. SpircCommand::Next => {
  343. if active {
  344. self.handle_next();
  345. self.notify(None);
  346. } else {
  347. CommandSender::new(self, MessageType::kMessageTypeNext).send();
  348. }
  349. }
  350. SpircCommand::VolumeUp => {
  351. if active {
  352. self.handle_volume_up();
  353. self.notify(None);
  354. } else {
  355. CommandSender::new(self, MessageType::kMessageTypeVolumeUp).send();
  356. }
  357. }
  358. SpircCommand::VolumeDown => {
  359. if active {
  360. self.handle_volume_down();
  361. self.notify(None);
  362. } else {
  363. CommandSender::new(self, MessageType::kMessageTypeVolumeDown).send();
  364. }
  365. }
  366. SpircCommand::Shutdown => {
  367. CommandSender::new(self, MessageType::kMessageTypeGoodbye).send();
  368. self.shutdown = true;
  369. self.commands.close();
  370. }
  371. }
  372. }
  373. fn handle_frame(&mut self, frame: Frame) {
  374. debug!(
  375. "{:?} {:?} {} {} {}",
  376. frame.get_typ(),
  377. frame.get_device_state().get_name(),
  378. frame.get_ident(),
  379. frame.get_seq_nr(),
  380. frame.get_state_update_id()
  381. );
  382. if frame.get_ident() == self.ident
  383. || (frame.get_recipient().len() > 0 && !frame.get_recipient().contains(&self.ident))
  384. {
  385. return;
  386. }
  387. match frame.get_typ() {
  388. MessageType::kMessageTypeHello => {
  389. self.notify(Some(frame.get_ident()));
  390. }
  391. MessageType::kMessageTypeLoad => {
  392. if !self.device.get_is_active() {
  393. self.device.set_is_active(true);
  394. self.device.set_became_active_at(now_ms());
  395. }
  396. self.update_tracks(&frame);
  397. if self.state.get_track().len() > 0 {
  398. self.state.set_position_ms(frame.get_state().get_position_ms());
  399. self.state.set_position_measured_at(now_ms() as u64);
  400. let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
  401. self.load_track(play);
  402. } else {
  403. self.state.set_status(PlayStatus::kPlayStatusStop);
  404. }
  405. self.notify(None);
  406. }
  407. MessageType::kMessageTypePlay => {
  408. self.handle_play();
  409. self.notify(None);
  410. }
  411. MessageType::kMessageTypePlayPause => {
  412. self.handle_play_pause();
  413. self.notify(None);
  414. }
  415. MessageType::kMessageTypePause => {
  416. self.handle_pause();
  417. self.notify(None);
  418. }
  419. MessageType::kMessageTypeNext => {
  420. self.handle_next();
  421. self.notify(None);
  422. }
  423. MessageType::kMessageTypePrev => {
  424. self.handle_prev();
  425. self.notify(None);
  426. }
  427. MessageType::kMessageTypeVolumeUp => {
  428. self.handle_volume_up();
  429. self.notify(None);
  430. }
  431. MessageType::kMessageTypeVolumeDown => {
  432. self.handle_volume_down();
  433. self.notify(None);
  434. }
  435. MessageType::kMessageTypeRepeat => {
  436. self.state.set_repeat(frame.get_state().get_repeat());
  437. self.notify(None);
  438. }
  439. MessageType::kMessageTypeShuffle => {
  440. self.state.set_shuffle(frame.get_state().get_shuffle());
  441. if self.state.get_shuffle() {
  442. let current_index = self.state.get_playing_track_index();
  443. {
  444. let tracks = self.state.mut_track();
  445. tracks.swap(0, current_index as usize);
  446. if let Some((_, rest)) = tracks.split_first_mut() {
  447. rand::thread_rng().shuffle(rest);
  448. }
  449. }
  450. self.state.set_playing_track_index(0);
  451. } else {
  452. let context = self.state.get_context_uri();
  453. debug!("{:?}", context);
  454. }
  455. self.notify(None);
  456. }
  457. MessageType::kMessageTypeSeek => {
  458. let position = frame.get_position();
  459. self.state.set_position_ms(position);
  460. self.state.set_position_measured_at(now_ms() as u64);
  461. self.player.seek(position);
  462. self.notify(None);
  463. }
  464. MessageType::kMessageTypeReplace => {
  465. self.update_tracks(&frame);
  466. self.notify(None);
  467. }
  468. MessageType::kMessageTypeVolume => {
  469. self.set_volume(frame.get_volume() as u16);
  470. self.notify(None);
  471. }
  472. MessageType::kMessageTypeNotify => {
  473. if self.device.get_is_active() && frame.get_device_state().get_is_active() {
  474. self.device.set_is_active(false);
  475. self.state.set_status(PlayStatus::kPlayStatusStop);
  476. self.player.stop();
  477. self.mixer.stop();
  478. }
  479. }
  480. _ => (),
  481. }
  482. }
  483. fn handle_play(&mut self) {
  484. if self.state.get_status() == PlayStatus::kPlayStatusPause {
  485. self.mixer.start();
  486. self.player.play();
  487. self.state.set_status(PlayStatus::kPlayStatusPlay);
  488. self.state.set_position_measured_at(now_ms() as u64);
  489. }
  490. }
  491. fn handle_play_pause(&mut self) {
  492. match self.state.get_status() {
  493. PlayStatus::kPlayStatusPlay => self.handle_pause(),
  494. PlayStatus::kPlayStatusPause => self.handle_play(),
  495. _ => (),
  496. }
  497. }
  498. fn handle_pause(&mut self) {
  499. if self.state.get_status() == PlayStatus::kPlayStatusPlay {
  500. self.player.pause();
  501. self.mixer.stop();
  502. self.state.set_status(PlayStatus::kPlayStatusPause);
  503. let now = now_ms() as u64;
  504. let position = self.state.get_position_ms();
  505. let diff = now - self.state.get_position_measured_at();
  506. self.state.set_position_ms(position + diff as u32);
  507. self.state.set_position_measured_at(now);
  508. }
  509. }
  510. fn consume_queued_track(&mut self) -> usize {
  511. // Removes current track if it is queued
  512. // Returns the index of the next track
  513. let current_index = self.state.get_playing_track_index() as usize;
  514. if self.state.get_track()[current_index].get_queued() {
  515. self.state.mut_track().remove(current_index);
  516. return current_index;
  517. }
  518. current_index + 1
  519. }
  520. fn handle_next(&mut self) {
  521. let mut new_index = self.consume_queued_track() as u32;
  522. let mut continue_playing = true;
  523. if new_index >= self.state.get_track().len() as u32 {
  524. new_index = 0; // Loop around back to start
  525. continue_playing = self.state.get_repeat();
  526. }
  527. self.state.set_playing_track_index(new_index);
  528. self.state.set_position_ms(0);
  529. self.state.set_position_measured_at(now_ms() as u64);
  530. self.load_track(continue_playing);
  531. }
  532. fn handle_prev(&mut self) {
  533. // Previous behaves differently based on the position
  534. // Under 3s it goes to the previous song (starts playing)
  535. // Over 3s it seeks to zero (retains previous play status)
  536. if self.position() < 3000 {
  537. // Queued tracks always follow the currently playing track.
  538. // They should not be considered when calculating the previous
  539. // track so extract them beforehand and reinsert them after it.
  540. let mut queue_tracks = Vec::new();
  541. {
  542. let queue_index = self.consume_queued_track();
  543. let tracks = self.state.mut_track();
  544. while queue_index < tracks.len() && tracks[queue_index].get_queued() {
  545. queue_tracks.push(tracks.remove(queue_index));
  546. }
  547. }
  548. let current_index = self.state.get_playing_track_index();
  549. let new_index = if current_index > 0 {
  550. current_index - 1
  551. } else if self.state.get_repeat() {
  552. self.state.get_track().len() as u32 - 1
  553. } else {
  554. 0
  555. };
  556. // Reinsert queued tracks after the new playing track.
  557. let mut pos = (new_index + 1) as usize;
  558. for track in queue_tracks.into_iter() {
  559. self.state.mut_track().insert(pos, track);
  560. pos += 1;
  561. }
  562. self.state.set_playing_track_index(new_index);
  563. self.state.set_position_ms(0);
  564. self.state.set_position_measured_at(now_ms() as u64);
  565. self.load_track(true);
  566. } else {
  567. self.state.set_position_ms(0);
  568. self.state.set_position_measured_at(now_ms() as u64);
  569. self.player.seek(0);
  570. }
  571. }
  572. fn handle_volume_up(&mut self) {
  573. let mut volume: u32 = self.device.get_volume() as u32 + 4096;
  574. if volume > 0xFFFF {
  575. volume = 0xFFFF;
  576. }
  577. self.set_volume(volume as u16);
  578. }
  579. fn handle_volume_down(&mut self) {
  580. let mut volume: i32 = self.device.get_volume() as i32 - 4096;
  581. if volume < 0 {
  582. volume = 0;
  583. }
  584. self.set_volume(volume as u16);
  585. }
  586. fn handle_end_of_track(&mut self) {
  587. self.handle_next();
  588. self.notify(None);
  589. }
  590. fn position(&mut self) -> u32 {
  591. let diff = now_ms() as u64 - self.state.get_position_measured_at();
  592. self.state.get_position_ms() + diff as u32
  593. }
  594. fn update_tracks(&mut self, frame: &protocol::spirc::Frame) {
  595. let index = frame.get_state().get_playing_track_index();
  596. let tracks = frame.get_state().get_track();
  597. let context_uri = frame.get_state().get_context_uri().to_owned();
  598. self.state.set_playing_track_index(index);
  599. self.state.set_track(tracks.into_iter().cloned().collect());
  600. self.state.set_context_uri(context_uri);
  601. self.state.set_repeat(frame.get_state().get_repeat());
  602. self.state.set_shuffle(frame.get_state().get_shuffle());
  603. }
  604. fn load_track(&mut self, play: bool) {
  605. let index = self.state.get_playing_track_index();
  606. let track = {
  607. let gid = self.state.get_track()[index as usize].get_gid();
  608. SpotifyId::from_raw(gid).unwrap()
  609. };
  610. let position = self.state.get_position_ms();
  611. let end_of_track = self.player.load(track, play, position);
  612. if play {
  613. self.state.set_status(PlayStatus::kPlayStatusPlay);
  614. } else {
  615. self.state.set_status(PlayStatus::kPlayStatusPause);
  616. }
  617. self.end_of_track = Box::new(end_of_track);
  618. }
  619. fn hello(&mut self) {
  620. CommandSender::new(self, MessageType::kMessageTypeHello).send();
  621. }
  622. fn notify(&mut self, recipient: Option<&str>) {
  623. let mut cs = CommandSender::new(self, MessageType::kMessageTypeNotify);
  624. if let Some(s) = recipient {
  625. cs = cs.recipient(&s);
  626. }
  627. cs.send();
  628. }
  629. fn set_volume(&mut self, volume: u16) {
  630. self.device.set_volume(volume as u32);
  631. self.mixer.set_volume(volume_to_mixer(volume, self.linear_volume));
  632. if let Some(cache) = self.session.cache() {
  633. cache.save_volume(Volume { volume })
  634. }
  635. }
  636. }
  637. impl Drop for SpircTask {
  638. fn drop(&mut self) {
  639. debug!("drop Spirc[{}]", self.session.session_id());
  640. }
  641. }
  642. struct CommandSender<'a> {
  643. spirc: &'a mut SpircTask,
  644. frame: protocol::spirc::Frame,
  645. }
  646. impl<'a> CommandSender<'a> {
  647. fn new(spirc: &'a mut SpircTask, cmd: MessageType) -> CommandSender {
  648. let mut frame = protocol::spirc::Frame::new();
  649. frame.set_version(1);
  650. frame.set_protocol_version(::std::convert::Into::into("2.0.0"));
  651. frame.set_ident(spirc.ident.clone());
  652. frame.set_seq_nr(spirc.sequence.get());
  653. frame.set_typ(cmd);
  654. frame.set_device_state(spirc.device.clone());
  655. frame.set_state_update_id(now_ms());
  656. CommandSender {
  657. spirc: spirc,
  658. frame: frame,
  659. }
  660. }
  661. fn recipient(mut self, recipient: &'a str) -> CommandSender {
  662. self.frame.mut_recipient().push(recipient.to_owned());
  663. self
  664. }
  665. #[allow(dead_code)]
  666. fn state(mut self, state: protocol::spirc::State) -> CommandSender<'a> {
  667. self.frame.set_state(state);
  668. self
  669. }
  670. fn send(mut self) {
  671. if !self.frame.has_state() && self.spirc.device.get_is_active() {
  672. self.frame.set_state(self.spirc.state.clone());
  673. }
  674. let send = self.spirc.sender.start_send(self.frame).unwrap();
  675. assert!(send.is_ready());
  676. }
  677. }