sender.rs 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. use std::collections::VecDeque;
  2. use futures::{Async, Poll, Future, Sink, StartSend, AsyncSink};
  3. use super::*;
  4. pub struct MercurySender {
  5. mercury: MercuryManager,
  6. uri: String,
  7. pending: VecDeque<MercuryFuture<MercuryResponse>>,
  8. }
  9. impl MercurySender {
  10. // TODO: pub(super) when stable
  11. pub fn new(mercury: MercuryManager, uri: String) -> MercurySender {
  12. MercurySender {
  13. mercury: mercury,
  14. uri: uri,
  15. pending: VecDeque::new(),
  16. }
  17. }
  18. }
  19. impl Sink for MercurySender {
  20. type SinkItem = Vec<u8>;
  21. type SinkError = MercuryError;
  22. fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
  23. let task = self.mercury.send(self.uri.clone(), item);
  24. self.pending.push_back(task);
  25. Ok(AsyncSink::Ready)
  26. }
  27. fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
  28. loop {
  29. match self.pending.front_mut() {
  30. Some(task) => {
  31. try_ready!(task.poll());
  32. }
  33. None => {
  34. return Ok(Async::Ready(()));
  35. }
  36. }
  37. self.pending.pop_front();
  38. }
  39. }
  40. }