Browse Source

Use EasyBuf instead of copying buffers

Paul Lietar 7 years ago
parent
commit
d2161ff75f
9 changed files with 66 additions and 101 deletions
  1. 7 1
      src/audio_file.rs
  2. 5 4
      src/audio_key.rs
  3. 15 13
      src/channel.rs
  4. 3 1
      src/connection/mod.rs
  5. 15 22
      src/mercury/mod.rs
  6. 5 4
      src/session.rs
  7. 0 51
      src/util/arcvec.rs
  8. 0 2
      src/util/mod.rs
  9. 16 3
      src/util/spotify_id.rs

+ 7 - 1
src/audio_file.rs

@@ -132,9 +132,12 @@ impl AudioFileManager {
         let cache = self.session().cache().cloned();
 
         if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) {
+            debug!("File {} already in cache", file_id);
             return AudioFileOpen::Cached(future::ok(file));
         }
 
+        debug!("Downloading file {}", file_id);
+
         let (complete_tx, complete_rx) = oneshot::channel();
         let (headers, data) = request_chunk(&self.session(), file_id, 0).split();
 
@@ -153,6 +156,9 @@ impl AudioFileManager {
             complete_rx.map(move |mut file| {
                 if let Some(cache) = session.cache() {
                     cache.save_file(file_id, &mut file);
+                    debug!("File {} complete, saving to cache", file_id);
+                } else {
+                    debug!("File {} complete", file_id);
                 }
             }).or_else(|oneshot::Canceled| Ok(()))
         });
