session.rs 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. use bytes::Bytes;
  2. use futures::sync::mpsc;
  3. use futures::{Async, Future, IntoFuture, Poll, Stream};
  4. use std::io;
  5. use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
  6. use std::sync::{Arc, RwLock, Weak};
  7. use tokio_core::reactor::{Handle, Remote};
  8. use apresolve::apresolve_or_fallback;
  9. use authentication::Credentials;
  10. use cache::Cache;
  11. use component::Lazy;
  12. use config::SessionConfig;
  13. use connection;
  14. use audio_key::AudioKeyManager;
  15. use channel::ChannelManager;
  16. use mercury::MercuryManager;
  17. struct SessionData {
  18. country: String,
  19. canonical_username: String,
  20. invalid: bool,
  21. }
  22. struct SessionInternal {
  23. config: SessionConfig,
  24. data: RwLock<SessionData>,
  25. tx_connection: mpsc::UnboundedSender<(u8, Vec<u8>)>,
  26. audio_key: Lazy<AudioKeyManager>,
  27. channel: Lazy<ChannelManager>,
  28. mercury: Lazy<MercuryManager>,
  29. cache: Option<Arc<Cache>>,
  30. handle: Remote,
  31. session_id: usize,
  32. }
  33. static SESSION_COUNTER: AtomicUsize = ATOMIC_USIZE_INIT;
  34. #[derive(Clone)]
  35. pub struct Session(Arc<SessionInternal>);
  36. impl Session {
  37. pub fn connect(
  38. config: SessionConfig,
  39. credentials: Credentials,
  40. cache: Option<Cache>,
  41. handle: Handle,
  42. ) -> Box<Future<Item = Session, Error = io::Error>> {
  43. let access_point = apresolve_or_fallback::<io::Error>(&handle, &config.proxy);
  44. let handle_ = handle.clone();
  45. let proxy = config.proxy.clone();
  46. let connection = access_point.and_then(move |addr| {
  47. info!("Connecting to AP \"{}\"", addr);
  48. connection::connect(addr, &handle_, &proxy)
  49. });
  50. let device_id = config.device_id.clone();
  51. let authentication = connection
  52. .and_then(move |connection| connection::authenticate(connection, credentials, device_id));
  53. let result = authentication.map(move |(transport, reusable_credentials)| {
  54. info!("Authenticated as \"{}\" !", reusable_credentials.username);
  55. if let Some(ref cache) = cache {
  56. cache.save_credentials(&reusable_credentials);
  57. }
  58. let (session, task) = Session::create(
  59. &handle,
  60. transport,
  61. config,
  62. cache,
  63. reusable_credentials.username.clone(),
  64. );
  65. handle.spawn(task.map_err(|e| {
  66. error!("{:?}", e);
  67. }));
  68. session
  69. });
  70. Box::new(result)
  71. }
  72. fn create(
  73. handle: &Handle,
  74. transport: connection::Transport,
  75. config: SessionConfig,
  76. cache: Option<Cache>,
  77. username: String,
  78. ) -> (Session, Box<Future<Item = (), Error = io::Error>>) {
  79. let (sink, stream) = transport.split();
  80. let (sender_tx, sender_rx) = mpsc::unbounded();
  81. let session_id = SESSION_COUNTER.fetch_add(1, Ordering::Relaxed);
  82. debug!("new Session[{}]", session_id);
  83. let session = Session(Arc::new(SessionInternal {
  84. config: config,
  85. data: RwLock::new(SessionData {
  86. country: String::new(),
  87. canonical_username: username,
  88. invalid: false,
  89. }),
  90. tx_connection: sender_tx,
  91. cache: cache.map(Arc::new),
  92. audio_key: Lazy::new(),
  93. channel: Lazy::new(),
  94. mercury: Lazy::new(),
  95. handle: handle.remote().clone(),
  96. session_id: session_id,
  97. }));
  98. let sender_task = sender_rx
  99. .map_err(|e| -> io::Error { panic!(e) })
  100. .forward(sink)
  101. .map(|_| ());
  102. let receiver_task = DispatchTask(stream, session.weak());
  103. let task = Box::new((receiver_task, sender_task).into_future().map(|((), ())| ()));
  104. (session, task)
  105. }
  106. pub fn audio_key(&self) -> &AudioKeyManager {
  107. self.0.audio_key.get(|| AudioKeyManager::new(self.weak()))
  108. }
  109. pub fn channel(&self) -> &ChannelManager {
  110. self.0.channel.get(|| ChannelManager::new(self.weak()))
  111. }
  112. pub fn mercury(&self) -> &MercuryManager {
  113. self.0.mercury.get(|| MercuryManager::new(self.weak()))
  114. }
  115. pub fn spawn<F, R>(&self, f: F)
  116. where
  117. F: FnOnce(&Handle) -> R + Send + 'static,
  118. R: IntoFuture<Item = (), Error = ()>,
  119. R::Future: 'static,
  120. {
  121. self.0.handle.spawn(f)
  122. }
  123. fn debug_info(&self) {
  124. debug!(
  125. "Session[{}] strong={} weak={}",
  126. self.0.session_id,
  127. Arc::strong_count(&self.0),
  128. Arc::weak_count(&self.0)
  129. );
  130. }
  131. #[cfg_attr(feature = "cargo-clippy", allow(match_same_arms))]
  132. fn dispatch(&self, cmd: u8, data: Bytes) {
  133. match cmd {
  134. 0x4 => {
  135. self.debug_info();
  136. self.send_packet(0x49, data.as_ref().to_owned());
  137. }
  138. 0x4a => (),
  139. 0x1b => {
  140. let country = String::from_utf8(data.as_ref().to_owned()).unwrap();
  141. info!("Country: {:?}", country);
  142. self.0.data.write().unwrap().country = country;
  143. }
  144. 0x9 | 0xa => self.channel().dispatch(cmd, data),
  145. 0xd | 0xe => self.audio_key().dispatch(cmd, data),
  146. 0xb2...0xb6 => self.mercury().dispatch(cmd, data),
  147. _ => (),
  148. }
  149. }
  150. pub fn send_packet(&self, cmd: u8, data: Vec<u8>) {
  151. self.0.tx_connection.unbounded_send((cmd, data)).unwrap();
  152. }
  153. pub fn cache(&self) -> Option<&Arc<Cache>> {
  154. self.0.cache.as_ref()
  155. }
  156. fn config(&self) -> &SessionConfig {
  157. &self.0.config
  158. }
  159. pub fn username(&self) -> String {
  160. self.0.data.read().unwrap().canonical_username.clone()
  161. }
  162. pub fn country(&self) -> String {
  163. self.0.data.read().unwrap().country.clone()
  164. }
  165. pub fn device_id(&self) -> &str {
  166. &self.config().device_id
  167. }
  168. fn weak(&self) -> SessionWeak {
  169. SessionWeak(Arc::downgrade(&self.0))
  170. }
  171. pub fn session_id(&self) -> usize {
  172. self.0.session_id
  173. }
  174. pub fn shutdown(&self) {
  175. debug!("Invalidating session[{}]", self.0.session_id);
  176. self.0.data.write().unwrap().invalid = true;
  177. }
  178. pub fn is_invalid(&self) -> bool {
  179. self.0.data.read().unwrap().invalid
  180. }
  181. }
  182. #[derive(Clone)]
  183. pub struct SessionWeak(Weak<SessionInternal>);
  184. impl SessionWeak {
  185. fn try_upgrade(&self) -> Option<Session> {
  186. self.0.upgrade().map(Session)
  187. }
  188. pub(crate) fn upgrade(&self) -> Session {
  189. self.try_upgrade().expect("Session died")
  190. }
  191. }
  192. impl Drop for SessionInternal {
  193. fn drop(&mut self) {
  194. debug!("drop Session[{}]", self.session_id);
  195. }
  196. }
  197. struct DispatchTask<S>(S, SessionWeak)
  198. where
  199. S: Stream<Item = (u8, Bytes)>;
  200. impl<S> Future for DispatchTask<S>
  201. where
  202. S: Stream<Item = (u8, Bytes)>,
  203. <S as Stream>::Error: ::std::fmt::Debug,
  204. {
  205. type Item = ();
  206. type Error = S::Error;
  207. fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
  208. let session = match self.1.try_upgrade() {
  209. Some(session) => session,
  210. None => return Ok(Async::Ready(())),
  211. };
  212. loop {
  213. let (cmd, data) = match self.0.poll() {
  214. Ok(Async::Ready(t)) => t,
  215. Ok(Async::NotReady) => return Ok(Async::NotReady),
  216. Err(e) => {
  217. session.shutdown();
  218. return Err(From::from(e));
  219. }
  220. }.expect("connection closed");
  221. session.dispatch(cmd, data);
  222. }
  223. }
  224. }
  225. impl<S> Drop for DispatchTask<S>
  226. where
  227. S: Stream<Item = (u8, Bytes)>,
  228. {
  229. fn drop(&mut self) {
  230. debug!("drop Dispatch");
  231. }
  232. }