proxytunnel.rs 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. use std::error::Error;
  2. use std::io;
  3. use std::str::FromStr;
  4. use futures::{Async, Future, Poll};
  5. use httparse;
  6. use hyper::Uri;
  7. use tokio_io::io::{read, write_all, Read, Window, WriteAll};
  8. use tokio_io::{AsyncRead, AsyncWrite};
  9. pub struct ProxyTunnel<T> {
  10. state: ProxyState<T>,
  11. }
  12. enum ProxyState<T> {
  13. ProxyConnect(WriteAll<T, Vec<u8>>),
  14. ProxyResponse(Read<T, Window<Vec<u8>>>),
  15. }
  16. pub fn connect<T: AsyncRead + AsyncWrite>(connection: T, connect_url: &str) -> ProxyTunnel<T> {
  17. let proxy = proxy_connect(connection, connect_url);
  18. ProxyTunnel {
  19. state: ProxyState::ProxyConnect(proxy),
  20. }
  21. }
  22. impl<T: AsyncRead + AsyncWrite> Future for ProxyTunnel<T> {
  23. type Item = T;
  24. type Error = io::Error;
  25. fn poll(&mut self) -> Poll<Self::Item, io::Error> {
  26. use self::ProxyState::*;
  27. loop {
  28. self.state = match self.state {
  29. ProxyConnect(ref mut write) => {
  30. let (connection, mut accumulator) = try_ready!(write.poll());
  31. let capacity = accumulator.capacity();
  32. accumulator.resize(capacity, 0);
  33. let window = Window::new(accumulator);
  34. let read = read(connection, window);
  35. ProxyResponse(read)
  36. }
  37. ProxyResponse(ref mut read_f) => {
  38. let (connection, mut window, bytes_read) = try_ready!(read_f.poll());
  39. if bytes_read == 0 {
  40. return Err(io::Error::new(io::ErrorKind::Other, "Early EOF from proxy"));
  41. }
  42. let data_end = window.start() + bytes_read;
  43. let buf = window.get_ref()[0..data_end].to_vec();
  44. let mut headers = [httparse::EMPTY_HEADER; 16];
  45. let mut response = httparse::Response::new(&mut headers);
  46. let status = match response.parse(&buf) {
  47. Ok(status) => status,
  48. Err(err) => {
  49. return Err(io::Error::new(io::ErrorKind::Other, err.description()))
  50. }
  51. };
  52. if status.is_complete() {
  53. if let Some(code) = response.code {
  54. if code == 200 {
  55. // Proxy says all is well
  56. return Ok(Async::Ready(connection));
  57. } else {
  58. let reason = response.reason.unwrap_or("no reason");
  59. let msg = format!("Proxy responded with {}: {}", code, reason);
  60. return Err(io::Error::new(io::ErrorKind::Other, msg));
  61. }
  62. } else {
  63. return Err(io::Error::new(
  64. io::ErrorKind::Other,
  65. "Malformed response from proxy",
  66. ));
  67. }
  68. } else {
  69. if data_end >= window.end() {
  70. // Allocate some more buffer space
  71. let newsize = data_end + 100;
  72. window.get_mut().resize(newsize, 0);
  73. window.set_end(newsize);
  74. }
  75. // We did not get a full header
  76. window.set_start(data_end);
  77. let read = read(connection, window);
  78. ProxyResponse(read)
  79. }
  80. }
  81. }
  82. }
  83. }
  84. }
  85. fn proxy_connect<T: AsyncWrite>(connection: T, connect_url: &str) -> WriteAll<T, Vec<u8>> {
  86. let uri = Uri::from_str(connect_url).unwrap();
  87. let buffer = format!(
  88. "CONNECT {0}:{1} HTTP/1.1\r\n\
  89. \r\n",
  90. uri.host().expect(&format!("No host in {}", uri)),
  91. uri.port().expect(&format!("No port in {}", uri))
  92. )
  93. .into_bytes();
  94. write_all(connection, buffer)
  95. }