Browse Source

do not panic on connection reset

Reinier Balt 7 years ago
parent
commit
2aea0e8fe6
3 changed files with 32 additions and 2 deletions
  1. 4 0
      connect/src/spirc.rs
  2. 24 2
      core/src/session.rs
  3. 4 0
      playback/src/player.rs

+ 4 - 0
connect/src/spirc.rs

@@ -301,6 +301,10 @@ impl Future for SpircTask {
         loop {
             let mut progress = false;
 
+            if self.session.is_invalid() {
+                return Ok(Async::Ready(()));
+            }
+
             if !self.shutdown {
                 match self.subscription.poll().unwrap() {
                     Async::Ready(Some(frame)) => {

+ 24 - 2
core/src/session.rs

@@ -20,6 +20,7 @@ use mercury::MercuryManager;
 struct SessionData {
     country: String,
     canonical_username: String,
+    invalid: bool,
 }
 
 struct SessionInternal {
@@ -77,7 +78,9 @@ impl Session {
                 reusable_credentials.username.clone(),
             );
 
-            handle.spawn(task.map_err(|e| panic!(e)));
+            handle.spawn(task.map_err(|e| {
+                error!("{:?}", e);
+            }));
 
             session
         });
@@ -104,6 +107,7 @@ impl Session {
             data: RwLock::new(SessionData {
                 country: String::new(),
                 canonical_username: username,
+                invalid: false,
             }),
 
             tx_connection: sender_tx,
@@ -212,6 +216,15 @@ impl Session {
     pub fn session_id(&self) -> usize {
         self.0.session_id
     }
+
+    pub fn shutdown(&self) {
+        debug!("Invalidating session[{}]", self.0.session_id);
+        self.0.data.write().unwrap().invalid = true;
+    }
+
+    pub fn is_invalid(&self) -> bool {
+        self.0.data.read().unwrap().invalid
+    }
 }
 
 #[derive(Clone)]
@@ -240,6 +253,7 @@ where
 impl<S> Future for DispatchTask<S>
 where
     S: Stream<Item = (u8, Bytes)>,
+    <S as Stream>::Error: ::std::fmt::Debug,
 {
     type Item = ();
     type Error = S::Error;
@@ -251,7 +265,15 @@ where
         };
 
         loop {
-            let (cmd, data) = try_ready!(self.0.poll()).expect("connection closed");
+            let (cmd, data) = match self.0.poll() {
+                Ok(Async::Ready(t)) => t,
+                Ok(Async::NotReady) => return Ok(Async::NotReady),
+                Err(e) => {
+                    session.shutdown();
+                    return Err(From::from(e));
+                }
+            }.expect("connection closed");
+
             session.dispatch(cmd, data);
         }
     }

+ 4 - 0
playback/src/player.rs

@@ -341,6 +341,10 @@ impl PlayerInternal {
                     self.handle_packet(packet, current_normalisation_factor);
                 }
             }
+
+            if self.session.is_invalid() {
+                return;
+            }
         }
     }