spirc.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469
  1. use eventual::Async;
  2. use protobuf::{self, Message, RepeatedField};
  3. use std::borrow::Cow;
  4. use std::sync::{Mutex, Arc};
  5. use std::collections::HashMap;
  6. use mercury::{MercuryRequest, MercuryMethod};
  7. use player::{Player, PlayerState};
  8. use mixer::Mixer;
  9. use session::Session;
  10. use util;
  11. use util::SpotifyId;
  12. use version;
  13. use protocol;
  14. pub use protocol::spirc::{PlayStatus, MessageType};
  15. #[derive(Clone)]
  16. pub struct SpircManager(Arc<Mutex<SpircInternal>>);
  17. struct SpircInternal {
  18. player: Player,
  19. session: Session,
  20. mixer: Box<Mixer + Send>,
  21. seq_nr: u32,
  22. name: String,
  23. ident: String,
  24. device_type: u8,
  25. can_play: bool,
  26. repeat: bool,
  27. shuffle: bool,
  28. is_active: bool,
  29. became_active_at: i64,
  30. last_command_ident: String,
  31. last_command_msgid: u32,
  32. tracks: Vec<SpotifyId>,
  33. index: u32,
  34. devices: HashMap<String, String>,
  35. }
  36. #[derive(Clone)]
  37. pub struct State {
  38. pub status: PlayStatus,
  39. pub position_ms: u32,
  40. pub position_measured_at: i64,
  41. pub update_time: i64,
  42. pub volume: u16,
  43. pub track: Option<SpotifyId>,
  44. pub end_of_track: bool,
  45. }
  46. impl SpircManager {
  47. pub fn new(session: Session, player: Player, mixer: Box<Mixer + Send>) -> SpircManager {
  48. let ident = session.device_id().to_owned();
  49. let name = session.config().device_name.clone();
  50. SpircManager(Arc::new(Mutex::new(SpircInternal {
  51. player: player,
  52. session: session,
  53. mixer: mixer,
  54. seq_nr: 0,
  55. name: name,
  56. ident: ident,
  57. device_type: 5,
  58. can_play: true,
  59. repeat: false,
  60. shuffle: false,
  61. is_active: false,
  62. became_active_at: 0,
  63. last_command_ident: String::new(),
  64. last_command_msgid: 0,
  65. tracks: Vec::new(),
  66. index: 0,
  67. devices: HashMap::new(),
  68. })))
  69. }
  70. pub fn run(&self) {
  71. let rx = {
  72. let mut internal = self.0.lock().unwrap();
  73. let rx = internal.session.mercury_sub(internal.uri());
  74. internal.notify(true, None);
  75. // Use a weak pointer to avoid creating an Rc cycle between the player and the
  76. // SpircManager
  77. let _self = Arc::downgrade(&self.0);
  78. internal.player.add_observer(Box::new(move |state| {
  79. if let Some(_self) = _self.upgrade() {
  80. let mut internal = _self.lock().unwrap();
  81. internal.on_update(state);
  82. }
  83. }));
  84. rx
  85. };
  86. for pkt in rx {
  87. let data = pkt.payload.first().unwrap();
  88. let frame = protobuf::parse_from_bytes::<protocol::spirc::Frame>(data).unwrap();
  89. debug!("{:?} {:?} {} {} {}",
  90. frame.get_typ(),
  91. frame.get_device_state().get_name(),
  92. frame.get_ident(),
  93. frame.get_seq_nr(),
  94. frame.get_state_update_id());
  95. self.0.lock().unwrap().handle(frame);
  96. }
  97. }
  98. pub fn devices(&self) -> HashMap<String, String> {
  99. self.0.lock().unwrap().devices.clone()
  100. }
  101. pub fn send_play(&self, recipient: &str) {
  102. let mut internal = self.0.lock().unwrap();
  103. CommandSender::new(&mut *internal, MessageType::kMessageTypePlay)
  104. .recipient(recipient)
  105. .send();
  106. }
  107. pub fn send_pause(&self, recipient: &str) {
  108. let mut internal = self.0.lock().unwrap();
  109. CommandSender::new(&mut *internal, MessageType::kMessageTypePause)
  110. .recipient(recipient)
  111. .send();
  112. }
  113. pub fn send_prev(&self, recipient: &str) {
  114. let mut internal = self.0.lock().unwrap();
  115. CommandSender::new(&mut *internal, MessageType::kMessageTypePrev)
  116. .recipient(recipient)
  117. .send();
  118. }
  119. pub fn send_next(&self, recipient: &str) {
  120. let mut internal = self.0.lock().unwrap();
  121. CommandSender::new(&mut *internal, MessageType::kMessageTypeNext)
  122. .recipient(recipient)
  123. .send();
  124. }
  125. pub fn send_replace_tracks<I: Iterator<Item = SpotifyId>>(&mut self,
  126. recipient: &str,
  127. track_ids: I) {
  128. let state = track_ids_to_state(track_ids);
  129. let mut internal = self.0.lock().unwrap();
  130. CommandSender::new(&mut *internal, MessageType::kMessageTypeReplace)
  131. .recipient(recipient)
  132. .state(state)
  133. .send();
  134. }
  135. pub fn send_load_tracks<I: Iterator<Item = SpotifyId>>(&mut self,
  136. recipient: &str,
  137. track_ids: I) {
  138. let state = track_ids_to_state(track_ids);
  139. let mut internal = self.0.lock().unwrap();
  140. CommandSender::new(&mut *internal, MessageType::kMessageTypeLoad)
  141. .recipient(recipient)
  142. .state(state)
  143. .send();
  144. }
  145. pub fn send_goodbye(&self) {
  146. let mut internal = self.0.lock().unwrap();
  147. CommandSender::new(&mut *internal, MessageType::kMessageTypeGoodbye)
  148. .send();
  149. }
  150. pub fn get_queue(&self) -> Vec<SpotifyId> {
  151. self.0.lock().unwrap().tracks.clone()
  152. }
  153. }
  154. impl SpircInternal {
  155. fn on_update(&mut self, player_state: &PlayerState) {
  156. let end_of_track = player_state.end_of_track();
  157. if end_of_track {
  158. self.index = (self.index + 1) % self.tracks.len() as u32;
  159. let track = self.tracks[self.index as usize];
  160. self.player.load(track, true, 0);
  161. } else {
  162. self.notify(false, None);
  163. }
  164. }
  165. fn handle(&mut self, frame: protocol::spirc::Frame) {
  166. if frame.get_ident() == self.ident ||
  167. (frame.get_recipient().len() > 0 && !frame.get_recipient().contains(&self.ident)) {
  168. return;
  169. }
  170. if frame.get_recipient().len() > 0 {
  171. self.last_command_ident = frame.get_ident().to_owned();
  172. self.last_command_msgid = frame.get_seq_nr();
  173. }
  174. if frame.has_ident() && !frame.has_goodbye() && frame.has_device_state() {
  175. self.devices.insert(frame.get_ident().into(),
  176. frame.get_device_state().get_name().into());
  177. }
  178. match frame.get_typ() {
  179. MessageType::kMessageTypeHello => {
  180. self.notify(false, Some(frame.get_ident()));
  181. }
  182. MessageType::kMessageTypeLoad => {
  183. if !self.is_active {
  184. self.is_active = true;
  185. self.became_active_at = util::now_ms();
  186. }
  187. self.reload_tracks(&frame);
  188. if self.tracks.len() > 0 {
  189. let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
  190. let track = self.tracks[self.index as usize];
  191. let position = frame.get_state().get_position_ms();
  192. self.player.load(track, play, position);
  193. } else {
  194. self.notify(false, Some(frame.get_ident()));
  195. }
  196. }
  197. MessageType::kMessageTypePlay => {
  198. self.player.play();
  199. }
  200. MessageType::kMessageTypePause => {
  201. self.player.pause();
  202. }
  203. MessageType::kMessageTypeNext => {
  204. self.index = (self.index + 1) % self.tracks.len() as u32;
  205. let track = self.tracks[self.index as usize];
  206. self.player.load(track, true, 0);
  207. }
  208. MessageType::kMessageTypePrev => {
  209. self.index = (self.index - 1) % self.tracks.len() as u32;
  210. let track = self.tracks[self.index as usize];
  211. self.player.load(track, true, 0);
  212. }
  213. MessageType::kMessageTypeSeek => {
  214. self.player.seek(frame.get_position());
  215. }
  216. MessageType::kMessageTypeReplace => {
  217. self.reload_tracks(&frame);
  218. }
  219. MessageType::kMessageTypeNotify => {
  220. if self.is_active && frame.get_device_state().get_is_active() {
  221. self.is_active = false;
  222. self.player.stop();
  223. }
  224. }
  225. MessageType::kMessageTypeVolume => {
  226. self.mixer.set_volume(frame.get_volume() as u16);
  227. }
  228. MessageType::kMessageTypeGoodbye => {
  229. if frame.has_ident() {
  230. self.devices.remove(frame.get_ident());
  231. }
  232. }
  233. _ => (),
  234. }
  235. }
  236. fn reload_tracks(&mut self, ref frame: &protocol::spirc::Frame) {
  237. self.index = frame.get_state().get_playing_track_index();
  238. self.tracks = frame.get_state()
  239. .get_track()
  240. .iter()
  241. .filter(|track| track.has_gid())
  242. .map(|track| SpotifyId::from_raw(track.get_gid()))
  243. .collect();
  244. }
  245. fn notify(&mut self, hello: bool, recipient: Option<&str>) {
  246. let mut cs = CommandSender::new(self,
  247. if hello {
  248. MessageType::kMessageTypeHello
  249. } else {
  250. MessageType::kMessageTypeNotify
  251. });
  252. if let Some(s) = recipient {
  253. cs = cs.recipient(&s);
  254. }
  255. cs.send();
  256. }
  257. fn spirc_state(&self, state: &State) -> protocol::spirc::State {
  258. protobuf_init!(protocol::spirc::State::new(), {
  259. status: state.status,
  260. position_ms: state.position_ms,
  261. position_measured_at: state.position_measured_at as u64,
  262. playing_track_index: self.index,
  263. track: self.tracks.iter().map(|track| {
  264. protobuf_init!(protocol::spirc::TrackRef::new(), {
  265. gid: track.to_raw().to_vec()
  266. })
  267. }).collect(),
  268. shuffle: self.shuffle,
  269. repeat: self.repeat,
  270. playing_from_fallback: true,
  271. last_command_ident: self.last_command_ident.clone(),
  272. last_command_msgid: self.last_command_msgid
  273. })
  274. }
  275. fn device_state(&self, state: &State) -> protocol::spirc::DeviceState {
  276. protobuf_init!(protocol::spirc::DeviceState::new(), {
  277. sw_version: version::version_string(),
  278. is_active: self.is_active,
  279. can_play: self.can_play,
  280. volume: state.volume as u32,
  281. name: self.name.clone(),
  282. error_code: 0,
  283. became_active_at: if self.is_active { self.became_active_at as i64 } else { 0 },
  284. capabilities => [
  285. @{
  286. typ: protocol::spirc::CapabilityType::kCanBePlayer,
  287. intValue => [0]
  288. },
  289. @{
  290. typ: protocol::spirc::CapabilityType::kDeviceType,
  291. intValue => [ self.device_type as i64 ]
  292. },
  293. @{
  294. typ: protocol::spirc::CapabilityType::kGaiaEqConnectId,
  295. intValue => [1]
  296. },
  297. @{
  298. typ: protocol::spirc::CapabilityType::kSupportsLogout,
  299. intValue => [0]
  300. },
  301. @{
  302. typ: protocol::spirc::CapabilityType::kIsObservable,
  303. intValue => [1]
  304. },
  305. @{
  306. typ: protocol::spirc::CapabilityType::kVolumeSteps,
  307. intValue => [64]
  308. },
  309. @{
  310. typ: protocol::spirc::CapabilityType::kSupportedContexts,
  311. stringValue => [
  312. "album",
  313. "playlist",
  314. "search",
  315. "inbox",
  316. "toplist",
  317. "starred",
  318. "publishedstarred",
  319. "track",
  320. ]
  321. },
  322. @{
  323. typ: protocol::spirc::CapabilityType::kSupportedTypes,
  324. stringValue => [
  325. "audio/local",
  326. "audio/track",
  327. "local",
  328. "track",
  329. ]
  330. }
  331. ],
  332. })
  333. }
  334. fn uri(&self) -> String {
  335. format!("hm://remote/user/{}", self.session.username())
  336. }
  337. }
  338. struct CommandSender<'a> {
  339. spirc_internal: &'a mut SpircInternal,
  340. cmd: MessageType,
  341. recipient: Option<&'a str>,
  342. state: Option<protocol::spirc::State>,
  343. }
  344. impl<'a> CommandSender<'a> {
  345. fn new(spirc_internal: &'a mut SpircInternal, cmd: MessageType) -> CommandSender {
  346. CommandSender {
  347. spirc_internal: spirc_internal,
  348. cmd: cmd,
  349. recipient: None,
  350. state: None,
  351. }
  352. }
  353. fn recipient(mut self, r: &'a str) -> CommandSender {
  354. self.recipient = Some(r);
  355. self
  356. }
  357. fn state(mut self, s: protocol::spirc::State) -> CommandSender<'a> {
  358. self.state = Some(s);
  359. self
  360. }
  361. fn send(self) {
  362. //TODO: get data
  363. let state = Cow::Owned(State {
  364. status: PlayStatus::kPlayStatusStop,
  365. position_ms: 0,
  366. position_measured_at: 0,
  367. update_time: util::now_ms(),
  368. volume: 0,
  369. track: None,
  370. end_of_track: false,
  371. });
  372. let mut pkt = protobuf_init!(protocol::spirc::Frame::new(), {
  373. version: 1,
  374. ident: self.spirc_internal.ident.clone(),
  375. protocol_version: "2.0.0",
  376. seq_nr: { self.spirc_internal.seq_nr += 1; self.spirc_internal.seq_nr },
  377. typ: self.cmd,
  378. recipient: RepeatedField::from_vec(
  379. self.recipient.map(|r| vec![r.to_owned()] ).unwrap_or(vec![])
  380. ),
  381. device_state: self.spirc_internal.device_state(&state),
  382. state_update_id: state.update_time
  383. });
  384. if self.spirc_internal.is_active {
  385. pkt.set_state(self.spirc_internal.spirc_state(&state));
  386. }
  387. self.spirc_internal
  388. .session
  389. .mercury(MercuryRequest {
  390. method: MercuryMethod::SEND,
  391. uri: self.spirc_internal.uri(),
  392. content_type: None,
  393. payload: vec![pkt.write_to_bytes().unwrap()],
  394. })
  395. .fire();
  396. }
  397. }
  398. fn track_ids_to_state<I: Iterator<Item = SpotifyId>>(track_ids: I) -> protocol::spirc::State {
  399. let tracks: Vec<protocol::spirc::TrackRef> =
  400. track_ids.map(|i| {
  401. protobuf_init!(protocol::spirc::TrackRef::new(), { gid: i.to_raw().to_vec()})
  402. })
  403. .collect();
  404. protobuf_init!(protocol::spirc::State::new(), {
  405. track: RepeatedField::from_vec(tracks)
  406. })
  407. }