mod.rs 7.8 KB


  1. use crate::protocol;
  2. use crate::util::url_encode;
  3. use byteorder::{BigEndian, ByteOrder};
  4. use bytes::Bytes;
  5. use futures::sync::{mpsc, oneshot};
  6. use futures::{Async, Future, Poll};
  7. use protobuf;
  8. use std::collections::HashMap;
  9. use std::mem;
  10. use crate::util::SeqGenerator;
  11. mod types;
  12. pub use self::types::*;
  13. mod sender;
  14. pub use self::sender::MercurySender;
  15. component! {
  16. MercuryManager : MercuryManagerInner {
  17. sequence: SeqGenerator<u64> = SeqGenerator::new(0),
  18. pending: HashMap<Vec<u8>, MercuryPending> = HashMap::new(),
  19. subscriptions: Vec<(String, mpsc::UnboundedSender<MercuryResponse>)> = Vec::new(),
  20. invalid: bool = false,
  21. }
  22. }
  23. pub struct MercuryPending {
  24. parts: Vec<Vec<u8>>,
  25. partial: Option<Vec<u8>>,
  26. callback: Option<oneshot::Sender<Result<MercuryResponse, MercuryError>>>,
  27. }
  28. pub struct MercuryFuture<T>(oneshot::Receiver<Result<T, MercuryError>>);
  29. impl<T> Future for MercuryFuture<T> {
  30. type Item = T;
  31. type Error = MercuryError;
  32. fn poll(&mut self) -> Poll<T, MercuryError> {
  33. match self.0.poll() {
  34. Ok(Async::Ready(Ok(value))) => Ok(Async::Ready(value)),
  35. Ok(Async::Ready(Err(err))) => Err(err),
  36. Ok(Async::NotReady) => Ok(Async::NotReady),
  37. Err(oneshot::Canceled) => Err(MercuryError),
  38. }
  39. }
  40. }
  41. impl MercuryManager {
  42. fn next_seq(&self) -> Vec<u8> {
  43. let mut seq = vec![0u8; 8];
  44. BigEndian::write_u64(&mut seq, self.lock(|inner| inner.sequence.get()));
  45. seq
  46. }
  47. fn request(&self, req: MercuryRequest) -> MercuryFuture<MercuryResponse> {
  48. let (tx, rx) = oneshot::channel();
  49. let pending = MercuryPending {
  50. parts: Vec::new(),
  51. partial: None,
  52. callback: Some(tx),
  53. };
  54. let seq = self.next_seq();
  55. self.lock(|inner| {
  56. if !inner.invalid {
  57. inner.pending.insert(seq.clone(), pending);
  58. }
  59. });
  60. let cmd = req.method.command();
  61. let data = req.encode(&seq);
  62. self.session().send_packet(cmd, data);
  63. MercuryFuture(rx)
  64. }
  65. pub fn get<T: Into<String>>(&self, uri: T) -> MercuryFuture<MercuryResponse> {
  66. self.request(MercuryRequest {
  67. method: MercuryMethod::GET,
  68. uri: uri.into(),
  69. content_type: None,
  70. payload: Vec::new(),
  71. })
  72. }
  73. pub fn send<T: Into<String>>(&self, uri: T, data: Vec<u8>) -> MercuryFuture<MercuryResponse> {
  74. self.request(MercuryRequest {
  75. method: MercuryMethod::SEND,
  76. uri: uri.into(),
  77. content_type: None,
  78. payload: vec![data],
  79. })
  80. }
  81. pub fn sender<T: Into<String>>(&self, uri: T) -> MercurySender {
  82. MercurySender::new(self.clone(), uri.into())
  83. }
  84. pub fn subscribe<T: Into<String>>(
  85. &self,
  86. uri: T,
  87. ) -> Box<dyn Future<Item = mpsc::UnboundedReceiver<MercuryResponse>, Error = MercuryError>>
  88. {
  89. let uri = uri.into();
  90. let request = self.request(MercuryRequest {
  91. method: MercuryMethod::SUB,
  92. uri: uri.clone(),
  93. content_type: None,
  94. payload: Vec::new(),
  95. });
  96. let manager = self.clone();
  97. Box::new(request.map(move |response| {
  98. let (tx, rx) = mpsc::unbounded();
  99. manager.lock(move |inner| {
  100. if !inner.invalid {
  101. debug!("subscribed uri={} count={}", uri, response.payload.len());
  102. if response.payload.len() > 0 {
  103. // Old subscription protocol, watch the provided list of URIs
  104. for sub in response.payload {
  105. let mut sub: protocol::pubsub::Subscription =
  106. protobuf::parse_from_bytes(&sub).unwrap();
  107. let sub_uri = sub.take_uri();
  108. debug!("subscribed sub_uri={}", sub_uri);
  109. inner.subscriptions.push((sub_uri, tx.clone()));
  110. }
  111. } else {
  112. // New subscription protocol, watch the requested URI
  113. inner.subscriptions.push((uri, tx));
  114. }
  115. }
  116. });
  117. rx
  118. }))
  119. }
  120. pub(crate) fn dispatch(&self, cmd: u8, mut data: Bytes) {
  121. let seq_len = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
  122. let seq = data.split_to(seq_len).as_ref().to_owned();
  123. let flags = data.split_to(1).as_ref()[0];
  124. let count = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
  125. let pending = self.lock(|inner| inner.pending.remove(&seq));
  126. let mut pending = match pending {
  127. Some(pending) => pending,
  128. None if cmd == 0xb5 => MercuryPending {
  129. parts: Vec::new(),
  130. partial: None,
  131. callback: None,
  132. },
  133. None => {
  134. warn!("Ignore seq {:?} cmd {:x}", seq, cmd);
  135. return;
  136. }
  137. };
  138. for i in 0..count {
  139. let mut part = Self::parse_part(&mut data);
  140. if let Some(mut partial) = mem::replace(&mut pending.partial, None) {
  141. partial.extend_from_slice(&part);
  142. part = partial;
  143. }
  144. if i == count - 1 && (flags == 2) {
  145. pending.partial = Some(part)
  146. } else {
  147. pending.parts.push(part);
  148. }
  149. }
  150. if flags == 0x1 {
  151. self.complete_request(cmd, pending);
  152. } else {
  153. self.lock(move |inner| inner.pending.insert(seq, pending));
  154. }
  155. }
  156. fn parse_part(data: &mut Bytes) -> Vec<u8> {
  157. let size = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
  158. data.split_to(size).as_ref().to_owned()
  159. }
  160. fn complete_request(&self, cmd: u8, mut pending: MercuryPending) {
  161. let header_data = pending.parts.remove(0);
  162. let header: protocol::mercury::Header = protobuf::parse_from_bytes(&header_data).unwrap();
  163. let response = MercuryResponse {
  164. uri: url_encode(header.get_uri()).to_owned(),
  165. status_code: header.get_status_code(),
  166. payload: pending.parts,
  167. };
  168. if response.status_code >= 500 {
  169. panic!("Spotify servers returned an error. Restart librespot.");
  170. } else if response.status_code >= 400 {
  171. warn!("error {} for uri {}", response.status_code, &response.uri);
  172. if let Some(cb) = pending.callback {
  173. let _ = cb.send(Err(MercuryError));
  174. }
  175. } else {
  176. if cmd == 0xb5 {
  177. self.lock(|inner| {
  178. let mut found = false;
  179. inner.subscriptions.retain(|&(ref prefix, ref sub)| {
  180. if response.uri.starts_with(prefix) {
  181. found = true;
  182. // if send fails, remove from list of subs
  183. // TODO: send unsub message
  184. sub.unbounded_send(response.clone()).is_ok()
  185. } else {
  186. // URI doesn't match
  187. true
  188. }
  189. });
  190. if !found {
  191. debug!("unknown subscription uri={}", response.uri);
  192. }
  193. })
  194. } else if let Some(cb) = pending.callback {
  195. let _ = cb.send(Ok(response));
  196. }
  197. }
  198. }
  199. pub(crate) fn shutdown(&self) {
  200. self.lock(|inner| {
  201. inner.invalid = true;
  202. // destroy the sending halves of the channels to signal everyone who is waiting for something.
  203. inner.pending.clear();
  204. inner.subscriptions.clear();
  205. });
  206. }
  207. }