浏览代码

Merge branch 'master' into ReadMe-Changes

Colm 7 年之前
父节点
当前提交
e104c2d44b

+ 2 - 4
.travis.yml

@@ -1,6 +1,6 @@
 language: rust
 rust:
-  - 1.17.0
+  - 1.18.0
   - stable
   - beta
   - nightly
@@ -24,13 +24,11 @@ before_script:
 script:
     - cargo build --no-default-features
     - cargo build --no-default-features --features "with-tremor"
+    - cargo build --no-default-features --features "with-lewton";
     - cargo build --no-default-features --features "portaudio-backend"
     - cargo build --no-default-features --features "pulseaudio-backend"
     - cargo build --no-default-features --features "alsa-backend"
     - cargo build --no-default-features --target armv7-unknown-linux-gnueabihf
-    - if [[ $TRAVIS_RUST_VERSION != *"1.17.0"* ]]; then
-        cargo build --no-default-features --features "with-lewton";
-      fi
 
 notifications:
     email: false

文件差异内容过多而无法显示
+ 227 - 218
Cargo.lock


+ 1 - 0
Cargo.toml

@@ -46,6 +46,7 @@ serde = "0.9.6"
 serde_derive = "0.9.6"
 serde_json = "0.9.5"
 tokio-core = "0.1.2"
+tokio-io = "0.1"
 tokio-signal = "0.1.2"
 url = "1.3"
 

+ 1 - 1
audio/src/fetch.rs

@@ -340,7 +340,7 @@ impl Seek for AudioFileStreaming {
         // Notify the fetch thread to get the correct block
         // This can fail if fetch thread has completed, in which case the
         // block is ready. Just ignore the error.
-        let _ = self.seek.send(self.position);
+        let _ = self.seek.unbounded_send(self.position);
         Ok(self.position)
     }
 }

+ 2 - 0
core/Cargo.toml

@@ -10,6 +10,7 @@ path = "../protocol"
 [dependencies]
 base64 = "0.5.0"
 byteorder = "1.0"
+bytes = "0.4"
 error-chain = { version = "0.9.0", default_features = false }
 futures = "0.1.8"
 hyper = "0.11.2"
@@ -27,6 +28,7 @@ serde_derive = "0.9.6"
 serde_json = "0.9.5"
 shannon = "0.2.0"
 tokio-core = "0.1.2"
+tokio-io = "0.1"
 uuid = { version = "0.4", features = ["v4"] }
 
 [build-dependencies]

+ 3 - 1
core/build.rs

@@ -39,5 +39,7 @@ pub fn build_id() -> &'static str {{
     protobuf_macros::expand("src/lib.in.rs", &out.join("lib.rs")).unwrap();
 
     println!("cargo:rerun-if-changed=src/lib.in.rs");
-    println!("cargo:rerun-if-changed=src/connection");
+    println!("cargo:rerun-if-changed=src/connection/mod.rs");
+    println!("cargo:rerun-if-changed=src/connection/codec.rs");
+    println!("cargo:rerun-if-changed=src/connection/handshake.rs");
 }

+ 5 - 5
core/src/audio_key.rs

@@ -1,9 +1,9 @@
 use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
+use bytes::Bytes;
 use futures::sync::oneshot;
 use futures::{Async, Future, Poll};
 use std::collections::HashMap;
 use std::io::Write;
-use tokio_core::io::EasyBuf;
 
 use util::SeqGenerator;
 use util::{SpotifyId, FileId};
