ソースを参照

Remove usage of deprecated tokio_core::io

Thomas Bächler 7 年 前
コミット
d36017d6f0

+ 3 - 0
Cargo.lock

@@ -295,6 +295,7 @@ dependencies = [
  "serde_derive 0.9.15 (registry+https://github.com/rust-lang/crates.io-index)",
  "serde_json 0.9.10 (registry+https://github.com/rust-lang/crates.io-index)",
  "tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
  "tokio-signal 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
  "url 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "vergen 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -324,6 +325,7 @@ version = "0.1.0"
 dependencies = [
  "base64 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)",
  "byteorder 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
  "error-chain 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
  "hyper 0.11.14 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -343,6 +345,7 @@ dependencies = [
  "serde_json 0.9.10 (registry+https://github.com/rust-lang/crates.io-index)",
  "shannon 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
  "uuid 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "vergen 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
 ]

+ 1 - 0
Cargo.toml

@@ -46,6 +46,7 @@ serde = "0.9.6"
 serde_derive = "0.9.6"
 serde_json = "0.9.5"
 tokio-core = "0.1.2"
+tokio-io = "0.1"
 tokio-signal = "0.1.2"
 url = "1.3"
 

+ 2 - 0
core/Cargo.toml

@@ -10,6 +10,7 @@ path = "../protocol"
 [dependencies]
 base64 = "0.5.0"
 byteorder = "1.0"
+bytes = "0.4"
 error-chain = { version = "0.9.0", default_features = false }
 futures = "0.1.8"
 hyper = "0.11.2"
@@ -27,6 +28,7 @@ serde_derive = "0.9.6"
 serde_json = "0.9.5"
 shannon = "0.2.0"
 tokio-core = "0.1.2"
+tokio-io = "0.1"
 uuid = { version = "0.4", features = ["v4"] }
 
 [build-dependencies]

+ 3 - 1
core/build.rs

@@ -39,5 +39,7 @@ pub fn build_id() -> &'static str {{
     protobuf_macros::expand("src/lib.in.rs", &out.join("lib.rs")).unwrap();
 
     println!("cargo:rerun-if-changed=src/lib.in.rs");
-    println!("cargo:rerun-if-changed=src/connection");
+    println!("cargo:rerun-if-changed=src/connection/mod.rs");
+    println!("cargo:rerun-if-changed=src/connection/codec.rs");
+    println!("cargo:rerun-if-changed=src/connection/handshake.rs");
 }

+ 3 - 3
core/src/audio_key.rs

@@ -1,9 +1,9 @@
 use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
+use bytes::Bytes;
 use futures::sync::oneshot;
 use futures::{Async, Future, Poll};
 use std::collections::HashMap;
 use std::io::Write;
-use tokio_core::io::EasyBuf;
 
 use util::SeqGenerator;
 use util::{SpotifyId, FileId};
