session.rs 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. use std::io;
  2. use std::sync::atomic::{AtomicUsize, Ordering};
  3. use std::sync::{Arc, RwLock, Weak};
  4. use std::time::{SystemTime, UNIX_EPOCH};
  5. use byteorder::{BigEndian, ByteOrder};
  6. use bytes::Bytes;
  7. use futures::sync::mpsc;
  8. use futures::{Async, Future, IntoFuture, Poll, Stream};
  9. use tokio_core::reactor::{Handle, Remote};
  10. use crate::apresolve::apresolve_or_fallback;
  11. use crate::audio_key::AudioKeyManager;
  12. use crate::authentication::Credentials;
  13. use crate::cache::Cache;
  14. use crate::channel::ChannelManager;
  15. use crate::component::Lazy;
  16. use crate::config::SessionConfig;
  17. use crate::connection;
  18. use crate::mercury::MercuryManager;
  19. struct SessionData {
  20. country: String,
  21. time_delta: i64,
  22. canonical_username: String,
  23. invalid: bool,
  24. }
  25. struct SessionInternal {
  26. config: SessionConfig,
  27. data: RwLock<SessionData>,
  28. tx_connection: mpsc::UnboundedSender<(u8, Vec<u8>)>,
  29. audio_key: Lazy<AudioKeyManager>,
  30. channel: Lazy<ChannelManager>,
  31. mercury: Lazy<MercuryManager>,
  32. cache: Option<Arc<Cache>>,
  33. handle: Remote,
  34. session_id: usize,
  35. }
  36. static SESSION_COUNTER: AtomicUsize = AtomicUsize::new(0);
  37. #[derive(Clone)]
  38. pub struct Session(Arc<SessionInternal>);
  39. impl Session {
  40. pub fn connect(
  41. config: SessionConfig,
  42. credentials: Credentials,
  43. cache: Option<Cache>,
  44. handle: Handle,
  45. ) -> Box<dyn Future<Item = Session, Error = io::Error>> {
  46. let access_point =
  47. apresolve_or_fallback::<io::Error>(&handle, &config.proxy, &config.ap_port);
  48. let handle_ = handle.clone();
  49. let proxy = config.proxy.clone();
  50. let connection = access_point.and_then(move |addr| {
  51. info!("Connecting to AP \"{}\"", addr);
  52. connection::connect(addr, &handle_, &proxy)
  53. });
  54. let device_id = config.device_id.clone();
  55. let authentication = connection.and_then(move |connection| {
  56. connection::authenticate(connection, credentials, device_id)
  57. });
  58. let result = authentication.map(move |(transport, reusable_credentials)| {
  59. info!("Authenticated as \"{}\" !", reusable_credentials.username);
  60. if let Some(ref cache) = cache {
  61. cache.save_credentials(&reusable_credentials);
  62. }
  63. let (session, task) = Session::create(
  64. &handle,
  65. transport,
  66. config,
  67. cache,
  68. reusable_credentials.username.clone(),
  69. );
  70. handle.spawn(task.map_err(|e| {
  71. error!("{:?}", e);
  72. }));
  73. session
  74. });
  75. Box::new(result)
  76. }
  77. fn create(
  78. handle: &Handle,
  79. transport: connection::Transport,
  80. config: SessionConfig,
  81. cache: Option<Cache>,
  82. username: String,
  83. ) -> (Session, Box<dyn Future<Item = (), Error = io::Error>>) {
  84. let (sink, stream) = transport.split();
  85. let (sender_tx, sender_rx) = mpsc::unbounded();
  86. let session_id = SESSION_COUNTER.fetch_add(1, Ordering::Relaxed);
  87. debug!("new Session[{}]", session_id);
  88. let session = Session(Arc::new(SessionInternal {
  89. config: config,
  90. data: RwLock::new(SessionData {
  91. country: String::new(),
  92. canonical_username: username,
  93. invalid: false,
  94. time_delta: 0,
  95. }),
  96. tx_connection: sender_tx,
  97. cache: cache.map(Arc::new),
  98. audio_key: Lazy::new(),
  99. channel: Lazy::new(),
  100. mercury: Lazy::new(),
  101. handle: handle.remote().clone(),
  102. session_id: session_id,
  103. }));
  104. let sender_task = sender_rx
  105. .map_err(|e| -> io::Error { panic!(e) })
  106. .forward(sink)
  107. .map(|_| ());
  108. let receiver_task = DispatchTask(stream, session.weak());
  109. let task = Box::new(
  110. (receiver_task, sender_task)
  111. .into_future()
  112. .map(|((), ())| ()),
  113. );
  114. (session, task)
  115. }
  116. pub fn audio_key(&self) -> &AudioKeyManager {
  117. self.0.audio_key.get(|| AudioKeyManager::new(self.weak()))
  118. }
  119. pub fn channel(&self) -> &ChannelManager {
  120. self.0.channel.get(|| ChannelManager::new(self.weak()))
  121. }
  122. pub fn mercury(&self) -> &MercuryManager {
  123. self.0.mercury.get(|| MercuryManager::new(self.weak()))
  124. }
  125. pub fn time_delta(&self) -> i64 {
  126. self.0.data.read().unwrap().time_delta
  127. }
  128. pub fn spawn<F, R>(&self, f: F)
  129. where
  130. F: FnOnce(&Handle) -> R + Send + 'static,
  131. R: IntoFuture<Item = (), Error = ()>,
  132. R::Future: 'static,
  133. {
  134. self.0.handle.spawn(f)
  135. }
  136. fn debug_info(&self) {
  137. debug!(
  138. "Session[{}] strong={} weak={}",
  139. self.0.session_id,
  140. Arc::strong_count(&self.0),
  141. Arc::weak_count(&self.0)
  142. );
  143. }
  144. #[cfg_attr(feature = "cargo-clippy", allow(match_same_arms))]
  145. fn dispatch(&self, cmd: u8, data: Bytes) {
  146. match cmd {
  147. 0x4 => {
  148. let server_timestamp = BigEndian::read_u32(data.as_ref()) as i64;
  149. let timestamp = match SystemTime::now().duration_since(UNIX_EPOCH) {
  150. Ok(dur) => dur,
  151. Err(err) => err.duration(),
  152. }
  153. .as_secs() as i64;
  154. self.0.data.write().unwrap().time_delta = server_timestamp - timestamp;
  155. self.debug_info();
  156. self.send_packet(0x49, vec![0, 0, 0, 0]);
  157. }
  158. 0x4a => (),
  159. 0x1b => {
  160. let country = String::from_utf8(data.as_ref().to_owned()).unwrap();
  161. info!("Country: {:?}", country);
  162. self.0.data.write().unwrap().country = country;
  163. }
  164. 0x9 | 0xa => self.channel().dispatch(cmd, data),
  165. 0xd | 0xe => self.audio_key().dispatch(cmd, data),
  166. 0xb2..=0xb6 => self.mercury().dispatch(cmd, data),
  167. _ => (),
  168. }
  169. }
  170. pub fn send_packet(&self, cmd: u8, data: Vec<u8>) {
  171. self.0.tx_connection.unbounded_send((cmd, data)).unwrap();
  172. }
  173. pub fn cache(&self) -> Option<&Arc<Cache>> {
  174. self.0.cache.as_ref()
  175. }
  176. fn config(&self) -> &SessionConfig {
  177. &self.0.config
  178. }
  179. pub fn username(&self) -> String {
  180. self.0.data.read().unwrap().canonical_username.clone()
  181. }
  182. pub fn country(&self) -> String {
  183. self.0.data.read().unwrap().country.clone()
  184. }
  185. pub fn device_id(&self) -> &str {
  186. &self.config().device_id
  187. }
  188. fn weak(&self) -> SessionWeak {
  189. SessionWeak(Arc::downgrade(&self.0))
  190. }
  191. pub fn session_id(&self) -> usize {
  192. self.0.session_id
  193. }
  194. pub fn shutdown(&self) {
  195. debug!("Invalidating session[{}]", self.0.session_id);
  196. self.0.data.write().unwrap().invalid = true;
  197. }
  198. pub fn is_invalid(&self) -> bool {
  199. self.0.data.read().unwrap().invalid
  200. }
  201. }
  202. #[derive(Clone)]
  203. pub struct SessionWeak(Weak<SessionInternal>);
  204. impl SessionWeak {
  205. fn try_upgrade(&self) -> Option<Session> {
  206. self.0.upgrade().map(Session)
  207. }
  208. pub(crate) fn upgrade(&self) -> Session {
  209. self.try_upgrade().expect("Session died")
  210. }
  211. }
  212. impl Drop for SessionInternal {
  213. fn drop(&mut self) {
  214. debug!("drop Session[{}]", self.session_id);
  215. }
  216. }
  217. struct DispatchTask<S>(S, SessionWeak)
  218. where
  219. S: Stream<Item = (u8, Bytes)>;
  220. impl<S> Future for DispatchTask<S>
  221. where
  222. S: Stream<Item = (u8, Bytes)>,
  223. <S as Stream>::Error: ::std::fmt::Debug,
  224. {
  225. type Item = ();
  226. type Error = S::Error;
  227. fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
  228. let session = match self.1.try_upgrade() {
  229. Some(session) => session,
  230. None => return Ok(Async::Ready(())),
  231. };
  232. loop {
  233. let (cmd, data) = match self.0.poll() {
  234. Ok(Async::Ready(t)) => t,
  235. Ok(Async::NotReady) => return Ok(Async::NotReady),
  236. Err(e) => {
  237. session.shutdown();
  238. return Err(From::from(e));
  239. }
  240. }
  241. .expect("connection closed");
  242. session.dispatch(cmd, data);
  243. }
  244. }
  245. }
  246. impl<S> Drop for DispatchTask<S>
  247. where
  248. S: Stream<Item = (u8, Bytes)>,
  249. {
  250. fn drop(&mut self) {
  251. debug!("drop Dispatch");
  252. }
  253. }