@@ -22,8 +22,8 @@ component! {
 }
 
 impl AudioKeyManager {
-    pub fn dispatch(&self, cmd: u8, mut data: EasyBuf) {
-        let seq = BigEndian::read_u32(data.drain_to(4).as_ref());
+    pub fn dispatch(&self, cmd: u8, mut data: Bytes) {
+        let seq = BigEndian::read_u32(data.split_to(4).as_ref());
 
         let sender = self.lock(|inner| inner.pending.remove(&seq));
 
@@ -32,11 +32,11 @@ impl AudioKeyManager {
                 0xd => {
                     let mut key = [0u8; 16];
                     key.copy_from_slice(data.as_ref());
-                    sender.complete(Ok(AudioKey(key)));
+                    let _ = sender.send(Ok(AudioKey(key)));
                 }
                 0xe => {
                     warn!("error audio key {:x} {:x}", data.as_ref()[0], data.as_ref()[1]);
-                    sender.complete(Err(AudioKeyError));
+                    let _ = sender.send(Err(AudioKeyError));
                 }
                 _ => (),
             }

+ 14 - 14
core/src/channel.rs

@@ -1,15 +1,15 @@
 use byteorder::{BigEndian, ByteOrder};
+use bytes::Bytes;
 use futures::sync::{BiLock, mpsc};
 use futures::{Poll, Async, Stream};
 use std::collections::HashMap;
-use tokio_core::io::EasyBuf;
 
 use util::SeqGenerator;
 
 component! {
     ChannelManager : ChannelManagerInner {
         sequence: SeqGenerator<u16> = SeqGenerator::new(0),
-        channels: HashMap<u16, mpsc::UnboundedSender<(u8, EasyBuf)>> = HashMap::new(),
+        channels: HashMap<u16, mpsc::UnboundedSender<(u8, Bytes)>> = HashMap::new(),
     }
 }
 
@@ -17,7 +17,7 @@ component! {
 pub struct ChannelError;
 
 pub struct Channel {
-    receiver: mpsc::UnboundedReceiver<(u8, EasyBuf)>,
+    receiver: mpsc::UnboundedReceiver<(u8, Bytes)>,
     state: ChannelState,
 }
 
@@ -26,12 +26,12 @@ pub struct ChannelData(BiLock<Channel>);
 
 pub enum ChannelEvent {
     Header(u8, Vec<u8>),
-    Data(EasyBuf),
+    Data(Bytes),
 }
 
 #[derive(Clone)]
 enum ChannelState {
-    Header(EasyBuf),
+    Header(Bytes),
     Data,
     Closed,
 }
@@ -48,27 +48,27 @@ impl ChannelManager {
 
         let channel = Channel {
             receiver: rx,
-            state: ChannelState::Header(EasyBuf::new()),
+            state: ChannelState::Header(Bytes::new()),
         };
 
         (seq, channel)
     }
 
-    pub fn dispatch(&self, cmd: u8, mut data: EasyBuf) {
+    pub fn dispatch(&self, cmd: u8, mut data: Bytes) {
         use std::collections::hash_map::Entry;
 
-        let id: u16 = BigEndian::read_u16(data.drain_to(2).as_ref());
+        let id: u16 = BigEndian::read_u16(data.split_to(2).as_ref());
 
         self.lock(|inner| {
             if let Entry::Occupied(entry) = inner.channels.entry(id) {
-                let _ = entry.get().send((cmd, data));
+                let _ = entry.get().unbounded_send((cmd, data));
             }
         });
     }
 }
 
 impl Channel {
-    fn recv_packet(&mut self) -> Poll<EasyBuf, ChannelError> {
+    fn recv_packet(&mut self) -> Poll<Bytes, ChannelError> {
         let (cmd, packet) = match self.receiver.poll() {
             Ok(Async::Ready(t)) => t.expect("channel closed"),
             Ok(Async::NotReady) => return Ok(Async::NotReady),
@@ -107,13 +107,13 @@ impl Stream for Channel {
                         data = try_ready!(self.recv_packet());
                     }
 
-                    let length = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize;
+                    let length = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
                     if length == 0 {
                         assert_eq!(data.len(), 0);
                         self.state = ChannelState::Data;
                     } else {
-                        let header_id = data.drain_to(1).as_ref()[0];
-                        let header_data = data.drain_to(length - 1).as_ref().to_owned();
+                        let header_id = data.split_to(1).as_ref()[0];
+                        let header_data = data.split_to(length - 1).as_ref().to_owned();
 
                         self.state = ChannelState::Header(data);
 
@@ -139,7 +139,7 @@ impl Stream for Channel {
 }
 
 impl Stream for ChannelData {
-    type Item = EasyBuf;
+    type Item = Bytes;
     type Error = ChannelError;
 
     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {

+ 21 - 14
core/src/connection/codec.rs

@@ -1,7 +1,8 @@
-use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
+use byteorder::{BigEndian, ByteOrder};
+use bytes::{Bytes, BytesMut, BufMut};
 use shannon::Shannon;
 use std::io;
-use tokio_core::io::{Codec, EasyBuf};
+use tokio_io::codec::{Decoder, Encoder};
 
 const HEADER_SIZE: usize = 3;
 const MAC_SIZE: usize = 4;
@@ -34,16 +35,17 @@ impl APCodec {
     }
 }
 
-impl Codec for APCodec {
-    type Out = (u8, Vec<u8>);
-    type In = (u8, EasyBuf);
+impl Encoder for APCodec {
+    type Item = (u8, Vec<u8>);
+    type Error = io::Error;
 
-    fn encode(&mut self, item: (u8, Vec<u8>), buf: &mut Vec<u8>) -> io::Result<()> {
+    fn encode(&mut self, item: (u8, Vec<u8>), buf: &mut BytesMut) -> io::Result<()> {
         let (cmd, payload) = item;
         let offset = buf.len();
 
-        buf.write_u8(cmd).unwrap();
-        buf.write_u16::<BigEndian>(payload.len() as u16).unwrap();
+        buf.reserve(3 + payload.len());
+        buf.put_u8(cmd);
+        buf.put_u16::<BigEndian>(payload.len() as u16);
         buf.extend_from_slice(&payload);
 
         self.encode_cipher.nonce_u32(self.encode_nonce);
@@ -57,12 +59,17 @@ impl Codec for APCodec {
 
         Ok(())
     }
+}
+
+impl Decoder for APCodec {
+    type Item = (u8, Bytes);
+    type Error = io::Error;
 
-    fn decode(&mut self, buf: &mut EasyBuf) -> io::Result<Option<(u8, EasyBuf)>> {
+    fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<(u8, Bytes)>> {
         if let DecodeState::Header = self.decode_state {
             if buf.len() >= HEADER_SIZE {
                 let mut header = [0u8; HEADER_SIZE];
-                header.copy_from_slice(buf.drain_to(HEADER_SIZE).as_slice());
+                header.copy_from_slice(buf.split_to(HEADER_SIZE).as_ref());
 
                 self.decode_cipher.nonce_u32(self.decode_nonce);
                 self.decode_nonce += 1;
@@ -79,13 +86,13 @@ impl Codec for APCodec {
             if buf.len() >= size + MAC_SIZE {
                 self.decode_state = DecodeState::Header;
 
-                let mut payload = buf.drain_to(size + MAC_SIZE);
+                let mut payload = buf.split_to(size + MAC_SIZE);
 
-                self.decode_cipher.decrypt(&mut payload.get_mut()[..size]);
+                self.decode_cipher.decrypt(&mut payload.get_mut(..size).unwrap());
                 let mac = payload.split_off(size);
-                self.decode_cipher.check_mac(mac.as_slice())?;
+                self.decode_cipher.check_mac(mac.as_ref())?;
 
-                return Ok(Some((cmd, payload)));
+                return Ok(Some((cmd, payload.freeze())));
             }
         }
 

+ 11 - 9
core/src/connection/handshake.rs

@@ -3,9 +3,11 @@ use crypto::hmac::Hmac;
 use crypto::mac::Mac;use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
 use protobuf::{self, Message, MessageStatic};
 use rand::thread_rng;
-use std::io::{self, Read, Write};
+use std::io::{self, Read};
 use std::marker::PhantomData;
-use tokio_core::io::{Io, Framed, write_all, WriteAll, read_exact, ReadExact, Window};
+use tokio_io::{AsyncRead, AsyncWrite};
+use tokio_io::codec::Framed;
+use tokio_io::io::{write_all, WriteAll, read_exact, ReadExact, Window};
 use futures::{Poll, Async, Future};
 
 use diffie_hellman::DHLocalKeys;
@@ -25,7 +27,7 @@ enum HandshakeState<T> {
     ClientResponse(Option<APCodec>, WriteAll<T, Vec<u8>>),
 }
 
-pub fn handshake<T: Io>(connection: T) -> Handshake<T> {
+pub fn handshake<T: AsyncRead + AsyncWrite>(connection: T) -> Handshake<T> {
     let local_keys = DHLocalKeys::random(&mut thread_rng());
     let client_hello = client_hello(connection, local_keys.public_key());
 
@@ -35,7 +37,7 @@ pub fn handshake<T: Io>(connection: T) -> Handshake<T> {
     }
 }
 
-impl <T: Io> Future for Handshake<T> {
+impl <T: AsyncRead + AsyncWrite> Future for Handshake<T> {
     type Item = Framed<T, APCodec>;
     type Error = io::Error;
 
@@ -78,7 +80,7 @@ impl <T: Io> Future for Handshake<T> {
     }
 }
 
-fn client_hello<T: Write>(connection: T, gc: Vec<u8>) -> WriteAll<T, Vec<u8>> {
+fn client_hello<T: AsyncWrite>(connection: T, gc: Vec<u8>) -> WriteAll<T, Vec<u8>> {
     let packet = protobuf_init!(ClientHello::new(), {
         build_info => {
             product: protocol::keyexchange::Product::PRODUCT_PARTNER,
@@ -104,7 +106,7 @@ fn client_hello<T: Write>(connection: T, gc: Vec<u8>) -> WriteAll<T, Vec<u8>> {
     write_all(connection, buffer)
 }
 
-fn client_response<T: Write>(connection: T, challenge: Vec<u8>) -> WriteAll<T, Vec<u8>> {
+fn client_response<T: AsyncWrite>(connection: T, challenge: Vec<u8>) -> WriteAll<T, Vec<u8>> {
     let packet = protobuf_init!(ClientResponsePlaintext::new(), {
         login_crypto_response.diffie_hellman => {
             hmac: challenge
@@ -126,14 +128,14 @@ enum RecvPacket<T, M: MessageStatic> {
     Body(ReadExact<T, Window<Vec<u8>>>, PhantomData<M>),
 }
 
-fn recv_packet<T, M>(connection: T, acc: Vec<u8>) -> RecvPacket<T, M>
+fn recv_packet<T: AsyncRead, M>(connection: T, acc: Vec<u8>) -> RecvPacket<T, M>
     where T: Read,
           M: MessageStatic
 {
     RecvPacket::Header(read_into_accumulator(connection, 4, acc), PhantomData)
 }
 
-impl <T, M> Future for RecvPacket<T, M>
+impl <T: AsyncRead, M> Future for RecvPacket<T, M>
     where T: Read,
           M: MessageStatic
 {
@@ -165,7 +167,7 @@ impl <T, M> Future for RecvPacket<T, M>
     }
 }
 
-fn read_into_accumulator<T: Read>(connection: T, size: usize, mut acc: Vec<u8>) -> ReadExact<T, Window<Vec<u8>>> {
+fn read_into_accumulator<T: AsyncRead>(connection: T, size: usize, mut acc: Vec<u8>) -> ReadExact<T, Window<Vec<u8>>> {
     let offset = acc.len();
     acc.resize(offset + size, 0);
 

+ 7 - 7
core/src/connection/mod.rs

@@ -4,12 +4,12 @@ mod handshake;
 pub use self::codec::APCodec;
 pub use self::handshake::handshake;
 
-use futures::{Future, Sink, Stream, BoxFuture};
+use futures::{Future, Sink, Stream};
 use std::io;
 use std::net::ToSocketAddrs;
 use tokio_core::net::TcpStream;
 use tokio_core::reactor::Handle;
-use tokio_core::io::Framed;
+use tokio_io::codec::Framed;
 use protobuf::{self, Message};
 
 use authentication::Credentials;
@@ -17,18 +17,18 @@ use version;
 
 pub type Transport = Framed<TcpStream, APCodec>;
 
-pub fn connect<A: ToSocketAddrs>(addr: A, handle: &Handle) -> BoxFuture<Transport, io::Error> {
+pub fn connect<A: ToSocketAddrs>(addr: A, handle: &Handle) -> Box<Future<Item = Transport, Error = io::Error>> {
     let addr = addr.to_socket_addrs().unwrap().next().unwrap();
     let socket = TcpStream::connect(&addr, handle);
     let connection = socket.and_then(|socket| {
         handshake(socket)
     });
 
-    connection.boxed()
+    Box::new(connection)
 }
 
 pub fn authenticate(transport: Transport, credentials: Credentials, device_id: String)
-    -> BoxFuture<(Transport, Credentials), io::Error>
+    -> Box<Future<Item = (Transport, Credentials), Error = io::Error>>
 {
     use protocol::authentication::{APWelcome, ClientResponseEncrypted, CpuFamily, Os};
 
@@ -50,7 +50,7 @@ pub fn authenticate(transport: Transport, credentials: Credentials, device_id: S
     let cmd = 0xab;
     let data = packet.write_to_bytes().unwrap();
 
-    transport.send((cmd, data)).and_then(|transport| {
+    Box::new(transport.send((cmd, data)).and_then(|transport| {
         transport.into_future().map_err(|(err, _stream)| err)
     }).and_then(|(packet, transport)| {
         match packet {
@@ -71,5 +71,5 @@ pub fn authenticate(transport: Transport, credentials: Credentials, device_id: S
             Some((cmd, _)) => panic!("Unexpected packet {:?}", cmd),
             None => panic!("EOF"),
         }
-    }).boxed()
+    }))
 }

+ 2 - 3
core/src/lib.rs

@@ -1,8 +1,5 @@
 #![cfg_attr(feature = "cargo-clippy", allow(unused_io_amount))]
 
-// TODO: many items from tokio-core::io have been deprecated in favour of tokio-io
-#![allow(deprecated)]
-
 #[macro_use] extern crate error_chain;
 #[macro_use] extern crate futures;
 #[macro_use] extern crate lazy_static;
@@ -11,6 +8,7 @@
 
 extern crate base64;
 extern crate byteorder;
+extern crate bytes;
 extern crate crypto;
 extern crate hyper;
 extern crate num_bigint;
@@ -23,6 +21,7 @@ extern crate serde;
 extern crate serde_json;
 extern crate shannon;
 extern crate tokio_core;
+extern crate tokio_io;
 extern crate uuid;
 
 extern crate librespot_protocol as protocol;

+ 16 - 16
core/src/mercury/mod.rs

@@ -1,11 +1,11 @@
 use byteorder::{BigEndian, ByteOrder};
+use bytes::Bytes;
 use futures::sync::{oneshot, mpsc};
-use futures::{Async, Poll, BoxFuture, Future};
+use futures::{Async, Poll, Future};
 use protobuf;
 use protocol;
 use std::collections::HashMap;
 use std::mem;
-use tokio_core::io::EasyBuf;
 
 use util::SeqGenerator;
 
@@ -99,7 +99,7 @@ impl MercuryManager {
     }
 
     pub fn subscribe<T: Into<String>>(&self, uri: T)
-        -> BoxFuture<mpsc::UnboundedReceiver<MercuryResponse>, MercuryError>
+        -> Box<Future<Item = mpsc::UnboundedReceiver<MercuryResponse>, Error = MercuryError>>
     {
         let uri = uri.into();
         let request = self.request(MercuryRequest {
@@ -110,7 +110,7 @@ impl MercuryManager {
         });
 
         let manager = self.clone();
-        request.map(move |response| {
+        Box::new(request.map(move |response| {
             let (tx, rx) = mpsc::unbounded();
 
             manager.lock(move |inner| {
@@ -133,15 +133,15 @@ impl MercuryManager {
             });
 
             rx
-        }).boxed()
+        }))
     }
 
-    pub fn dispatch(&self, cmd: u8, mut data: EasyBuf) {
-        let seq_len = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize;
-        let seq = data.drain_to(seq_len).as_ref().to_owned();
+    pub fn dispatch(&self, cmd: u8, mut data: Bytes) {
+        let seq_len = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
+        let seq = data.split_to(seq_len).as_ref().to_owned();
 
-        let flags = data.drain_to(1).as_ref()[0];
-        let count = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize;
+        let flags = data.split_to(1).as_ref()[0];
+        let count = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
 
         let pending = self.lock(|inner| inner.pending.remove(&seq));
 
@@ -181,9 +181,9 @@ impl MercuryManager {
         }
     }
 
-    fn parse_part(data: &mut EasyBuf) -> Vec<u8> {
-        let size = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize;
-        data.drain_to(size).as_ref().to_owned()
+    fn parse_part(data: &mut Bytes) -> Vec<u8> {
+        let size = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
+        data.split_to(size).as_ref().to_owned()
     }
 
     fn complete_request(&self, cmd: u8, mut pending: MercuryPending) {
@@ -199,7 +199,7 @@ impl MercuryManager {
         if response.status_code >= 400 {
             warn!("error {} for uri {}", response.status_code, &response.uri);
             if let Some(cb) = pending.callback {
-                cb.complete(Err(MercuryError));
+                let _ = cb.send(Err(MercuryError));
             }
         } else {
             if cmd == 0xb5 {
@@ -211,7 +211,7 @@ impl MercuryManager {
 
                             // if send fails, remove from list of subs
                             // TODO: send unsub message
-                            sub.send(response.clone()).is_ok()
+                            sub.unbounded_send(response.clone()).is_ok()
                         } else {
                             // URI doesn't match
                             true
@@ -223,7 +223,7 @@ impl MercuryManager {
                     }
                 })
             } else if let Some(cb) = pending.callback {
-                cb.complete(Ok(response));
+                let _ = cb.send(Ok(response));
             }
         }
     }

+ 10 - 10
core/src/session.rs

@@ -1,10 +1,10 @@
+use bytes::Bytes;
 use crypto::digest::Digest;
 use crypto::sha1::Sha1;
 use futures::sync::mpsc;
-use futures::{Future, Stream, BoxFuture, IntoFuture, Poll, Async};
+use futures::{Future, Stream, IntoFuture, Poll, Async};
 use std::io;
 use std::sync::{RwLock, Arc, Weak};
-use tokio_core::io::EasyBuf;
 use tokio_core::reactor::{Handle, Remote};
 use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
 
@@ -90,7 +90,7 @@ impl Session {
 
     fn create(handle: &Handle, transport: connection::Transport,
               config: SessionConfig, cache: Option<Cache>, username: String)
-        -> (Session, BoxFuture<(), io::Error>)
+        -> (Session, Box<Future<Item = (), Error = io::Error>>)
     {
         let (sink, stream) = transport.split();
 
@@ -124,8 +124,8 @@ impl Session {
             .forward(sink).map(|_| ());
         let receiver_task = DispatchTask(stream, session.weak());
 
-        let task = (receiver_task, sender_task).into_future()
-            .map(|((), ())| ()).boxed();
+        let task = Box::new((receiver_task, sender_task).into_future()
+            .map(|((), ())| ()));
 
         (session, task)
     }
@@ -156,7 +156,7 @@ impl Session {
     }
 
     #[cfg_attr(feature = "cargo-clippy", allow(match_same_arms))]
-    fn dispatch(&self, cmd: u8, data: EasyBuf) {
+    fn dispatch(&self, cmd: u8, data: Bytes) {
         match cmd {
             0x4 => {
                 self.debug_info();
@@ -177,7 +177,7 @@ impl Session {
     }
 
     pub fn send_packet(&self, cmd: u8, data: Vec<u8>) {
-        self.0.tx_connection.send((cmd, data)).unwrap();
+        self.0.tx_connection.unbounded_send((cmd, data)).unwrap();
     }
 
     pub fn cache(&self) -> Option<&Arc<Cache>> {
@@ -229,10 +229,10 @@ impl Drop for SessionInternal {
 }
 
 struct DispatchTask<S>(S, SessionWeak)
-    where S: Stream<Item = (u8, EasyBuf)>;
+    where S: Stream<Item = (u8, Bytes)>;
 
 impl <S> Future for DispatchTask<S>
-    where S: Stream<Item = (u8, EasyBuf)>
+    where S: Stream<Item = (u8, Bytes)>
 {
     type Item = ();
     type Error = S::Error;
@@ -253,7 +253,7 @@ impl <S> Future for DispatchTask<S>
 }
 
 impl <S> Drop for DispatchTask<S>
-    where S: Stream<Item = (u8, EasyBuf)>
+    where S: Stream<Item = (u8, Bytes)>
 {
     fn drop(&mut self) {
         debug!("drop Dispatch");

+ 2 - 0
core/src/util/spotify_id.rs

@@ -2,6 +2,8 @@ use std;
 use std::fmt;
 use util::u128;
 use byteorder::{BigEndian, ByteOrder};
+// Unneeded since 1.21
+#[allow(unused_imports)]
 use std::ascii::AsciiExt;
 
 #[derive(Debug,Copy,Clone,PartialEq,Eq,Hash)]

+ 4 - 4
metadata/src/lib.rs

@@ -8,7 +8,7 @@ extern crate librespot_protocol as protocol;
 
 pub mod cover;
 
-use futures::{Future, BoxFuture};
+use futures::Future;
 use linear_map::LinearMap;
 
 use core::mercury::MercuryError;
@@ -57,17 +57,17 @@ pub trait Metadata : Send + Sized + 'static {
     fn base_url() -> &'static str;
     fn parse(msg: &Self::Message, session: &Session) -> Self;
 
-    fn get(session: &Session, id: SpotifyId) -> BoxFuture<Self, MercuryError> {
+    fn get(session: &Session, id: SpotifyId) -> Box<Future<Item = Self, Error = MercuryError>> {
         let uri = format!("{}/{}", Self::base_url(), id.to_base16());
         let request = session.mercury().get(uri);
 
         let session = session.clone();
-        request.and_then(move |response| {
+        Box::new(request.and_then(move |response| {
             let data = response.payload.first().expect("Empty payload");
             let msg: Self::Message = protobuf::parse_from_bytes(data).unwrap();
 
             Ok(Self::parse(&msg, &session))
-        }).boxed()
+        }))
     }
 }
 

+ 23 - 37
src/discovery.rs

@@ -3,8 +3,8 @@ use crypto::digest::Digest;
 use crypto::mac::Mac;
 use crypto;
 use futures::sync::mpsc;
-use futures::{Future, Stream, BoxFuture, Poll, Async};
-use hyper::server::{Service, NewService, Request, Response, Http};
+use futures::{Future, Stream, Poll};
+use hyper::server::{Service, Request, Response, Http};
 use hyper::{self, Get, Post, StatusCode};
 use mdns;
 use num_bigint::BigUint;
@@ -12,7 +12,6 @@ use rand;
 use std::collections::BTreeMap;
 use std::io;
 use std::sync::Arc;
-use tokio_core::net::TcpListener;
 use tokio_core::reactor::Handle;
 use url;
 
@@ -32,7 +31,7 @@ struct DiscoveryInner {
 }
 
 impl Discovery {
-    pub fn new(config: ConnectConfig, device_id: String)
+    fn new(config: ConnectConfig, device_id: String)
         -> (Discovery, mpsc::UnboundedReceiver<Credentials>)
     {
         let (tx, rx) = mpsc::unbounded();
@@ -136,7 +135,7 @@ impl Discovery {
 
         let credentials = Credentials::with_blob(username.to_owned(), &decrypted, &self.0.device_id);
 
-        self.0.tx.send(credentials).unwrap();
+        self.0.tx.unbounded_send(credentials).unwrap();
 
         let result = json!({
             "status": 101,
@@ -159,7 +158,7 @@ impl Service for Discovery {
     type Request = Request;
     type Response = Response;
     type Error = hyper::Error;
-    type Future = BoxFuture<Response, hyper::Error>;
+    type Future = Box<Future<Item = Response, Error = hyper::Error>>;
 
     fn call(&self, request: Request) -> Self::Future {
         let mut params = BTreeMap::new();
@@ -174,7 +173,7 @@ impl Service for Discovery {
         }
 
         let this = self.clone();
-        body.fold(Vec::new(), |mut acc, chunk| {
+        Box::new(body.fold(Vec::new(), |mut acc, chunk| {
             acc.extend_from_slice(chunk.as_ref());
             Ok::<_, hyper::Error>(acc)
         }).map(move |body| {
@@ -186,25 +185,13 @@ impl Service for Discovery {
                 (Post, Some("addUser")) => this.handle_add_user(&params),
                 _ => this.not_found(),
             }
-        }).boxed()
-    }
-}
-
-impl NewService for Discovery {
-    type Request = Request;
-    type Response = Response;
-    type Error = hyper::Error;
-    type Instance = Self;
-
-    fn new_service(&self) -> io::Result<Self::Instance> {
-        Ok(self.clone())
+        }))
     }
 }
 
 pub struct DiscoveryStream {
     credentials: mpsc::UnboundedReceiver<Credentials>,
     _svc: mdns::Service,
-    task: Box<Future<Item=(), Error=io::Error>>,
 }
 
 pub fn discovery(handle: &Handle, config: ConnectConfig, device_id: String)
@@ -212,15 +199,20 @@ pub fn discovery(handle: &Handle, config: ConnectConfig, device_id: String)
 {
     let (discovery, creds_rx) = Discovery::new(config.clone(), device_id);
 
-    let listener = TcpListener::bind(&"0.0.0.0:0".parse().unwrap(), handle)?;
-    let addr = listener.local_addr()?;
-
-    let http = Http::new();
-    let handle_ = handle.clone();
-    let task = Box::new(listener.incoming().for_each(move |(socket, addr)| {
-        http.bind_connection(&handle_, socket, addr, discovery.clone());
-        Ok(())
-    }));
+    let serve = {
+        let http = Http::new();
+        http.serve_addr_handle(&"0.0.0.0:0".parse().unwrap(), &handle, move || Ok(discovery.clone())).unwrap()
+    };
+    let addr = serve.incoming_ref().local_addr();
+    let server_future = {
+        let handle = handle.clone();
+        serve.for_each(move |connection| {
+                handle.spawn(connection.then(|_| Ok(())));
+                Ok(())
+            })
+            .then(|_| Ok(()))
+    };
+    handle.spawn(server_future);
 
     let responder = mdns::Responder::spawn(&handle)?;
     let svc = responder.register(
@@ -232,20 +224,14 @@ pub fn discovery(handle: &Handle, config: ConnectConfig, device_id: String)
     Ok(DiscoveryStream {
         credentials: creds_rx,
         _svc: svc,
-        task: task,
     })
 }
 
 impl Stream for DiscoveryStream {
     type Item = Credentials;
-    type Error = io::Error;
+    type Error = ();
 
     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
-        match self.task.poll()? {
-            Async::Ready(()) => unreachable!(),
-            Async::NotReady => (),
-        }
-
-        Ok(self.credentials.poll().unwrap())
+        self.credentials.poll()
     }
 }

+ 4 - 4
src/keymaster.rs

@@ -1,4 +1,4 @@
-use futures::{Future, BoxFuture};
+use futures::Future;
 use serde_json;
 
 use core::mercury::MercuryError;
@@ -13,14 +13,14 @@ pub struct Token {
     pub scope: Vec<String>,
 }
 
-pub fn get_token(session: &Session, client_id: &str, scopes: &str) -> BoxFuture<Token, MercuryError> {
+pub fn get_token(session: &Session, client_id: &str, scopes: &str) -> Box<Future<Item = Token, Error = MercuryError>> {
     let url = format!("hm://keymaster/token/authenticated?client_id={}&scope={}",
                       client_id, scopes);
-    session.mercury().get(url).map(move |response| {
+    Box::new(session.mercury().get(url).map(move |response| {
         let data = response.payload.first().expect("Empty payload");
         let data = String::from_utf8(data.clone()).unwrap();
         let token : Token = serde_json::from_str(&data).unwrap();
 
         token
-    }).boxed()
+    }))
 }

+ 0 - 3
src/lib.rs

@@ -2,9 +2,6 @@
 
 #![cfg_attr(feature = "cargo-clippy", allow(unused_io_amount))]
 
-// TODO: many items from tokio-core::io have been deprecated in favour of tokio-io
-#![allow(deprecated)]
-
 #[macro_use] extern crate log;
 #[macro_use] extern crate serde_json;
 #[macro_use] extern crate serde_derive;

+ 3 - 5
src/main.rs

@@ -1,12 +1,10 @@
-// TODO: many items from tokio-core::io have been deprecated in favour of tokio-io
-#![allow(deprecated)]
-
 #[macro_use] extern crate log;
 extern crate env_logger;
 extern crate futures;
 extern crate getopts;
 extern crate librespot;
 extern crate tokio_core;
+extern crate tokio_io;
 extern crate tokio_signal;
 
 use env_logger::LogBuilder;
@@ -17,7 +15,7 @@ use std::path::PathBuf;
 use std::process::exit;
 use std::str::FromStr;
 use tokio_core::reactor::{Handle, Core};
-use tokio_core::io::IoStream;
+use tokio_io::IoStream;
 use std::mem;
 
 use librespot::core::authentication::{get_credentials, Credentials};
@@ -264,7 +262,7 @@ impl Main {
             spirc: None,
             spirc_task: None,
             shutdown: false,
-            signal: tokio_signal::ctrl_c(&handle).flatten_stream().boxed(),
+            signal: Box::new(tokio_signal::ctrl_c(&handle).flatten_stream()),
         };
 
         if setup.enable_discovery {

+ 2 - 2
src/player.rs

@@ -155,7 +155,7 @@ impl PlayerState {
         match self {
             Paused { end_of_track, .. } |
             Playing { end_of_track, .. } => {
-                end_of_track.complete(())
+                let _ = end_of_track.send(());
             }
 
             Stopped => warn!("signal_end_of_track from stopped state"),
@@ -313,7 +313,7 @@ impl PlayerInternal {
                     }
 
                     None => {
-                        end_of_track.complete(());
+                        let _ = end_of_track.send(());
                         if self.state.is_playing() {
                             self.run_onstop();
                         }

+ 17 - 19
src/spirc.rs

@@ -1,8 +1,6 @@
 use futures::future;
-use futures::sink::BoxSink;
-use futures::stream::BoxStream;
 use futures::sync::{oneshot, mpsc};
-use futures::{Future, Stream, Sink, Async, Poll, BoxFuture};
+use futures::{Future, Stream, Sink, Async, Poll};
 use protobuf::{self, Message};
 
 use core::config::ConnectConfig;
@@ -30,10 +28,10 @@ pub struct SpircTask {
     device: DeviceState,
     state: State,
 
-    subscription: BoxStream<Frame, MercuryError>,
-    sender: BoxSink<Frame, MercuryError>,
+    subscription: Box<Stream<Item = Frame, Error = MercuryError>>,
+    sender: Box<Sink<SinkItem = Frame, SinkError = MercuryError>>,
     commands: mpsc::UnboundedReceiver<SpircCommand>,
-    end_of_track: BoxFuture<(), oneshot::Canceled>,
+    end_of_track: Box<Future<Item = (), Error = oneshot::Canceled>>,
 
     shutdown: bool,
     session: Session,
@@ -134,10 +132,10 @@ impl Spirc {
 
         let subscription = session.mercury().subscribe(&uri as &str);
         let subscription = subscription.map(|stream| stream.map_err(|_| MercuryError)).flatten_stream();
-        let subscription = subscription.map(|response| -> Frame {
+        let subscription = Box::new(subscription.map(|response| -> Frame {
             let data = response.payload.first().unwrap();
             protobuf::parse_from_bytes(data).unwrap()
-        }).boxed();
+        }));
 
         let sender = Box::new(session.mercury().sender(uri).with(|frame: Frame| {
             Ok(frame.write_to_bytes().unwrap())
@@ -163,7 +161,7 @@ impl Spirc {
             subscription: subscription,
             sender: sender,
             commands: cmd_rx,
-            end_of_track: future::empty().boxed(),
+            end_of_track: Box::new(future::empty()),
 
             shutdown: false,
             session: session.clone(),
@@ -179,28 +177,28 @@ impl Spirc {
     }
 
     pub fn play(&self) {
-        let _ = mpsc::UnboundedSender::send(&self.commands, SpircCommand::Play);
+        let _ = self.commands.unbounded_send(SpircCommand::Play);
     }
     pub fn play_pause(&self) {
-        let _ = mpsc::UnboundedSender::send(&self.commands, SpircCommand::PlayPause);
+        let _ = self.commands.unbounded_send(SpircCommand::PlayPause);
     }
     pub fn pause(&self) {
-        let _ = mpsc::UnboundedSender::send(&self.commands, SpircCommand::Pause);
+        let _ = self.commands.unbounded_send(SpircCommand::Pause);
     }
     pub fn prev(&self) {
-        let _ = mpsc::UnboundedSender::send(&self.commands, SpircCommand::Prev);
+        let _ = self.commands.unbounded_send(SpircCommand::Prev);
     }
     pub fn next(&self) {
-        let _ = mpsc::UnboundedSender::send(&self.commands, SpircCommand::Next);
+        let _ = self.commands.unbounded_send(SpircCommand::Next);
     }
     pub fn volume_up(&self) {
-        let _ = mpsc::UnboundedSender::send(&self.commands, SpircCommand::VolumeUp);
+        let _ = self.commands.unbounded_send(SpircCommand::VolumeUp);
     }
     pub fn volume_down(&self) {
-        let _ = mpsc::UnboundedSender::send(&self.commands, SpircCommand::VolumeDown);
+        let _ = self.commands.unbounded_send(SpircCommand::VolumeDown);
     }
     pub fn shutdown(&self) {
-        let _ = mpsc::UnboundedSender::send(&self.commands, SpircCommand::Shutdown);
+        let _ = self.commands.unbounded_send(SpircCommand::Shutdown);
     }
 }
 
@@ -238,7 +236,7 @@ impl Future for SpircTask {
                     }
                     Ok(Async::NotReady) => (),
                     Err(oneshot::Canceled) => {
-                        self.end_of_track = future::empty().boxed()
+                        self.end_of_track = Box::new(future::empty())
                     }
                 }
             }
@@ -587,7 +585,7 @@ impl SpircTask {
             self.state.set_status(PlayStatus::kPlayStatusPause);
         }
 
-        self.end_of_track = end_of_track.boxed();
+        self.end_of_track = Box::new(end_of_track);
     }
 
     fn hello(&mut self) {

部分文件因为文件数量过多而无法显示