Просмотр исходного кода

Refactor the whole architecture.

Use less threads, makes it much simpler to reason about.
Paul Lietar 9 лет назад
Родитель
Сommit
2a2f227bef
10 измененных файлов с 296 добавлено и 539 удалено
  1. 34 47
      src/audio_file.rs
  2. 25 71
      src/audio_key.rs
  3. 5 61
      src/connection.rs
  4. 29 37
      src/main.rs
  5. 68 95
      src/mercury.rs
  6. 36 76
      src/metadata.rs
  7. 2 15
      src/player.rs
  8. 63 70
      src/session.rs
  9. 22 62
      src/stream.rs
  10. 12 5
      src/util/mod.rs

+ 34 - 47
src/audio_file.rs

@@ -5,22 +5,25 @@ use std::io::{self, SeekFrom};
 use std::slice::bytes::copy_memory;
 use std::sync::{Arc, Condvar, Mutex};
 use std::sync::mpsc::{self, TryRecvError};
+use std::thread;
 
-use stream::{StreamRequest, StreamEvent};
+use stream::StreamEvent;
 use util::FileId;
-use std::thread;
+use session::Session;
 
 const CHUNK_SIZE : usize = 0x40000;
 
-#[derive(Clone)]
-pub struct AudioFile {
+pub struct AudioFile<'s> {
     position: usize,
     seek: mpsc::Sender<u64>,
     shared: Arc<AudioFileShared>,
+
+    #[allow(dead_code)]
+    thread: thread::JoinGuard<'s, ()>,
 }
 
 struct AudioFileShared {
-    fileid: FileId,
+    file_id: FileId,
     size: usize,
     data: Mutex<AudioFileData>,
     cond: Condvar
@@ -31,38 +34,24 @@ struct AudioFileData {
     bitmap: BitSet,
 }
 
-impl AudioFile {
-    pub fn new(fileid: FileId, streams: mpsc::Sender<StreamRequest>) -> AudioFile {
-        let (tx, rx) = mpsc::channel();
-
-        streams.send(StreamRequest {
-            id: fileid,
-            offset: 0,
-            size: 1,
-            callback: tx
-        }).unwrap();
-
-        let size = {
-            let mut size = None;
-            for event in rx.iter() {
+impl <'s> AudioFile <'s> {
+    pub fn new(session: &Session, file_id: FileId) -> AudioFile {
+        let mut it = session.stream(file_id, 0, 1).into_iter()
+            .filter_map(|event| {
                 match event {
-                    StreamEvent::Header(id, data) => {
-                        if id == 0x3 {
-                            size = Some(BigEndian::read_u32(&data) * 4);
-                            break;
-                        }
-                    },
-                    StreamEvent::Data(_) => break
+                    StreamEvent::Header(id, ref data) if id == 0x3 => {
+                        Some(BigEndian::read_u32(data) as usize * 4)
+                    }
+                    _ => None
                 }
-            }
-            size.unwrap() as usize
-        };
+            });
+        
+        let size = it.next().unwrap();
 
         let bufsize = size + (CHUNK_SIZE - size % CHUNK_SIZE); 
-        let (tx, rx) = mpsc::channel();
 
         let shared = Arc::new(AudioFileShared {
-            fileid: fileid,
+            file_id: file_id,
             size: size,
             data: Mutex::new(AudioFileData {
                 buffer: vec![0u8; bufsize],
@@ -71,25 +60,23 @@ impl AudioFile {
             cond: Condvar::new(),
         });
         
+        let shared_ = shared.clone();
+        let (seek_tx, seek_rx) = mpsc::channel();
+
         let file = AudioFile {
+            thread: thread::scoped( move || { AudioFile::fetch(session, shared_, seek_rx); }),
             position: 0,
-            seek: tx,
-            shared: shared.clone(),
+            seek: seek_tx,
+            shared: shared,
         };
 
-        thread::spawn( move || { AudioFile::fetch(shared, streams, rx); });
-
         file
     }
 
-    fn fetch_chunk(shared: &Arc<AudioFileShared>, streams: &mpsc::Sender<StreamRequest>, index: usize) {
-        let (tx, rx) = mpsc::channel();
-        streams.send(StreamRequest {
-            id: shared.fileid,
-            offset: (index * CHUNK_SIZE / 4) as u32,
-            size: (CHUNK_SIZE / 4) as u32,
-            callback: tx
-        }).unwrap();
+    fn fetch_chunk(session: &Session, shared: &Arc<AudioFileShared>, index: usize) {
+        let rx = session.stream(shared.file_id,
+                     (index * CHUNK_SIZE / 4) as u32,
+                     (CHUNK_SIZE / 4) as u32);
 
         let mut offset = 0usize;
         for event in rx.iter() {
@@ -114,7 +101,7 @@ impl AudioFile {
         }
     }
 
-    fn fetch(shared: Arc<AudioFileShared>, streams: mpsc::Sender<StreamRequest>, seek: mpsc::Receiver<u64>) {
+    fn fetch(session: &Session, shared: Arc<AudioFileShared>, seek: mpsc::Receiver<u64>) {
         let mut index = 0;
         loop {
             index = if index * CHUNK_SIZE < shared.size {
@@ -138,13 +125,13 @@ impl AudioFile {
             }
 
             if index * CHUNK_SIZE < shared.size {
-                AudioFile::fetch_chunk(&shared, &streams, index) 
+                AudioFile::fetch_chunk(session, &shared, index) 
             }
         }
     }
 }
 
-impl io::Read for AudioFile {
+impl <'s> io::Read for AudioFile <'s> {
     fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
         let index = self.position / CHUNK_SIZE;
         let offset = self.position % CHUNK_SIZE;
@@ -163,7 +150,7 @@ impl io::Read for AudioFile {
     }
 }
 
-impl io::Seek for AudioFile {
+impl <'s> io::Seek for AudioFile <'s> {
     fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
         let newpos = match pos {
             SeekFrom::Start(offset) => offset as i64,

+ 25 - 71
src/audio_key.rs

@@ -1,109 +1,63 @@
 use std::collections::HashMap;
-use std::sync::mpsc;
+use std::sync::{mpsc, Future};
 use std::io::{Cursor, Write};
 use byteorder::{BigEndian, ByteOrder, ReadBytesExt, WriteBytesExt};
 use readall::ReadAllExt;
 
-use connection::Packet;
-use util::{SpotifyId, FileId};
-use util::Either::{Left, Right};
-use subsystem::Subsystem;
+use util::{SpotifyId, FileId, IgnoreExt};
+use session::Session;
+use connection::PacketHandler;
 
-pub struct AudioKeyRequest {
-    pub track: SpotifyId,
-    pub file: FileId,
-    pub callback: AudioKeyCallback,
-}
 pub type AudioKey = [u8; 16];
-pub struct AudioKeyResponse(pub AudioKey);
-pub type AudioKeyCallback = mpsc::Sender<AudioKeyResponse>;
-
 type AudioKeyId = u32;
+
 pub struct AudioKeyManager {
     next_seq: AudioKeyId,
-    callbacks: HashMap<AudioKeyId, AudioKeyCallback>,
-
-    requests: mpsc::Receiver<AudioKeyRequest>,
-    packet_rx: mpsc::Receiver<Packet>,
-    packet_tx: mpsc::Sender<Packet>,
+    callbacks: HashMap<AudioKeyId, mpsc::Sender<AudioKey>>,
 }
 
 impl AudioKeyManager {
-    pub fn new(tx: mpsc::Sender<Packet>) -> (AudioKeyManager,
-                                             mpsc::Sender<AudioKeyRequest>,
-                                             mpsc::Sender<Packet>) {
-        let (req_tx, req_rx) = mpsc::channel();
-        let (pkt_tx, pkt_rx) = mpsc::channel();
-
-        (AudioKeyManager {
+    pub fn new() -> AudioKeyManager {
+        AudioKeyManager {
             next_seq: 1,
             callbacks: HashMap::new(),
-
-            requests: req_rx,
-            packet_rx: pkt_rx,
-            packet_tx: tx
-        }, req_tx, pkt_tx)
+        }
     }
 
-    fn request(&mut self, req: AudioKeyRequest) {
+    pub fn request(&mut self, session: &Session, track: SpotifyId, file: FileId)
+        -> Future<AudioKey> {
+        let (tx, rx) = mpsc::channel();
+
         let seq = self.next_seq;
         self.next_seq += 1;
 
         let mut data : Vec<u8> = Vec::new();
-        data.write(&req.file).unwrap();
-        data.write(&req.track.to_raw()).unwrap();
+        data.write(&file).unwrap();
+        data.write(&track.to_raw()).unwrap();
         data.write_u32::<BigEndian>(seq).unwrap();
         data.write_u16::<BigEndian>(0x0000).unwrap();
 
-        self.packet_tx.send(Packet {
-            cmd: 0xc,
-            data: data
-        }).unwrap();
+        session.send_packet(0xc, &data).unwrap();
+
+        self.callbacks.insert(seq, tx);
 
-        self.callbacks.insert(seq, req.callback);
+        Future::from_receiver(rx)
     }
+}
 
-    fn packet(&mut self, packet: Packet) {
-        assert_eq!(packet.cmd, 0xd);
+impl PacketHandler for AudioKeyManager {
+    fn handle(&mut self, cmd: u8, data: Vec<u8>) {
+        assert_eq!(cmd, 0xd);
 
-        let mut data = Cursor::new(&packet.data as &[u8]);
+        let mut data = Cursor::new(data);
         let seq = data.read_u32::<BigEndian>().unwrap();
         let mut key = [0u8; 16];
         data.read_all(&mut key).unwrap();
 
         match self.callbacks.remove(&seq) {
-            Some(callback) => callback.send(AudioKeyResponse(key)).unwrap(),
+            Some(callback) => callback.send(key).ignore(),
             None => ()
         };
     }
 }
 
-
-impl Subsystem for AudioKeyManager {
-    fn run(mut self) {
-        loop {
-            match {
-                let requests = &self.requests;
-                let packets = &self.packet_rx;
-
-                select!{
-                    r = requests.recv() => {
-                        Left(r.unwrap())
-                    },
-                    p = packets.recv() => {
-                        Right(p.unwrap())
-                    }
-                }
-            } {
-                Left(req) => {
-                    self.request(req);
-                }
-                Right(pkt) => {
-                    self.packet(pkt);
-                }
-            }
-
-        }
-    }
-}
-

+ 5 - 61
src/connection.rs

@@ -6,7 +6,6 @@ use std::io;
 use std::io::Write;
 use std::net::TcpStream;
 use std::result;
-use std::sync::mpsc;
 
 use keys::SharedKeys;
 
@@ -84,7 +83,7 @@ impl PlainConnection {
 }
 
 impl CipherConnection {
-    pub fn send_encrypted_packet(&mut self, cmd: u8, data: &[u8]) -> Result<()> {
+    pub fn send_packet(&mut self, cmd: u8, data: &[u8]) -> Result<()> {
         try!(self.stream.write_u8(cmd)); try!(self.stream.write_u16::<BigEndian>(data.len() as u16));
         try!(self.stream.write(data));
 
@@ -107,70 +106,15 @@ impl CipherConnection {
     }
 }
 
-pub struct Packet {
-    pub cmd: u8,
-    pub data: Vec<u8>
+pub trait PacketHandler {
+    fn handle(&mut self, cmd: u8, data: Vec<u8>);
 }
 
-pub struct SendThread {
-    connection: CipherConnection,
-    receiver: mpsc::Receiver<Packet>,
-}
-impl SendThread {
-    pub fn new(connection: CipherConnection)
-      -> (SendThread, mpsc::Sender<Packet>) {
-        let (tx, rx) = mpsc::channel();
-        (SendThread {
-            connection: connection,
-            receiver: rx
-        }, tx)
-    }
-
-    pub fn run(mut self) {
-        for req in self.receiver {
-            self.connection.send_encrypted_packet(
-                req.cmd, &req.data).unwrap();
-        }
-    }
-}
-
-pub struct PacketDispatch {
-    pub main: mpsc::Sender<Packet>,
-    pub stream: mpsc::Sender<Packet>,
-    pub mercury: mpsc::Sender<Packet>,
-    pub audio_key: mpsc::Sender<Packet>,
-}
-
-pub struct RecvThread {
-    connection: CipherConnection,
-    dispatch: PacketDispatch
-}
-
-impl RecvThread {
-    pub fn new(connection: CipherConnection, dispatch: PacketDispatch)
-        -> RecvThread {
-        RecvThread {
-            connection: connection,
-            dispatch: dispatch
-        }
-    }
-
-    pub fn run(mut self) {
-        loop {
-            let (cmd, data) = self.connection.recv_packet().unwrap();
-            let packet = Packet {
-                cmd: cmd,
-                data: data
-            };
-
+/*
             match packet.cmd {
                 0x09 => &self.dispatch.stream,
                 0xd | 0xe => &self.dispatch.audio_key,
                 0xb2...0xb6 => &self.dispatch.mercury,
                 _ => &self.dispatch.main,
             }.send(packet).unwrap();
-
-        }
-    }
-}
-
+            */

+ 29 - 37
src/main.rs

@@ -1,7 +1,8 @@
 #![crate_name = "librespot"]
 
-#![feature(plugin,zero_one,iter_arith,slice_position_elem,slice_bytes,bitset,mpsc_select,arc_weak,append)]
-#![allow(unused_imports,dead_code)]
+#![feature(plugin,scoped,zero_one,iter_arith,slice_position_elem,slice_bytes,bitset,arc_weak,append,future)]
+#![allow(deprecated)]
+//#![allow(unused_imports,dead_code)]
 
 #![plugin(protobuf_macros)]
 #[macro_use] extern crate lazy_static;
@@ -38,10 +39,10 @@ use std::clone::Clone;
 use std::fs::File;
 use std::io::{Read, Write};
 use std::path::Path;
-use std::sync::mpsc;
 use protobuf::core::Message;
+use std::thread;
 
-use metadata::{MetadataCache, AlbumRef, ArtistRef, TrackRef};
+use metadata::{AlbumRef, ArtistRef, TrackRef};
 use session::{Config, Session};
 use util::SpotifyId;
 use util::version::version_string;
@@ -69,12 +70,17 @@ fn main() {
     session.login(username.clone(), password);
     session.poll();
 
-    let ident = session.config.device_id.clone();
-    SpircManager{
-        session: session,
+    let poll_thread = thread::scoped(|| {
+        loop {
+            session.poll();
+        }
+    });
+
+    SpircManager {
+        session: &session,
         username: username.clone(),
         name: name.clone(),
-        ident: ident,
+        ident: session.config.device_id.clone(),
         device_type: 5,
 
         state_update_id: 0,
@@ -88,21 +94,17 @@ fn main() {
         state: PlayerState::new()
     }.run();
 
-    /*
-    loop {
-        session.poll();
-    }
-    */
+    poll_thread.join();
 }
 
-fn print_track(cache: &mut MetadataCache, track_id: SpotifyId) {
-    let track : TrackRef = cache.get(track_id);
+fn print_track(session: &Session, track_id: SpotifyId) {
+    let track : TrackRef = session.metadata(track_id);
 
     let album : AlbumRef = {
         let handle = track.wait();
         let data = handle.unwrap();
         eprintln!("{}", data.name);
-        cache.get(data.album)
+        session.metadata(data.album)
     };
 
     let artists : Vec<ArtistRef> = {
@@ -110,7 +112,7 @@ fn print_track(cache: &mut MetadataCache, track_id: SpotifyId) {
         let data = handle.unwrap();
         eprintln!("{}", data.name);
         data.artists.iter().map(|id| {
-            cache.get(*id)
+            session.metadata(*id)
         }).collect()
     };
 
@@ -159,7 +161,6 @@ impl PlayerState {
     }
 
     fn import(&mut self, state: &protocol::spirc::State) {
-        //println!("{:?}", state);
         self.status = state.get_status();
 
         self.context_uri = state.get_context_uri().to_string();
@@ -203,8 +204,8 @@ impl PlayerState {
     }
 }
 
-struct SpircManager {
-    session: Session,
+struct SpircManager<'s> {
+    session: &'s Session,
     username: String,
     state_update_id: i64,
     seq_nr: u32,
@@ -221,24 +222,16 @@ struct SpircManager {
     state: PlayerState
 }
 
-impl SpircManager {
+impl <'s> SpircManager<'s> {
     fn run(&mut self) {
-        let (tx, rx) = mpsc::channel();
-
-        self.session.mercury.send(MercuryRequest{
-            method: MercuryMethod::SUB,
-            uri: format!("hm://remote/user/{}/v23", self.username),
-            content_type: None,
-            callback: Some(tx),
-            payload: Vec::new()
-        }).unwrap();
+        let rx = self.session
+            .mercury_sub(format!("hm://remote/user/{}/v23", self.username))
+            .into_iter().map(|pkt| {
+            protobuf::parse_from_bytes::<protocol::spirc::Frame>(pkt.payload.front().unwrap()).unwrap()
+        });
 
         self.notify(None);
 
-        let rx = rx.into_iter().map(|pkt| {
-            protobuf::parse_from_bytes::<protocol::spirc::Frame>(pkt.payload.front().unwrap()).unwrap()
-        });
-        
         for frame in rx {
             println!("{:?} {} {} {} {}",
                      frame.get_typ(),
@@ -328,13 +321,12 @@ impl SpircManager {
             pkt.set_state(self.state.export());
         }
 
-        self.session.mercury.send(MercuryRequest{
+        self.session.mercury(MercuryRequest{
             method: MercuryMethod::SEND,
             uri: format!("hm://remote/user/{}", self.username),
             content_type: None,
-            callback: None,
             payload: vec![ pkt.write_to_bytes().unwrap() ]
-        }).unwrap();
+        });
     }
 
     fn device_state(&mut self) -> protocol::spirc::DeviceState {

+ 68 - 95
src/mercury.rs

@@ -5,12 +5,12 @@ use std::collections::{HashMap, LinkedList};
 use std::io::{Cursor, Read, Write};
 use std::fmt;
 use std::mem::replace;
-use std::sync::mpsc;
+use std::sync::{mpsc, Future};
 
-use connection::Packet;
 use librespot_protocol as protocol;
-use subsystem::Subsystem;
-use util::Either::{Left, Right};
+use session::Session;
+use connection::PacketHandler;
+use util::IgnoreExt;
 
 #[derive(Debug, PartialEq, Eq)]
 pub enum MercuryMethod {
@@ -24,7 +24,6 @@ pub struct MercuryRequest {
     pub method: MercuryMethod,
     pub uri: String,
     pub content_type: Option<String>,
-    pub callback: Option<MercuryCallback>,
     pub payload: Vec<Vec<u8>>
 }
 
@@ -34,22 +33,16 @@ pub struct MercuryResponse {
     pub payload: LinkedList<Vec<u8>>
 }
 
-pub type MercuryCallback = mpsc::Sender<MercuryResponse>;
-
 pub struct MercuryPending {
     parts: LinkedList<Vec<u8>>,
     partial: Option<Vec<u8>>,
-    callback: Option<MercuryCallback>
+    callback: Option<mpsc::Sender<MercuryResponse>>
 }
 
 pub struct MercuryManager {
     next_seq: u32,
     pending: HashMap<Vec<u8>, MercuryPending>,
-    subscriptions: HashMap<String, MercuryCallback>,
-
-    requests: mpsc::Receiver<MercuryRequest>,
-    packet_tx: mpsc::Sender<Packet>,
-    packet_rx: mpsc::Receiver<Packet>,
+    subscriptions: HashMap<String, mpsc::Sender<MercuryResponse>>,
 }
 
 impl fmt::Display for MercuryMethod {
@@ -64,24 +57,17 @@ impl fmt::Display for MercuryMethod {
 }
 
 impl MercuryManager {
-    pub fn new(tx: mpsc::Sender<Packet>) -> (MercuryManager,
-                                             mpsc::Sender<MercuryRequest>,
-                                             mpsc::Sender<Packet>) {
-        let (req_tx, req_rx) = mpsc::channel();
-        let (pkt_tx, pkt_rx) = mpsc::channel();
-
-        (MercuryManager {
+    pub fn new() -> MercuryManager {
+        MercuryManager {
             next_seq: 0,
             pending: HashMap::new(),
             subscriptions: HashMap::new(),
-
-            requests: req_rx,
-            packet_rx: pkt_rx,
-            packet_tx: tx,
-        }, req_tx, pkt_tx)
+        }
     }
 
-    fn request(&mut self, req: MercuryRequest) {
+    pub fn request(&mut self, session: &Session, req: MercuryRequest)
+        -> Future<MercuryResponse> {
+
         let mut seq = [0u8; 4];
         BigEndian::write_u32(&mut seq, self.next_seq);
         self.next_seq += 1;
@@ -93,20 +79,31 @@ impl MercuryManager {
             _ => 0xb2,
         };
 
-        self.packet_tx.send(Packet {
-            cmd: cmd,
-            data: data
-        }).unwrap();
+        session.send_packet(cmd, &data).unwrap();
 
-        if req.method != MercuryMethod::SUB {
-            self.pending.insert(seq.to_vec(), MercuryPending{
-                parts: LinkedList::new(),
-                partial: None,
-                callback: req.callback,
-            });
-        } else if let Some(cb) = req.callback {
-            self.subscriptions.insert(req.uri, cb);
-        }
+        let (tx, rx) = mpsc::channel();
+        self.pending.insert(seq.to_vec(), MercuryPending{
+            parts: LinkedList::new(),
+            partial: None,
+            callback: Some(tx),
+        });
+
+        Future::from_receiver(rx)
+    }
+
+    pub fn subscribe(&mut self, session: &Session, uri: String)
+        -> mpsc::Receiver<MercuryResponse> {
+        let (tx, rx) = mpsc::channel();
+        self.subscriptions.insert(uri.clone(), tx);
+
+        self.request(session, MercuryRequest{
+            method: MercuryMethod::SUB,
+            uri: uri,
+            content_type: None,
+            payload: Vec::new()
+        });
+
+        rx
     }
 
     fn parse_part(mut s: &mut Read) -> Vec<u8> {
@@ -133,14 +130,44 @@ impl MercuryManager {
         };
 
         if let Some(ref ch) = callback {
+             // Ignore send error.
+             // It simply means the receiver was closed
             ch.send(MercuryResponse{
                 uri: header.get_uri().to_string(),
                 payload: pending.parts
-            }).unwrap();
+            }).ignore();
         }
     }
 
-    fn handle_packet(&mut self, cmd: u8, data: Vec<u8>) {
+    fn encode_request(&self, seq: &[u8], req: &MercuryRequest) -> Vec<u8> {
+        let mut packet = Vec::new();
+        packet.write_u16::<BigEndian>(seq.len() as u16).unwrap();
+        packet.write_all(seq).unwrap();
+        packet.write_u8(1).unwrap(); // Flags: FINAL
+        packet.write_u16::<BigEndian>(1 + req.payload.len() as u16).unwrap(); // Part count
+
+        let mut header = protobuf_init!(protocol::mercury::Header::new(), {
+            uri: req.uri.clone(),
+            method: req.method.to_string(),
+        });
+        if let Some(ref content_type) = req.content_type {
+            header.set_content_type(content_type.clone());
+        }
+
+        packet.write_u16::<BigEndian>(header.compute_size() as u16).unwrap();
+        header.write_to_writer(&mut packet).unwrap();
+
+        for p in &req.payload {
+            packet.write_u16::<BigEndian>(p.len() as u16).unwrap();
+            packet.write(&p).unwrap();
+        }
+
+        packet
+    }
+}
+
+impl PacketHandler for MercuryManager {
+    fn handle(&mut self, cmd: u8, data: Vec<u8>) {
         let mut packet = Cursor::new(data);
 
         let seq = {
@@ -185,59 +212,5 @@ impl MercuryManager {
             self.pending.insert(seq, pending);
         }
     }
-
-    fn encode_request(&self, seq: &[u8], req: &MercuryRequest) -> Vec<u8> {
-        let mut packet = Vec::new();
-        packet.write_u16::<BigEndian>(seq.len() as u16).unwrap();
-        packet.write_all(seq).unwrap();
-        packet.write_u8(1).unwrap(); // Flags: FINAL
-        packet.write_u16::<BigEndian>(1 + req.payload.len() as u16).unwrap(); // Part count
-
-        let mut header = protobuf_init!(protocol::mercury::Header::new(), {
-            uri: req.uri.clone(),
-            method: req.method.to_string(),
-        });
-        if let Some(ref content_type) = req.content_type {
-            header.set_content_type(content_type.clone());
-        }
-
-        packet.write_u16::<BigEndian>(header.compute_size() as u16).unwrap();
-        header.write_to_writer(&mut packet).unwrap();
-
-        for p in &req.payload {
-            packet.write_u16::<BigEndian>(p.len() as u16).unwrap();
-            packet.write(&p).unwrap();
-        }
-
-        packet
-    }
-}
-
-impl Subsystem for MercuryManager {
-    fn run(mut self) {
-        loop {
-            match {
-                let requests = &self.requests;
-                let packets = &self.packet_rx;
-
-                select!{
-                    r = requests.recv() => {
-                        Left(r.unwrap())
-                    },
-                    p = packets.recv() => {
-                        Right(p.unwrap())
-                    }
-                }
-            } {
-                Left(req) => {
-                    self.request(req);
-                }
-                Right(pkt) => {
-                    self.handle_packet(pkt.cmd, pkt.data);
-                }
-            }
-
-        }
-    }
 }
 

+ 36 - 76
src/metadata.rs

@@ -3,13 +3,13 @@ use std::any::{Any, TypeId};
 use std::collections::HashMap;
 use std::fmt;
 use std::slice::bytes::copy_memory;
-use std::sync::{mpsc, Arc, Condvar, Mutex, MutexGuard, Weak};
+use std::sync::{Arc, Condvar, Mutex, MutexGuard, Weak};
 use std::thread;
 
 use librespot_protocol as protocol;
 use mercury::{MercuryRequest, MercuryMethod};
-use subsystem::Subsystem;
 use util::{SpotifyId, FileId};
+use session::Session;
 
 pub trait MetadataTrait : Send + Any + 'static {
     type Message: protobuf::MessageStatic;
@@ -119,40 +119,6 @@ pub type TrackRef = MetadataRef<Track>;
 pub type AlbumRef = MetadataRef<Album>;
 pub type ArtistRef = MetadataRef<Artist>;
 
-pub struct MetadataCache {
-    metadata: mpsc::Sender<MetadataRequest>,
-    cache: HashMap<(SpotifyId, TypeId), Box<Any + 'static>>
-}
-
-impl MetadataCache {
-    pub fn new(metadata: mpsc::Sender<MetadataRequest>) -> MetadataCache {
-        MetadataCache {
-            metadata: metadata,
-            cache: HashMap::new()
-        }
-    }
-
-    pub fn get<T: MetadataTrait>(&mut self, id: SpotifyId)
-      -> MetadataRef<T> {
-        let key = (id, TypeId::of::<T>());
-
-        self.cache.get(&key)
-            .and_then(|x| x.downcast_ref::<Weak<Metadata<T>>>())
-            .and_then(|x| x.upgrade())
-            .unwrap_or_else(|| {
-                let x : MetadataRef<T> = Arc::new(Metadata{
-                    id: id,
-                    state: Mutex::new(MetadataState::Loading),
-                    cond: Condvar::new()
-                });
-
-                self.cache.insert(key, Box::new(x.downgrade()));
-                self.metadata.send(T::request(x.clone())).unwrap();
-                x
-            })
-    }
-}
-
 impl <T: MetadataTrait> Metadata<T> {
     pub fn id(&self) -> SpotifyId {
         self.id
@@ -214,34 +180,46 @@ pub enum MetadataRequest {
 }
 
 pub struct MetadataManager {
-    requests: mpsc::Receiver<MetadataRequest>,
-    mercury: mpsc::Sender<MercuryRequest>
+    cache: HashMap<(SpotifyId, TypeId), Box<Any + Send>>
 }
 
 impl MetadataManager {
-    pub fn new(mercury: mpsc::Sender<MercuryRequest>) -> (MetadataManager,
-                                                          mpsc::Sender<MetadataRequest>) {
-        let (tx, rx) = mpsc::channel();
-        (MetadataManager {
-            requests: rx,
-            mercury: mercury
-        }, tx)
+    pub fn new() -> MetadataManager {
+        MetadataManager {
+            cache: HashMap::new()
+        }
     }
 
-    fn load<T: MetadataTrait> (&self, object: MetadataRef<T>) {
-        let mercury = self.mercury.clone();
-        thread::spawn(move || {
-            let (tx, rx) = mpsc::channel();
-            
-            mercury.send(MercuryRequest {
-                method: MercuryMethod::GET,
-                uri: format!("{}/{}", T::base_url(), object.id.to_base16()),
-                content_type: None,
-                callback: Some(tx),
-                payload: Vec::new()
-            }).unwrap();
+    pub fn get<T: MetadataTrait>(&mut self, session: &Session, id: SpotifyId)
+      -> MetadataRef<T> {
+        let key = (id, TypeId::of::<T>());
+
+        self.cache.get(&key)
+            .and_then(|x| x.downcast_ref::<Weak<Metadata<T>>>())
+            .and_then(|x| x.upgrade())
+            .unwrap_or_else(|| {
+                let x : MetadataRef<T> = Arc::new(Metadata{
+                    id: id,
+                    state: Mutex::new(MetadataState::Loading),
+                    cond: Condvar::new()
+                });
 
-            let response = rx.recv().unwrap();
+                self.cache.insert(key, Box::new(x.downgrade()));
+                self.load(session, x.clone());
+                x
+            })
+    }
+
+    fn load<T: MetadataTrait> (&self, session: &Session, object: MetadataRef<T>) {
+        let rx = session.mercury(MercuryRequest {
+            method: MercuryMethod::GET,
+            uri: format!("{}/{}", T::base_url(), object.id.to_base16()),
+            content_type: None,
+            payload: Vec::new()
+        });
+
+        thread::spawn(move || {
+            let response = rx.into_inner();
 
             let msg : T::Message = protobuf::parse_from_bytes(
                 response.payload.front().unwrap()).unwrap();
@@ -251,21 +229,3 @@ impl MetadataManager {
     }
 }
 
-impl Subsystem for MetadataManager {
-    fn run(self) {
-        for req in self.requests.iter() {
-            match req {
-                MetadataRequest::Artist(artist) => {
-                    self.load(artist)
-                }
-                MetadataRequest::Album(album) => {
-                    self.load(album)
-                }
-                MetadataRequest::Track(track) => {
-                    self.load(track)
-                }
-            }
-        }
-    }
-}
-

+ 2 - 15
src/player.rs

@@ -1,8 +1,6 @@
 use portaudio;
-use std::sync::mpsc;
 use vorbis;
 
-use audio_key::{AudioKeyRequest, AudioKeyResponse};
 use metadata::TrackRef;
 use session::Session;
 use audio_file::AudioFile;
@@ -15,24 +13,13 @@ impl Player {
     pub fn play(session: &Session, track: TrackRef) {
         let file_id = *track.wait().unwrap().files.first().unwrap();
 
-        let key = {
-            let (tx, rx) = mpsc::channel();
-
-            session.audio_key.send(AudioKeyRequest {
-                track: track.id(),
-                file: file_id,
-                callback: tx
-            }).unwrap();
-            
-            let AudioKeyResponse(key) = rx.recv().unwrap();
-            key
-        };
+        let key = session.audio_key(track.id(), file_id).into_inner();
 
         let mut decoder = 
             vorbis::Decoder::new(
                 Subfile::new(
                         AudioDecrypt::new(key,
-                            AudioFile::new(file_id, session.stream.clone())), 0xa7)).unwrap();
+                            AudioFile::new(session, file_id)), 0xa7)).unwrap();
         //decoder.time_seek(60f64).unwrap();
 
         portaudio::initialize().unwrap();

+ 63 - 70
src/session.rs

@@ -2,17 +2,19 @@ use crypto::digest::Digest;
 use crypto::sha1::Sha1;
 use protobuf::{self, Message};
 use rand::thread_rng;
-use std::sync::mpsc;
-use std::thread;
+use std::sync::{Mutex, Arc, Future, mpsc};
 
-use audio_key;
-use connection::{PlainConnection, Packet, PacketDispatch, SendThread, RecvThread};
+use connection::{self, PlainConnection, CipherConnection};
 use keys::PrivateKeys;
 use librespot_protocol as protocol;
-use mercury;
-use metadata;
-use stream;
-use subsystem::Subsystem;
+use util::{SpotifyId, FileId};
+
+use mercury::{MercuryManager, MercuryRequest, MercuryResponse};
+use metadata::{MetadataManager, MetadataRef, MetadataTrait};
+use stream::{StreamManager, StreamEvent};
+use audio_key::{AudioKeyManager, AudioKey};
+use connection::PacketHandler;
+
 use util;
 
 pub struct Config {
@@ -24,15 +26,16 @@ pub struct Config {
 pub struct Session {
     pub config: Config,
 
-    packet_rx: mpsc::Receiver<Packet>,
-    pub packet_tx: mpsc::Sender<Packet>,
-
-    pub audio_key: mpsc::Sender<audio_key::AudioKeyRequest>,
-    pub mercury: mpsc::Sender<mercury::MercuryRequest>,
-    pub metadata: mpsc::Sender<metadata::MetadataRequest>,
-    pub stream: mpsc::Sender<stream::StreamRequest>,
+    mercury: Mutex<MercuryManager>,
+    metadata: Mutex<MetadataManager>,
+    stream: Mutex<StreamManager>,
+    audio_key: Mutex<AudioKeyManager>,
+    rx_connection: Mutex<CipherConnection>,
+    tx_connection: Mutex<CipherConnection>,
 }
 
+type SessionRef = Arc<Session>;
+
 impl Session {
     pub fn new(mut config: Config) -> Session {
         config.device_id = {
@@ -105,41 +108,16 @@ impl Session {
 
         let cipher_connection = connection.setup_cipher(shared_keys);
 
-        let (send_thread, tx) = SendThread::new(cipher_connection.clone());
-
-        let (main_tx, rx) = mpsc::channel();
-        let (mercury, mercury_req, mercury_pkt)
-            = mercury::MercuryManager::new(tx.clone());
-        let (metadata, metadata_req)
-            = metadata::MetadataManager::new(mercury_req.clone());
-        let (stream, stream_req, stream_pkt)
-            = stream::StreamManager::new(tx.clone());
-        let (audio_key, audio_key_req, audio_key_pkt)
-            = audio_key::AudioKeyManager::new(tx.clone());
-
-        let recv_thread = RecvThread::new(cipher_connection, PacketDispatch {
-            main: main_tx,
-            stream: stream_pkt,
-            mercury: mercury_pkt,
-            audio_key: audio_key_pkt
-        });
-
-        thread::spawn(move || send_thread.run());
-        thread::spawn(move || recv_thread.run());
-
-        mercury.start();
-        metadata.start();
-        stream.start();
-        audio_key.start();
-
         Session {
             config: config,
-            packet_rx: rx,
-            packet_tx: tx,
-            mercury: mercury_req,
-            metadata: metadata_req,
-            stream: stream_req,
-            audio_key: audio_key_req,
+
+            rx_connection: Mutex::new(cipher_connection.clone()),
+            tx_connection: Mutex::new(cipher_connection),
+
+            mercury: Mutex::new(MercuryManager::new()),
+            metadata: Mutex::new(MetadataManager::new()),
+            stream: Mutex::new(StreamManager::new()),
+            audio_key: Mutex::new(AudioKeyManager::new()),
         }
     }
 
@@ -166,32 +144,47 @@ impl Session {
             }
         });
 
-        self.packet_tx.send(Packet {
-            cmd: 0xab,
-            data: packet.write_to_bytes().unwrap()
-        }).unwrap();
+        self.send_packet(0xab, &packet.write_to_bytes().unwrap()).unwrap();
     }
 
     pub fn poll(&self) {
-        let packet = self.packet_rx.recv().unwrap();
-
-        match packet.cmd {
-            0x4 => { // PING
-                self.packet_tx.send(Packet {
-                    cmd: 0x49,
-                    data: packet.data
-                }).unwrap();
-            }
-            0x4a => { // PONG
-            }
-            0xac => { // AUTHENTICATED
-                eprintln!("Authentication succeedded");
-            }
-            0xad => {
-                eprintln!("Authentication failed");
-            }
+        let (cmd, data) =
+            self.rx_connection.lock().unwrap().recv_packet().unwrap();
+
+        match cmd {
+            0x4 => self.send_packet(0x49, &data).unwrap(),
+            0x4a => (),
+            0x9  => self.stream.lock().unwrap().handle(cmd, data),
+            0xd | 0xe => self.audio_key.lock().unwrap().handle(cmd, data),
+            0xb2...0xb6 => self.mercury.lock().unwrap().handle(cmd, data),
+            0xac => eprintln!("Authentication succeedded"),
+            0xad => eprintln!("Authentication failed"),
             _ => ()
-        };
+        }
+    }
+
+    pub fn send_packet(&self, cmd: u8, data: &[u8]) -> connection::Result<()> {
+        self.tx_connection.lock().unwrap().send_packet(cmd, data)
+    }
+    
+    pub fn audio_key(&self, track: SpotifyId, file: FileId) -> Future<AudioKey> {
+        self.audio_key.lock().unwrap().request(self, track, file)
+    }
+
+    pub fn stream(&self, file: FileId, offset: u32, size: u32) -> mpsc::Receiver<StreamEvent> {
+        self.stream.lock().unwrap().request(self, file, offset, size)
+    }
+
+    pub fn metadata<T: MetadataTrait>(&self, id: SpotifyId) -> MetadataRef<T> {
+        self.metadata.lock().unwrap().get(self, id)
+    }
+
+    pub fn mercury(&self, req: MercuryRequest) -> Future<MercuryResponse> {
+        self.mercury.lock().unwrap().request(self, req)
+    }
+
+    pub fn mercury_sub(&self, uri: String) -> mpsc::Receiver<MercuryResponse> {
+        self.mercury.lock().unwrap().subscribe(self, uri)
     }
 }
 

+ 22 - 62
src/stream.rs

@@ -3,18 +3,9 @@ use std::collections::HashMap;
 use std::io::{Cursor, Seek, SeekFrom, Write};
 use std::sync::mpsc;
 
-use connection::Packet;
 use util::{ArcVec, FileId};
-use util::Either::{Left, Right};
-use subsystem::Subsystem;
-
-pub type StreamCallback = mpsc::Sender<StreamEvent>;
-pub struct StreamRequest {
-    pub id: FileId,
-    pub offset: u32,
-    pub size: u32,
-    pub callback: StreamCallback
-}
+use connection::PacketHandler;
+use session::Session;
 
 #[derive(Debug)]
 pub enum StreamEvent {
@@ -31,36 +22,28 @@ enum ChannelMode {
 
 struct Channel {
     mode: ChannelMode,
-    callback: StreamCallback
+    callback: mpsc::Sender<StreamEvent>
 }
 
 pub struct StreamManager {
     next_id: ChannelId,
     channels: HashMap<ChannelId, Channel>,
-
-    requests: mpsc::Receiver<StreamRequest>,
-    packet_rx: mpsc::Receiver<Packet>,
-    packet_tx: mpsc::Sender<Packet>,
 }
 
 impl StreamManager {
-    pub fn new(tx: mpsc::Sender<Packet>) -> (StreamManager,
-                                             mpsc::Sender<StreamRequest>,
-                                             mpsc::Sender<Packet>) {
-        let (req_tx, req_rx) = mpsc::channel();
-        let (pkt_tx, pkt_rx) = mpsc::channel();
-
-        (StreamManager {
+    pub fn new() -> StreamManager {
+        StreamManager {
             next_id: 0,
             channels: HashMap::new(),
-
-            requests: req_rx,
-            packet_rx: pkt_rx,
-            packet_tx: tx
-        }, req_tx, pkt_tx)
+        }
     }
 
-    fn request(&mut self, req: StreamRequest) {
+    pub fn request(&mut self, session: &Session,
+                   file: FileId, offset: u32, size: u32)
+        -> mpsc::Receiver<StreamEvent> {
+
+        let (tx, rx) = mpsc::channel();
+
         let channel_id = self.next_id;
         self.next_id += 1;
 
@@ -72,22 +55,23 @@ impl StreamManager {
         data.write_u32::<BigEndian>(0x00000000).unwrap();
         data.write_u32::<BigEndian>(0x00009C40).unwrap();
         data.write_u32::<BigEndian>(0x00020000).unwrap();
-        data.write(&req.id).unwrap();
-        data.write_u32::<BigEndian>(req.offset).unwrap();
-        data.write_u32::<BigEndian>(req.offset + req.size).unwrap();
+        data.write(&file).unwrap();
+        data.write_u32::<BigEndian>(offset).unwrap();
+        data.write_u32::<BigEndian>(offset + size).unwrap();
 
-        self.packet_tx.send(Packet {
-            cmd: 0x8,
-            data: data
-        }).unwrap();
+        session.send_packet(0x8, &data).unwrap();
 
         self.channels.insert(channel_id, Channel {
             mode: ChannelMode::Header,
-            callback: req.callback
+            callback: tx
         });
+
+        rx
     }
+}
 
-    fn packet(&mut self, data: Vec<u8>) {
+impl PacketHandler for StreamManager {
+    fn handle(&mut self, _cmd: u8, data: Vec<u8>) {
         let data = ArcVec::new(data);
         let mut packet = Cursor::new(&data as &[u8]);
 
@@ -140,27 +124,3 @@ impl StreamManager {
     }
 }
 
-impl Subsystem for StreamManager {
-    fn run(mut self) {
-        loop {
-            match {
-                let requests = &self.requests;
-                let packets = &self.packet_rx;
-
-                select!{
-                    r = requests.recv() => {
-                        Left(r.unwrap())
-                    },
-                    p = packets.recv() => {
-                        Right(p.unwrap())
-                    }
-                }
-            } {
-                Left(req) => self.request(req),
-                Right(pkt) => self.packet(pkt.data)
-            }
-        }
-    }
-}
-
-

+ 12 - 5
src/util/mod.rs

@@ -47,11 +47,6 @@ pub mod version {
     }
 }
 
-pub enum Either<S,T> {
-    Left(S),
-    Right(T)
-}
-
 pub fn hexdump(data: &[u8]) {
     for b in data.iter() {
         eprint!("{:02X} ", b);
@@ -59,3 +54,15 @@ pub fn hexdump(data: &[u8]) {
     eprintln!("");
 }
 
+pub trait IgnoreExt {
+    fn ignore(self);
+}
+
+impl <T, E> IgnoreExt for Result<T, E> {
+    fn ignore(self) {
+        match self {
+            Ok(_)  => (),
+            Err(_) => (),
+        }
+    }
+}