mercury.rs 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. use byteorder::{BigEndian, ByteOrder, ReadBytesExt, WriteBytesExt};
  2. use protobuf::{self, Message};
  3. use readall::ReadAllExt;
  4. use std::collections::{HashMap, LinkedList};
  5. use std::io::{Cursor, Read, Write};
  6. use std::fmt;
  7. use std::mem::replace;
  8. use std::sync::mpsc;
  9. use connection::Packet;
  10. use librespot_protocol as protocol;
  11. use subsystem::Subsystem;
  12. use util::Either::{Left, Right};
  13. #[derive(Debug, PartialEq, Eq)]
  14. pub enum MercuryMethod {
  15. GET,
  16. SUB,
  17. UNSUB,
  18. SEND,
  19. }
  20. pub struct MercuryRequest {
  21. pub method: MercuryMethod,
  22. pub uri: String,
  23. pub content_type: Option<String>,
  24. pub callback: Option<MercuryCallback>,
  25. pub payload: Vec<Vec<u8>>
  26. }
  27. #[derive(Debug)]
  28. pub struct MercuryResponse {
  29. pub uri: String,
  30. pub payload: LinkedList<Vec<u8>>
  31. }
  32. pub type MercuryCallback = mpsc::Sender<MercuryResponse>;
  33. pub struct MercuryPending {
  34. parts: LinkedList<Vec<u8>>,
  35. partial: Option<Vec<u8>>,
  36. callback: Option<MercuryCallback>
  37. }
  38. pub struct MercuryManager {
  39. next_seq: u32,
  40. pending: HashMap<Vec<u8>, MercuryPending>,
  41. subscriptions: HashMap<String, MercuryCallback>,
  42. requests: mpsc::Receiver<MercuryRequest>,
  43. packet_tx: mpsc::Sender<Packet>,
  44. packet_rx: mpsc::Receiver<Packet>,
  45. }
  46. impl fmt::Display for MercuryMethod {
  47. fn fmt(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> {
  48. formatter.write_str(match *self {
  49. MercuryMethod::GET => "GET",
  50. MercuryMethod::SUB => "SUB",
  51. MercuryMethod::UNSUB => "UNSUB",
  52. MercuryMethod::SEND => "SEND"
  53. })
  54. }
  55. }
  56. impl MercuryManager {
  57. pub fn new(tx: mpsc::Sender<Packet>) -> (MercuryManager,
  58. mpsc::Sender<MercuryRequest>,
  59. mpsc::Sender<Packet>) {
  60. let (req_tx, req_rx) = mpsc::channel();
  61. let (pkt_tx, pkt_rx) = mpsc::channel();
  62. (MercuryManager {
  63. next_seq: 0,
  64. pending: HashMap::new(),
  65. subscriptions: HashMap::new(),
  66. requests: req_rx,
  67. packet_rx: pkt_rx,
  68. packet_tx: tx,
  69. }, req_tx, pkt_tx)
  70. }
  71. fn request(&mut self, req: MercuryRequest) {
  72. let mut seq = [0u8; 4];
  73. BigEndian::write_u32(&mut seq, self.next_seq);
  74. self.next_seq += 1;
  75. let data = self.encode_request(&seq, &req);
  76. let cmd = match req.method {
  77. MercuryMethod::SUB => 0xb3,
  78. MercuryMethod::UNSUB => 0xb4,
  79. _ => 0xb2,
  80. };
  81. self.packet_tx.send(Packet {
  82. cmd: cmd,
  83. data: data
  84. }).unwrap();
  85. if req.method != MercuryMethod::SUB {
  86. self.pending.insert(seq.to_vec(), MercuryPending{
  87. parts: LinkedList::new(),
  88. partial: None,
  89. callback: req.callback,
  90. });
  91. } else if let Some(cb) = req.callback {
  92. self.subscriptions.insert(req.uri, cb);
  93. }
  94. }
  95. fn parse_part(mut s: &mut Read) -> Vec<u8> {
  96. let size = s.read_u16::<BigEndian>().unwrap() as usize;
  97. let mut buffer = vec![0; size];
  98. s.read_all(&mut buffer).unwrap();
  99. buffer
  100. }
  101. fn complete_request(&mut self, cmd: u8, mut pending: MercuryPending) {
  102. let header_data = match pending.parts.pop_front() {
  103. Some(data) => data,
  104. None => panic!("No header part !")
  105. };
  106. let header : protocol::mercury::Header =
  107. protobuf::parse_from_bytes(&header_data).unwrap();
  108. let callback = if cmd == 0xb5 {
  109. self.subscriptions.get(header.get_uri())
  110. } else {
  111. pending.callback.as_ref()
  112. };
  113. if let Some(ref ch) = callback {
  114. ch.send(MercuryResponse{
  115. uri: header.get_uri().to_string(),
  116. payload: pending.parts
  117. }).unwrap();
  118. }
  119. }
  120. fn handle_packet(&mut self, cmd: u8, data: Vec<u8>) {
  121. let mut packet = Cursor::new(data);
  122. let seq = {
  123. let seq_length = packet.read_u16::<BigEndian>().unwrap() as usize;
  124. let mut seq = vec![0; seq_length];
  125. packet.read_all(&mut seq).unwrap();
  126. seq
  127. };
  128. let flags = packet.read_u8().unwrap();
  129. let count = packet.read_u16::<BigEndian>().unwrap() as usize;
  130. let mut pending = if let Some(pending) = self.pending.remove(&seq) {
  131. pending
  132. } else if cmd == 0xb5 {
  133. MercuryPending {
  134. parts: LinkedList::new(),
  135. partial: None,
  136. callback: None,
  137. }
  138. } else {
  139. println!("Ignore seq {:?} cmd {}", seq, cmd);
  140. return
  141. };
  142. for i in 0..count {
  143. let mut part = Self::parse_part(&mut packet);
  144. if let Some(mut data) = replace(&mut pending.partial, None) {
  145. data.append(&mut part);
  146. part = data;
  147. }
  148. if i == count - 1 && (flags == 2) {
  149. pending.partial = Some(part)
  150. } else {
  151. pending.parts.push_back(part);
  152. }
  153. }
  154. if flags == 0x1 {
  155. self.complete_request(cmd, pending);
  156. } else {
  157. self.pending.insert(seq, pending);
  158. }
  159. }
  160. fn encode_request(&self, seq: &[u8], req: &MercuryRequest) -> Vec<u8> {
  161. let mut packet = Vec::new();
  162. packet.write_u16::<BigEndian>(seq.len() as u16).unwrap();
  163. packet.write_all(seq).unwrap();
  164. packet.write_u8(1).unwrap(); // Flags: FINAL
  165. packet.write_u16::<BigEndian>(1 + req.payload.len() as u16).unwrap(); // Part count
  166. let mut header = protobuf_init!(protocol::mercury::Header::new(), {
  167. uri: req.uri.clone(),
  168. method: req.method.to_string(),
  169. });
  170. if let Some(ref content_type) = req.content_type {
  171. header.set_content_type(content_type.clone());
  172. }
  173. packet.write_u16::<BigEndian>(header.compute_size() as u16).unwrap();
  174. header.write_to_writer(&mut packet).unwrap();
  175. for p in &req.payload {
  176. packet.write_u16::<BigEndian>(p.len() as u16).unwrap();
  177. packet.write(&p).unwrap();
  178. }
  179. packet
  180. }
  181. }
  182. impl Subsystem for MercuryManager {
  183. fn run(mut self) {
  184. loop {
  185. match {
  186. let requests = &self.requests;
  187. let packets = &self.packet_rx;
  188. select!{
  189. r = requests.recv() => {
  190. Left(r.unwrap())
  191. },
  192. p = packets.recv() => {
  193. Right(p.unwrap())
  194. }
  195. }
  196. } {
  197. Left(req) => {
  198. self.request(req);
  199. }
  200. Right(pkt) => {
  201. self.handle_packet(pkt.cmd, pkt.data);
  202. }
  203. }
  204. }
  205. }
  206. }