spirc.rs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. use futures::future;
  2. use futures::sink::BoxSink;
  3. use futures::stream::BoxStream;
  4. use futures::sync::{oneshot, mpsc};
  5. use futures::{Future, Stream, Sink, Async, Poll, BoxFuture};
  6. use protobuf::{self, Message};
  7. use mercury::MercuryError;
  8. use player::Player;
  9. use session::Session;
  10. use util::{now_ms, SpotifyId, SeqGenerator};
  11. use version;
  12. use protocol;
  13. use protocol::spirc::{PlayStatus, State, MessageType, Frame, DeviceState};
  14. pub struct SpircTask {
  15. player: Player,
  16. sequence: SeqGenerator<u32>,
  17. ident: String,
  18. device: DeviceState,
  19. state: State,
  20. subscription: BoxStream<Frame, MercuryError>,
  21. sender: BoxSink<Frame, MercuryError>,
  22. commands: mpsc::UnboundedReceiver<SpircCommand>,
  23. end_of_track: BoxFuture<(), oneshot::Canceled>,
  24. shutdown: bool,
  25. }
  26. pub enum SpircCommand {
  27. Shutdown
  28. }
  29. pub struct Spirc {
  30. commands: mpsc::UnboundedSender<SpircCommand>,
  31. }
  32. fn initial_state() -> State {
  33. protobuf_init!(protocol::spirc::State::new(), {
  34. repeat: false,
  35. shuffle: false,
  36. status: PlayStatus::kPlayStatusStop,
  37. position_ms: 0,
  38. position_measured_at: 0,
  39. })
  40. }
  41. fn initial_device_state(name: String, volume: u16) -> DeviceState {
  42. protobuf_init!(DeviceState::new(), {
  43. sw_version: version::version_string(),
  44. is_active: false,
  45. can_play: true,
  46. volume: volume as u32,
  47. name: name,
  48. capabilities => [
  49. @{
  50. typ: protocol::spirc::CapabilityType::kCanBePlayer,
  51. intValue => [1]
  52. },
  53. @{
  54. typ: protocol::spirc::CapabilityType::kDeviceType,
  55. intValue => [5]
  56. },
  57. @{
  58. typ: protocol::spirc::CapabilityType::kGaiaEqConnectId,
  59. intValue => [1]
  60. },
  61. @{
  62. typ: protocol::spirc::CapabilityType::kSupportsLogout,
  63. intValue => [0]
  64. },
  65. @{
  66. typ: protocol::spirc::CapabilityType::kIsObservable,
  67. intValue => [1]
  68. },
  69. @{
  70. typ: protocol::spirc::CapabilityType::kVolumeSteps,
  71. intValue => [10]
  72. },
  73. @{
  74. typ: protocol::spirc::CapabilityType::kSupportedContexts,
  75. stringValue => [
  76. "album",
  77. "playlist",
  78. "search",
  79. "inbox",
  80. "toplist",
  81. "starred",
  82. "publishedstarred",
  83. "track",
  84. ]
  85. },
  86. @{
  87. typ: protocol::spirc::CapabilityType::kSupportedTypes,
  88. stringValue => [
  89. "audio/local",
  90. "audio/track",
  91. "local",
  92. "track",
  93. ]
  94. }
  95. ],
  96. })
  97. }
  98. impl Spirc {
  99. pub fn new(session: Session, player: Player) -> (Spirc, SpircTask) {
  100. let ident = session.device_id().to_owned();
  101. let name = session.config().name.clone();
  102. let uri = format!("hm://remote/user/{}", session.username());
  103. let subscription = session.mercury().subscribe(&uri as &str);
  104. let subscription = subscription.map(|stream| stream.map_err(|_| MercuryError)).flatten_stream();
  105. let subscription = subscription.map(|response| -> Frame {
  106. let data = response.payload.first().unwrap();
  107. protobuf::parse_from_bytes(data).unwrap()
  108. }).boxed();
  109. let sender = Box::new(session.mercury().sender(uri).with(|frame: Frame| {
  110. Ok(frame.write_to_bytes().unwrap())
  111. }));
  112. let (cmd_tx, cmd_rx) = mpsc::unbounded();
  113. let volume = 0xFFFF;
  114. let device = initial_device_state(name, volume);
  115. player.volume(volume);
  116. let mut task = SpircTask {
  117. player: player,
  118. sequence: SeqGenerator::new(1),
  119. ident: ident,
  120. device: device,
  121. state: initial_state(),
  122. subscription: subscription,
  123. sender: sender,
  124. commands: cmd_rx,
  125. end_of_track: future::empty().boxed(),
  126. shutdown: false,
  127. };
  128. let spirc = Spirc {
  129. commands: cmd_tx,
  130. };
  131. task.hello();
  132. (spirc, task)
  133. }
  134. pub fn shutdown(&self) {
  135. let _ = mpsc::UnboundedSender::send(&self.commands, SpircCommand::Shutdown);
  136. }
  137. }
  138. impl Future for SpircTask {
  139. type Item = ();
  140. type Error = ();
  141. fn poll(&mut self) -> Poll<(), ()> {
  142. loop {
  143. let mut progress = false;
  144. if !self.shutdown {
  145. match self.subscription.poll().unwrap() {
  146. Async::Ready(Some(frame)) => {
  147. progress = true;
  148. self.handle_frame(frame);
  149. }
  150. Async::Ready(None) => panic!("subscription terminated"),
  151. Async::NotReady => (),
  152. }
  153. match self.commands.poll().unwrap() {
  154. Async::Ready(Some(command)) => {
  155. progress = true;
  156. self.handle_command(command);
  157. }
  158. Async::Ready(None) => (),
  159. Async::NotReady => (),
  160. }
  161. match self.end_of_track.poll() {
  162. Ok(Async::Ready(())) => {
  163. progress = true;
  164. self.handle_end_of_track();
  165. }
  166. Ok(Async::NotReady) => (),
  167. Err(oneshot::Canceled) => {
  168. self.end_of_track = future::empty().boxed()
  169. }
  170. }
  171. }
  172. let poll_sender = self.sender.poll_complete().unwrap();
  173. // Only shutdown once we've flushed out all our messages
  174. if self.shutdown && poll_sender.is_ready() {
  175. return Ok(Async::Ready(()));
  176. }
  177. if !progress {
  178. return Ok(Async::NotReady);
  179. }
  180. }
  181. }
  182. }
  183. impl SpircTask {
  184. fn handle_command(&mut self, cmd: SpircCommand) {
  185. match cmd {
  186. SpircCommand::Shutdown => {
  187. CommandSender::new(self, MessageType::kMessageTypeGoodbye).send();
  188. self.shutdown = true;
  189. self.commands.close();
  190. }
  191. }
  192. }
  193. fn handle_frame(&mut self, frame: Frame) {
  194. debug!("{:?} {:?} {} {} {}",
  195. frame.get_typ(),
  196. frame.get_device_state().get_name(),
  197. frame.get_ident(),
  198. frame.get_seq_nr(),
  199. frame.get_state_update_id());
  200. if frame.get_ident() == self.ident ||
  201. (frame.get_recipient().len() > 0 && !frame.get_recipient().contains(&self.ident)) {
  202. return;
  203. }
  204. match frame.get_typ() {
  205. MessageType::kMessageTypeHello => {
  206. self.notify(Some(frame.get_ident()));
  207. }
  208. MessageType::kMessageTypeLoad => {
  209. if !self.device.get_is_active() {
  210. self.device.set_is_active(true);
  211. self.device.set_became_active_at(now_ms());
  212. }
  213. self.update_tracks(&frame);
  214. if self.state.get_track().len() > 0 {
  215. self.state.set_position_ms(frame.get_state().get_position_ms());
  216. self.state.set_position_measured_at(now_ms() as u64);
  217. let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
  218. self.load_track(play);
  219. } else {
  220. self.state.set_status(PlayStatus::kPlayStatusStop);
  221. }
  222. self.notify(None);
  223. }
  224. MessageType::kMessageTypePlay => {
  225. if self.state.get_status() == PlayStatus::kPlayStatusPause {
  226. self.player.play();
  227. self.state.set_status(PlayStatus::kPlayStatusPlay);
  228. self.state.set_position_measured_at(now_ms() as u64);
  229. }
  230. self.notify(None);
  231. }
  232. MessageType::kMessageTypePause => {
  233. if self.state.get_status() == PlayStatus::kPlayStatusPlay {
  234. self.player.pause();
  235. self.state.set_status(PlayStatus::kPlayStatusPause);
  236. let now = now_ms() as u64;
  237. let position = self.state.get_position_ms();
  238. let diff = now - self.state.get_position_measured_at();
  239. self.state.set_position_ms(position + diff as u32);
  240. self.state.set_position_measured_at(now);
  241. }
  242. self.notify(None);
  243. }
  244. MessageType::kMessageTypeNext => {
  245. let current_index = self.state.get_playing_track_index();
  246. let new_index = (current_index + 1) % (self.state.get_track().len() as u32);
  247. self.state.set_playing_track_index(new_index);
  248. self.state.set_position_ms(0);
  249. self.state.set_position_measured_at(now_ms() as u64);
  250. self.load_track(true);
  251. self.notify(None);
  252. }
  253. MessageType::kMessageTypePrev => {
  254. // Previous behaves differently based on the position
  255. // Under 3s it goes to the previous song
  256. // Over 3s it seeks to zero
  257. if self.position() < 3000 {
  258. let current_index = self.state.get_playing_track_index();
  259. let new_index = if current_index == 0 {
  260. self.state.get_track().len() as u32 - 1
  261. } else {
  262. current_index - 1
  263. };
  264. self.state.set_playing_track_index(new_index);
  265. self.state.set_position_ms(0);
  266. self.state.set_position_measured_at(now_ms() as u64);
  267. self.load_track(true);
  268. } else {
  269. self.state.set_position_ms(0);
  270. self.state.set_position_measured_at(now_ms() as u64);
  271. self.player.seek(0);
  272. }
  273. self.notify(None);
  274. }
  275. MessageType::kMessageTypeSeek => {
  276. let position = frame.get_position();
  277. self.state.set_position_ms(position);
  278. self.state.set_position_measured_at(now_ms() as u64);
  279. self.player.seek(position);
  280. self.notify(None);
  281. }
  282. MessageType::kMessageTypeReplace => {
  283. self.update_tracks(&frame);
  284. self.notify(None);
  285. }
  286. MessageType::kMessageTypeVolume => {
  287. let volume = frame.get_volume();
  288. self.device.set_volume(volume);
  289. self.player.volume(volume as u16);
  290. self.notify(None);
  291. }
  292. MessageType::kMessageTypeNotify => {
  293. if self.device.get_is_active() &&
  294. frame.get_device_state().get_is_active()
  295. {
  296. self.device.set_is_active(false);
  297. self.state.set_status(PlayStatus::kPlayStatusStop);
  298. self.player.stop();
  299. }
  300. }
  301. _ => (),
  302. }
  303. }
  304. fn handle_end_of_track(&mut self) {
  305. let current_index = self.state.get_playing_track_index();
  306. let new_index = (current_index + 1) % (self.state.get_track().len() as u32);
  307. self.state.set_playing_track_index(new_index);
  308. self.state.set_position_ms(0);
  309. self.state.set_position_measured_at(now_ms() as u64);
  310. self.load_track(true);
  311. self.notify(None);
  312. }
  313. fn position(&mut self) -> u32 {
  314. let diff = now_ms() as u64 - self.state.get_position_measured_at();
  315. self.state.get_position_ms() + diff as u32
  316. }
  317. fn update_tracks(&mut self, frame: &protocol::spirc::Frame) {
  318. let index = frame.get_state().get_playing_track_index();
  319. let tracks = frame.get_state().get_track();
  320. self.state.set_playing_track_index(index);
  321. self.state.set_track(tracks.into_iter().cloned().collect());
  322. }
  323. fn load_track(&mut self, play: bool) {
  324. let index = self.state.get_playing_track_index();
  325. let track = {
  326. let gid = self.state.get_track()[index as usize].get_gid();
  327. SpotifyId::from_raw(gid)
  328. };
  329. let position = self.state.get_position_ms();
  330. let end_of_track = self.player.load(track, play, position);
  331. if play {
  332. self.state.set_status(PlayStatus::kPlayStatusPlay);
  333. } else {
  334. self.state.set_status(PlayStatus::kPlayStatusPause);
  335. }
  336. self.end_of_track = end_of_track.boxed();
  337. }
  338. fn hello(&mut self) {
  339. CommandSender::new(self, MessageType::kMessageTypeHello).send();
  340. }
  341. fn notify(&mut self, recipient: Option<&str>) {
  342. let mut cs = CommandSender::new(self, MessageType::kMessageTypeNotify);
  343. if let Some(s) = recipient {
  344. cs = cs.recipient(&s);
  345. }
  346. cs.send();
  347. }
  348. fn spirc_state(&self) -> protocol::spirc::State {
  349. self.state.clone()
  350. }
  351. }
  352. struct CommandSender<'a> {
  353. spirc: &'a mut SpircTask,
  354. cmd: MessageType,
  355. recipient: Option<String>,
  356. }
  357. impl<'a> CommandSender<'a> {
  358. fn new(spirc: &'a mut SpircTask, cmd: MessageType) -> CommandSender {
  359. CommandSender {
  360. spirc: spirc,
  361. cmd: cmd,
  362. recipient: None,
  363. }
  364. }
  365. fn recipient(mut self, r: &str) -> CommandSender<'a> {
  366. self.recipient = Some(r.to_owned());
  367. self
  368. }
  369. fn send(self) {
  370. let mut frame = protobuf_init!(Frame::new(), {
  371. version: 1,
  372. ident: self.spirc.ident.clone(),
  373. protocol_version: "2.0.0",
  374. seq_nr: self.spirc.sequence.get(),
  375. typ: self.cmd,
  376. device_state: self.spirc.device.clone(),
  377. state_update_id: now_ms(),
  378. });
  379. if let Some(recipient) = self.recipient {
  380. frame.mut_recipient().push(recipient.to_owned());
  381. }
  382. if self.spirc.device.get_is_active() {
  383. frame.set_state(self.spirc.spirc_state());
  384. }
  385. let ready = self.spirc.sender.start_send(frame).unwrap().is_ready();
  386. assert!(ready);
  387. }
  388. }