channel.rs 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. use byteorder::{BigEndian, ByteOrder};
  2. use bytes::Bytes;
  3. use futures::sync::{mpsc, BiLock};
  4. use futures::{Async, Poll, Stream};
  5. use std::collections::HashMap;
  6. use util::SeqGenerator;
  7. component! {
  8. ChannelManager : ChannelManagerInner {
  9. sequence: SeqGenerator<u16> = SeqGenerator::new(0),
  10. channels: HashMap<u16, mpsc::UnboundedSender<(u8, Bytes)>> = HashMap::new(),
  11. }
  12. }
  13. #[derive(Debug, Hash, PartialEq, Eq, Copy, Clone)]
  14. pub struct ChannelError;
  15. pub struct Channel {
  16. receiver: mpsc::UnboundedReceiver<(u8, Bytes)>,
  17. state: ChannelState,
  18. }
  19. pub struct ChannelHeaders(BiLock<Channel>);
  20. pub struct ChannelData(BiLock<Channel>);
  21. pub enum ChannelEvent {
  22. Header(u8, Vec<u8>),
  23. Data(Bytes),
  24. }
  25. #[derive(Clone)]
  26. enum ChannelState {
  27. Header(Bytes),
  28. Data,
  29. Closed,
  30. }
  31. impl ChannelManager {
  32. pub fn allocate(&self) -> (u16, Channel) {
  33. let (tx, rx) = mpsc::unbounded();
  34. let seq = self.lock(|inner| {
  35. let seq = inner.sequence.get();
  36. inner.channels.insert(seq, tx);
  37. seq
  38. });
  39. let channel = Channel {
  40. receiver: rx,
  41. state: ChannelState::Header(Bytes::new()),
  42. };
  43. (seq, channel)
  44. }
  45. pub(crate) fn dispatch(&self, cmd: u8, mut data: Bytes) {
  46. use std::collections::hash_map::Entry;
  47. let id: u16 = BigEndian::read_u16(data.split_to(2).as_ref());
  48. self.lock(|inner| {
  49. if let Entry::Occupied(entry) = inner.channels.entry(id) {
  50. let _ = entry.get().unbounded_send((cmd, data));
  51. }
  52. });
  53. }
  54. }
  55. impl Channel {
  56. fn recv_packet(&mut self) -> Poll<Bytes, ChannelError> {
  57. let (cmd, packet) = match self.receiver.poll() {
  58. Ok(Async::Ready(t)) => t.expect("channel closed"),
  59. Ok(Async::NotReady) => return Ok(Async::NotReady),
  60. Err(()) => unreachable!(),
  61. };
  62. if cmd == 0xa {
  63. let code = BigEndian::read_u16(&packet.as_ref()[..2]);
  64. error!("channel error: {} {}", packet.len(), code);
  65. self.state = ChannelState::Closed;
  66. Err(ChannelError)
  67. } else {
  68. Ok(Async::Ready(packet))
  69. }
  70. }
  71. pub fn split(self) -> (ChannelHeaders, ChannelData) {
  72. let (headers, data) = BiLock::new(self);
  73. (ChannelHeaders(headers), ChannelData(data))
  74. }
  75. }
  76. impl Stream for Channel {
  77. type Item = ChannelEvent;
  78. type Error = ChannelError;
  79. fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
  80. loop {
  81. match self.state.clone() {
  82. ChannelState::Closed => panic!("Polling already terminated channel"),
  83. ChannelState::Header(mut data) => {
  84. if data.len() == 0 {
  85. data = try_ready!(self.recv_packet());
  86. }
  87. let length = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
  88. if length == 0 {
  89. assert_eq!(data.len(), 0);
  90. self.state = ChannelState::Data;
  91. } else {
  92. let header_id = data.split_to(1).as_ref()[0];
  93. let header_data = data.split_to(length - 1).as_ref().to_owned();
  94. self.state = ChannelState::Header(data);
  95. let event = ChannelEvent::Header(header_id, header_data);
  96. return Ok(Async::Ready(Some(event)));
  97. }
  98. }
  99. ChannelState::Data => {
  100. let data = try_ready!(self.recv_packet());
  101. if data.len() == 0 {
  102. self.receiver.close();
  103. self.state = ChannelState::Closed;
  104. return Ok(Async::Ready(None));
  105. } else {
  106. let event = ChannelEvent::Data(data);
  107. return Ok(Async::Ready(Some(event)));
  108. }
  109. }
  110. }
  111. }
  112. }
  113. }
  114. impl Stream for ChannelData {
  115. type Item = Bytes;
  116. type Error = ChannelError;
  117. fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
  118. let mut channel = match self.0.poll_lock() {
  119. Async::Ready(c) => c,
  120. Async::NotReady => return Ok(Async::NotReady),
  121. };
  122. loop {
  123. match try_ready!(channel.poll()) {
  124. Some(ChannelEvent::Header(..)) => (),
  125. Some(ChannelEvent::Data(data)) => return Ok(Async::Ready(Some(data))),
  126. None => return Ok(Async::Ready(None)),
  127. }
  128. }
  129. }
  130. }
  131. impl Stream for ChannelHeaders {
  132. type Item = (u8, Vec<u8>);
  133. type Error = ChannelError;
  134. fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
  135. let mut channel = match self.0.poll_lock() {
  136. Async::Ready(c) => c,
  137. Async::NotReady => return Ok(Async::NotReady),
  138. };
  139. match try_ready!(channel.poll()) {
  140. Some(ChannelEvent::Header(id, data)) => Ok(Async::Ready(Some((id, data)))),
  141. Some(ChannelEvent::Data(..)) | None => Ok(Async::Ready(None)),
  142. }
  143. }
  144. }