import copy import datetime import json import os import ssl import subprocess import sys import time import traceback import urllib.parse import urllib.request try: import Xlib.display from Xlib import X, XK except ImportError: Xlib = False """ official api documentation: https://github.com/ToontownRewritten/api-doc/blob/master/login.md https://github.com/ToontownRewritten/api-doc/blob/master/invasions.md """ INVASIONS_API_URL = 'https://www.toontownrewritten.com/api/invasions?format=json' LOGIN_API_URL = 'https://www.toontownrewritten.com/api/login?format=json' if sys.platform == 'darwin': TOONTOWN_LIBRARY_PATH = os.path.join( os.path.expanduser('~'), 'Library', 'Application Support', 'Toontown Rewritten', ) TOONTOWN_ENGINE_DEFAULT_PATH = os.path.join( TOONTOWN_LIBRARY_PATH, 'Toontown Rewritten', ) else: TOONTOWN_LIBRARY_PATH = None TOONTOWN_ENGINE_DEFAULT_PATH = None if Xlib: EXTENDED_KEYBOARD_CONTROLS_DEFAULT_MAPPING = { XK.XK_w: XK.XK_Up, XK.XK_a: XK.XK_Left, XK.XK_s: XK.XK_Down, XK.XK_d: XK.XK_Right, } def start_engine(engine_path, gameserver, playcookie, **kwargs): env = { 'TTR_GAMESERVER': gameserver, 'TTR_PLAYCOOKIE': playcookie, } if sys.platform == 'darwin': env['DYLD_LIBRARY_PATH'] = os.path.join( TOONTOWN_LIBRARY_PATH, 'Libraries.bundle', ) env['DYLD_FRAMEWORK_PATH'] = os.path.join( TOONTOWN_LIBRARY_PATH, 'Frameworks', ) elif sys.platform == 'linux' and 'XAUTHORITY' in os.environ: """ Fix for TTREngine reporting: > :display:x11display(error): Could not open display ":0.0". > :ToonBase: Default graphics pipe is glxGraphicsPipe (OpenGL). > :ToonBase(warning): Unable to open 'onscreen' window. > Traceback (most recent call last): > File "", line 0, in > [...] > File "", line 0, in vltf05fd21b > Exception: Could not open window. """ env['XAUTHORITY'] = os.environ['XAUTHORITY'] return subprocess.Popen( args=[engine_path], cwd=os.path.dirname(engine_path), env=env, **kwargs, ) def api_request(url, params=None, validate_ssl_cert=True): resp = urllib.request.urlopen( url=url, data=urllib.parse.urlencode(params).encode('ascii') if params else None, context=None if validate_ssl_cert else ssl._create_unverified_context(), ) return json.loads(resp.read().decode('ascii')) class LoginSuccessful: def __init__(self, playcookie, gameserver): self.playcookie = playcookie self.gameserver = gameserver class LoginDelayed: def __init__(self, queue_token): self.queue_token = queue_token def login(username=None, password=None, queue_token=None, validate_ssl_cert=True): if username is not None and queue_token is None: assert password is not None req_params = { 'username': username, 'password': password, } elif username is None and queue_token is not None: req_params = { 'queueToken': queue_token, } else: raise Exception('either specify username or queue token') resp_data = api_request( url=LOGIN_API_URL, params=req_params, validate_ssl_cert=validate_ssl_cert, ) if resp_data['success'] == 'true': return LoginSuccessful( playcookie=resp_data['cookie'], gameserver=resp_data['gameserver'], ) elif resp_data['success'] == 'delayed': return LoginDelayed( queue_token=resp_data['queueToken'], ) else: raise Exception(repr(resp_data)) def x_find_window(parent_window, filter_callback): matching = [] for child_window in parent_window.query_tree().children: if filter_callback(child_window): matching.append(child_window) matching += x_find_window(child_window, filter_callback) return matching def x_find_window_by_pid(display, pid): pid_prop = display.intern_atom('_NET_WM_PID') def filter_callback(window): prop = window.get_full_property(pid_prop, X.AnyPropertyType) return prop and prop.value.tolist() == [pid] return x_find_window(display.screen().root, filter_callback) def wait_for_engine_window(xdisplay, engine_process, timeout_seconds=20, search_interval_seconds=2): start_epoch = time.time() while engine_process.poll() is None and (time.time() - start_epoch) <= timeout_seconds: windows = x_find_window_by_pid(xdisplay, engine_process.pid) assert len(windows) <= 1 if len(windows) == 1: return windows[0] time.sleep(search_interval_seconds) return None class ExtendedControls: def __init__(self, engine_process, toggle_keysym_name): if not Xlib: raise Exception('\n'.join([ 'Extended keyboard controls require xlib for python to be installed.', 'Depending on your system run', '\t$ sudo apt-get install python3-xlib', 'or', '\t$ pip3 install --user xlib', ])) self._engine_process = engine_process self._xdisplay = Xlib.display.Display() self._toggle_keysym = XK.string_to_keysym(toggle_keysym_name) if self._toggle_keysym == X.NoSymbol: raise Exception("Extended keyboard controls toggle:" + " Unknown keysym name '{}'".format(toggle_keysym_name)) keyboard_mapping = copy.deepcopy( EXTENDED_KEYBOARD_CONTROLS_DEFAULT_MAPPING ) if self._toggle_keysym in keyboard_mapping: del keyboard_mapping[self._toggle_keysym] print("INFO Extended Controls:" + " Ignoring mapping for toggle key '{}'".format(toggle_keysym_name)) self._keyboard_mapping = keyboard_mapping self._engine_window = None self._enabled = False def run(self): self._engine_window = wait_for_engine_window( self._xdisplay, self._engine_process, ) if not self._engine_window: raise Exception('Could not find the game\'s window.') self._grab_key( self._xdisplay.keysym_to_keycode(self._toggle_keysym), ) if not self.enabled: keysym_name = XK.keysym_to_string(self._toggle_keysym) print("INFO Extended Controls are currently disabled." + " Press key '{}' to enable.".format(keysym_name)) while self._engine_process.poll() is None: # TODO don't block here, engine might have already been stopped self._handle_xevent(self._xdisplay.next_event()) def _handle_xevent(self, xevent): # TODO investigate why some release events get lost if isinstance(xevent, Xlib.protocol.event.KeyPress) \ or isinstance(xevent, Xlib.protocol.event.KeyRelease): self._handle_xkeyevent(xevent) def _handle_xkeyevent(self, xkeyevent): # TODO map keycodes instead of keysyms keysym_in = self._xdisplay.keycode_to_keysym( xkeyevent.detail, index=0, ) if keysym_in == self._toggle_keysym: if isinstance(xkeyevent, Xlib.protocol.event.KeyPress): self.toggle() else: if self.enabled and keysym_in in self._keyboard_mapping: keysym_out = self._keyboard_mapping[keysym_in] else: keysym_out = keysym_in self._engine_window.send_event(type(xkeyevent)( window=self._engine_window, detail=self._xdisplay.keysym_to_keycode(keysym_out), state=0, root_x=xkeyevent.root_x, root_y=xkeyevent.root_y, event_x=xkeyevent.event_x, event_y=xkeyevent.event_y, child=xkeyevent.child, root=xkeyevent.root, time=xkeyevent.time, # X.CurrentTime same_screen=xkeyevent.same_screen, )) def enable(self): for keysym in self._keyboard_mapping.keys(): self._grab_key( self._xdisplay.keysym_to_keycode(keysym), ) self._enabled = True print("INFO Enabled Extended Controls") def disable(self): for keysym in self._keyboard_mapping.keys(): self._ungrab_key( self._xdisplay.keysym_to_keycode(keysym), ) self._enabled = False print("INFO Disabled Extended Controls") @property def enabled(self): return self._enabled def toggle(self): if self.enabled: self.disable() else: self.enable() def _grab_key(self, keycode): self._engine_window.grab_key( keycode, X.AnyModifier, # owner_events # https://stackoverflow.com/questions/32122360/x11-will-xgrabpointer-prevent-other-apps-from-any-mouse-event # False, True, X.GrabModeAsync, X.GrabModeAsync, ) def _ungrab_key(self, keycode): self._engine_window.ungrab_key(keycode, X.AnyModifier) def launch(engine_path, username, password, validate_ssl_certs=True, cpu_limit_percent=None, enable_extended_keyboard_controls=False, extended_keyboard_control_toggle_keysym_name=None): result = login( username=username, password=password, validate_ssl_cert=validate_ssl_certs, ) if isinstance(result, LoginDelayed): result = login( queue_token=result.queue_token, validate_ssl_cert=validate_ssl_certs, ) if isinstance(result, LoginSuccessful): p = start_engine( engine_path=engine_path, gameserver=result.gameserver, playcookie=result.playcookie, ) if cpu_limit_percent is not None: subprocess.Popen(args=[ 'cpulimit', '--pid', str(p.pid), '--limit', str(cpu_limit_percent), # '--verbose', ]) if enable_extended_keyboard_controls: try: ExtendedControls( engine_process=p, toggle_keysym_name=extended_keyboard_control_toggle_keysym_name, ).run() except Exception as e: if isinstance(e, KeyboardInterrupt): raise e else: traceback.print_exc() if p.poll() is None: p.wait() else: raise Exception(repr(result)) class InvasionProgress: def __init__(self, district, date, cog_type, despawned_number, total_number): self.district = district self.date = date self.cog_type = cog_type self.despawned_number = despawned_number self.total_number = total_number @property def remaining_number(self): return self.total_number - self.despawned_number def request_active_invasions(validate_ssl_certs=True): resp_data = api_request(INVASIONS_API_URL) if resp_data['error'] is not None: raise Exception(resp_data['error']) else: invs = {} for district, inv_data in resp_data['invasions'].items(): despawned_number, total_number = inv_data['progress'].split('/') invs[district] = InvasionProgress( district=district, date=datetime.datetime.utcfromtimestamp(inv_data['asOf']), cog_type=inv_data['type'], despawned_number=int(despawned_number), total_number=int(total_number), ) return invs