Browse Source

Enable Mercury to be shut down and all pending requests being cancelled.

Konstantin Seiler 4 years ago
parent
commit
3fe3849588
2 changed files with 32 additions and 15 deletions
  1. 31 15
      core/src/mercury/mod.rs
  2. 1 0
      core/src/session.rs

+ 31 - 15
core/src/mercury/mod.rs

@@ -20,6 +20,7 @@ component! {
         sequence: SeqGenerator<u64> = SeqGenerator::new(0),
         pending: HashMap<Vec<u8>, MercuryPending> = HashMap::new(),
         subscriptions: Vec<(String, mpsc::UnboundedSender<MercuryResponse>)> = Vec::new(),
+        is_shutdown: bool = false,
     }
 }
 
@@ -61,7 +62,11 @@ impl MercuryManager {
         };
 
         let seq = self.next_seq();
-        self.lock(|inner| inner.pending.insert(seq.clone(), pending));
+        self.lock(|inner| {
+            if !inner.is_shutdown {
+                inner.pending.insert(seq.clone(), pending);
+            }
+        });
 
         let cmd = req.method.command();
         let data = req.encode(&seq);
@@ -109,21 +114,23 @@ impl MercuryManager {
             let (tx, rx) = mpsc::unbounded();
 
             manager.lock(move |inner| {
-                debug!("subscribed uri={} count={}", uri, response.payload.len());
-                if response.payload.len() > 0 {
-                    // Old subscription protocol, watch the provided list of URIs
-                    for sub in response.payload {
-                        let mut sub: protocol::pubsub::Subscription =
-                            protobuf::parse_from_bytes(&sub).unwrap();
-                        let sub_uri = sub.take_uri();
-
-                        debug!("subscribed sub_uri={}", sub_uri);
-
-                        inner.subscriptions.push((sub_uri, tx.clone()));
+                if !inner.is_shutdown {
+                    debug!("subscribed uri={} count={}", uri, response.payload.len());
+                    if response.payload.len() > 0 {
+                        // Old subscription protocol, watch the provided list of URIs
+                        for sub in response.payload {
+                            let mut sub: protocol::pubsub::Subscription =
+                                protobuf::parse_from_bytes(&sub).unwrap();
+                            let sub_uri = sub.take_uri();
+
+                            debug!("subscribed sub_uri={}", sub_uri);
+
+                            inner.subscriptions.push((sub_uri, tx.clone()));
+                        }
+                    } else {
+                        // New subscription protocol, watch the requested URI
+                        inner.subscriptions.push((uri, tx));
                     }
-                } else {
-                    // New subscription protocol, watch the requested URI
-                    inner.subscriptions.push((uri, tx));
                 }
             });
 
@@ -222,4 +229,13 @@ impl MercuryManager {
             }
         }
     }
+
+    pub(crate) fn shutdown(&self) {
+        self.lock(|inner| {
+            inner.is_shutdown = true;
+            // destroy the sending halves of the channels to signal everyone who is waiting for something.
+            inner.pending.clear();
+            inner.subscriptions.clear();
+        });
+    }
 }

+ 1 - 0
core/src/session.rs

@@ -237,6 +237,7 @@ impl Session {
     pub fn shutdown(&self) {
         debug!("Invalidating session[{}]", self.0.session_id);
         self.0.data.write().unwrap().invalid = true;
+        self.mercury().shutdown();
     }
 
     pub fn is_invalid(&self) -> bool {