Эх сурвалжийг харах

Create channel in spirc instead of in mercury and use MercuryResponseSender implemented by macro

Daniel Romero 8 жил өмнө
parent
commit
c8ee08663d
3 өөрчлөгдсөн 42 нэмэгдсэн , 25 устгасан
  1. 5 9
      src/mercury.rs
  2. 4 3
      src/session.rs
  3. 33 13
      src/spirc.rs

+ 5 - 9
src/mercury.rs

@@ -4,10 +4,10 @@ use protobuf::{self, Message};
 use std::collections::HashMap;
 use std::io::{Cursor, Read, Write};
 use std::mem::replace;
-use std::sync::mpsc;
 
 use protocol;
 use session::{Session, PacketHandler};
+use spirc::MercuryResponseSender;
 
 #[derive(Debug, PartialEq, Eq)]
 pub enum MercuryMethod {
@@ -32,7 +32,7 @@ pub struct MercuryResponse {
 
 enum MercuryCallback {
     Future(eventual::Complete<MercuryResponse, ()>),
-    Subscription(mpsc::Sender<MercuryResponse>),
+    Subscription(MercuryResponseSender),
     Channel,
 }
 
@@ -45,7 +45,7 @@ pub struct MercuryPending {
 pub struct MercuryManager {
     next_seq: u32,
     pending: HashMap<Vec<u8>, MercuryPending>,
-    subscriptions: HashMap<String, mpsc::Sender<MercuryResponse>>,
+    subscriptions: HashMap<String, MercuryResponseSender>,
 }
 
 impl ToString for MercuryMethod {
@@ -103,9 +103,7 @@ impl MercuryManager {
         rx
     }
 
-    pub fn subscribe(&mut self, session: &Session, uri: String) -> mpsc::Receiver<MercuryResponse> {
-        let (tx, rx) = mpsc::channel();
-
+    pub fn subscribe(&mut self, session: &Session, uri: String, tx: MercuryResponseSender) {
         self.request_with_callback(session,
                                    MercuryRequest {
                                        method: MercuryMethod::SUB,
@@ -114,8 +112,6 @@ impl MercuryManager {
                                        payload: Vec::new(),
                                    },
                                    MercuryCallback::Subscription(tx));
-
-        rx
     }
 
     fn parse_part(mut s: &mut Read) -> Vec<u8> {
@@ -128,7 +124,7 @@ impl MercuryManager {
 
     fn complete_subscription(&mut self,
                              response: MercuryResponse,
-                             tx: mpsc::Sender<MercuryResponse>) {
+                             tx: MercuryResponseSender) {
         for sub_data in response.payload {
             if let Ok(mut sub) =
                    protobuf::parse_from_bytes::<protocol::pubsub::Subscription>(&sub_data) {

+ 4 - 3
src/session.rs

@@ -9,7 +9,7 @@ use protobuf::{self, Message};
 use rand::thread_rng;
 use std::io::{Read, Write, Cursor};
 use std::result::Result;
-use std::sync::{Mutex, RwLock, Arc, mpsc};
+use std::sync::{Mutex, RwLock, Arc};
 use std::str::FromStr;
 
 use album_cover::AlbumCover;
@@ -24,6 +24,7 @@ use mercury::{MercuryManager, MercuryRequest, MercuryResponse};
 use metadata::{MetadataManager, MetadataRef, MetadataTrait};
 use protocol;
 use stream::StreamManager;
+use spirc::MercuryResponseSender;
 use util::{self, SpotifyId, FileId, ReadSeek};
 use version;
 
@@ -320,8 +321,8 @@ impl Session {
         self.0.mercury.lock().unwrap().request(self, req)
     }
 
-    pub fn mercury_sub(&self, uri: String) -> mpsc::Receiver<MercuryResponse> {
-        self.0.mercury.lock().unwrap().subscribe(self, uri)
+    pub fn mercury_sub(&self, uri: String, tx: MercuryResponseSender) {
+        self.0.mercury.lock().unwrap().subscribe(self, uri, tx)
     }
 
     pub fn cache(&self) -> &Cache {

+ 33 - 13
src/spirc.rs

@@ -1,10 +1,10 @@
 use eventual::Async;
 use protobuf::{self, Message, RepeatedField};
 use std::borrow::Cow;
-use std::sync::{Mutex, Arc};
+use std::sync::{mpsc, Mutex, Arc};
 use std::collections::HashMap;
 
-use mercury::{MercuryRequest, MercuryMethod};
+use mercury::{MercuryRequest, MercuryMethod, MercuryResponse};
 use player::{Player, PlayerState};
 use mixer::Mixer;
 use session::Session;
@@ -56,6 +56,18 @@ pub struct State {
     pub end_of_track: bool,
 }
 
+pub struct UpdateMessage;
+
+pub enum SpircMessage {
+    MercuryMsg(MercuryResponse),
+    UpdateMsg(UpdateMessage)
+}
+
+implement_sender!(name => MercuryResponseSender, 
+                  wrap => MercuryResponse, 
+                  with => SpircMessage, 
+                  variant => MercuryMsg);
+
 impl SpircManager {
     pub fn new(session: Session, player: Player, mixer: Box<Mixer + Send>) -> SpircManager {
         let ident = session.device_id().to_owned();
@@ -92,8 +104,11 @@ impl SpircManager {
     pub fn run(&self) {
         let rx = {
             let mut internal = self.0.lock().unwrap();
+            let (tx, rx) = mpsc::channel::<SpircMessage>();
 
-            let rx = internal.session.mercury_sub(internal.uri());
+            let mercury_response_sender = MercuryResponseSender::create(tx.clone());
+
+            internal.session.mercury_sub(internal.uri(), mercury_response_sender);
 
             internal.notify(true, None);
 
@@ -110,18 +125,23 @@ impl SpircManager {
             rx
         };
 
-        for pkt in rx {
-            let data = pkt.payload.first().unwrap();
-            let frame = protobuf::parse_from_bytes::<protocol::spirc::Frame>(data).unwrap();
+        for msg in rx {
+            match msg {
+            SpircMessage::MercuryMsg(pkt) => {
+                let data = pkt.payload.first().unwrap();
+                let frame = protobuf::parse_from_bytes::<protocol::spirc::Frame>(data).unwrap();
 
-            debug!("{:?} {:?} {} {} {}",
-                     frame.get_typ(),
-                     frame.get_device_state().get_name(),
-                     frame.get_ident(),
-                     frame.get_seq_nr(),
-                     frame.get_state_update_id());
+                debug!("{:?} {:?} {} {} {}",
+                        frame.get_typ(),
+                        frame.get_device_state().get_name(),
+                        frame.get_ident(),
+                        frame.get_seq_nr(),
+                        frame.get_state_update_id());
 
-            self.0.lock().unwrap().handle(frame);
+                self.0.lock().unwrap().handle(frame);
+            }
+            _ => {}
+            }
         }
     }