adaptor.rs 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. use futures::future::ok;
  2. use futures::sync::mpsc;
  3. use futures::sync::oneshot;
  4. use futures::{Future, Sink, Stream, BoxFuture, IntoFuture};
  5. use std::thread;
  6. use tokio_core::reactor::Core;
  7. use tokio_core::reactor::Handle;
  8. pub struct SinkAdaptor<T>(mpsc::UnboundedSender<T>);
  9. pub struct StreamAdaptor<T, E>(Option<mpsc::Receiver<Result<T, E>>>);
  10. impl <T> SinkAdaptor<T> {
  11. pub fn send(&mut self, item: T) {
  12. mpsc::UnboundedSender::send(&mut self.0, item).unwrap();
  13. }
  14. }
  15. impl <T, E> StreamAdaptor<T, E> {
  16. pub fn recv(&mut self) -> Result<T, E> {
  17. let receiver = self.0.take().unwrap();
  18. let receiving = receiver.into_future();
  19. let (packet, receiver) = receiving.wait().map_err(|(e, _)| e).unwrap();
  20. self.0 = Some(receiver);
  21. packet.unwrap()
  22. }
  23. }
  24. pub fn adapt<S, E>(transport: S) -> (SinkAdaptor<S::SinkItem>,
  25. StreamAdaptor<S::Item, E>,
  26. BoxFuture<(), E>)
  27. where S: Sink<SinkError=E> + Stream<Error=E> + Send + 'static,
  28. S::Item: Send + 'static,
  29. S::SinkItem: Send + 'static,
  30. E: Send + 'static,
  31. {
  32. let (receiver_tx, receiver_rx) = mpsc::channel(0);
  33. let (sender_tx, sender_rx) = mpsc::unbounded();
  34. let (sink, stream) = transport.split();
  35. let receiver_task = stream
  36. .then(ok::<_, mpsc::SendError<_>>)
  37. .forward(receiver_tx).map(|_| ())
  38. .map_err(|e| -> E { panic!(e) });
  39. let sender_task = sender_rx
  40. .map_err(|e| -> E { panic!(e) })
  41. .forward(sink).map(|_| ());
  42. let task = (receiver_task, sender_task).into_future()
  43. .map(|((), ())| ()).boxed();
  44. (SinkAdaptor(sender_tx),
  45. StreamAdaptor(Some(receiver_rx)), task)
  46. }
  47. pub fn adapt_future<F, U>(f: F) -> oneshot::Receiver<Result<U::Item, U::Error>>
  48. where F: FnOnce(Handle) -> U + Send + 'static,
  49. U: IntoFuture,
  50. U::Item: Send + 'static,
  51. U::Error: Send + 'static,
  52. {
  53. let (tx, rx) = oneshot::channel();
  54. thread::spawn(move || {
  55. let mut core = Core::new().unwrap();
  56. let handle = core.handle();
  57. let task = f(handle).into_future();
  58. let result = core.run(task);
  59. tx.complete(result);
  60. });
  61. rx
  62. }