mod.rs 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
  2. use futures::sync::{oneshot, mpsc};
  3. use futures::{Async, Poll, BoxFuture, Future};
  4. use protobuf;
  5. use protocol;
  6. use std::collections::HashMap;
  7. use std::io::Read;
  8. use std::mem;
  9. use util::SeqGenerator;
  10. mod types;
  11. pub use self::types::*;
  12. mod sender;
  13. pub use self::sender::MercurySender;
  14. component! {
  15. MercuryManager : MercuryManagerInner {
  16. sequence: SeqGenerator<u64> = SeqGenerator::new(0),
  17. pending: HashMap<Vec<u8>, MercuryPending> = HashMap::new(),
  18. subscriptions: HashMap<String, mpsc::UnboundedSender<MercuryResponse>> = HashMap::new(),
  19. }
  20. }
  21. pub struct MercuryPending {
  22. parts: Vec<Vec<u8>>,
  23. partial: Option<Vec<u8>>,
  24. callback: Option<oneshot::Sender<Result<MercuryResponse, MercuryError>>>,
  25. }
  26. pub struct MercuryFuture<T>(oneshot::Receiver<Result<T, MercuryError>>);
  27. impl <T> Future for MercuryFuture<T> {
  28. type Item = T;
  29. type Error = MercuryError;
  30. fn poll(&mut self) -> Poll<T, MercuryError> {
  31. match self.0.poll() {
  32. Ok(Async::Ready(Ok(value))) => Ok(Async::Ready(value)),
  33. Ok(Async::Ready(Err(err))) => Err(err),
  34. Ok(Async::NotReady) => Ok(Async::NotReady),
  35. Err(oneshot::Canceled) => Err(MercuryError),
  36. }
  37. }
  38. }
  39. impl MercuryManager {
  40. fn next_seq(&self) -> Vec<u8> {
  41. let mut seq = vec![0u8; 8];
  42. BigEndian::write_u64(&mut seq, self.lock(|inner| inner.sequence.get()));
  43. seq
  44. }
  45. pub fn request(&self, req: MercuryRequest)
  46. -> MercuryFuture<MercuryResponse>
  47. {
  48. let (tx, rx) = oneshot::channel();
  49. let pending = MercuryPending {
  50. parts: Vec::new(),
  51. partial: None,
  52. callback: Some(tx),
  53. };
  54. let seq = self.next_seq();
  55. self.lock(|inner| inner.pending.insert(seq.clone(), pending));
  56. let cmd = req.method.command();
  57. let data = req.encode(&seq);
  58. self.session().send_packet(cmd, data);
  59. MercuryFuture(rx)
  60. }
  61. pub fn get<T: Into<String>>(&self, uri: T)
  62. -> MercuryFuture<MercuryResponse>
  63. {
  64. self.request(MercuryRequest {
  65. method: MercuryMethod::GET,
  66. uri: uri.into(),
  67. content_type: None,
  68. payload: Vec::new(),
  69. })
  70. }
  71. pub fn send<T: Into<String>>(&self, uri: T, data: Vec<u8>)
  72. -> MercuryFuture<MercuryResponse>
  73. {
  74. self.request(MercuryRequest {
  75. method: MercuryMethod::SEND,
  76. uri: uri.into(),
  77. content_type: None,
  78. payload: vec![data],
  79. })
  80. }
  81. /*
  82. pub fn sender<T: Into<String>>(&self, uri: T) -> MercurySender {
  83. MercurySender::new(self.clone(), uri.into())
  84. }
  85. */
  86. pub fn subscribe<T: Into<String>>(&self, uri: T)
  87. -> BoxFuture<mpsc::UnboundedReceiver<MercuryResponse>, MercuryError>
  88. {
  89. let request = self.request(MercuryRequest {
  90. method: MercuryMethod::SUB,
  91. uri: uri.into(),
  92. content_type: None,
  93. payload: Vec::new(),
  94. });
  95. let manager = self.clone();
  96. request.map(move |response| {
  97. let (tx, rx) = mpsc::unbounded();
  98. manager.lock(move |inner| {
  99. for sub in response.payload {
  100. let mut sub : protocol::pubsub::Subscription
  101. = protobuf::parse_from_bytes(&sub).unwrap();
  102. let uri = sub.take_uri();
  103. inner.subscriptions.insert(uri, tx.clone());
  104. }
  105. });
  106. rx
  107. }).boxed()
  108. }
  109. pub fn dispatch(&self, cmd: u8, data: Vec<u8>) {
  110. let mut packet = ::std::io::Cursor::new(data);
  111. let seq = {
  112. let len = packet.read_u16::<BigEndian>().unwrap() as usize;
  113. let mut seq = vec![0; len];
  114. packet.read_exact(&mut seq).unwrap();
  115. seq
  116. };
  117. let flags = packet.read_u8().unwrap();
  118. let count = packet.read_u16::<BigEndian>().unwrap() as usize;
  119. let pending = self.lock(|inner| inner.pending.remove(&seq));
  120. let mut pending = match pending {
  121. Some(pending) => pending,
  122. None if cmd == 0xb5 => {
  123. MercuryPending {
  124. parts: Vec::new(),
  125. partial: None,
  126. callback: None,
  127. }
  128. }
  129. None => {
  130. warn!("Ignore seq {:?} cmd {:x}", seq, cmd);
  131. return;
  132. }
  133. };
  134. for i in 0..count {
  135. let mut part = Self::parse_part(&mut packet);
  136. if let Some(mut data) = mem::replace(&mut pending.partial, None) {
  137. data.append(&mut part);
  138. part = data;
  139. }
  140. if i == count - 1 && (flags == 2) {
  141. pending.partial = Some(part)
  142. } else {
  143. pending.parts.push(part);
  144. }
  145. }
  146. if flags == 0x1 {
  147. self.complete_request(cmd, pending);
  148. } else {
  149. self.lock(move |inner| inner.pending.insert(seq, pending));
  150. }
  151. }
  152. fn parse_part<T: Read>(s: &mut T) -> Vec<u8> {
  153. let size = s.read_u16::<BigEndian>().unwrap() as usize;
  154. let mut buffer = vec![0; size];
  155. s.read_exact(&mut buffer).unwrap();
  156. buffer
  157. }
  158. fn complete_request(&self, cmd: u8, mut pending: MercuryPending) {
  159. let header_data = pending.parts.remove(0);
  160. let header: protocol::mercury::Header = protobuf::parse_from_bytes(&header_data).unwrap();
  161. let response = MercuryResponse {
  162. uri: header.get_uri().to_owned(),
  163. payload: pending.parts,
  164. };
  165. if cmd == 0xb5 {
  166. self.lock(|inner| {
  167. if let Some(cb) = inner.subscriptions.get(&response.uri) {
  168. cb.send(response).unwrap();
  169. }
  170. })
  171. } else if let Some(cb) = pending.callback {
  172. cb.complete(Ok(response));
  173. }
  174. }
  175. }