sender.rs 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. use std::collections::VecDeque;
  2. use futures::{Async, Poll, Future, Sink, StartSend, AsyncSink};
  3. use super::*;
  4. pub struct MercurySender {
  5. mercury: MercuryManager,
  6. uri: String,
  7. pending: VecDeque<MercuryFuture<MercuryResponse>>,
  8. }
  9. impl MercurySender {
  10. // TODO: pub(super) when stable
  11. pub(crate) fn new(mercury: MercuryManager, uri: String) -> MercurySender {
  12. MercurySender {
  13. mercury: mercury,
  14. uri: uri,
  15. pending: VecDeque::new(),
  16. }
  17. }
  18. }
  19. impl Clone for MercurySender {
  20. fn clone(&self) -> MercurySender {
  21. MercurySender {
  22. mercury: self.mercury.clone(),
  23. uri: self.uri.clone(),
  24. pending: VecDeque::new(),
  25. }
  26. }
  27. }
  28. impl Sink for MercurySender {
  29. type SinkItem = Vec<u8>;
  30. type SinkError = MercuryError;
  31. fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
  32. let task = self.mercury.send(self.uri.clone(), item);
  33. self.pending.push_back(task);
  34. Ok(AsyncSink::Ready)
  35. }
  36. fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
  37. loop {
  38. match self.pending.front_mut() {
  39. Some(task) => {
  40. try_ready!(task.poll());
  41. }
  42. None => {
  43. return Ok(Async::Ready(()));
  44. }
  45. }
  46. self.pending.pop_front();
  47. }
  48. }
  49. }