mod.rs 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. use byteorder::{BigEndian, ByteOrder};
  2. use futures::sync::{oneshot, mpsc};
  3. use futures::{Async, Poll, BoxFuture, Future};
  4. use protobuf;
  5. use protocol;
  6. use std::collections::HashMap;
  7. use std::mem;
  8. use tokio_core::io::EasyBuf;
  9. use util::SeqGenerator;
  10. mod types;
  11. pub use self::types::*;
  12. mod sender;
  13. pub use self::sender::MercurySender;
  14. component! {
  15. MercuryManager : MercuryManagerInner {
  16. sequence: SeqGenerator<u64> = SeqGenerator::new(0),
  17. pending: HashMap<Vec<u8>, MercuryPending> = HashMap::new(),
  18. subscriptions: Vec<(String, mpsc::UnboundedSender<MercuryResponse>)> = Vec::new(),
  19. }
  20. }
  21. pub struct MercuryPending {
  22. parts: Vec<Vec<u8>>,
  23. partial: Option<Vec<u8>>,
  24. callback: Option<oneshot::Sender<Result<MercuryResponse, MercuryError>>>,
  25. }
  26. pub struct MercuryFuture<T>(oneshot::Receiver<Result<T, MercuryError>>);
  27. impl <T> Future for MercuryFuture<T> {
  28. type Item = T;
  29. type Error = MercuryError;
  30. fn poll(&mut self) -> Poll<T, MercuryError> {
  31. match self.0.poll() {
  32. Ok(Async::Ready(Ok(value))) => Ok(Async::Ready(value)),
  33. Ok(Async::Ready(Err(err))) => Err(err),
  34. Ok(Async::NotReady) => Ok(Async::NotReady),
  35. Err(oneshot::Canceled) => Err(MercuryError),
  36. }
  37. }
  38. }
  39. impl MercuryManager {
  40. fn next_seq(&self) -> Vec<u8> {
  41. let mut seq = vec![0u8; 8];
  42. BigEndian::write_u64(&mut seq, self.lock(|inner| inner.sequence.get()));
  43. seq
  44. }
  45. pub fn request(&self, req: MercuryRequest)
  46. -> MercuryFuture<MercuryResponse>
  47. {
  48. let (tx, rx) = oneshot::channel();
  49. let pending = MercuryPending {
  50. parts: Vec::new(),
  51. partial: None,
  52. callback: Some(tx),
  53. };
  54. let seq = self.next_seq();
  55. self.lock(|inner| inner.pending.insert(seq.clone(), pending));
  56. let cmd = req.method.command();
  57. let data = req.encode(&seq);
  58. self.session().send_packet(cmd, data);
  59. MercuryFuture(rx)
  60. }
  61. pub fn get<T: Into<String>>(&self, uri: T)
  62. -> MercuryFuture<MercuryResponse>
  63. {
  64. self.request(MercuryRequest {
  65. method: MercuryMethod::GET,
  66. uri: uri.into(),
  67. content_type: None,
  68. payload: Vec::new(),
  69. })
  70. }
  71. pub fn send<T: Into<String>>(&self, uri: T, data: Vec<u8>)
  72. -> MercuryFuture<MercuryResponse>
  73. {
  74. self.request(MercuryRequest {
  75. method: MercuryMethod::SEND,
  76. uri: uri.into(),
  77. content_type: None,
  78. payload: vec![data],
  79. })
  80. }
  81. pub fn sender<T: Into<String>>(&self, uri: T) -> MercurySender {
  82. MercurySender::new(self.clone(), uri.into())
  83. }
  84. pub fn subscribe<T: Into<String>>(&self, uri: T)
  85. -> BoxFuture<mpsc::UnboundedReceiver<MercuryResponse>, MercuryError>
  86. {
  87. let uri = uri.into();
  88. let request = self.request(MercuryRequest {
  89. method: MercuryMethod::SUB,
  90. uri: uri.clone(),
  91. content_type: None,
  92. payload: Vec::new(),
  93. });
  94. let manager = self.clone();
  95. request.map(move |response| {
  96. let (tx, rx) = mpsc::unbounded();
  97. manager.lock(move |inner| {
  98. debug!("subscribed uri={} count={}", uri, response.payload.len());
  99. if response.payload.len() > 0 {
  100. // Old subscription protocol, watch the provided list of URIs
  101. for sub in response.payload {
  102. let mut sub : protocol::pubsub::Subscription
  103. = protobuf::parse_from_bytes(&sub).unwrap();
  104. let sub_uri = sub.take_uri();
  105. debug!("subscribed sub_uri={}", sub_uri);
  106. inner.subscriptions.push((sub_uri, tx.clone()));
  107. }
  108. } else {
  109. // New subscription protocol, watch the requested URI
  110. inner.subscriptions.push((uri, tx));
  111. }
  112. });
  113. rx
  114. }).boxed()
  115. }
  116. pub fn dispatch(&self, cmd: u8, mut data: EasyBuf) {
  117. let seq_len = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize;
  118. let seq = data.drain_to(seq_len).as_ref().to_owned();
  119. let flags = data.drain_to(1).as_ref()[0];
  120. let count = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize;
  121. let pending = self.lock(|inner| inner.pending.remove(&seq));
  122. let mut pending = match pending {
  123. Some(pending) => pending,
  124. None if cmd == 0xb5 => {
  125. MercuryPending {
  126. parts: Vec::new(),
  127. partial: None,
  128. callback: None,
  129. }
  130. }
  131. None => {
  132. warn!("Ignore seq {:?} cmd {:x}", seq, cmd);
  133. return;
  134. }
  135. };
  136. for i in 0..count {
  137. let mut part = Self::parse_part(&mut data);
  138. if let Some(mut partial) = mem::replace(&mut pending.partial, None) {
  139. partial.extend_from_slice(&part);
  140. part = partial;
  141. }
  142. if i == count - 1 && (flags == 2) {
  143. pending.partial = Some(part)
  144. } else {
  145. pending.parts.push(part);
  146. }
  147. }
  148. if flags == 0x1 {
  149. self.complete_request(cmd, pending);
  150. } else {
  151. self.lock(move |inner| inner.pending.insert(seq, pending));
  152. }
  153. }
  154. fn parse_part(data: &mut EasyBuf) -> Vec<u8> {
  155. let size = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize;
  156. data.drain_to(size).as_ref().to_owned()
  157. }
  158. fn complete_request(&self, cmd: u8, mut pending: MercuryPending) {
  159. let header_data = pending.parts.remove(0);
  160. let header: protocol::mercury::Header = protobuf::parse_from_bytes(&header_data).unwrap();
  161. let response = MercuryResponse {
  162. uri: header.get_uri().to_owned(),
  163. status_code: header.get_status_code(),
  164. payload: pending.parts,
  165. };
  166. if response.status_code >= 400 {
  167. warn!("error {} for uri {}", response.status_code, &response.uri);
  168. if let Some(cb) = pending.callback {
  169. let _ = cb.send(Err(MercuryError));
  170. }
  171. } else {
  172. if cmd == 0xb5 {
  173. self.lock(|inner| {
  174. let mut found = false;
  175. inner.subscriptions.retain(|&(ref prefix, ref sub)| {
  176. if response.uri.starts_with(prefix) {
  177. found = true;
  178. // if send fails, remove from list of subs
  179. // TODO: send unsub message
  180. sub.send(response.clone()).is_ok()
  181. } else {
  182. // URI doesn't match
  183. true
  184. }
  185. });
  186. if !found {
  187. debug!("unknown subscription uri={}", response.uri);
  188. }
  189. })
  190. } else if let Some(cb) = pending.callback {
  191. let _ = cb.send(Ok(response));
  192. }
  193. }
  194. }
  195. }