channel.rs 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. use byteorder::{BigEndian, ByteOrder};
  2. use bytes::Bytes;
  3. use futures::sync::{mpsc, BiLock};
  4. use futures::{Async, Poll, Stream};
  5. use std::collections::HashMap;
  6. use std::time::Instant;
  7. use util::SeqGenerator;
  8. component! {
  9. ChannelManager : ChannelManagerInner {
  10. sequence: SeqGenerator<u16> = SeqGenerator::new(0),
  11. channels: HashMap<u16, mpsc::UnboundedSender<(u8, Bytes)>> = HashMap::new(),
  12. download_rate_estimate: usize = 0,
  13. download_measurement_start: Option<Instant> = None,
  14. download_measurement_bytes: usize = 0,
  15. }
  16. }
  17. #[derive(Debug, Hash, PartialEq, Eq, Copy, Clone)]
  18. pub struct ChannelError;
  19. pub struct Channel {
  20. receiver: mpsc::UnboundedReceiver<(u8, Bytes)>,
  21. state: ChannelState,
  22. }
  23. pub struct ChannelHeaders(BiLock<Channel>);
  24. pub struct ChannelData(BiLock<Channel>);
  25. pub enum ChannelEvent {
  26. Header(u8, Vec<u8>),
  27. Data(Bytes),
  28. }
  29. #[derive(Clone)]
  30. enum ChannelState {
  31. Header(Bytes),
  32. Data,
  33. Closed,
  34. }
  35. impl ChannelManager {
  36. pub fn allocate(&self) -> (u16, Channel) {
  37. let (tx, rx) = mpsc::unbounded();
  38. let seq = self.lock(|inner| {
  39. let seq = inner.sequence.get();
  40. inner.channels.insert(seq, tx);
  41. seq
  42. });
  43. let channel = Channel {
  44. receiver: rx,
  45. state: ChannelState::Header(Bytes::new()),
  46. };
  47. (seq, channel)
  48. }
  49. pub(crate) fn dispatch(&self, cmd: u8, mut data: Bytes) {
  50. use std::collections::hash_map::Entry;
  51. let id: u16 = BigEndian::read_u16(data.split_to(2).as_ref());
  52. self.lock(|inner| {
  53. let current_time = Instant::now();
  54. if let Some(download_measurement_start) = inner.download_measurement_start {
  55. if (current_time - download_measurement_start).as_millis() > 1000 {
  56. inner.download_rate_estimate = 1000 * inner.download_measurement_bytes
  57. / (current_time - download_measurement_start).as_millis() as usize;
  58. inner.download_measurement_start = Some(current_time);
  59. inner.download_measurement_bytes = 0;
  60. }
  61. } else {
  62. inner.download_measurement_start = Some(current_time);
  63. }
  64. inner.download_measurement_bytes += data.len();
  65. if let Entry::Occupied(entry) = inner.channels.entry(id) {
  66. let _ = entry.get().unbounded_send((cmd, data));
  67. }
  68. });
  69. }
  70. pub fn get_download_rate_estimate(&self) -> usize {
  71. return self.lock(|inner| inner.download_rate_estimate);
  72. }
  73. }
  74. impl Channel {
  75. fn recv_packet(&mut self) -> Poll<Bytes, ChannelError> {
  76. let (cmd, packet) = match self.receiver.poll() {
  77. Ok(Async::Ready(Some(t))) => t,
  78. Ok(Async::Ready(None)) => return Err(ChannelError), // The channel has been closed.
  79. Ok(Async::NotReady) => return Ok(Async::NotReady),
  80. Err(()) => unreachable!(),
  81. };
  82. if cmd == 0xa {
  83. let code = BigEndian::read_u16(&packet.as_ref()[..2]);
  84. error!("channel error: {} {}", packet.len(), code);
  85. self.state = ChannelState::Closed;
  86. Err(ChannelError)
  87. } else {
  88. Ok(Async::Ready(packet))
  89. }
  90. }
  91. pub fn split(self) -> (ChannelHeaders, ChannelData) {
  92. let (headers, data) = BiLock::new(self);
  93. (ChannelHeaders(headers), ChannelData(data))
  94. }
  95. }
  96. impl Stream for Channel {
  97. type Item = ChannelEvent;
  98. type Error = ChannelError;
  99. fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
  100. loop {
  101. match self.state.clone() {
  102. ChannelState::Closed => panic!("Polling already terminated channel"),
  103. ChannelState::Header(mut data) => {
  104. if data.len() == 0 {
  105. data = try_ready!(self.recv_packet());
  106. }
  107. let length = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
  108. if length == 0 {
  109. assert_eq!(data.len(), 0);
  110. self.state = ChannelState::Data;
  111. } else {
  112. let header_id = data.split_to(1).as_ref()[0];
  113. let header_data = data.split_to(length - 1).as_ref().to_owned();
  114. self.state = ChannelState::Header(data);
  115. let event = ChannelEvent::Header(header_id, header_data);
  116. return Ok(Async::Ready(Some(event)));
  117. }
  118. }
  119. ChannelState::Data => {
  120. let data = try_ready!(self.recv_packet());
  121. if data.len() == 0 {
  122. self.receiver.close();
  123. self.state = ChannelState::Closed;
  124. return Ok(Async::Ready(None));
  125. } else {
  126. let event = ChannelEvent::Data(data);
  127. return Ok(Async::Ready(Some(event)));
  128. }
  129. }
  130. }
  131. }
  132. }
  133. }
  134. impl Stream for ChannelData {
  135. type Item = Bytes;
  136. type Error = ChannelError;
  137. fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
  138. let mut channel = match self.0.poll_lock() {
  139. Async::Ready(c) => c,
  140. Async::NotReady => return Ok(Async::NotReady),
  141. };
  142. loop {
  143. match try_ready!(channel.poll()) {
  144. Some(ChannelEvent::Header(..)) => (),
  145. Some(ChannelEvent::Data(data)) => return Ok(Async::Ready(Some(data))),
  146. None => return Ok(Async::Ready(None)),
  147. }
  148. }
  149. }
  150. }
  151. impl Stream for ChannelHeaders {
  152. type Item = (u8, Vec<u8>);
  153. type Error = ChannelError;
  154. fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
  155. let mut channel = match self.0.poll_lock() {
  156. Async::Ready(c) => c,
  157. Async::NotReady => return Ok(Async::NotReady),
  158. };
  159. match try_ready!(channel.poll()) {
  160. Some(ChannelEvent::Header(id, data)) => Ok(Async::Ready(Some((id, data)))),
  161. Some(ChannelEvent::Data(..)) | None => Ok(Async::Ready(None)),
  162. }
  163. }
  164. }