@@ -275,7 +281,7 @@ impl Future for AudioFileFetch {
                     progress = true;
 
                     self.output.as_mut().unwrap()
-                        .write_all(&data).unwrap();
+                        .write_all(data.as_ref()).unwrap();
                 }
                  Ok(Async::Ready(None)) => {
                     progress = true;

+ 5 - 4
src/audio_key.rs

@@ -3,6 +3,7 @@ use futures::sync::oneshot;
 use futures::{Async, Future, Poll};
 use std::collections::HashMap;
 use std::io::Write;
+use tokio_core::io::EasyBuf;
 
 use util::SeqGenerator;
 use util::{SpotifyId, FileId};
@@ -21,8 +22,8 @@ component! {
 }
 
 impl AudioKeyManager {
-    pub fn dispatch(&self, cmd: u8, data: Vec<u8>) {
-        let seq = BigEndian::read_u32(&data[..4]);
+    pub fn dispatch(&self, cmd: u8, mut data: EasyBuf) {
+        let seq = BigEndian::read_u32(data.drain_to(4).as_ref());
 
         let sender = self.lock(|inner| inner.pending.remove(&seq));
 
@@ -30,11 +31,11 @@ impl AudioKeyManager {
             match cmd {
                 0xd => {
                     let mut key = [0u8; 16];
-                    key.copy_from_slice(&data[4..20]);
+                    key.copy_from_slice(data.as_ref());
                     sender.complete(Ok(AudioKey(key)));
                 }
                 0xe => {
-                    warn!("error audio key {:x} {:x}", data[4], data[5]);
+                    warn!("error audio key {:x} {:x}", data.as_ref()[0], data.as_ref()[1]);
                     sender.complete(Err(AudioKeyError));
                 }
                 _ => (),

+ 15 - 13
src/channel.rs

@@ -9,7 +9,7 @@ use util::SeqGenerator;
 component! {
     ChannelManager : ChannelManagerInner {
         sequence: SeqGenerator<u16> = SeqGenerator::new(0),
-        channels: HashMap<u16, mpsc::UnboundedSender<(u8, Vec<u8>)>> = HashMap::new(),
+        channels: HashMap<u16, mpsc::UnboundedSender<(u8, EasyBuf)>> = HashMap::new(),
     }
 }
 
@@ -17,7 +17,7 @@ component! {
 pub struct ChannelError;
 
 pub struct Channel {
-    receiver: mpsc::UnboundedReceiver<(u8, Vec<u8>)>,
+    receiver: mpsc::UnboundedReceiver<(u8, EasyBuf)>,
     state: ChannelState,
 }
 
@@ -26,7 +26,7 @@ pub struct ChannelData(BiLock<Channel>);
 
 pub enum ChannelEvent {
     Header(u8, Vec<u8>),
-    Data(Vec<u8>),
+    Data(EasyBuf),
 }
 
 #[derive(Clone)]
@@ -54,21 +54,21 @@ impl ChannelManager {
         (seq, channel)
     }
 
-    pub fn dispatch(&self, cmd: u8, data: Vec<u8>) {
+    pub fn dispatch(&self, cmd: u8, mut data: EasyBuf) {
         use std::collections::hash_map::Entry;
 
-        let id: u16 = BigEndian::read_u16(&data[..2]);
+        let id: u16 = BigEndian::read_u16(data.drain_to(2).as_ref());
 
         self.lock(|inner| {
             if let Entry::Occupied(entry) = inner.channels.entry(id) {
-                let _ = entry.get().send((cmd, data[2..].to_owned()));
+                let _ = entry.get().send((cmd, data));
             }
         });
     }
 }
 
 impl Channel {
-    fn recv_packet(&mut self) -> Poll<Vec<u8>, ChannelError> {
+    fn recv_packet(&mut self) -> Poll<EasyBuf, ChannelError> {
         let (cmd, packet) = match self.receiver.poll() {
             Ok(Async::Ready(t)) => t.expect("channel closed"),
             Ok(Async::NotReady) => return Ok(Async::NotReady),
@@ -76,7 +76,7 @@ impl Channel {
         };
 
         if cmd == 0xa {
-            let code = BigEndian::read_u16(&packet[..2]);
+            let code = BigEndian::read_u16(&packet.as_ref()[..2]);
             error!("channel error: {} {}", packet.len(), code);
 
             self.state = ChannelState::Closed;
@@ -104,7 +104,7 @@ impl Stream for Channel {
                 ChannelState::Closed => panic!("Polling already terminated channel"),
                 ChannelState::Header(mut data) => {
                     if data.len() == 0 {
-                        data = EasyBuf::from(try_ready!(self.recv_packet()));
+                        data = try_ready!(self.recv_packet());
                     }
 
                     let length = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize;
@@ -117,18 +117,20 @@ impl Stream for Channel {
 
                         self.state = ChannelState::Header(data);
 
-                        return Ok(Async::Ready(Some(ChannelEvent::Header(header_id, header_data))));
+                        let event = ChannelEvent::Header(header_id, header_data);
+                        return Ok(Async::Ready(Some(event)));
                     }
                 }
 
                 ChannelState::Data => {
                     let data = try_ready!(self.recv_packet());
-                    if data.is_empty() {
+                    if data.len() == 0 {
                         self.receiver.close();
                         self.state = ChannelState::Closed;
                         return Ok(Async::Ready(None));
                     } else {
-                        return Ok(Async::Ready(Some(ChannelEvent::Data(data))));
+                        let event = ChannelEvent::Data(data);
+                        return Ok(Async::Ready(Some(event)));
                     }
                 }
             }
@@ -137,7 +139,7 @@ impl Stream for Channel {
 }
 
 impl Stream for ChannelData {
-    type Item = Vec<u8>;
+    type Item = EasyBuf;
     type Error = ChannelError;
 
     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {

+ 3 - 1
src/connection/mod.rs

@@ -27,7 +27,9 @@ pub fn connect<A: ToSocketAddrs>(addr: A, handle: &Handle) -> BoxFuture<Transpor
     connection.boxed()
 }
 
-pub fn authenticate(transport: Transport, credentials: Credentials, device_id: String) -> BoxFuture<(Transport, Credentials), io::Error> {
+pub fn authenticate(transport: Transport, credentials: Credentials, device_id: String)
+    -> BoxFuture<(Transport, Credentials), io::Error>
+{
     use protocol::authentication::{APWelcome, ClientResponseEncrypted, CpuFamily, Os};
 
     let packet = protobuf_init!(ClientResponseEncrypted::new(), {

+ 15 - 22
src/mercury/mod.rs

@@ -1,11 +1,11 @@
-use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
+use byteorder::{BigEndian, ByteOrder};
 use futures::sync::{oneshot, mpsc};
 use futures::{Async, Poll, BoxFuture, Future};
 use protobuf;
 use protocol;
 use std::collections::HashMap;
-use std::io::Read;
 use std::mem;
+use tokio_core::io::EasyBuf;
 
 use util::SeqGenerator;
 
@@ -125,16 +125,12 @@ impl MercuryManager {
         }).boxed()
     }
 
-    pub fn dispatch(&self, cmd: u8, data: Vec<u8>) {
-        let mut packet = ::std::io::Cursor::new(data);
-        let seq = {
-            let len = packet.read_u16::<BigEndian>().unwrap() as usize;
-            let mut seq = vec![0; len];
-            packet.read_exact(&mut seq).unwrap();
-            seq
-        };
-        let flags = packet.read_u8().unwrap();
-        let count = packet.read_u16::<BigEndian>().unwrap() as usize;
+    pub fn dispatch(&self, cmd: u8, mut data: EasyBuf) {
+        let seq_len = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize;
+        let seq = data.drain_to(seq_len).as_ref().to_owned();
+
+        let flags = data.drain_to(1).as_ref()[0];
+        let count = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize;
 
         let pending = self.lock(|inner| inner.pending.remove(&seq));
 
@@ -154,10 +150,10 @@ impl MercuryManager {
         };
 
         for i in 0..count {
-            let mut part = Self::parse_part(&mut packet);
-            if let Some(mut data) = mem::replace(&mut pending.partial, None) {
-                data.append(&mut part);
-                part = data;
+            let mut part = Self::parse_part(&mut data);
+            if let Some(mut partial) = mem::replace(&mut pending.partial, None) {
+                partial.extend_from_slice(&part);
+                part = partial;
             }
 
             if i == count - 1 && (flags == 2) {
@@ -174,12 +170,9 @@ impl MercuryManager {
         }
     }
 
-    fn parse_part<T: Read>(s: &mut T) -> Vec<u8> {
-        let size = s.read_u16::<BigEndian>().unwrap() as usize;
-        let mut buffer = vec![0; size];
-        s.read_exact(&mut buffer).unwrap();
-
-        buffer
+    fn parse_part(data: &mut EasyBuf) -> Vec<u8> {
+        let size = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize;
+        data.drain_to(size).as_ref().to_owned()
     }
 
     fn complete_request(&self, cmd: u8, mut pending: MercuryPending) {

+ 5 - 4
src/session.rs

@@ -7,6 +7,7 @@ use std::io;
 use std::result::Result;
 use std::str::FromStr;
 use std::sync::{RwLock, Arc, Weak};
+use tokio_core::io::EasyBuf;
 use tokio_core::reactor::{Handle, Remote};
 
 use apresolve::apresolve_or_fallback;
@@ -121,7 +122,6 @@ impl Session {
               config: Config, cache: Option<Cache>, username: String)
         -> (Session, BoxFuture<(), io::Error>)
     {
-        let transport = transport.map(|(cmd, data)| (cmd, data.as_ref().to_owned()));
         let (sink, stream) = transport.split();
 
         let (sender_tx, sender_rx) = mpsc::unbounded();
@@ -193,12 +193,13 @@ impl Session {
     }
 
     #[cfg_attr(feature = "cargo-clippy", allow(match_same_arms))]
-    fn dispatch(&self, cmd: u8, data: Vec<u8>) {
+    fn dispatch(&self, cmd: u8, data: EasyBuf) {
         match cmd {
-            0x4 => self.send_packet(0x49, data),
+            0x4 => self.send_packet(0x49, data.as_ref().to_owned()),
             0x4a => (),
             0x1b => {
-                self.0.data.write().unwrap().country = String::from_utf8(data).unwrap();
+                let country = String::from_utf8(data.as_ref().to_owned()).unwrap();
+                self.0.data.write().unwrap().country = country;
             }
 
             0x9 | 0xa => self.channel().dispatch(cmd, data),

+ 0 - 51
src/util/arcvec.rs

@@ -1,51 +0,0 @@
-use std::sync::Arc;
-use std::fmt;
-use std::ops::Deref;
-
-#[derive(Clone)]
-pub struct ArcVec<T> {
-    data: Arc<Vec<T>>,
-    offset: usize,
-    length: usize,
-}
-
-impl<T> ArcVec<T> {
-    pub fn new(data: Vec<T>) -> ArcVec<T> {
-        let length = data.len();
-        ArcVec {
-            data: Arc::new(data),
-            offset: 0,
-            length: length,
-        }
-    }
-
-    pub fn offset(mut self, offset: usize) -> ArcVec<T> {
-        assert!(offset <= self.length);
-
-        self.offset += offset;
-        self.length -= offset;
-
-        self
-    }
-
-    pub fn limit(mut self, length: usize) -> ArcVec<T> {
-        assert!(length <= self.length);
-        self.length = length;
-
-        self
-    }
-}
-
-impl<T> Deref for ArcVec<T> {
-    type Target = [T];
-
-    fn deref(&self) -> &[T] {
-        &self.data[self.offset..self.offset + self.length]
-    }
-}
-
-impl<T: fmt::Debug> fmt::Debug for ArcVec<T> {
-    fn fmt(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> {
-        self.deref().fmt(formatter)
-    }
-}

+ 0 - 2
src/util/mod.rs

@@ -12,12 +12,10 @@ use std::time::{UNIX_EPOCH, SystemTime};
 
 mod int128;
 mod spotify_id;
-mod arcvec;
 mod subfile;
 
 pub use util::int128::u128;
 pub use util::spotify_id::{SpotifyId, FileId};
-pub use util::arcvec::ArcVec;
 pub use util::subfile::Subfile;
 
 pub fn rand_vec<G: Rng, R: Rand>(rng: &mut G, size: usize) -> Vec<R> {

+ 16 - 3
src/util/spotify_id.rs

@@ -1,4 +1,5 @@
 use std;
+use std::fmt;
 use util::u128;
 use byteorder::{BigEndian, ByteOrder};
 use std::ascii::AsciiExt;
@@ -6,9 +7,6 @@ use std::ascii::AsciiExt;
 #[derive(Debug,Copy,Clone,PartialEq,Eq,Hash)]
 pub struct SpotifyId(u128);
 
-#[derive(Debug,Copy,Clone,PartialEq,Eq,Hash)]
-pub struct FileId(pub [u8; 20]);
-
 const BASE62_DIGITS: &'static [u8] =
     b"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
 const BASE16_DIGITS: &'static [u8] = b"0123456789abcdef";
@@ -79,6 +77,9 @@ impl SpotifyId {
     }
 }
 
+#[derive(Copy,Clone,PartialEq,Eq,PartialOrd,Ord,Hash)]
+pub struct FileId(pub [u8; 20]);
+
 impl FileId {
     pub fn to_base16(&self) -> String {
         self.0
@@ -88,3 +89,15 @@ impl FileId {
             .concat()
     }
 }
+
+impl fmt::Debug for FileId {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        f.debug_tuple("FileId").field(&self.to_base16()).finish()
+    }
+}
+
+impl fmt::Display for FileId {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        f.write_str(&self.to_base16())
+    }
+}