Răsfoiți Sursa

mercury: Register subscription for all channel aliases

When subscribing to a channel, we may actually end up subscribed to other
alias channels. We must track these as well in order to redirect received
messages properly.
Paul Lietar 10 ani în urmă
părinte
comite
4806f3e85a
4 a modificat fișierele cu 44 adăugiri și 19 ștergeri
  1. 1 0
      protocol/build.rs
  2. 1 0
      protocol/src/lib.rs
  3. 41 18
      src/mercury.rs
  4. 1 1
      src/spirc.rs

+ 1 - 0
protocol/build.rs

@@ -43,6 +43,7 @@ fn main() {
             &proto.join("authentication.proto"),
             &proto.join("mercury.proto"),
             &proto.join("metadata.proto"),
+            &proto.join("pubsub.proto"),
             &proto.join("spirc.proto"),
     ]).unwrap();
 }

+ 1 - 0
protocol/src/lib.rs

@@ -7,5 +7,6 @@ mod_path! keyexchange (concat!(env!("OUT_DIR"), "/keyexchange.rs"));
 mod_path! authentication (concat!(env!("OUT_DIR"), "/authentication.rs"));
 mod_path! mercury (concat!(env!("OUT_DIR"), "/mercury.rs"));
 mod_path! metadata (concat!(env!("OUT_DIR"), "/metadata.rs"));
+mod_path! pubsub (concat!(env!("OUT_DIR"), "/pubsub.rs"));
 mod_path! spirc (concat!(env!("OUT_DIR"), "/spirc.rs"));
 

+ 41 - 18
src/mercury.rs

@@ -1,5 +1,5 @@
 use byteorder::{BigEndian, ByteOrder, ReadBytesExt, WriteBytesExt};
-use eventual::{self, Async};
+use eventual;
 use protobuf::{self, Message};
 use std::collections::HashMap;
 use std::io::{Cursor, Read, Write};
@@ -9,7 +9,6 @@ use std::sync::mpsc;
 use librespot_protocol as protocol;
 use session::Session;
 use connection::PacketHandler;
-use util::IgnoreExt;
 
 #[derive(Debug, PartialEq, Eq)]
 pub enum MercuryMethod {
@@ -32,10 +31,16 @@ pub struct MercuryResponse {
     pub payload: Vec<Vec<u8>>
 }
 
+enum MercuryCallback {
+    Future(eventual::Complete<MercuryResponse, ()>),
+    Subscription(mpsc::Sender<MercuryResponse>),
+    Channel,
+}
+
 pub struct MercuryPending {
     parts: Vec<Vec<u8>>,
     partial: Option<Vec<u8>>,
-    callback: Option<eventual::Complete<MercuryResponse, ()>>
+    callback: MercuryCallback
 }
 
 pub struct MercuryManager {
@@ -64,9 +69,10 @@ impl MercuryManager {
         }
     }
 
-    pub fn request(&mut self, session: &Session, req: MercuryRequest)
-        -> eventual::Future<MercuryResponse, ()> {
-
+    fn request_with_callback(&mut self,
+                             session: &Session,
+                             req: MercuryRequest, 
+                             cb: MercuryCallback) {
         let mut seq = [0u8; 4];
         BigEndian::write_u32(&mut seq, self.next_seq);
         self.next_seq += 1;
@@ -80,27 +86,30 @@ impl MercuryManager {
 
         session.send_packet(cmd, &data).unwrap();
 
-        let (tx, rx) = eventual::Future::pair();
         self.pending.insert(seq.to_vec(), MercuryPending{
             parts: Vec::new(),
             partial: None,
-            callback: Some(tx),
+            callback: cb,
         });
+    }
 
+    pub fn request(&mut self, session: &Session, req: MercuryRequest)
+            -> eventual::Future<MercuryResponse, ()> {
+        let (tx, rx) = eventual::Future::pair();
+        self.request_with_callback(session, req, MercuryCallback::Future(tx));
         rx
     }
 
     pub fn subscribe(&mut self, session: &Session, uri: String)
         -> mpsc::Receiver<MercuryResponse> {
         let (tx, rx) = mpsc::channel();
-        self.subscriptions.insert(uri.clone(), tx);
 
-        self.request(session, MercuryRequest{
+        self.request_with_callback(session, MercuryRequest{
             method: MercuryMethod::SUB,
             uri: uri,
             content_type: None,
             payload: Vec::new()
-        }).fire();
+        }, MercuryCallback::Subscription(tx));
 
         rx
     }
@@ -113,7 +122,17 @@ impl MercuryManager {
         buffer
     }
 
-    fn complete_request(&mut self, cmd: u8, mut pending: MercuryPending) {
+    fn complete_subscription(&mut self,
+                             response: MercuryResponse,
+                             tx: mpsc::Sender<MercuryResponse>) {
+        for sub_data in response.payload {
+            if let Ok(mut sub) = protobuf::parse_from_bytes::<protocol::pubsub::Subscription>(&sub_data) {
+                self.subscriptions.insert(sub.take_uri(), tx.clone());
+            }
+        }
+    }
+
+    fn complete_request(&mut self, mut pending: MercuryPending) {
         let header_data = pending.parts.remove(0);
         let header : protocol::mercury::Header =
             protobuf::parse_from_bytes(&header_data).unwrap();
@@ -123,10 +142,14 @@ impl MercuryManager {
             payload: pending.parts
         };
 
-        if cmd == 0xb5 {
-            self.subscriptions.get(header.get_uri()).map(|ch| ch.send(response).ignore());
-        } else {
-            pending.callback.map(|cb| cb.complete(response));
+        match pending.callback {
+            MercuryCallback::Future(tx) => tx.complete(response),
+            MercuryCallback::Subscription(tx) => self.complete_subscription(response, tx),
+            MercuryCallback::Channel => {
+                self.subscriptions
+                    .get(header.get_uri()).unwrap()
+                    .send(response).unwrap()
+            }
         }
     }
 
@@ -176,7 +199,7 @@ impl PacketHandler for MercuryManager {
             MercuryPending {
                 parts: Vec::new(),
                 partial: None,
-                callback: None,
+                callback: MercuryCallback::Channel,
             }
         } else {
             println!("Ignore seq {:?} cmd {}", seq, cmd);
@@ -198,7 +221,7 @@ impl PacketHandler for MercuryManager {
         }
 
         if flags == 0x1 {
-            self.complete_request(cmd, pending);
+            self.complete_request(pending);
         } else {
             self.pending.insert(seq, pending);
         }

+ 1 - 1
src/spirc.rs

@@ -89,7 +89,7 @@ impl <'s, D: SpircDelegate> SpircManager<'s, D> {
     }
 
     pub fn run(&mut self) {
-        let rx = self.session.mercury_sub(format!("hm://remote/3/user/{}/", 
+        let rx = self.session.mercury_sub(format!("hm://remote/user/{}/",
                     self.session.0.data.read().unwrap().canonical_username.clone()));
         let updates = self.delegate.updates();