__init__.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. import datetime
  2. import json
  3. import os
  4. import ssl
  5. import subprocess
  6. import sys
  7. import time
  8. import traceback
  9. import urllib.parse
  10. import urllib.request
  11. try:
  12. import Xlib.display
  13. from Xlib import X, XK
  14. except ImportError:
  15. Xlib = False
  16. """
  17. official api documentation:
  18. https://github.com/ToontownRewritten/api-doc/blob/master/login.md
  19. https://github.com/ToontownRewritten/api-doc/blob/master/invasions.md
  20. """
  21. INVASIONS_API_URL = 'https://www.toontownrewritten.com/api/invasions?format=json'
  22. LOGIN_API_URL = 'https://www.toontownrewritten.com/api/login?format=json'
  23. if sys.platform == 'darwin':
  24. TOONTOWN_LIBRARY_PATH = os.path.join(
  25. os.path.expanduser('~'), 'Library',
  26. 'Application Support', 'Toontown Rewritten',
  27. )
  28. TOONTOWN_ENGINE_DEFAULT_PATH = os.path.join(
  29. TOONTOWN_LIBRARY_PATH,
  30. 'Toontown Rewritten',
  31. )
  32. else:
  33. TOONTOWN_LIBRARY_PATH = None
  34. TOONTOWN_ENGINE_DEFAULT_PATH = None
  35. if Xlib:
  36. EXTENDED_KEYBOARD_CONTROLS_MAPPING = {
  37. XK.XK_w: XK.XK_Up,
  38. XK.XK_a: XK.XK_Left,
  39. XK.XK_s: XK.XK_Down,
  40. XK.XK_d: XK.XK_Right,
  41. }
  42. def start_engine(engine_path, gameserver, playcookie, **kwargs):
  43. env = {
  44. 'TTR_GAMESERVER': gameserver,
  45. 'TTR_PLAYCOOKIE': playcookie,
  46. }
  47. if sys.platform == 'darwin':
  48. env['DYLD_LIBRARY_PATH'] = os.path.join(
  49. TOONTOWN_LIBRARY_PATH,
  50. 'Libraries.bundle',
  51. )
  52. env['DYLD_FRAMEWORK_PATH'] = os.path.join(
  53. TOONTOWN_LIBRARY_PATH,
  54. 'Frameworks',
  55. )
  56. elif sys.platform == 'linux' and 'XAUTHORITY' in os.environ:
  57. """
  58. Fix for TTREngine reporting:
  59. > :display:x11display(error): Could not open display ":0.0".
  60. > :ToonBase: Default graphics pipe is glxGraphicsPipe (OpenGL).
  61. > :ToonBase(warning): Unable to open 'onscreen' window.
  62. > Traceback (most recent call last):
  63. > File "<compiled '__voltorbmain__'>", line 0, in <module>
  64. > [...]
  65. > File "<compiled 'direct.vlt8f63e471.ShowBase'>", line 0, in vltf05fd21b
  66. > Exception: Could not open window.
  67. """
  68. env['XAUTHORITY'] = os.environ['XAUTHORITY']
  69. return subprocess.Popen(
  70. args=[engine_path],
  71. cwd=os.path.dirname(engine_path),
  72. env=env,
  73. **kwargs,
  74. )
  75. def api_request(url, params=None, validate_ssl_cert=True):
  76. resp = urllib.request.urlopen(
  77. url=url,
  78. data=urllib.parse.urlencode(params).encode('ascii')
  79. if params else None,
  80. context=None if validate_ssl_cert
  81. else ssl._create_unverified_context(),
  82. )
  83. return json.loads(resp.read().decode('ascii'))
  84. class LoginSuccessful:
  85. def __init__(self, playcookie, gameserver):
  86. self.playcookie = playcookie
  87. self.gameserver = gameserver
  88. class LoginDelayed:
  89. def __init__(self, queue_token):
  90. self.queue_token = queue_token
  91. def login(username=None, password=None,
  92. queue_token=None, validate_ssl_cert=True):
  93. if username is not None and queue_token is None:
  94. assert password is not None
  95. req_params = {
  96. 'username': username,
  97. 'password': password,
  98. }
  99. elif username is None and queue_token is not None:
  100. req_params = {
  101. 'queueToken': queue_token,
  102. }
  103. else:
  104. raise Exception('either specify username or queue token')
  105. resp_data = api_request(
  106. url=LOGIN_API_URL,
  107. params=req_params,
  108. validate_ssl_cert=validate_ssl_cert,
  109. )
  110. if resp_data['success'] == 'true':
  111. return LoginSuccessful(
  112. playcookie=resp_data['cookie'],
  113. gameserver=resp_data['gameserver'],
  114. )
  115. elif resp_data['success'] == 'delayed':
  116. return LoginDelayed(
  117. queue_token=resp_data['queueToken'],
  118. )
  119. else:
  120. raise Exception(repr(resp_data))
  121. def x_find_window(parent_window, filter_callback):
  122. matching = []
  123. for child_window in parent_window.query_tree().children:
  124. if filter_callback(child_window):
  125. matching.append(child_window)
  126. matching += x_find_window(child_window, filter_callback)
  127. return matching
  128. def x_find_window_by_pid(display, pid):
  129. pid_prop = display.intern_atom('_NET_WM_PID')
  130. def filter_callback(window):
  131. prop = window.get_full_property(pid_prop, X.AnyPropertyType)
  132. return prop and prop.value.tolist() == [pid]
  133. return x_find_window(display.screen().root, filter_callback)
  134. def x_grab_key(grab_window, keycode, modifiers=None):
  135. if modifiers is None:
  136. modifiers = X.AnyModifier
  137. grab_window.grab_key(
  138. keycode,
  139. modifiers,
  140. # owner_events
  141. # https://stackoverflow.com/questions/32122360/x11-will-xgrabpointer-prevent-other-apps-from-any-mouse-event
  142. # False,
  143. True,
  144. X.GrabModeAsync,
  145. X.GrabModeAsync,
  146. )
  147. def wait_for_engine_window(xdisplay, engine_process, timeout_seconds=20, search_interval_seconds=2):
  148. start_epoch = time.time()
  149. while engine_process.poll() is None and (time.time() - start_epoch) <= timeout_seconds:
  150. windows = x_find_window_by_pid(xdisplay, engine_process.pid)
  151. assert len(windows) <= 1
  152. if len(windows) == 1:
  153. return windows[0]
  154. time.sleep(search_interval_seconds)
  155. return None
  156. def run_extended_keyboard_controls(engine_process):
  157. if not Xlib:
  158. raise Exception('\n'.join([
  159. 'Extended keyboard controls require xlib for python to be installed.',
  160. 'Depending on your system run',
  161. '\t$ sudo apt-get install python3-xlib',
  162. 'or',
  163. '\t$ pip3 install --user xlib',
  164. ]))
  165. xdisplay = Xlib.display.Display()
  166. engine_window = wait_for_engine_window(xdisplay, engine_process)
  167. if not engine_window:
  168. raise Exception('Could not find the game\'s window.')
  169. # TODO add toggle to switch on / off
  170. for keysym in EXTENDED_KEYBOARD_CONTROLS_MAPPING.keys():
  171. x_grab_key(
  172. engine_window,
  173. xdisplay.keysym_to_keycode(keysym),
  174. )
  175. while engine_process.poll() is None:
  176. # TODO don't block here, engine might have already been stopped
  177. xevent = xdisplay.next_event()
  178. # TODO investigate why some release events get lost
  179. if isinstance(xevent, Xlib.protocol.event.KeyPress) \
  180. or isinstance(xevent, Xlib.protocol.event.KeyRelease):
  181. keysym_in = xdisplay.keycode_to_keysym(
  182. xevent.detail,
  183. index=0,
  184. )
  185. if keysym_in in EXTENDED_KEYBOARD_CONTROLS_MAPPING:
  186. keysym_out = EXTENDED_KEYBOARD_CONTROLS_MAPPING[keysym_in]
  187. else:
  188. keysym_out = keysym_in
  189. engine_window.send_event(type(xevent)(
  190. window=engine_window,
  191. detail=xdisplay.keysym_to_keycode(keysym_out),
  192. state=0,
  193. root_x=xevent.root_x,
  194. root_y=xevent.root_y,
  195. event_x=xevent.event_x,
  196. event_y=xevent.event_y,
  197. child=xevent.child,
  198. root=xevent.root,
  199. time=xevent.time, # X.CurrentTime
  200. same_screen=xevent.same_screen,
  201. ))
  202. def launch(engine_path, username, password, validate_ssl_certs=True,
  203. cpu_limit_percent=None, enable_extended_keyboard_controls=False):
  204. result = login(
  205. username=username,
  206. password=password,
  207. validate_ssl_cert=validate_ssl_certs,
  208. )
  209. if isinstance(result, LoginDelayed):
  210. result = login(
  211. queue_token=result.queue_token,
  212. validate_ssl_cert=validate_ssl_certs,
  213. )
  214. if isinstance(result, LoginSuccessful):
  215. p = start_engine(
  216. engine_path=engine_path,
  217. gameserver=result.gameserver,
  218. playcookie=result.playcookie,
  219. )
  220. if cpu_limit_percent is not None:
  221. subprocess.Popen(args=[
  222. 'cpulimit',
  223. '--pid', str(p.pid),
  224. '--limit', str(cpu_limit_percent),
  225. # '--verbose',
  226. ])
  227. if enable_extended_keyboard_controls:
  228. try:
  229. run_extended_keyboard_controls(
  230. engine_process=p,
  231. )
  232. except Exception as e:
  233. if isinstance(e, KeyboardInterrupt):
  234. raise e
  235. else:
  236. traceback.print_exc()
  237. if p.poll() is None:
  238. p.wait()
  239. else:
  240. raise Exception(repr(result))
  241. class InvasionProgress:
  242. def __init__(self, district, date, cog_type,
  243. despawned_number, total_number):
  244. self.district = district
  245. self.date = date
  246. self.cog_type = cog_type
  247. self.despawned_number = despawned_number
  248. self.total_number = total_number
  249. @property
  250. def remaining_number(self):
  251. return self.total_number - self.despawned_number
  252. def request_active_invasions(validate_ssl_certs=True):
  253. resp_data = api_request(INVASIONS_API_URL)
  254. if resp_data['error'] is not None:
  255. raise Exception(resp_data['error'])
  256. else:
  257. invs = {}
  258. for district, inv_data in resp_data['invasions'].items():
  259. despawned_number, total_number = inv_data['progress'].split('/')
  260. invs[district] = InvasionProgress(
  261. district=district,
  262. date=datetime.datetime.utcfromtimestamp(inv_data['asOf']),
  263. cog_type=inv_data['type'],
  264. despawned_number=int(despawned_number),
  265. total_number=int(total_number),
  266. )
  267. return invs