spirc.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. use eventual::Async;
  2. use protobuf::{self, Message};
  3. use std::sync::{mpsc, MutexGuard};
  4. use util;
  5. use session::Session;
  6. use util::SpotifyId;
  7. use util::version::version_string;
  8. use mercury::{MercuryRequest, MercuryMethod};
  9. use librespot_protocol as protocol;
  10. pub use librespot_protocol::spirc::PlayStatus;
  11. pub struct SpircManager<D: SpircDelegate> {
  12. delegate: D,
  13. session: Session,
  14. state_update_id: i64,
  15. seq_nr: u32,
  16. name: String,
  17. ident: String,
  18. device_type: u8,
  19. can_play: bool,
  20. repeat: bool,
  21. shuffle: bool,
  22. volume: u16,
  23. is_active: bool,
  24. became_active_at: i64,
  25. last_command_ident: String,
  26. last_command_msgid: u32,
  27. tracks: Vec<SpotifyId>,
  28. index: u32,
  29. }
  30. pub trait SpircDelegate {
  31. type State : SpircState;
  32. fn load(&self, track: SpotifyId, start_playing: bool, position_ms: u32);
  33. fn play(&self);
  34. fn pause(&self);
  35. fn seek(&self, position_ms: u32);
  36. fn stop(&self);
  37. fn state(&self) -> MutexGuard<Self::State>;
  38. fn updates(&self) -> mpsc::Receiver<i64>;
  39. }
  40. pub trait SpircState {
  41. fn status(&self) -> PlayStatus;
  42. fn position(&self) -> (u32, i64);
  43. fn update_time(&self) -> i64;
  44. fn end_of_track(&self) -> bool;
  45. }
  46. impl<D: SpircDelegate> SpircManager<D> {
  47. pub fn new(session: Session, delegate: D) -> SpircManager<D> {
  48. let ident = session.0.data.read().unwrap().device_id.clone();
  49. let name = session.0.config.device_name.clone();
  50. SpircManager {
  51. delegate: delegate,
  52. session: session,
  53. state_update_id: 0,
  54. seq_nr: 0,
  55. name: name,
  56. ident: ident,
  57. device_type: 5,
  58. can_play: true,
  59. repeat: false,
  60. shuffle: false,
  61. volume: 0x8000,
  62. is_active: false,
  63. became_active_at: 0,
  64. last_command_ident: String::new(),
  65. last_command_msgid: 0,
  66. tracks: Vec::new(),
  67. index: 0,
  68. }
  69. }
  70. pub fn run(&mut self) {
  71. let rx = self.session.mercury_sub(format!("hm://remote/user/{}/",
  72. self.session
  73. .0
  74. .data
  75. .read()
  76. .unwrap()
  77. .canonical_username
  78. .clone()));
  79. let updates = self.delegate.updates();
  80. self.notify(true, None);
  81. loop {
  82. select! {
  83. pkt = rx.recv() => {
  84. let frame = protobuf::parse_from_bytes::<protocol::spirc::Frame>(
  85. pkt.unwrap().payload.first().unwrap()).unwrap();
  86. println!("{:?} {} {} {} {}",
  87. frame.get_typ(),
  88. frame.get_device_state().get_name(),
  89. frame.get_ident(),
  90. frame.get_seq_nr(),
  91. frame.get_state_update_id());
  92. if frame.get_ident() != self.ident &&
  93. (frame.get_recipient().len() == 0 ||
  94. frame.get_recipient().contains(&self.ident)) {
  95. self.handle(frame);
  96. }
  97. },
  98. update_time = updates.recv() => {
  99. let end_of_track = self.delegate.state().end_of_track();
  100. if end_of_track {
  101. self.index = (self.index + 1) % self.tracks.len() as u32;
  102. let track = self.tracks[self.index as usize];
  103. self.delegate.load(track, true, 0);
  104. } else {
  105. self.state_update_id = update_time.unwrap();
  106. self.notify(false, None);
  107. }
  108. }
  109. }
  110. }
  111. }
  112. fn handle(&mut self, frame: protocol::spirc::Frame) {
  113. if frame.get_recipient().len() > 0 {
  114. self.last_command_ident = frame.get_ident().to_owned();
  115. self.last_command_msgid = frame.get_seq_nr();
  116. }
  117. match frame.get_typ() {
  118. protocol::spirc::MessageType::kMessageTypeHello => {
  119. self.notify(false, Some(frame.get_ident()));
  120. }
  121. protocol::spirc::MessageType::kMessageTypeLoad => {
  122. if !self.is_active {
  123. self.is_active = true;
  124. self.became_active_at = util::now_ms();
  125. }
  126. self.index = frame.get_state().get_playing_track_index();
  127. self.tracks = frame.get_state()
  128. .get_track()
  129. .iter()
  130. .filter(|track| track.get_gid().len()==16)
  131. .map(|track| SpotifyId::from_raw(track.get_gid()))
  132. .collect();
  133. let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
  134. let track = self.tracks[self.index as usize];
  135. let position = frame.get_state().get_position_ms();
  136. self.delegate.load(track, play, position);
  137. }
  138. protocol::spirc::MessageType::kMessageTypePlay => {
  139. self.delegate.play();
  140. }
  141. protocol::spirc::MessageType::kMessageTypePause => {
  142. self.delegate.pause();
  143. }
  144. protocol::spirc::MessageType::kMessageTypeNext => {
  145. self.index = (self.index + 1) % self.tracks.len() as u32;
  146. let track = self.tracks[self.index as usize];
  147. self.delegate.load(track, true, 0);
  148. }
  149. protocol::spirc::MessageType::kMessageTypePrev => {
  150. self.index = (self.index - 1) % self.tracks.len() as u32;
  151. let track = self.tracks[self.index as usize];
  152. self.delegate.load(track, true, 0);
  153. }
  154. protocol::spirc::MessageType::kMessageTypeSeek => {
  155. self.delegate.seek(frame.get_position());
  156. }
  157. protocol::spirc::MessageType::kMessageTypeNotify => {
  158. if self.is_active && frame.get_device_state().get_is_active() {
  159. self.is_active = false;
  160. self.delegate.stop();
  161. }
  162. }
  163. _ => (),
  164. }
  165. }
  166. fn notify(&mut self, hello: bool, recipient: Option<&str>) {
  167. let mut pkt = protobuf_init!(protocol::spirc::Frame::new(), {
  168. version: 1,
  169. ident: self.ident.clone(),
  170. protocol_version: "2.0.0".to_owned(),
  171. seq_nr: { self.seq_nr += 1; self.seq_nr },
  172. typ: if hello {
  173. protocol::spirc::MessageType::kMessageTypeHello
  174. } else {
  175. protocol::spirc::MessageType::kMessageTypeNotify
  176. },
  177. device_state: self.device_state(),
  178. recipient: protobuf::RepeatedField::from_vec(
  179. recipient.map(|r| vec![r.to_owned()] ).unwrap_or(vec![])
  180. ),
  181. state_update_id: self.state_update_id as i64
  182. });
  183. if self.is_active {
  184. pkt.set_state(self.spirc_state());
  185. }
  186. self.session
  187. .mercury(MercuryRequest {
  188. method: MercuryMethod::SEND,
  189. uri: format!("hm://remote/user/{}",
  190. self.session.0.data.read().unwrap().canonical_username.clone()),
  191. content_type: None,
  192. payload: vec![pkt.write_to_bytes().unwrap()],
  193. })
  194. .await()
  195. .unwrap();
  196. }
  197. fn spirc_state(&self) -> protocol::spirc::State {
  198. let state = self.delegate.state();
  199. let (position_ms, position_measured_at) = state.position();
  200. protobuf_init!(protocol::spirc::State::new(), {
  201. status: state.status(),
  202. position_ms: position_ms,
  203. position_measured_at: position_measured_at as u64,
  204. playing_track_index: self.index,
  205. track: self.tracks.iter().map(|track| {
  206. protobuf_init!(protocol::spirc::TrackRef::new(), {
  207. gid: track.to_raw().to_vec()
  208. })
  209. }).collect(),
  210. shuffle: self.shuffle,
  211. repeat: self.repeat,
  212. playing_from_fallback: true,
  213. last_command_ident: self.last_command_ident.clone(),
  214. last_command_msgid: self.last_command_msgid
  215. })
  216. }
  217. fn device_state(&self) -> protocol::spirc::DeviceState {
  218. protobuf_init!(protocol::spirc::DeviceState::new(), {
  219. sw_version: version_string(),
  220. is_active: self.is_active,
  221. can_play: self.can_play,
  222. volume: self.volume as u32,
  223. name: self.name.clone(),
  224. error_code: 0,
  225. became_active_at: if self.is_active { self.became_active_at as i64 } else { 0 },
  226. capabilities => [
  227. @{
  228. typ: protocol::spirc::CapabilityType::kCanBePlayer,
  229. intValue => [0]
  230. },
  231. @{
  232. typ: protocol::spirc::CapabilityType::kDeviceType,
  233. intValue => [ self.device_type as i64 ]
  234. },
  235. @{
  236. typ: protocol::spirc::CapabilityType::kGaiaEqConnectId,
  237. intValue => [1]
  238. },
  239. @{
  240. typ: protocol::spirc::CapabilityType::kSupportsLogout,
  241. intValue => [0]
  242. },
  243. @{
  244. typ: protocol::spirc::CapabilityType::kIsObservable,
  245. intValue => [1]
  246. },
  247. @{
  248. typ: protocol::spirc::CapabilityType::kVolumeSteps,
  249. intValue => [10]
  250. },
  251. @{
  252. typ: protocol::spirc::CapabilityType::kSupportedContexts,
  253. stringValue => [
  254. "album".to_owned(),
  255. "playlist".to_owned(),
  256. "search".to_owned(),
  257. "inbox".to_owned(),
  258. "toplist".to_owned(),
  259. "starred".to_owned(),
  260. "publishedstarred".to_owned(),
  261. "track".to_owned(),
  262. ]
  263. },
  264. @{
  265. typ: protocol::spirc::CapabilityType::kSupportedTypes,
  266. stringValue => [
  267. "audio/local".to_owned(),
  268. "audio/track".to_owned(),
  269. "local".to_owned(),
  270. "track".to_owned(),
  271. ]
  272. }
  273. ],
  274. })
  275. }
  276. }