mod.rs 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. use crate::protocol;
  2. use byteorder::{BigEndian, ByteOrder};
  3. use bytes::Bytes;
  4. use futures::sync::{mpsc, oneshot};
  5. use futures::{Async, Future, Poll};
  6. use protobuf;
  7. use std::collections::HashMap;
  8. use std::mem;
  9. use crate::util::SeqGenerator;
  10. mod types;
  11. pub use self::types::*;
  12. mod sender;
  13. pub use self::sender::MercurySender;
  14. component! {
  15. MercuryManager : MercuryManagerInner {
  16. sequence: SeqGenerator<u64> = SeqGenerator::new(0),
  17. pending: HashMap<Vec<u8>, MercuryPending> = HashMap::new(),
  18. subscriptions: Vec<(String, mpsc::UnboundedSender<MercuryResponse>)> = Vec::new(),
  19. }
  20. }
  21. pub struct MercuryPending {
  22. parts: Vec<Vec<u8>>,
  23. partial: Option<Vec<u8>>,
  24. callback: Option<oneshot::Sender<Result<MercuryResponse, MercuryError>>>,
  25. }
  26. pub struct MercuryFuture<T>(oneshot::Receiver<Result<T, MercuryError>>);
  27. impl<T> Future for MercuryFuture<T> {
  28. type Item = T;
  29. type Error = MercuryError;
  30. fn poll(&mut self) -> Poll<T, MercuryError> {
  31. match self.0.poll() {
  32. Ok(Async::Ready(Ok(value))) => Ok(Async::Ready(value)),
  33. Ok(Async::Ready(Err(err))) => Err(err),
  34. Ok(Async::NotReady) => Ok(Async::NotReady),
  35. Err(oneshot::Canceled) => Err(MercuryError),
  36. }
  37. }
  38. }
  39. impl MercuryManager {
  40. fn next_seq(&self) -> Vec<u8> {
  41. let mut seq = vec![0u8; 8];
  42. BigEndian::write_u64(&mut seq, self.lock(|inner| inner.sequence.get()));
  43. seq
  44. }
  45. fn request(&self, req: MercuryRequest) -> MercuryFuture<MercuryResponse> {
  46. let (tx, rx) = oneshot::channel();
  47. let pending = MercuryPending {
  48. parts: Vec::new(),
  49. partial: None,
  50. callback: Some(tx),
  51. };
  52. let seq = self.next_seq();
  53. self.lock(|inner| inner.pending.insert(seq.clone(), pending));
  54. let cmd = req.method.command();
  55. let data = req.encode(&seq);
  56. self.session().send_packet(cmd, data);
  57. MercuryFuture(rx)
  58. }
  59. pub fn get<T: Into<String>>(&self, uri: T) -> MercuryFuture<MercuryResponse> {
  60. self.request(MercuryRequest {
  61. method: MercuryMethod::GET,
  62. uri: uri.into(),
  63. content_type: None,
  64. payload: Vec::new(),
  65. })
  66. }
  67. pub fn send<T: Into<String>>(&self, uri: T, data: Vec<u8>) -> MercuryFuture<MercuryResponse> {
  68. self.request(MercuryRequest {
  69. method: MercuryMethod::SEND,
  70. uri: uri.into(),
  71. content_type: None,
  72. payload: vec![data],
  73. })
  74. }
  75. pub fn sender<T: Into<String>>(&self, uri: T) -> MercurySender {
  76. MercurySender::new(self.clone(), uri.into())
  77. }
  78. pub fn subscribe<T: Into<String>>(
  79. &self,
  80. uri: T,
  81. ) -> Box<dyn Future<Item = mpsc::UnboundedReceiver<MercuryResponse>, Error = MercuryError>>
  82. {
  83. let uri = uri.into();
  84. let request = self.request(MercuryRequest {
  85. method: MercuryMethod::SUB,
  86. uri: uri.clone(),
  87. content_type: None,
  88. payload: Vec::new(),
  89. });
  90. let manager = self.clone();
  91. Box::new(request.map(move |response| {
  92. let (tx, rx) = mpsc::unbounded();
  93. manager.lock(move |inner| {
  94. debug!("subscribed uri={} count={}", uri, response.payload.len());
  95. if response.payload.len() > 0 {
  96. // Old subscription protocol, watch the provided list of URIs
  97. for sub in response.payload {
  98. let mut sub: protocol::pubsub::Subscription =
  99. protobuf::parse_from_bytes(&sub).unwrap();
  100. let sub_uri = sub.take_uri();
  101. debug!("subscribed sub_uri={}", sub_uri);
  102. inner.subscriptions.push((sub_uri, tx.clone()));
  103. }
  104. } else {
  105. // New subscription protocol, watch the requested URI
  106. inner.subscriptions.push((uri, tx));
  107. }
  108. });
  109. rx
  110. }))
  111. }
  112. pub(crate) fn dispatch(&self, cmd: u8, mut data: Bytes) {
  113. let seq_len = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
  114. let seq = data.split_to(seq_len).as_ref().to_owned();
  115. let flags = data.split_to(1).as_ref()[0];
  116. let count = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
  117. let pending = self.lock(|inner| inner.pending.remove(&seq));
  118. let mut pending = match pending {
  119. Some(pending) => pending,
  120. None if cmd == 0xb5 => MercuryPending {
  121. parts: Vec::new(),
  122. partial: None,
  123. callback: None,
  124. },
  125. None => {
  126. warn!("Ignore seq {:?} cmd {:x}", seq, cmd);
  127. return;
  128. }
  129. };
  130. for i in 0..count {
  131. let mut part = Self::parse_part(&mut data);
  132. if let Some(mut partial) = mem::replace(&mut pending.partial, None) {
  133. partial.extend_from_slice(&part);
  134. part = partial;
  135. }
  136. if i == count - 1 && (flags == 2) {
  137. pending.partial = Some(part)
  138. } else {
  139. pending.parts.push(part);
  140. }
  141. }
  142. if flags == 0x1 {
  143. self.complete_request(cmd, pending);
  144. } else {
  145. self.lock(move |inner| inner.pending.insert(seq, pending));
  146. }
  147. }
  148. fn parse_part(data: &mut Bytes) -> Vec<u8> {
  149. let size = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
  150. data.split_to(size).as_ref().to_owned()
  151. }
  152. fn complete_request(&self, cmd: u8, mut pending: MercuryPending) {
  153. let header_data = pending.parts.remove(0);
  154. let header: protocol::mercury::Header = protobuf::parse_from_bytes(&header_data).unwrap();
  155. let response = MercuryResponse {
  156. uri: header.get_uri().to_owned(),
  157. status_code: header.get_status_code(),
  158. payload: pending.parts,
  159. };
  160. if response.status_code >= 500 {
  161. panic!("Spotify servers returned an error. Restart librespot.");
  162. } else if response.status_code >= 400 {
  163. warn!("error {} for uri {}", response.status_code, &response.uri);
  164. if let Some(cb) = pending.callback {
  165. let _ = cb.send(Err(MercuryError));
  166. }
  167. } else {
  168. if cmd == 0xb5 {
  169. self.lock(|inner| {
  170. let mut found = false;
  171. inner.subscriptions.retain(|&(ref prefix, ref sub)| {
  172. if response.uri.starts_with(prefix) {
  173. found = true;
  174. // if send fails, remove from list of subs
  175. // TODO: send unsub message
  176. sub.unbounded_send(response.clone()).is_ok()
  177. } else {
  178. // URI doesn't match
  179. true
  180. }
  181. });
  182. if !found {
  183. debug!("unknown subscription uri={}", response.uri);
  184. }
  185. })
  186. } else if let Some(cb) = pending.callback {
  187. let _ = cb.send(Ok(response));
  188. }
  189. }
  190. }
  191. }