session.rs 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. use std::io;
  2. use std::sync::{Arc, RwLock, Weak};
  3. use std::sync::atomic::{AtomicUsize, Ordering};
  4. use std::time::{SystemTime, UNIX_EPOCH};
  5. use byteorder::{BigEndian, ByteOrder};
  6. use bytes::Bytes;
  7. use futures::{Async, Future, IntoFuture, Poll, Stream};
  8. use futures::sync::mpsc;
  9. use tokio_core::reactor::{Handle, Remote};
  10. use apresolve::apresolve_or_fallback;
  11. use audio_key::AudioKeyManager;
  12. use authentication::Credentials;
  13. use cache::Cache;
  14. use channel::ChannelManager;
  15. use component::Lazy;
  16. use config::SessionConfig;
  17. use connection;
  18. use mercury::MercuryManager;
  19. struct SessionData {
  20. country: String,
  21. time_delta: i64,
  22. canonical_username: String,
  23. invalid: bool,
  24. }
  25. struct SessionInternal {
  26. config: SessionConfig,
  27. data: RwLock<SessionData>,
  28. tx_connection: mpsc::UnboundedSender<(u8, Vec<u8>)>,
  29. audio_key: Lazy<AudioKeyManager>,
  30. channel: Lazy<ChannelManager>,
  31. mercury: Lazy<MercuryManager>,
  32. cache: Option<Arc<Cache>>,
  33. handle: Remote,
  34. session_id: usize,
  35. }
  36. static SESSION_COUNTER: AtomicUsize = AtomicUsize::new(0);
  37. #[derive(Clone)]
  38. pub struct Session(Arc<SessionInternal>);
  39. impl Session {
  40. pub fn connect(
  41. config: SessionConfig,
  42. credentials: Credentials,
  43. cache: Option<Cache>,
  44. handle: Handle,
  45. ) -> Box<Future<Item = Session, Error = io::Error>> {
  46. let access_point = apresolve_or_fallback::<io::Error>(&handle, &config.proxy, &config.ap_port);
  47. let handle_ = handle.clone();
  48. let proxy = config.proxy.clone();
  49. let connection = access_point.and_then(move |addr| {
  50. info!("Connecting to AP \"{}\"", addr);
  51. connection::connect(addr, &handle_, &proxy)
  52. });
  53. let device_id = config.device_id.clone();
  54. let authentication = connection
  55. .and_then(move |connection| connection::authenticate(connection, credentials, device_id));
  56. let result = authentication.map(move |(transport, reusable_credentials)| {
  57. info!("Authenticated as \"{}\" !", reusable_credentials.username);
  58. if let Some(ref cache) = cache {
  59. cache.save_credentials(&reusable_credentials);
  60. }
  61. let (session, task) = Session::create(
  62. &handle,
  63. transport,
  64. config,
  65. cache,
  66. reusable_credentials.username.clone(),
  67. );
  68. handle.spawn(task.map_err(|e| {
  69. error!("{:?}", e);
  70. }));
  71. session
  72. });
  73. Box::new(result)
  74. }
  75. fn create(
  76. handle: &Handle,
  77. transport: connection::Transport,
  78. config: SessionConfig,
  79. cache: Option<Cache>,
  80. username: String,
  81. ) -> (Session, Box<Future<Item = (), Error = io::Error>>) {
  82. let (sink, stream) = transport.split();
  83. let (sender_tx, sender_rx) = mpsc::unbounded();
  84. let session_id = SESSION_COUNTER.fetch_add(1, Ordering::Relaxed);
  85. debug!("new Session[{}]", session_id);
  86. let session = Session(Arc::new(SessionInternal {
  87. config: config,
  88. data: RwLock::new(SessionData {
  89. country: String::new(),
  90. canonical_username: username,
  91. invalid: false,
  92. time_delta: 0,
  93. }),
  94. tx_connection: sender_tx,
  95. cache: cache.map(Arc::new),
  96. audio_key: Lazy::new(),
  97. channel: Lazy::new(),
  98. mercury: Lazy::new(),
  99. handle: handle.remote().clone(),
  100. session_id: session_id,
  101. }));
  102. let sender_task = sender_rx
  103. .map_err(|e| -> io::Error { panic!(e) })
  104. .forward(sink)
  105. .map(|_| ());
  106. let receiver_task = DispatchTask(stream, session.weak());
  107. let task = Box::new((receiver_task, sender_task).into_future().map(|((), ())| ()));
  108. (session, task)
  109. }
  110. pub fn audio_key(&self) -> &AudioKeyManager {
  111. self.0.audio_key.get(|| AudioKeyManager::new(self.weak()))
  112. }
  113. pub fn channel(&self) -> &ChannelManager {
  114. self.0.channel.get(|| ChannelManager::new(self.weak()))
  115. }
  116. pub fn mercury(&self) -> &MercuryManager {
  117. self.0.mercury.get(|| MercuryManager::new(self.weak()))
  118. }
  119. pub fn time_delta(&self) -> i64 {
  120. self.0.data.read().unwrap().time_delta
  121. }
  122. pub fn spawn<F, R>(&self, f: F)
  123. where
  124. F: FnOnce(&Handle) -> R + Send + 'static,
  125. R: IntoFuture<Item = (), Error = ()>,
  126. R::Future: 'static,
  127. {
  128. self.0.handle.spawn(f)
  129. }
  130. fn debug_info(&self) {
  131. debug!(
  132. "Session[{}] strong={} weak={}",
  133. self.0.session_id,
  134. Arc::strong_count(&self.0),
  135. Arc::weak_count(&self.0)
  136. );
  137. }
  138. #[cfg_attr(feature = "cargo-clippy", allow(match_same_arms))]
  139. fn dispatch(&self, cmd: u8, data: Bytes) {
  140. match cmd {
  141. 0x4 => {
  142. let server_timestamp = BigEndian::read_u32(data.as_ref()) as i64;
  143. let timestamp = match SystemTime::now().duration_since(UNIX_EPOCH) {
  144. Ok(dur) => dur,
  145. Err(err) => err.duration(),
  146. }
  147. .as_secs() as i64;
  148. self.0.data.write().unwrap().time_delta = server_timestamp - timestamp;
  149. self.debug_info();
  150. self.send_packet(0x49, vec![0, 0, 0, 0]);
  151. }
  152. 0x4a => (),
  153. 0x1b => {
  154. let country = String::from_utf8(data.as_ref().to_owned()).unwrap();
  155. info!("Country: {:?}", country);
  156. self.0.data.write().unwrap().country = country;
  157. }
  158. 0x9 | 0xa => self.channel().dispatch(cmd, data),
  159. 0xd | 0xe => self.audio_key().dispatch(cmd, data),
  160. 0xb2...0xb6 => self.mercury().dispatch(cmd, data),
  161. _ => (),
  162. }
  163. }
  164. pub fn send_packet(&self, cmd: u8, data: Vec<u8>) {
  165. self.0.tx_connection.unbounded_send((cmd, data)).unwrap();
  166. }
  167. pub fn cache(&self) -> Option<&Arc<Cache>> {
  168. self.0.cache.as_ref()
  169. }
  170. fn config(&self) -> &SessionConfig {
  171. &self.0.config
  172. }
  173. pub fn username(&self) -> String {
  174. self.0.data.read().unwrap().canonical_username.clone()
  175. }
  176. pub fn country(&self) -> String {
  177. self.0.data.read().unwrap().country.clone()
  178. }
  179. pub fn device_id(&self) -> &str {
  180. &self.config().device_id
  181. }
  182. fn weak(&self) -> SessionWeak {
  183. SessionWeak(Arc::downgrade(&self.0))
  184. }
  185. pub fn session_id(&self) -> usize {
  186. self.0.session_id
  187. }
  188. pub fn shutdown(&self) {
  189. debug!("Invalidating session[{}]", self.0.session_id);
  190. self.0.data.write().unwrap().invalid = true;
  191. }
  192. pub fn is_invalid(&self) -> bool {
  193. self.0.data.read().unwrap().invalid
  194. }
  195. }
  196. #[derive(Clone)]
  197. pub struct SessionWeak(Weak<SessionInternal>);
  198. impl SessionWeak {
  199. fn try_upgrade(&self) -> Option<Session> {
  200. self.0.upgrade().map(Session)
  201. }
  202. pub(crate) fn upgrade(&self) -> Session {
  203. self.try_upgrade().expect("Session died")
  204. }
  205. }
  206. impl Drop for SessionInternal {
  207. fn drop(&mut self) {
  208. debug!("drop Session[{}]", self.session_id);
  209. }
  210. }
  211. struct DispatchTask<S>(S, SessionWeak)
  212. where
  213. S: Stream<Item = (u8, Bytes)>;
  214. impl<S> Future for DispatchTask<S>
  215. where
  216. S: Stream<Item = (u8, Bytes)>,
  217. <S as Stream>::Error: ::std::fmt::Debug,
  218. {
  219. type Item = ();
  220. type Error = S::Error;
  221. fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
  222. let session = match self.1.try_upgrade() {
  223. Some(session) => session,
  224. None => return Ok(Async::Ready(())),
  225. };
  226. loop {
  227. let (cmd, data) = match self.0.poll() {
  228. Ok(Async::Ready(t)) => t,
  229. Ok(Async::NotReady) => return Ok(Async::NotReady),
  230. Err(e) => {
  231. session.shutdown();
  232. return Err(From::from(e));
  233. }
  234. }.expect("connection closed");
  235. session.dispatch(cmd, data);
  236. }
  237. }
  238. }
  239. impl<S> Drop for DispatchTask<S>
  240. where
  241. S: Stream<Item = (u8, Bytes)>,
  242. {
  243. fn drop(&mut self) {
  244. debug!("drop Dispatch");
  245. }
  246. }