| 
					
				 | 
			
			
				@@ -101,9 +101,10 @@ impl MercuryManager { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     pub fn subscribe<T: Into<String>>(&self, uri: T) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         -> BoxFuture<mpsc::UnboundedReceiver<MercuryResponse>, MercuryError> 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        let uri = uri.into(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         let request = self.request(MercuryRequest { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             method: MercuryMethod::SUB, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            uri: uri.into(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            uri: uri.clone(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             content_type: None, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             payload: Vec::new(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         }); 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -116,8 +117,11 @@ impl MercuryManager { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 for sub in response.payload { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                     let mut sub : protocol::pubsub::Subscription 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                         = protobuf::parse_from_bytes(&sub).unwrap(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    let uri = sub.take_uri(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    inner.subscriptions.insert(uri, tx.clone()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    let sub_uri = sub.take_uri(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    debug!("subscribed {} ({})", uri, sub_uri); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    inner.subscriptions.insert(sub_uri, tx.clone()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -181,21 +185,29 @@ impl MercuryManager { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         let response = MercuryResponse { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             uri: header.get_uri().to_owned(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            status_code: header.get_status_code(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             payload: pending.parts, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         }; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        if cmd == 0xb5 { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            self.lock(|inner| { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                use std::collections::hash_map::Entry; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                if let Entry::Occupied(entry) = inner.subscriptions.entry(response.uri.clone()) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    // TODO: send unsub message 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    if entry.get().send(response).is_err() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        entry.remove(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if response.status_code >= 400 { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            warn!("error {} for uri {}", response.status_code, &response.uri); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if let Some(cb) = pending.callback { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                cb.complete(Err(MercuryError)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } else { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if cmd == 0xb5 { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                self.lock(|inner| { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    use std::collections::hash_map::Entry; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    if let Entry::Occupied(entry) = inner.subscriptions.entry(response.uri.clone()) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        // TODO: send unsub message 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        if entry.get().send(response).is_err() { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            entry.remove(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            }) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        } else if let Some(cb) = pending.callback { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            cb.complete(Ok(response)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                }) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } else if let Some(cb) = pending.callback { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                cb.complete(Ok(response)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 } 
			 |