__init__.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. import copy
  2. import datetime
  3. import json
  4. import os
  5. import ssl
  6. import subprocess
  7. import sys
  8. import time
  9. import traceback
  10. import urllib.parse
  11. import urllib.request
  12. try:
  13. import Xlib.display
  14. from Xlib import X, XK
  15. except ImportError:
  16. Xlib = False
  17. """
  18. official api documentation:
  19. https://github.com/ToontownRewritten/api-doc/blob/master/login.md
  20. https://github.com/ToontownRewritten/api-doc/blob/master/invasions.md
  21. """
  22. INVASIONS_API_URL = 'https://www.toontownrewritten.com/api/invasions?format=json'
  23. LOGIN_API_URL = 'https://www.toontownrewritten.com/api/login?format=json'
  24. if sys.platform == 'darwin':
  25. TOONTOWN_LIBRARY_PATH = os.path.join(
  26. os.path.expanduser('~'), 'Library',
  27. 'Application Support', 'Toontown Rewritten',
  28. )
  29. TOONTOWN_ENGINE_DEFAULT_PATH = os.path.join(
  30. TOONTOWN_LIBRARY_PATH,
  31. 'Toontown Rewritten',
  32. )
  33. else:
  34. TOONTOWN_LIBRARY_PATH = None
  35. TOONTOWN_ENGINE_DEFAULT_PATH = None
  36. if Xlib:
  37. EXTENDED_KEYBOARD_CONTROLS_DEFAULT_MAPPING = {
  38. XK.XK_w: XK.XK_Up,
  39. XK.XK_a: XK.XK_Left,
  40. XK.XK_s: XK.XK_Down,
  41. XK.XK_d: XK.XK_Right,
  42. }
  43. def start_engine(engine_path, gameserver, playcookie, **kwargs):
  44. env = {
  45. 'TTR_GAMESERVER': gameserver,
  46. 'TTR_PLAYCOOKIE': playcookie,
  47. }
  48. if sys.platform == 'darwin':
  49. env['DYLD_LIBRARY_PATH'] = os.path.join(
  50. TOONTOWN_LIBRARY_PATH,
  51. 'Libraries.bundle',
  52. )
  53. env['DYLD_FRAMEWORK_PATH'] = os.path.join(
  54. TOONTOWN_LIBRARY_PATH,
  55. 'Frameworks',
  56. )
  57. elif sys.platform == 'linux' and 'XAUTHORITY' in os.environ:
  58. """
  59. Fix for TTREngine reporting:
  60. > :display:x11display(error): Could not open display ":0.0".
  61. > :ToonBase: Default graphics pipe is glxGraphicsPipe (OpenGL).
  62. > :ToonBase(warning): Unable to open 'onscreen' window.
  63. > Traceback (most recent call last):
  64. > File "<compiled '__voltorbmain__'>", line 0, in <module>
  65. > [...]
  66. > File "<compiled 'direct.vlt8f63e471.ShowBase'>", line 0, in vltf05fd21b
  67. > Exception: Could not open window.
  68. """
  69. env['XAUTHORITY'] = os.environ['XAUTHORITY']
  70. return subprocess.Popen(
  71. args=[engine_path],
  72. cwd=os.path.dirname(engine_path),
  73. env=env,
  74. **kwargs,
  75. )
  76. def api_request(url, params=None, validate_ssl_cert=True):
  77. resp = urllib.request.urlopen(
  78. url=url,
  79. data=urllib.parse.urlencode(params).encode('ascii')
  80. if params else None,
  81. context=None if validate_ssl_cert
  82. else ssl._create_unverified_context(),
  83. )
  84. return json.loads(resp.read().decode('ascii'))
  85. class LoginSuccessful:
  86. def __init__(self, playcookie, gameserver):
  87. self.playcookie = playcookie
  88. self.gameserver = gameserver
  89. class LoginDelayed:
  90. def __init__(self, queue_token):
  91. self.queue_token = queue_token
  92. def login(username=None, password=None,
  93. queue_token=None, validate_ssl_cert=True):
  94. if username is not None and queue_token is None:
  95. assert password is not None
  96. req_params = {
  97. 'username': username,
  98. 'password': password,
  99. }
  100. elif username is None and queue_token is not None:
  101. req_params = {
  102. 'queueToken': queue_token,
  103. }
  104. else:
  105. raise Exception('either specify username or queue token')
  106. resp_data = api_request(
  107. url=LOGIN_API_URL,
  108. params=req_params,
  109. validate_ssl_cert=validate_ssl_cert,
  110. )
  111. if resp_data['success'] == 'true':
  112. return LoginSuccessful(
  113. playcookie=resp_data['cookie'],
  114. gameserver=resp_data['gameserver'],
  115. )
  116. elif resp_data['success'] == 'delayed':
  117. return LoginDelayed(
  118. queue_token=resp_data['queueToken'],
  119. )
  120. else:
  121. raise Exception(repr(resp_data))
  122. def x_find_window(parent_window, filter_callback):
  123. matching = []
  124. for child_window in parent_window.query_tree().children:
  125. if filter_callback(child_window):
  126. matching.append(child_window)
  127. matching += x_find_window(child_window, filter_callback)
  128. return matching
  129. def x_find_window_by_pid(display, pid):
  130. pid_prop = display.intern_atom('_NET_WM_PID')
  131. def filter_callback(window):
  132. prop = window.get_full_property(pid_prop, X.AnyPropertyType)
  133. return prop and prop.value.tolist() == [pid]
  134. return x_find_window(display.screen().root, filter_callback)
  135. class ExtendedControls:
  136. def __init__(self, engine_process, toggle_keysym_name):
  137. if not Xlib:
  138. raise Exception('\n'.join([
  139. 'Extended keyboard controls require xlib for python to be installed.',
  140. 'Depending on your system run',
  141. '\t$ sudo apt-get install python3-xlib',
  142. 'or',
  143. '\t$ pip3 install --user xlib',
  144. ]))
  145. self._engine_process = engine_process
  146. self._xdisplay = Xlib.display.Display()
  147. self._toggle_keysym = XK.string_to_keysym(toggle_keysym_name)
  148. if self._toggle_keysym == X.NoSymbol:
  149. raise Exception("Extended keyboard controls toggle:"
  150. + " Unknown keysym name '{}'".format(toggle_keysym_name))
  151. keyboard_mapping = copy.deepcopy(
  152. EXTENDED_KEYBOARD_CONTROLS_DEFAULT_MAPPING
  153. )
  154. if self._toggle_keysym in keyboard_mapping:
  155. del keyboard_mapping[self._toggle_keysym]
  156. print("INFO Extended Controls:"
  157. + " Ignoring mapping for toggle key '{}'".format(toggle_keysym_name))
  158. self._keyboard_mapping = keyboard_mapping
  159. self._engine_window = None
  160. self._enabled = False
  161. def _wait_for_engine_window(self, timeout_seconds=20, search_interval_seconds=2):
  162. start_epoch = time.time()
  163. while self._engine_process.poll() is None and (time.time() - start_epoch) <= timeout_seconds:
  164. windows = x_find_window_by_pid(
  165. self._xdisplay,
  166. self._engine_process.pid,
  167. )
  168. assert len(windows) <= 1
  169. if len(windows) == 1:
  170. return windows[0]
  171. time.sleep(search_interval_seconds)
  172. return None
  173. def run(self):
  174. self._engine_window = self._wait_for_engine_window()
  175. if not self._engine_window:
  176. raise Exception('Could not find the game\'s window.')
  177. self._grab_key(
  178. self._xdisplay.keysym_to_keycode(self._toggle_keysym),
  179. )
  180. if not self.enabled:
  181. keysym_name = XK.keysym_to_string(self._toggle_keysym)
  182. print("INFO Extended Controls are currently disabled."
  183. + " Press key '{}' to enable.".format(keysym_name))
  184. while self._engine_process.poll() is None:
  185. # TODO don't block here, engine might have already been stopped
  186. self._handle_xevent(self._xdisplay.next_event())
  187. def _handle_xevent(self, xevent):
  188. # TODO investigate why some release events get lost
  189. if isinstance(xevent, Xlib.protocol.event.KeyPress) \
  190. or isinstance(xevent, Xlib.protocol.event.KeyRelease):
  191. self._handle_xkeyevent(xevent)
  192. def _handle_xkeyevent(self, xkeyevent):
  193. # TODO map keycodes instead of keysyms
  194. keysym_in = self._xdisplay.keycode_to_keysym(
  195. xkeyevent.detail,
  196. index=0,
  197. )
  198. if keysym_in == self._toggle_keysym:
  199. if isinstance(xkeyevent, Xlib.protocol.event.KeyPress):
  200. self.toggle()
  201. else:
  202. if self.enabled and keysym_in in self._keyboard_mapping:
  203. keysym_out = self._keyboard_mapping[keysym_in]
  204. else:
  205. keysym_out = keysym_in
  206. self._engine_window.send_event(type(xkeyevent)(
  207. window=self._engine_window,
  208. detail=self._xdisplay.keysym_to_keycode(keysym_out),
  209. state=0,
  210. root_x=xkeyevent.root_x,
  211. root_y=xkeyevent.root_y,
  212. event_x=xkeyevent.event_x,
  213. event_y=xkeyevent.event_y,
  214. child=xkeyevent.child,
  215. root=xkeyevent.root,
  216. time=xkeyevent.time, # X.CurrentTime
  217. same_screen=xkeyevent.same_screen,
  218. ))
  219. def enable(self):
  220. for keysym in self._keyboard_mapping.keys():
  221. self._grab_key(
  222. self._xdisplay.keysym_to_keycode(keysym),
  223. )
  224. self._enabled = True
  225. print("INFO Enabled Extended Controls")
  226. def disable(self):
  227. for keysym in self._keyboard_mapping.keys():
  228. self._ungrab_key(
  229. self._xdisplay.keysym_to_keycode(keysym),
  230. )
  231. self._enabled = False
  232. print("INFO Disabled Extended Controls")
  233. @property
  234. def enabled(self):
  235. return self._enabled
  236. def toggle(self):
  237. if self.enabled:
  238. self.disable()
  239. else:
  240. self.enable()
  241. def _grab_key(self, keycode):
  242. self._engine_window.grab_key(
  243. keycode,
  244. X.AnyModifier,
  245. # owner_events
  246. # https://stackoverflow.com/questions/32122360/x11-will-xgrabpointer-prevent-other-apps-from-any-mouse-event
  247. # False,
  248. True,
  249. X.GrabModeAsync,
  250. X.GrabModeAsync,
  251. )
  252. def _ungrab_key(self, keycode):
  253. self._engine_window.ungrab_key(keycode, X.AnyModifier)
  254. def launch(engine_path, username, password, validate_ssl_certs=True,
  255. cpu_limit_percent=None, enable_extended_keyboard_controls=False,
  256. extended_keyboard_control_toggle_keysym_name=None):
  257. result = login(
  258. username=username,
  259. password=password,
  260. validate_ssl_cert=validate_ssl_certs,
  261. )
  262. if isinstance(result, LoginDelayed):
  263. result = login(
  264. queue_token=result.queue_token,
  265. validate_ssl_cert=validate_ssl_certs,
  266. )
  267. if isinstance(result, LoginSuccessful):
  268. p = start_engine(
  269. engine_path=engine_path,
  270. gameserver=result.gameserver,
  271. playcookie=result.playcookie,
  272. )
  273. if cpu_limit_percent is not None:
  274. subprocess.Popen(args=[
  275. 'cpulimit',
  276. '--pid', str(p.pid),
  277. '--limit', str(cpu_limit_percent),
  278. # '--verbose',
  279. ])
  280. if enable_extended_keyboard_controls:
  281. try:
  282. ExtendedControls(
  283. engine_process=p,
  284. toggle_keysym_name=extended_keyboard_control_toggle_keysym_name,
  285. ).run()
  286. except Exception as e:
  287. if isinstance(e, KeyboardInterrupt):
  288. raise e
  289. else:
  290. traceback.print_exc()
  291. if p.poll() is None:
  292. p.wait()
  293. else:
  294. raise Exception(repr(result))
  295. class InvasionProgress:
  296. def __init__(self, district, date, cog_type,
  297. despawned_number, total_number):
  298. self.district = district
  299. self.date = date
  300. self.cog_type = cog_type
  301. self.despawned_number = despawned_number
  302. self.total_number = total_number
  303. @property
  304. def remaining_number(self):
  305. return self.total_number - self.despawned_number
  306. def request_active_invasions(validate_ssl_certs=True):
  307. resp_data = api_request(INVASIONS_API_URL)
  308. if resp_data['error'] is not None:
  309. raise Exception(resp_data['error'])
  310. else:
  311. invs = {}
  312. for district, inv_data in resp_data['invasions'].items():
  313. despawned_number, total_number = inv_data['progress'].split('/')
  314. invs[district] = InvasionProgress(
  315. district=district,
  316. date=datetime.datetime.utcfromtimestamp(inv_data['asOf']),
  317. cog_type=inv_data['type'],
  318. despawned_number=int(despawned_number),
  319. total_number=int(total_number),
  320. )
  321. return invs