__init__.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. import copy
  2. import datetime
  3. import json
  4. import os
  5. import ssl
  6. import subprocess
  7. import sys
  8. import time
  9. import traceback
  10. import urllib.parse
  11. import urllib.request
  12. try:
  13. import Xlib.display
  14. from Xlib import X, XK
  15. except ImportError:
  16. Xlib = False
  17. """
  18. official api documentation:
  19. https://github.com/ToontownRewritten/api-doc/blob/master/login.md
  20. https://github.com/ToontownRewritten/api-doc/blob/master/invasions.md
  21. """
  22. INVASIONS_API_URL = 'https://www.toontownrewritten.com/api/invasions?format=json'
  23. LOGIN_API_URL = 'https://www.toontownrewritten.com/api/login?format=json'
  24. if sys.platform == 'darwin':
  25. TOONTOWN_LIBRARY_PATH = os.path.join(
  26. os.path.expanduser('~'), 'Library',
  27. 'Application Support', 'Toontown Rewritten',
  28. )
  29. TOONTOWN_ENGINE_DEFAULT_PATH = os.path.join(
  30. TOONTOWN_LIBRARY_PATH,
  31. 'Toontown Rewritten',
  32. )
  33. else:
  34. TOONTOWN_LIBRARY_PATH = None
  35. TOONTOWN_ENGINE_DEFAULT_PATH = None
  36. if Xlib:
  37. EXTENDED_KEYBOARD_CONTROLS_DEFAULT_MAPPING = {
  38. XK.XK_w: XK.XK_Up,
  39. XK.XK_a: XK.XK_Left,
  40. XK.XK_s: XK.XK_Down,
  41. XK.XK_d: XK.XK_Right,
  42. }
  43. def start_engine(engine_path, gameserver, playcookie, **kwargs):
  44. env = {
  45. 'TTR_GAMESERVER': gameserver,
  46. 'TTR_PLAYCOOKIE': playcookie,
  47. }
  48. if sys.platform == 'darwin':
  49. env['DYLD_LIBRARY_PATH'] = os.path.join(
  50. TOONTOWN_LIBRARY_PATH,
  51. 'Libraries.bundle',
  52. )
  53. env['DYLD_FRAMEWORK_PATH'] = os.path.join(
  54. TOONTOWN_LIBRARY_PATH,
  55. 'Frameworks',
  56. )
  57. elif sys.platform == 'linux' and 'XAUTHORITY' in os.environ:
  58. """
  59. Fix for TTREngine reporting:
  60. > :display:x11display(error): Could not open display ":0.0".
  61. > :ToonBase: Default graphics pipe is glxGraphicsPipe (OpenGL).
  62. > :ToonBase(warning): Unable to open 'onscreen' window.
  63. > Traceback (most recent call last):
  64. > File "<compiled '__voltorbmain__'>", line 0, in <module>
  65. > [...]
  66. > File "<compiled 'direct.vlt8f63e471.ShowBase'>", line 0, in vltf05fd21b
  67. > Exception: Could not open window.
  68. """
  69. env['XAUTHORITY'] = os.environ['XAUTHORITY']
  70. return subprocess.Popen(
  71. args=[engine_path],
  72. cwd=os.path.dirname(engine_path),
  73. env=env,
  74. **kwargs,
  75. )
  76. def api_request(url, params=None, validate_ssl_cert=True):
  77. resp = urllib.request.urlopen(
  78. url=url,
  79. data=urllib.parse.urlencode(params).encode('ascii')
  80. if params else None,
  81. context=None if validate_ssl_cert
  82. else ssl._create_unverified_context(),
  83. )
  84. return json.loads(resp.read().decode('ascii'))
  85. class LoginSuccessful:
  86. def __init__(self, playcookie, gameserver):
  87. self.playcookie = playcookie
  88. self.gameserver = gameserver
  89. class LoginDelayed:
  90. def __init__(self, queue_token):
  91. self.queue_token = queue_token
  92. def login(username=None, password=None,
  93. queue_token=None, validate_ssl_cert=True):
  94. if username is not None and queue_token is None:
  95. assert password is not None
  96. req_params = {
  97. 'username': username,
  98. 'password': password,
  99. }
  100. elif username is None and queue_token is not None:
  101. req_params = {
  102. 'queueToken': queue_token,
  103. }
  104. else:
  105. raise Exception('either specify username or queue token')
  106. resp_data = api_request(
  107. url=LOGIN_API_URL,
  108. params=req_params,
  109. validate_ssl_cert=validate_ssl_cert,
  110. )
  111. if resp_data['success'] == 'true':
  112. return LoginSuccessful(
  113. playcookie=resp_data['cookie'],
  114. gameserver=resp_data['gameserver'],
  115. )
  116. elif resp_data['success'] == 'delayed':
  117. return LoginDelayed(
  118. queue_token=resp_data['queueToken'],
  119. )
  120. else:
  121. raise Exception(repr(resp_data))
  122. def x_find_window(parent_window, filter_callback):
  123. matching = []
  124. for child_window in parent_window.query_tree().children:
  125. if filter_callback(child_window):
  126. matching.append(child_window)
  127. matching += x_find_window(child_window, filter_callback)
  128. return matching
  129. def x_find_window_by_pid(display, pid):
  130. pid_prop = display.intern_atom('_NET_WM_PID')
  131. def filter_callback(window):
  132. prop = window.get_full_property(pid_prop, X.AnyPropertyType)
  133. return prop and prop.value.tolist() == [pid]
  134. return x_find_window(display.screen().root, filter_callback)
  135. def x_grab_key(grab_window, keycode, modifiers=None):
  136. if modifiers is None:
  137. modifiers = X.AnyModifier
  138. grab_window.grab_key(
  139. keycode,
  140. modifiers,
  141. # owner_events
  142. # https://stackoverflow.com/questions/32122360/x11-will-xgrabpointer-prevent-other-apps-from-any-mouse-event
  143. # False,
  144. True,
  145. X.GrabModeAsync,
  146. X.GrabModeAsync,
  147. )
  148. def x_grab_keysym(xdisplay, grab_window, keysym, modifiers=None):
  149. x_grab_key(
  150. grab_window=grab_window,
  151. keycode=xdisplay.keysym_to_keycode(keysym),
  152. modifiers=modifiers,
  153. )
  154. def x_ungrab_keysym(xdisplay, grab_window, keysym, modifiers=None):
  155. if modifiers is None:
  156. modifiers = X.AnyModifier
  157. grab_window.ungrab_key(
  158. xdisplay.keysym_to_keycode(keysym),
  159. modifiers,
  160. )
  161. def wait_for_engine_window(xdisplay, engine_process, timeout_seconds=20, search_interval_seconds=2):
  162. start_epoch = time.time()
  163. while engine_process.poll() is None and (time.time() - start_epoch) <= timeout_seconds:
  164. windows = x_find_window_by_pid(xdisplay, engine_process.pid)
  165. assert len(windows) <= 1
  166. if len(windows) == 1:
  167. return windows[0]
  168. time.sleep(search_interval_seconds)
  169. return None
  170. def _enable_extended_controls(xdisplay, engine_window, keyboard_mapping):
  171. for keysym in keyboard_mapping.keys():
  172. x_grab_keysym(xdisplay, engine_window, keysym)
  173. def _disable_extended_controls(xdisplay, engine_window, keyboard_mapping):
  174. for keysym in keyboard_mapping.keys():
  175. x_ungrab_keysym(xdisplay, engine_window, keysym)
  176. def run_extended_keyboard_controls(engine_process, toggle_keysym_name):
  177. if not Xlib:
  178. raise Exception('\n'.join([
  179. 'Extended keyboard controls require xlib for python to be installed.',
  180. 'Depending on your system run',
  181. '\t$ sudo apt-get install python3-xlib',
  182. 'or',
  183. '\t$ pip3 install --user xlib',
  184. ]))
  185. xdisplay = Xlib.display.Display()
  186. engine_window = wait_for_engine_window(xdisplay, engine_process)
  187. if not engine_window:
  188. raise Exception('Could not find the game\'s window.')
  189. toggle_keysym = XK.string_to_keysym(toggle_keysym_name)
  190. if toggle_keysym == X.NoSymbol:
  191. raise Exception("Extended keyboard controls toggle:"
  192. + " Unknown keysym name '{}'".format(toggle_keysym_name))
  193. x_grab_keysym(xdisplay, engine_window, toggle_keysym)
  194. keyboard_mapping = copy.deepcopy(
  195. EXTENDED_KEYBOARD_CONTROLS_DEFAULT_MAPPING
  196. )
  197. if toggle_keysym in keyboard_mapping:
  198. del keyboard_mapping[toggle_keysym]
  199. print("INFO Extended Controls:"
  200. + " Ignoring mapping for toggle key '{}'".format(toggle_keysym_name))
  201. print("INFO Extended Controls are currently off."
  202. + " Press key '{}' to enable.".format(toggle_keysym_name))
  203. controls_enabled = False
  204. while engine_process.poll() is None:
  205. # TODO don't block here, engine might have already been stopped
  206. xevent = xdisplay.next_event()
  207. # TODO investigate why some release events get lost
  208. if isinstance(xevent, Xlib.protocol.event.KeyPress) \
  209. or isinstance(xevent, Xlib.protocol.event.KeyRelease):
  210. keysym_in = xdisplay.keycode_to_keysym(
  211. xevent.detail,
  212. index=0,
  213. )
  214. if keysym_in == toggle_keysym:
  215. if isinstance(xevent, Xlib.protocol.event.KeyPress):
  216. if controls_enabled:
  217. _disable_extended_controls(
  218. xdisplay,
  219. engine_window,
  220. keyboard_mapping,
  221. )
  222. else:
  223. _enable_extended_controls(
  224. xdisplay,
  225. engine_window,
  226. keyboard_mapping,
  227. )
  228. controls_enabled = not controls_enabled
  229. print("INFO {} Extended Controls".format(
  230. "Enabled" if controls_enabled else "Disabled",
  231. ))
  232. else:
  233. if controls_enabled and keysym_in in keyboard_mapping:
  234. keysym_out = keyboard_mapping[keysym_in]
  235. else:
  236. keysym_out = keysym_in
  237. engine_window.send_event(type(xevent)(
  238. window=engine_window,
  239. detail=xdisplay.keysym_to_keycode(keysym_out),
  240. state=0,
  241. root_x=xevent.root_x,
  242. root_y=xevent.root_y,
  243. event_x=xevent.event_x,
  244. event_y=xevent.event_y,
  245. child=xevent.child,
  246. root=xevent.root,
  247. time=xevent.time, # X.CurrentTime
  248. same_screen=xevent.same_screen,
  249. ))
  250. def launch(engine_path, username, password, validate_ssl_certs=True,
  251. cpu_limit_percent=None, enable_extended_keyboard_controls=False,
  252. extended_keyboard_control_toggle_keysym_name=None):
  253. result = login(
  254. username=username,
  255. password=password,
  256. validate_ssl_cert=validate_ssl_certs,
  257. )
  258. if isinstance(result, LoginDelayed):
  259. result = login(
  260. queue_token=result.queue_token,
  261. validate_ssl_cert=validate_ssl_certs,
  262. )
  263. if isinstance(result, LoginSuccessful):
  264. p = start_engine(
  265. engine_path=engine_path,
  266. gameserver=result.gameserver,
  267. playcookie=result.playcookie,
  268. )
  269. if cpu_limit_percent is not None:
  270. subprocess.Popen(args=[
  271. 'cpulimit',
  272. '--pid', str(p.pid),
  273. '--limit', str(cpu_limit_percent),
  274. # '--verbose',
  275. ])
  276. if enable_extended_keyboard_controls:
  277. try:
  278. run_extended_keyboard_controls(
  279. engine_process=p,
  280. toggle_keysym_name=extended_keyboard_control_toggle_keysym_name,
  281. )
  282. except Exception as e:
  283. if isinstance(e, KeyboardInterrupt):
  284. raise e
  285. else:
  286. traceback.print_exc()
  287. if p.poll() is None:
  288. p.wait()
  289. else:
  290. raise Exception(repr(result))
  291. class InvasionProgress:
  292. def __init__(self, district, date, cog_type,
  293. despawned_number, total_number):
  294. self.district = district
  295. self.date = date
  296. self.cog_type = cog_type
  297. self.despawned_number = despawned_number
  298. self.total_number = total_number
  299. @property
  300. def remaining_number(self):
  301. return self.total_number - self.despawned_number
  302. def request_active_invasions(validate_ssl_certs=True):
  303. resp_data = api_request(INVASIONS_API_URL)
  304. if resp_data['error'] is not None:
  305. raise Exception(resp_data['error'])
  306. else:
  307. invs = {}
  308. for district, inv_data in resp_data['invasions'].items():
  309. despawned_number, total_number = inv_data['progress'].split('/')
  310. invs[district] = InvasionProgress(
  311. district=district,
  312. date=datetime.datetime.utcfromtimestamp(inv_data['asOf']),
  313. cog_type=inv_data['type'],
  314. despawned_number=int(despawned_number),
  315. total_number=int(total_number),
  316. )
  317. return invs