|
@@ -1,19 +1,19 @@
|
|
|
use bytes::Bytes;
|
|
|
use crypto::digest::Digest;
|
|
|
use crypto::sha1::Sha1;
|
|
|
+use futures::{Async, Future, IntoFuture, Poll, Stream};
|
|
|
use futures::sync::mpsc;
|
|
|
-use futures::{Future, Stream, IntoFuture, Poll, Async};
|
|
|
use std::io;
|
|
|
-use std::sync::{RwLock, Arc, Weak};
|
|
|
+use std::sync::{Arc, RwLock, Weak};
|
|
|
+use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
|
|
|
use tokio_core::reactor::{Handle, Remote};
|
|
|
-use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
|
|
|
|
|
|
use apresolve::apresolve_or_fallback;
|
|
|
use authentication::Credentials;
|
|
|
use cache::Cache;
|
|
|
use component::Lazy;
|
|
|
-use connection;
|
|
|
use config::SessionConfig;
|
|
|
+use connection;
|
|
|
|
|
|
use audio_key::AudioKeyManager;
|
|
|
use channel::ChannelManager;
|
|
@@ -40,7 +40,7 @@ pub struct SessionInternal {
|
|
|
session_id: usize,
|
|
|
}
|
|
|
|
|
|
-static SESSION_COUNTER : AtomicUsize = ATOMIC_USIZE_INIT;
|
|
|
+static SESSION_COUNTER: AtomicUsize = ATOMIC_USIZE_INIT;
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
pub struct Session(pub Arc<SessionInternal>);
|
|
@@ -52,13 +52,14 @@ pub fn device_id(name: &str) -> String {
|
|
|
}
|
|
|
|
|
|
impl Session {
|
|
|
- pub fn connect(config: SessionConfig, credentials: Credentials,
|
|
|
- cache: Option<Cache>, handle: Handle)
|
|
|
- -> Box<Future<Item=Session, Error=io::Error>>
|
|
|
- {
|
|
|
+ pub fn connect(
|
|
|
+ config: SessionConfig,
|
|
|
+ credentials: Credentials,
|
|
|
+ cache: Option<Cache>,
|
|
|
+ handle: Handle,
|
|
|
+ ) -> Box<Future<Item = Session, Error = io::Error>> {
|
|
|
let access_point = apresolve_or_fallback::<io::Error>(&handle);
|
|
|
|
|
|
-
|
|
|
let handle_ = handle.clone();
|
|
|
let connection = access_point.and_then(move |addr| {
|
|
|
info!("Connecting to AP \"{}\"", addr);
|
|
@@ -66,9 +67,8 @@ impl Session {
|
|
|
});
|
|
|
|
|
|
let device_id = config.device_id.clone();
|
|
|
- let authentication = connection.and_then(move |connection| {
|
|
|
- connection::authenticate(connection, credentials, device_id)
|
|
|
- });
|
|
|
+ let authentication = connection
|
|
|
+ .and_then(move |connection| connection::authenticate(connection, credentials, device_id));
|
|
|
|
|
|
let result = authentication.map(move |(transport, reusable_credentials)| {
|
|
|
info!("Authenticated as \"{}\" !", reusable_credentials.username);
|
|
@@ -77,21 +77,28 @@ impl Session {
|
|
|
}
|
|
|
|
|
|
let (session, task) = Session::create(
|
|
|
- &handle, transport, config, cache, reusable_credentials.username.clone()
|
|
|
+ &handle,
|
|
|
+ transport,
|
|
|
+ config,
|
|
|
+ cache,
|
|
|
+ reusable_credentials.username.clone(),
|
|
|
);
|
|
|
|
|
|
handle.spawn(task.map_err(|e| panic!(e)));
|
|
|
|
|
|
session
|
|
|
});
|
|
|
-
|
|
|
+
|
|
|
Box::new(result)
|
|
|
}
|
|
|
|
|
|
- fn create(handle: &Handle, transport: connection::Transport,
|
|
|
- config: SessionConfig, cache: Option<Cache>, username: String)
|
|
|
- -> (Session, Box<Future<Item = (), Error = io::Error>>)
|
|
|
- {
|
|
|
+ fn create(
|
|
|
+ handle: &Handle,
|
|
|
+ transport: connection::Transport,
|
|
|
+ config: SessionConfig,
|
|
|
+ cache: Option<Cache>,
|
|
|
+ username: String,
|
|
|
+ ) -> (Session, Box<Future<Item = (), Error = io::Error>>) {
|
|
|
let (sink, stream) = transport.split();
|
|
|
|
|
|
let (sender_tx, sender_rx) = mpsc::unbounded();
|
|
@@ -121,11 +128,15 @@ impl Session {
|
|
|
|
|
|
let sender_task = sender_rx
|
|
|
.map_err(|e| -> io::Error { panic!(e) })
|
|
|
- .forward(sink).map(|_| ());
|
|
|
+ .forward(sink)
|
|
|
+ .map(|_| ());
|
|
|
let receiver_task = DispatchTask(stream, session.weak());
|
|
|
|
|
|
- let task = Box::new((receiver_task, sender_task).into_future()
|
|
|
- .map(|((), ())| ()));
|
|
|
+ let task = Box::new(
|
|
|
+ (receiver_task, sender_task)
|
|
|
+ .into_future()
|
|
|
+ .map(|((), ())| ()),
|
|
|
+ );
|
|
|
|
|
|
(session, task)
|
|
|
}
|
|
@@ -143,16 +154,21 @@ impl Session {
|
|
|
}
|
|
|
|
|
|
pub fn spawn<F, R>(&self, f: F)
|
|
|
- where F: FnOnce(&Handle) -> R + Send + 'static,
|
|
|
- R: IntoFuture<Item=(), Error=()>,
|
|
|
- R::Future: 'static
|
|
|
+ where
|
|
|
+ F: FnOnce(&Handle) -> R + Send + 'static,
|
|
|
+ R: IntoFuture<Item = (), Error = ()>,
|
|
|
+ R::Future: 'static,
|
|
|
{
|
|
|
self.0.handle.spawn(f)
|
|
|
}
|
|
|
|
|
|
fn debug_info(&self) {
|
|
|
- debug!("Session[{}] strong={} weak={}",
|
|
|
- self.0.session_id, Arc::strong_count(&self.0), Arc::weak_count(&self.0));
|
|
|
+ debug!(
|
|
|
+ "Session[{}] strong={} weak={}",
|
|
|
+ self.0.session_id,
|
|
|
+ Arc::strong_count(&self.0),
|
|
|
+ Arc::weak_count(&self.0)
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
#[cfg_attr(feature = "cargo-clippy", allow(match_same_arms))]
|
|
@@ -161,7 +177,7 @@ impl Session {
|
|
|
0x4 => {
|
|
|
self.debug_info();
|
|
|
self.send_packet(0x49, data.as_ref().to_owned());
|
|
|
- },
|
|
|
+ }
|
|
|
0x4a => (),
|
|
|
0x1b => {
|
|
|
let country = String::from_utf8(data.as_ref().to_owned()).unwrap();
|
|
@@ -229,10 +245,12 @@ impl Drop for SessionInternal {
|
|
|
}
|
|
|
|
|
|
struct DispatchTask<S>(S, SessionWeak)
|
|
|
- where S: Stream<Item = (u8, Bytes)>;
|
|
|
+where
|
|
|
+ S: Stream<Item = (u8, Bytes)>;
|
|
|
|
|
|
-impl <S> Future for DispatchTask<S>
|
|
|
- where S: Stream<Item = (u8, Bytes)>
|
|
|
+impl<S> Future for DispatchTask<S>
|
|
|
+where
|
|
|
+ S: Stream<Item = (u8, Bytes)>,
|
|
|
{
|
|
|
type Item = ();
|
|
|
type Error = S::Error;
|
|
@@ -240,9 +258,7 @@ impl <S> Future for DispatchTask<S>
|
|
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
|
|
let session = match self.1.try_upgrade() {
|
|
|
Some(session) => session,
|
|
|
- None => {
|
|
|
- return Ok(Async::Ready(()))
|
|
|
- },
|
|
|
+ None => return Ok(Async::Ready(())),
|
|
|
};
|
|
|
|
|
|
loop {
|
|
@@ -252,8 +268,9 @@ impl <S> Future for DispatchTask<S>
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl <S> Drop for DispatchTask<S>
|
|
|
- where S: Stream<Item = (u8, Bytes)>
|
|
|
+impl<S> Drop for DispatchTask<S>
|
|
|
+where
|
|
|
+ S: Stream<Item = (u8, Bytes)>,
|
|
|
{
|
|
|
fn drop(&mut self) {
|
|
|
debug!("drop Dispatch");
|