proxytunnel.rs 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. use std::io;
  2. use std::str::FromStr;
  3. use futures::{Async, Future, Poll};
  4. use httparse;
  5. use hyper::Uri;
  6. use tokio_io::io::{read, write_all, Read, Window, WriteAll};
  7. use tokio_io::{AsyncRead, AsyncWrite};
  8. pub struct ProxyTunnel<T> {
  9. state: ProxyState<T>,
  10. }
  11. enum ProxyState<T> {
  12. ProxyConnect(WriteAll<T, Vec<u8>>),
  13. ProxyResponse(Read<T, Window<Vec<u8>>>),
  14. }
  15. pub fn connect<T: AsyncRead + AsyncWrite>(connection: T, connect_url: &str) -> ProxyTunnel<T> {
  16. let proxy = proxy_connect(connection, connect_url);
  17. ProxyTunnel {
  18. state: ProxyState::ProxyConnect(proxy),
  19. }
  20. }
  21. impl<T: AsyncRead + AsyncWrite> Future for ProxyTunnel<T> {
  22. type Item = T;
  23. type Error = io::Error;
  24. fn poll(&mut self) -> Poll<Self::Item, io::Error> {
  25. use self::ProxyState::*;
  26. loop {
  27. self.state = match self.state {
  28. ProxyConnect(ref mut write) => {
  29. let (connection, mut accumulator) = try_ready!(write.poll());
  30. let capacity = accumulator.capacity();
  31. accumulator.resize(capacity, 0);
  32. let window = Window::new(accumulator);
  33. let read = read(connection, window);
  34. ProxyResponse(read)
  35. }
  36. ProxyResponse(ref mut read_f) => {
  37. let (connection, mut window, bytes_read) = try_ready!(read_f.poll());
  38. if bytes_read == 0 {
  39. return Err(io::Error::new(io::ErrorKind::Other, "Early EOF from proxy"));
  40. }
  41. let data_end = window.start() + bytes_read;
  42. let buf = window.get_ref()[0..data_end].to_vec();
  43. let mut headers = [httparse::EMPTY_HEADER; 16];
  44. let mut response = httparse::Response::new(&mut headers);
  45. let status = match response.parse(&buf) {
  46. Ok(status) => status,
  47. Err(err) => {
  48. return Err(io::Error::new(io::ErrorKind::Other, err.to_string()));
  49. }
  50. };
  51. if status.is_complete() {
  52. if let Some(code) = response.code {
  53. if code == 200 {
  54. // Proxy says all is well
  55. return Ok(Async::Ready(connection));
  56. } else {
  57. let reason = response.reason.unwrap_or("no reason");
  58. let msg = format!("Proxy responded with {}: {}", code, reason);
  59. return Err(io::Error::new(io::ErrorKind::Other, msg));
  60. }
  61. } else {
  62. return Err(io::Error::new(
  63. io::ErrorKind::Other,
  64. "Malformed response from proxy",
  65. ));
  66. }
  67. } else {
  68. if data_end >= window.end() {
  69. // Allocate some more buffer space
  70. let newsize = data_end + 100;
  71. window.get_mut().resize(newsize, 0);
  72. window.set_end(newsize);
  73. }
  74. // We did not get a full header
  75. window.set_start(data_end);
  76. let read = read(connection, window);
  77. ProxyResponse(read)
  78. }
  79. }
  80. }
  81. }
  82. }
  83. }
  84. fn proxy_connect<T: AsyncWrite>(connection: T, connect_url: &str) -> WriteAll<T, Vec<u8>> {
  85. let uri = Uri::from_str(connect_url).unwrap();
  86. let buffer = format!(
  87. "CONNECT {0}:{1} HTTP/1.1\r\n\
  88. \r\n",
  89. uri.host().expect(&format!("No host in {}", uri)),
  90. uri.port().expect(&format!("No port in {}", uri))
  91. )
  92. .into_bytes();
  93. write_all(connection, buffer)
  94. }