spirc.rs 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. use eventual::Async;
  2. use protobuf::{self, Message};
  3. use std::sync::{mpsc, MutexGuard};
  4. use util;
  5. use session::Session;
  6. use util::SpotifyId;
  7. use util::version::version_string;
  8. use mercury::{MercuryRequest, MercuryMethod};
  9. use librespot_protocol as protocol;
  10. pub use librespot_protocol::spirc::PlayStatus;
  11. pub struct SpircManager<'s, D: SpircDelegate> {
  12. delegate: D,
  13. session: &'s Session,
  14. username: String,
  15. state_update_id: i64,
  16. seq_nr: u32,
  17. name: String,
  18. ident: String,
  19. device_type: u8,
  20. can_play: bool,
  21. repeat: bool,
  22. shuffle: bool,
  23. volume: u16,
  24. is_active: bool,
  25. became_active_at: i64,
  26. last_command_ident: String,
  27. last_command_msgid: u32,
  28. tracks: Vec<SpotifyId>,
  29. index: u32
  30. }
  31. pub trait SpircDelegate {
  32. type State : SpircState;
  33. fn load(&self, track: SpotifyId,
  34. start_playing: bool, position_ms: u32);
  35. fn play(&self);
  36. fn pause(&self);
  37. fn seek(&self, position_ms: u32);
  38. fn stop(&self);
  39. fn state(&self) -> MutexGuard<Self::State>;
  40. fn updates(&self) -> mpsc::Receiver<i64>;
  41. }
  42. pub trait SpircState {
  43. fn status(&self) -> PlayStatus;
  44. fn position(&self) -> (u32, i64);
  45. fn update_time(&self) -> i64;
  46. fn end_of_track(&self) -> bool;
  47. }
  48. impl <'s, D: SpircDelegate> SpircManager<'s, D> {
  49. pub fn new(session: &'s Session, delegate: D,
  50. username: String, name: String) -> SpircManager<'s, D> {
  51. SpircManager {
  52. delegate: delegate,
  53. session: &session,
  54. username: username.clone(),
  55. state_update_id: 0,
  56. seq_nr: 0,
  57. name: name,
  58. ident: session.0.config.device_id.clone(),
  59. device_type: 5,
  60. can_play: true,
  61. repeat: false,
  62. shuffle: false,
  63. volume: 0x8000,
  64. is_active: false,
  65. became_active_at: 0,
  66. last_command_ident: String::new(),
  67. last_command_msgid: 0,
  68. tracks: Vec::new(),
  69. index: 0
  70. }
  71. }
  72. pub fn run(&mut self) {
  73. let rx = self.session.mercury_sub(format!("hm://remote/3/user/{}/", self.username));
  74. let updates = self.delegate.updates();
  75. loop {
  76. select! {
  77. pkt = rx.recv() => {
  78. let frame = protobuf::parse_from_bytes::<protocol::spirc::Frame>(
  79. pkt.unwrap().payload.first().unwrap()).unwrap();
  80. println!("{:?} {} {} {} {}",
  81. frame.get_typ(),
  82. frame.get_device_state().get_name(),
  83. frame.get_ident(),
  84. frame.get_seq_nr(),
  85. frame.get_state_update_id());
  86. if frame.get_ident() != self.ident &&
  87. (frame.get_recipient().len() == 0 ||
  88. frame.get_recipient().contains(&self.ident)) {
  89. self.handle(frame);
  90. }
  91. },
  92. update_time = updates.recv() => {
  93. let end_of_track = self.delegate.state().end_of_track();
  94. if end_of_track {
  95. self.index = (self.index + 1) % self.tracks.len() as u32;
  96. let track = self.tracks[self.index as usize];
  97. self.delegate.load(track, true, 0);
  98. } else {
  99. self.state_update_id = update_time.unwrap();
  100. self.notify(None);
  101. }
  102. }
  103. }
  104. }
  105. }
  106. fn handle(&mut self, frame: protocol::spirc::Frame) {
  107. if frame.get_recipient().len() > 0 {
  108. self.last_command_ident = frame.get_ident().to_owned();
  109. self.last_command_msgid = frame.get_seq_nr();
  110. }
  111. match frame.get_typ() {
  112. protocol::spirc::MessageType::kMessageTypeHello => {
  113. self.notify(Some(frame.get_ident()));
  114. }
  115. protocol::spirc::MessageType::kMessageTypeLoad => {
  116. if !self.is_active {
  117. self.is_active = true;
  118. self.became_active_at = util::now_ms();
  119. }
  120. self.index = frame.get_state().get_playing_track_index();
  121. self.tracks = frame.get_state().get_track().iter()
  122. .map(|track| SpotifyId::from_raw(track.get_gid()))
  123. .collect();
  124. let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
  125. let track = self.tracks[self.index as usize];
  126. let position = frame.get_state().get_position_ms();
  127. self.delegate.load(track, play, position);
  128. }
  129. protocol::spirc::MessageType::kMessageTypePlay => {
  130. self.delegate.play();
  131. }
  132. protocol::spirc::MessageType::kMessageTypePause => {
  133. self.delegate.pause();
  134. }
  135. protocol::spirc::MessageType::kMessageTypeSeek => {
  136. self.delegate.seek(frame.get_position());
  137. }
  138. protocol::spirc::MessageType::kMessageTypeNotify => {
  139. if self.is_active && frame.get_device_state().get_is_active() {
  140. self.is_active = false;
  141. self.delegate.stop();
  142. }
  143. }
  144. _ => ()
  145. }
  146. }
  147. fn notify(&mut self, recipient: Option<&str>) {
  148. let mut pkt = protobuf_init!(protocol::spirc::Frame::new(), {
  149. version: 1,
  150. ident: self.ident.clone(),
  151. protocol_version: "2.0.0".to_owned(),
  152. seq_nr: { self.seq_nr += 1; self.seq_nr },
  153. typ: protocol::spirc::MessageType::kMessageTypeNotify,
  154. device_state: self.device_state(),
  155. recipient: protobuf::RepeatedField::from_vec(
  156. recipient.map(|r| vec![r.to_owned()] ).unwrap_or(vec![])
  157. ),
  158. state_update_id: self.state_update_id as i64
  159. });
  160. if self.is_active {
  161. pkt.set_state(self.spirc_state());
  162. }
  163. self.session.mercury(MercuryRequest{
  164. method: MercuryMethod::SEND,
  165. uri: format!("hm://remote/user/{}", self.username),
  166. content_type: None,
  167. payload: vec![ pkt.write_to_bytes().unwrap() ]
  168. }).await().unwrap();
  169. }
  170. fn spirc_state(&self) -> protocol::spirc::State {
  171. let state = self.delegate.state();
  172. let (position_ms, position_measured_at) = state.position();
  173. protobuf_init!(protocol::spirc::State::new(), {
  174. status: state.status(),
  175. position_ms: position_ms,
  176. position_measured_at: position_measured_at as u64,
  177. playing_track_index: self.index,
  178. track: self.tracks.iter().map(|track| {
  179. protobuf_init!(protocol::spirc::TrackRef::new(), {
  180. gid: track.to_raw().to_vec()
  181. })
  182. }).collect(),
  183. shuffle: self.shuffle,
  184. repeat: self.repeat,
  185. playing_from_fallback: true,
  186. last_command_ident: self.last_command_ident.clone(),
  187. last_command_msgid: self.last_command_msgid
  188. })
  189. }
  190. fn device_state(&self) -> protocol::spirc::DeviceState {
  191. protobuf_init!(protocol::spirc::DeviceState::new(), {
  192. sw_version: version_string(),
  193. is_active: self.is_active,
  194. can_play: self.can_play,
  195. volume: self.volume as u32,
  196. name: self.name.clone(),
  197. error_code: 0,
  198. became_active_at: if self.is_active { self.became_active_at as i64 } else { 0 },
  199. capabilities => [
  200. @{
  201. typ: protocol::spirc::CapabilityType::kCanBePlayer,
  202. intValue => [0]
  203. },
  204. @{
  205. typ: protocol::spirc::CapabilityType::kDeviceType,
  206. intValue => [ self.device_type as i64 ]
  207. },
  208. @{
  209. typ: protocol::spirc::CapabilityType::kGaiaEqConnectId,
  210. intValue => [1]
  211. },
  212. @{
  213. typ: protocol::spirc::CapabilityType::kSupportsLogout,
  214. intValue => [0]
  215. },
  216. @{
  217. typ: protocol::spirc::CapabilityType::kIsObservable,
  218. intValue => [1]
  219. },
  220. @{
  221. typ: protocol::spirc::CapabilityType::kVolumeSteps,
  222. intValue => [10]
  223. },
  224. @{
  225. typ: protocol::spirc::CapabilityType::kSupportedContexts,
  226. stringValue => [
  227. "album".to_owned(),
  228. "playlist".to_owned(),
  229. "search".to_owned(),
  230. "inbox".to_owned(),
  231. "toplist".to_owned(),
  232. "starred".to_owned(),
  233. "publishedstarred".to_owned(),
  234. "track".to_owned(),
  235. ]
  236. },
  237. @{
  238. typ: protocol::spirc::CapabilityType::kSupportedTypes,
  239. stringValue => [
  240. "audio/local".to_owned(),
  241. "audio/track".to_owned(),
  242. "local".to_owned(),
  243. "track".to_owned(),
  244. ]
  245. }
  246. ],
  247. })
  248. }
  249. }