spirc.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. use eventual::Async;
  2. use protobuf::{self, Message};
  3. use std::sync::{mpsc, MutexGuard};
  4. use util;
  5. use session::Session;
  6. use util::SpotifyId;
  7. use util::version::version_string;
  8. use mercury::{MercuryRequest, MercuryMethod};
  9. use librespot_protocol as protocol;
  10. pub use librespot_protocol::spirc::PlayStatus;
  11. pub struct SpircManager<D: SpircDelegate> {
  12. delegate: D,
  13. session: Session,
  14. state_update_id: i64,
  15. seq_nr: u32,
  16. name: String,
  17. ident: String,
  18. device_type: u8,
  19. can_play: bool,
  20. repeat: bool,
  21. shuffle: bool,
  22. is_active: bool,
  23. became_active_at: i64,
  24. last_command_ident: String,
  25. last_command_msgid: u32,
  26. tracks: Vec<SpotifyId>,
  27. index: u32,
  28. }
  29. pub trait SpircDelegate {
  30. type State : SpircState;
  31. fn load(&self, track: SpotifyId, start_playing: bool, position_ms: u32);
  32. fn play(&self);
  33. fn pause(&self);
  34. fn seek(&self, position_ms: u32);
  35. fn volume(&self, vol:i32);
  36. fn stop(&self);
  37. fn state(&self) -> MutexGuard<Self::State>;
  38. fn updates(&self) -> mpsc::Receiver<i64>;
  39. }
  40. pub trait SpircState {
  41. fn status(&self) -> PlayStatus;
  42. fn position(&self) -> (u32, i64);
  43. fn update_time(&self) -> i64;
  44. fn end_of_track(&self) -> bool;
  45. fn volume(&self) -> u32;
  46. }
  47. impl<D: SpircDelegate> SpircManager<D> {
  48. pub fn new(session: Session, delegate: D) -> SpircManager<D> {
  49. let ident = session.0.data.read().unwrap().device_id.clone();
  50. let name = session.0.config.device_name.clone();
  51. SpircManager {
  52. delegate: delegate,
  53. session: session,
  54. state_update_id: 0,
  55. seq_nr: 0,
  56. name: name,
  57. ident: ident,
  58. device_type: 5,
  59. can_play: true,
  60. repeat: false,
  61. shuffle: false,
  62. is_active: false,
  63. became_active_at: 0,
  64. last_command_ident: String::new(),
  65. last_command_msgid: 0,
  66. tracks: Vec::new(),
  67. index: 0,
  68. }
  69. }
  70. pub fn run(&mut self) {
  71. let rx = self.session.mercury_sub(format!("hm://remote/user/{}/",
  72. self.session
  73. .0
  74. .data
  75. .read()
  76. .unwrap()
  77. .canonical_username
  78. .clone()));
  79. let updates = self.delegate.updates();
  80. self.notify(true, None);
  81. loop {
  82. select! {
  83. pkt = rx.recv() => {
  84. let frame = protobuf::parse_from_bytes::<protocol::spirc::Frame>(
  85. pkt.unwrap().payload.first().unwrap()).unwrap();
  86. println!("{:?} {} {} {} {}",
  87. frame.get_typ(),
  88. frame.get_device_state().get_name(),
  89. frame.get_ident(),
  90. frame.get_seq_nr(),
  91. frame.get_state_update_id());
  92. if frame.get_ident() != self.ident &&
  93. (frame.get_recipient().len() == 0 ||
  94. frame.get_recipient().contains(&self.ident)) {
  95. self.handle(frame);
  96. }
  97. },
  98. update_time = updates.recv() => {
  99. let end_of_track = self.delegate.state().end_of_track();
  100. if end_of_track {
  101. self.index = (self.index + 1) % self.tracks.len() as u32;
  102. let track = self.tracks[self.index as usize];
  103. self.delegate.load(track, true, 0);
  104. } else {
  105. self.state_update_id = update_time.unwrap();
  106. self.notify(false, None);
  107. }
  108. }
  109. }
  110. }
  111. }
  112. fn handle(&mut self, frame: protocol::spirc::Frame) {
  113. if frame.get_recipient().len() > 0 {
  114. self.last_command_ident = frame.get_ident().to_owned();
  115. self.last_command_msgid = frame.get_seq_nr();
  116. }
  117. match frame.get_typ() {
  118. protocol::spirc::MessageType::kMessageTypeHello => {
  119. self.notify(false, Some(frame.get_ident()));
  120. }
  121. protocol::spirc::MessageType::kMessageTypeLoad => {
  122. if !self.is_active {
  123. self.is_active = true;
  124. self.became_active_at = util::now_ms();
  125. }
  126. self.index = frame.get_state().get_playing_track_index();
  127. self.tracks = frame.get_state()
  128. .get_track()
  129. .iter()
  130. .filter(|track| track.has_gid())
  131. .map(|track| SpotifyId::from_raw(track.get_gid()))
  132. .collect();
  133. let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
  134. let track = self.tracks[self.index as usize];
  135. let position = frame.get_state().get_position_ms();
  136. self.delegate.load(track, play, position);
  137. }
  138. protocol::spirc::MessageType::kMessageTypePlay => {
  139. self.delegate.play();
  140. }
  141. protocol::spirc::MessageType::kMessageTypePause => {
  142. self.delegate.pause();
  143. }
  144. protocol::spirc::MessageType::kMessageTypeNext => {
  145. self.index = (self.index + 1) % self.tracks.len() as u32;
  146. let track = self.tracks[self.index as usize];
  147. self.delegate.load(track, true, 0);
  148. }
  149. protocol::spirc::MessageType::kMessageTypePrev => {
  150. self.index = (self.index - 1) % self.tracks.len() as u32;
  151. let track = self.tracks[self.index as usize];
  152. self.delegate.load(track, true, 0);
  153. }
  154. protocol::spirc::MessageType::kMessageTypeSeek => {
  155. self.delegate.seek(frame.get_position());
  156. }
  157. protocol::spirc::MessageType::kMessageTypeNotify => {
  158. if self.is_active && frame.get_device_state().get_is_active() {
  159. self.is_active = false;
  160. self.delegate.stop();
  161. }
  162. }
  163. protocol::spirc::MessageType::kMessageTypeVolume =>{
  164. self.delegate.volume(frame.get_volume() as i32);
  165. }
  166. _ => (),
  167. }
  168. }
  169. fn notify(&mut self, hello: bool, recipient: Option<&str>) {
  170. let mut pkt = protobuf_init!(protocol::spirc::Frame::new(), {
  171. version: 1,
  172. ident: self.ident.clone(),
  173. protocol_version: "2.0.0".to_owned(),
  174. seq_nr: { self.seq_nr += 1; self.seq_nr },
  175. typ: if hello {
  176. protocol::spirc::MessageType::kMessageTypeHello
  177. } else {
  178. protocol::spirc::MessageType::kMessageTypeNotify
  179. },
  180. device_state: self.device_state(),
  181. recipient: protobuf::RepeatedField::from_vec(
  182. recipient.map(|r| vec![r.to_owned()] ).unwrap_or(vec![])
  183. ),
  184. state_update_id: self.state_update_id as i64
  185. });
  186. if self.is_active {
  187. pkt.set_state(self.spirc_state());
  188. }
  189. self.session
  190. .mercury(MercuryRequest {
  191. method: MercuryMethod::SEND,
  192. uri: format!("hm://remote/user/{}",
  193. self.session.0.data.read().unwrap().canonical_username.clone()),
  194. content_type: None,
  195. payload: vec![pkt.write_to_bytes().unwrap()],
  196. })
  197. .await()
  198. .unwrap();
  199. }
  200. fn spirc_state(&self) -> protocol::spirc::State {
  201. let state = self.delegate.state();
  202. let (position_ms, position_measured_at) = state.position();
  203. protobuf_init!(protocol::spirc::State::new(), {
  204. status: state.status(),
  205. position_ms: position_ms,
  206. position_measured_at: position_measured_at as u64,
  207. playing_track_index: self.index,
  208. track: self.tracks.iter().map(|track| {
  209. protobuf_init!(protocol::spirc::TrackRef::new(), {
  210. gid: track.to_raw().to_vec()
  211. })
  212. }).collect(),
  213. shuffle: self.shuffle,
  214. repeat: self.repeat,
  215. playing_from_fallback: true,
  216. last_command_ident: self.last_command_ident.clone(),
  217. last_command_msgid: self.last_command_msgid
  218. })
  219. }
  220. fn device_state(&self) -> protocol::spirc::DeviceState {
  221. protobuf_init!(protocol::spirc::DeviceState::new(), {
  222. sw_version: version_string(),
  223. is_active: self.is_active,
  224. can_play: self.can_play,
  225. volume: self.delegate.state().volume(),
  226. name: self.name.clone(),
  227. error_code: 0,
  228. became_active_at: if self.is_active { self.became_active_at as i64 } else { 0 },
  229. capabilities => [
  230. @{
  231. typ: protocol::spirc::CapabilityType::kCanBePlayer,
  232. intValue => [0]
  233. },
  234. @{
  235. typ: protocol::spirc::CapabilityType::kDeviceType,
  236. intValue => [ self.device_type as i64 ]
  237. },
  238. @{
  239. typ: protocol::spirc::CapabilityType::kGaiaEqConnectId,
  240. intValue => [1]
  241. },
  242. @{
  243. typ: protocol::spirc::CapabilityType::kSupportsLogout,
  244. intValue => [0]
  245. },
  246. @{
  247. typ: protocol::spirc::CapabilityType::kIsObservable,
  248. intValue => [1]
  249. },
  250. @{
  251. typ: protocol::spirc::CapabilityType::kVolumeSteps,
  252. intValue => [10]
  253. },
  254. @{
  255. typ: protocol::spirc::CapabilityType::kSupportedContexts,
  256. stringValue => [
  257. "album".to_owned(),
  258. "playlist".to_owned(),
  259. "search".to_owned(),
  260. "inbox".to_owned(),
  261. "toplist".to_owned(),
  262. "starred".to_owned(),
  263. "publishedstarred".to_owned(),
  264. "track".to_owned(),
  265. ]
  266. },
  267. @{
  268. typ: protocol::spirc::CapabilityType::kSupportedTypes,
  269. stringValue => [
  270. "audio/local".to_owned(),
  271. "audio/track".to_owned(),
  272. "local".to_owned(),
  273. "track".to_owned(),
  274. ]
  275. }
  276. ],
  277. })
  278. }
  279. }