@@ -22,8 +22,8 @@ component! {
 }
 
 impl AudioKeyManager {
-    pub fn dispatch(&self, cmd: u8, mut data: EasyBuf) {
-        let seq = BigEndian::read_u32(data.drain_to(4).as_ref());
+    pub fn dispatch(&self, cmd: u8, mut data: Bytes) {
+        let seq = BigEndian::read_u32(data.split_to(4).as_ref());
 
         let sender = self.lock(|inner| inner.pending.remove(&seq));
 

+ 13 - 13
core/src/channel.rs

@@ -1,15 +1,15 @@
 use byteorder::{BigEndian, ByteOrder};
+use bytes::Bytes;
 use futures::sync::{BiLock, mpsc};
 use futures::{Poll, Async, Stream};
 use std::collections::HashMap;
-use tokio_core::io::EasyBuf;
 
 use util::SeqGenerator;
 
 component! {
     ChannelManager : ChannelManagerInner {
         sequence: SeqGenerator<u16> = SeqGenerator::new(0),
-        channels: HashMap<u16, mpsc::UnboundedSender<(u8, EasyBuf)>> = HashMap::new(),
+        channels: HashMap<u16, mpsc::UnboundedSender<(u8, Bytes)>> = HashMap::new(),
     }
 }
 
@@ -17,7 +17,7 @@ component! {
 pub struct ChannelError;
 
 pub struct Channel {
-    receiver: mpsc::UnboundedReceiver<(u8, EasyBuf)>,
+    receiver: mpsc::UnboundedReceiver<(u8, Bytes)>,
     state: ChannelState,
 }
 
@@ -26,12 +26,12 @@ pub struct ChannelData(BiLock<Channel>);
 
 pub enum ChannelEvent {
     Header(u8, Vec<u8>),
-    Data(EasyBuf),
+    Data(Bytes),
 }
 
 #[derive(Clone)]
 enum ChannelState {
-    Header(EasyBuf),
+    Header(Bytes),
     Data,
     Closed,
 }
@@ -48,16 +48,16 @@ impl ChannelManager {
 
         let channel = Channel {
             receiver: rx,
-            state: ChannelState::Header(EasyBuf::new()),
+            state: ChannelState::Header(Bytes::new()),
         };
 
         (seq, channel)
     }
 
-    pub fn dispatch(&self, cmd: u8, mut data: EasyBuf) {
+    pub fn dispatch(&self, cmd: u8, mut data: Bytes) {
         use std::collections::hash_map::Entry;
 
-        let id: u16 = BigEndian::read_u16(data.drain_to(2).as_ref());
+        let id: u16 = BigEndian::read_u16(data.split_to(2).as_ref());
 
         self.lock(|inner| {
             if let Entry::Occupied(entry) = inner.channels.entry(id) {
@@ -68,7 +68,7 @@ impl ChannelManager {
 }
 
 impl Channel {
-    fn recv_packet(&mut self) -> Poll<EasyBuf, ChannelError> {
+    fn recv_packet(&mut self) -> Poll<Bytes, ChannelError> {
         let (cmd, packet) = match self.receiver.poll() {
             Ok(Async::Ready(t)) => t.expect("channel closed"),
             Ok(Async::NotReady) => return Ok(Async::NotReady),
@@ -107,13 +107,13 @@ impl Stream for Channel {
                         data = try_ready!(self.recv_packet());
                     }
 
-                    let length = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize;
+                    let length = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
                     if length == 0 {
                         assert_eq!(data.len(), 0);
                         self.state = ChannelState::Data;
                     } else {
-                        let header_id = data.drain_to(1).as_ref()[0];
-                        let header_data = data.drain_to(length - 1).as_ref().to_owned();
+                        let header_id = data.split_to(1).as_ref()[0];
+                        let header_data = data.split_to(length - 1).as_ref().to_owned();
 
                         self.state = ChannelState::Header(data);
 
@@ -139,7 +139,7 @@ impl Stream for Channel {
 }
 
 impl Stream for ChannelData {
-    type Item = EasyBuf;
+    type Item = Bytes;
     type Error = ChannelError;
 
     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {

+ 21 - 14
core/src/connection/codec.rs

@@ -1,7 +1,8 @@
-use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
+use byteorder::{BigEndian, ByteOrder};
+use bytes::{Bytes, BytesMut, BufMut};
 use shannon::Shannon;
 use std::io;
-use tokio_core::io::{Codec, EasyBuf};
+use tokio_io::codec::{Decoder, Encoder};
 
 const HEADER_SIZE: usize = 3;
 const MAC_SIZE: usize = 4;
@@ -34,16 +35,17 @@ impl APCodec {
     }
 }
 
-impl Codec for APCodec {
-    type Out = (u8, Vec<u8>);
-    type In = (u8, EasyBuf);
+impl Encoder for APCodec {
+    type Item = (u8, Vec<u8>);
+    type Error = io::Error;
 
-    fn encode(&mut self, item: (u8, Vec<u8>), buf: &mut Vec<u8>) -> io::Result<()> {
+    fn encode(&mut self, item: (u8, Vec<u8>), buf: &mut BytesMut) -> io::Result<()> {
         let (cmd, payload) = item;
         let offset = buf.len();
 
-        buf.write_u8(cmd).unwrap();
-        buf.write_u16::<BigEndian>(payload.len() as u16).unwrap();
+        buf.reserve(3 + payload.len());
+        buf.put_u8(cmd);
+        buf.put_u16::<BigEndian>(payload.len() as u16);
         buf.extend_from_slice(&payload);
 
         self.encode_cipher.nonce_u32(self.encode_nonce);
@@ -57,12 +59,17 @@ impl Codec for APCodec {
 
         Ok(())
     }
+}
+
+impl Decoder for APCodec {
+    type Item = (u8, Bytes);
+    type Error = io::Error;
 
-    fn decode(&mut self, buf: &mut EasyBuf) -> io::Result<Option<(u8, EasyBuf)>> {
+    fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<(u8, Bytes)>> {
         if let DecodeState::Header = self.decode_state {
             if buf.len() >= HEADER_SIZE {
                 let mut header = [0u8; HEADER_SIZE];
-                header.copy_from_slice(buf.drain_to(HEADER_SIZE).as_slice());
+                header.copy_from_slice(buf.split_to(HEADER_SIZE).as_ref());
 
                 self.decode_cipher.nonce_u32(self.decode_nonce);
                 self.decode_nonce += 1;
@@ -79,13 +86,13 @@ impl Codec for APCodec {
             if buf.len() >= size + MAC_SIZE {
                 self.decode_state = DecodeState::Header;
 
-                let mut payload = buf.drain_to(size + MAC_SIZE);
+                let mut payload = buf.split_to(size + MAC_SIZE);
 
-                self.decode_cipher.decrypt(&mut payload.get_mut()[..size]);
+                self.decode_cipher.decrypt(&mut payload.get_mut(..size).unwrap());
                 let mac = payload.split_off(size);
-                self.decode_cipher.check_mac(mac.as_slice())?;
+                self.decode_cipher.check_mac(mac.as_ref())?;
 
-                return Ok(Some((cmd, payload)));
+                return Ok(Some((cmd, payload.freeze())));
             }
         }
 

+ 11 - 9
core/src/connection/handshake.rs

@@ -3,9 +3,11 @@ use crypto::hmac::Hmac;
 use crypto::mac::Mac;use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
 use protobuf::{self, Message, MessageStatic};
 use rand::thread_rng;
-use std::io::{self, Read, Write};
+use std::io::{self, Read};
 use std::marker::PhantomData;
-use tokio_core::io::{Io, Framed, write_all, WriteAll, read_exact, ReadExact, Window};
+use tokio_io::{AsyncRead, AsyncWrite};
+use tokio_io::codec::Framed;
+use tokio_io::io::{write_all, WriteAll, read_exact, ReadExact, Window};
 use futures::{Poll, Async, Future};
 
 use diffie_hellman::DHLocalKeys;
@@ -25,7 +27,7 @@ enum HandshakeState<T> {
     ClientResponse(Option<APCodec>, WriteAll<T, Vec<u8>>),
 }
 
-pub fn handshake<T: Io>(connection: T) -> Handshake<T> {
+pub fn handshake<T: AsyncRead + AsyncWrite>(connection: T) -> Handshake<T> {
     let local_keys = DHLocalKeys::random(&mut thread_rng());
     let client_hello = client_hello(connection, local_keys.public_key());
 
@@ -35,7 +37,7 @@ pub fn handshake<T: Io>(connection: T) -> Handshake<T> {
     }
 }
 
-impl <T: Io> Future for Handshake<T> {
+impl <T: AsyncRead + AsyncWrite> Future for Handshake<T> {
     type Item = Framed<T, APCodec>;
     type Error = io::Error;
 
@@ -78,7 +80,7 @@ impl <T: Io> Future for Handshake<T> {
     }
 }
 
-fn client_hello<T: Write>(connection: T, gc: Vec<u8>) -> WriteAll<T, Vec<u8>> {
+fn client_hello<T: AsyncWrite>(connection: T, gc: Vec<u8>) -> WriteAll<T, Vec<u8>> {
     let packet = protobuf_init!(ClientHello::new(), {
         build_info => {
             product: protocol::keyexchange::Product::PRODUCT_PARTNER,
@@ -104,7 +106,7 @@ fn client_hello<T: Write>(connection: T, gc: Vec<u8>) -> WriteAll<T, Vec<u8>> {
     write_all(connection, buffer)
 }
 
-fn client_response<T: Write>(connection: T, challenge: Vec<u8>) -> WriteAll<T, Vec<u8>> {
+fn client_response<T: AsyncWrite>(connection: T, challenge: Vec<u8>) -> WriteAll<T, Vec<u8>> {
     let packet = protobuf_init!(ClientResponsePlaintext::new(), {
         login_crypto_response.diffie_hellman => {
             hmac: challenge
@@ -126,14 +128,14 @@ enum RecvPacket<T, M: MessageStatic> {
     Body(ReadExact<T, Window<Vec<u8>>>, PhantomData<M>),
 }
 
-fn recv_packet<T, M>(connection: T, acc: Vec<u8>) -> RecvPacket<T, M>
+fn recv_packet<T: AsyncRead, M>(connection: T, acc: Vec<u8>) -> RecvPacket<T, M>
     where T: Read,
           M: MessageStatic
 {
     RecvPacket::Header(read_into_accumulator(connection, 4, acc), PhantomData)
 }
 
-impl <T, M> Future for RecvPacket<T, M>
+impl <T: AsyncRead, M> Future for RecvPacket<T, M>
     where T: Read,
           M: MessageStatic
 {
@@ -165,7 +167,7 @@ impl <T, M> Future for RecvPacket<T, M>
     }
 }
 
-fn read_into_accumulator<T: Read>(connection: T, size: usize, mut acc: Vec<u8>) -> ReadExact<T, Window<Vec<u8>>> {
+fn read_into_accumulator<T: AsyncRead>(connection: T, size: usize, mut acc: Vec<u8>) -> ReadExact<T, Window<Vec<u8>>> {
     let offset = acc.len();
     acc.resize(offset + size, 0);
 

+ 1 - 1
core/src/connection/mod.rs

@@ -9,7 +9,7 @@ use std::io;
 use std::net::ToSocketAddrs;
 use tokio_core::net::TcpStream;
 use tokio_core::reactor::Handle;
-use tokio_core::io::Framed;
+use tokio_io::codec::Framed;
 use protobuf::{self, Message};
 
 use authentication::Credentials;

+ 2 - 3
core/src/lib.rs

@@ -1,8 +1,5 @@
 #![cfg_attr(feature = "cargo-clippy", allow(unused_io_amount))]
 
-// TODO: many items from tokio-core::io have been deprecated in favour of tokio-io
-#![allow(deprecated)]
-
 #[macro_use] extern crate error_chain;
 #[macro_use] extern crate futures;
 #[macro_use] extern crate lazy_static;
@@ -11,6 +8,7 @@
 
 extern crate base64;
 extern crate byteorder;
+extern crate bytes;
 extern crate crypto;
 extern crate hyper;
 extern crate num_bigint;
@@ -23,6 +21,7 @@ extern crate serde;
 extern crate serde_json;
 extern crate shannon;
 extern crate tokio_core;
+extern crate tokio_io;
 extern crate uuid;
 
 extern crate librespot_protocol as protocol;

+ 9 - 9
core/src/mercury/mod.rs

@@ -1,11 +1,11 @@
 use byteorder::{BigEndian, ByteOrder};
+use bytes::Bytes;
 use futures::sync::{oneshot, mpsc};
 use futures::{Async, Poll, Future};
 use protobuf;
 use protocol;
 use std::collections::HashMap;
 use std::mem;
-use tokio_core::io::EasyBuf;
 
 use util::SeqGenerator;
 
@@ -136,12 +136,12 @@ impl MercuryManager {
         }))
     }
 
-    pub fn dispatch(&self, cmd: u8, mut data: EasyBuf) {
-        let seq_len = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize;
-        let seq = data.drain_to(seq_len).as_ref().to_owned();
+    pub fn dispatch(&self, cmd: u8, mut data: Bytes) {
+        let seq_len = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
+        let seq = data.split_to(seq_len).as_ref().to_owned();
 
-        let flags = data.drain_to(1).as_ref()[0];
-        let count = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize;
+        let flags = data.split_to(1).as_ref()[0];
+        let count = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
 
         let pending = self.lock(|inner| inner.pending.remove(&seq));
 
@@ -181,9 +181,9 @@ impl MercuryManager {
         }
     }
 
-    fn parse_part(data: &mut EasyBuf) -> Vec<u8> {
-        let size = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize;
-        data.drain_to(size).as_ref().to_owned()
+    fn parse_part(data: &mut Bytes) -> Vec<u8> {
+        let size = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
+        data.split_to(size).as_ref().to_owned()
     }
 
     fn complete_request(&self, cmd: u8, mut pending: MercuryPending) {

+ 5 - 5
core/src/session.rs

@@ -1,10 +1,10 @@
+use bytes::Bytes;
 use crypto::digest::Digest;
 use crypto::sha1::Sha1;
 use futures::sync::mpsc;
 use futures::{Future, Stream, IntoFuture, Poll, Async};
 use std::io;
 use std::sync::{RwLock, Arc, Weak};
-use tokio_core::io::EasyBuf;
 use tokio_core::reactor::{Handle, Remote};
 use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
 
@@ -156,7 +156,7 @@ impl Session {
     }
 
     #[cfg_attr(feature = "cargo-clippy", allow(match_same_arms))]
-    fn dispatch(&self, cmd: u8, data: EasyBuf) {
+    fn dispatch(&self, cmd: u8, data: Bytes) {
         match cmd {
             0x4 => {
                 self.debug_info();
@@ -229,10 +229,10 @@ impl Drop for SessionInternal {
 }
 
 struct DispatchTask<S>(S, SessionWeak)
-    where S: Stream<Item = (u8, EasyBuf)>;
+    where S: Stream<Item = (u8, Bytes)>;
 
 impl <S> Future for DispatchTask<S>
-    where S: Stream<Item = (u8, EasyBuf)>
+    where S: Stream<Item = (u8, Bytes)>
 {
     type Item = ();
     type Error = S::Error;
@@ -253,7 +253,7 @@ impl <S> Future for DispatchTask<S>
 }
 
 impl <S> Drop for DispatchTask<S>
-    where S: Stream<Item = (u8, EasyBuf)>
+    where S: Stream<Item = (u8, Bytes)>
 {
     fn drop(&mut self) {
         debug!("drop Dispatch");

+ 2 - 4
src/main.rs

@@ -1,12 +1,10 @@
-// TODO: many items from tokio-core::io have been deprecated in favour of tokio-io
-#![allow(deprecated)]
-
 #[macro_use] extern crate log;
 extern crate env_logger;
 extern crate futures;
 extern crate getopts;
 extern crate librespot;
 extern crate tokio_core;
+extern crate tokio_io;
 extern crate tokio_signal;
 
 use env_logger::LogBuilder;
@@ -17,7 +15,7 @@ use std::path::PathBuf;
 use std::process::exit;
 use std::str::FromStr;
 use tokio_core::reactor::{Handle, Core};
-use tokio_core::io::IoStream;
+use tokio_io::IoStream;
 use std::mem;
 
 use librespot::core::authentication::{get_credentials, Credentials};