123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302 |
- import datetime
- import json
- import os
- import ssl
- import subprocess
- import sys
- import time
- import traceback
- import urllib.parse
- import urllib.request
- try:
- import Xlib.display
- from Xlib import X, XK
- except ImportError:
- Xlib = False
- """
- official api documentation:
- https://github.com/ToontownRewritten/api-doc/blob/master/login.md
- https://github.com/ToontownRewritten/api-doc/blob/master/invasions.md
- """
- INVASIONS_API_URL = 'https://www.toontownrewritten.com/api/invasions?format=json'
- LOGIN_API_URL = 'https://www.toontownrewritten.com/api/login?format=json'
- if sys.platform == 'darwin':
- TOONTOWN_LIBRARY_PATH = os.path.join(
- os.path.expanduser('~'), 'Library',
- 'Application Support', 'Toontown Rewritten',
- )
- TOONTOWN_ENGINE_DEFAULT_PATH = os.path.join(
- TOONTOWN_LIBRARY_PATH,
- 'Toontown Rewritten',
- )
- else:
- TOONTOWN_LIBRARY_PATH = None
- TOONTOWN_ENGINE_DEFAULT_PATH = None
- if Xlib:
- EXTENDED_KEYBOARD_CONTROLS_MAPPING = {
- XK.XK_w: XK.XK_Up,
- XK.XK_a: XK.XK_Left,
- XK.XK_s: XK.XK_Down,
- XK.XK_d: XK.XK_Right,
- }
- def start_engine(engine_path, gameserver, playcookie, **kwargs):
- env = {
- 'TTR_GAMESERVER': gameserver,
- 'TTR_PLAYCOOKIE': playcookie,
- }
- if sys.platform == 'darwin':
- env['DYLD_LIBRARY_PATH'] = os.path.join(
- TOONTOWN_LIBRARY_PATH,
- 'Libraries.bundle',
- )
- env['DYLD_FRAMEWORK_PATH'] = os.path.join(
- TOONTOWN_LIBRARY_PATH,
- 'Frameworks',
- )
- elif sys.platform == 'linux' and 'XAUTHORITY' in os.environ:
- """
- Fix for TTREngine reporting:
- > :display:x11display(error): Could not open display ":0.0".
- > :ToonBase: Default graphics pipe is glxGraphicsPipe (OpenGL).
- > :ToonBase(warning): Unable to open 'onscreen' window.
- > Traceback (most recent call last):
- > File "<compiled '__voltorbmain__'>", line 0, in <module>
- > [...]
- > File "<compiled 'direct.vlt8f63e471.ShowBase'>", line 0, in vltf05fd21b
- > Exception: Could not open window.
- """
- env['XAUTHORITY'] = os.environ['XAUTHORITY']
- return subprocess.Popen(
- args=[engine_path],
- cwd=os.path.dirname(engine_path),
- env=env,
- **kwargs,
- )
- def api_request(url, params=None, validate_ssl_cert=True):
- resp = urllib.request.urlopen(
- url=url,
- data=urllib.parse.urlencode(params).encode('ascii')
- if params else None,
- context=None if validate_ssl_cert
- else ssl._create_unverified_context(),
- )
- return json.loads(resp.read().decode('ascii'))
- class LoginSuccessful:
- def __init__(self, playcookie, gameserver):
- self.playcookie = playcookie
- self.gameserver = gameserver
- class LoginDelayed:
- def __init__(self, queue_token):
- self.queue_token = queue_token
- def login(username=None, password=None,
- queue_token=None, validate_ssl_cert=True):
- if username is not None and queue_token is None:
- assert password is not None
- req_params = {
- 'username': username,
- 'password': password,
- }
- elif username is None and queue_token is not None:
- req_params = {
- 'queueToken': queue_token,
- }
- else:
- raise Exception('either specify username or queue token')
- resp_data = api_request(
- url=LOGIN_API_URL,
- params=req_params,
- validate_ssl_cert=validate_ssl_cert,
- )
- if resp_data['success'] == 'true':
- return LoginSuccessful(
- playcookie=resp_data['cookie'],
- gameserver=resp_data['gameserver'],
- )
- elif resp_data['success'] == 'delayed':
- return LoginDelayed(
- queue_token=resp_data['queueToken'],
- )
- else:
- raise Exception(repr(resp_data))
- def x_find_window(parent_window, filter_callback):
- matching = []
- for child_window in parent_window.query_tree().children:
- if filter_callback(child_window):
- matching.append(child_window)
- matching += x_find_window(child_window, filter_callback)
- return matching
- def x_find_window_by_pid(display, pid):
- pid_prop = display.intern_atom('_NET_WM_PID')
- def filter_callback(window):
- prop = window.get_full_property(pid_prop, X.AnyPropertyType)
- return prop and prop.value.tolist() == [pid]
- return x_find_window(display.screen().root, filter_callback)
- def x_grab_key(grab_window, keycode, modifiers=None):
- if modifiers is None:
- modifiers = X.AnyModifier
- grab_window.grab_key(
- keycode,
- modifiers,
- # owner_events
- # https://stackoverflow.com/questions/32122360/x11-will-xgrabpointer-prevent-other-apps-from-any-mouse-event
- # False,
- True,
- X.GrabModeAsync,
- X.GrabModeAsync,
- )
- def wait_for_engine_window(xdisplay, engine_process, timeout_seconds=20, search_interval_seconds=2):
- start_epoch = time.time()
- while engine_process.poll() is None and (time.time() - start_epoch) <= timeout_seconds:
- windows = x_find_window_by_pid(xdisplay, engine_process.pid)
- assert len(windows) <= 1
- if len(windows) == 1:
- return windows[0]
- time.sleep(search_interval_seconds)
- return None
- def run_extended_keyboard_controls(engine_process):
- if not Xlib:
- raise Exception('\n'.join([
- 'Extended keyboard controls require xlib for python to be installed.',
- 'Depending on your system run',
- '\t$ sudo apt-get install python3-xlib',
- 'or',
- '\t$ pip3 install --user xlib',
- ]))
- xdisplay = Xlib.display.Display()
- engine_window = wait_for_engine_window(xdisplay, engine_process)
- if not engine_window:
- raise Exception('Could not find the game\'s window.')
- # TODO add toggle to switch on / off
- for keysym in EXTENDED_KEYBOARD_CONTROLS_MAPPING.keys():
- x_grab_key(
- engine_window,
- xdisplay.keysym_to_keycode(keysym),
- )
- while engine_process.poll() is None:
- # TODO don't block here, engine might have already been stopped
- xevent = xdisplay.next_event()
- # TODO investigate why some release events get lost
- if isinstance(xevent, Xlib.protocol.event.KeyPress) \
- or isinstance(xevent, Xlib.protocol.event.KeyRelease):
- keysym_in = xdisplay.keycode_to_keysym(
- xevent.detail,
- index=0,
- )
- if keysym_in in EXTENDED_KEYBOARD_CONTROLS_MAPPING:
- keysym_out = EXTENDED_KEYBOARD_CONTROLS_MAPPING[keysym_in]
- else:
- keysym_out = keysym_in
- engine_window.send_event(type(xevent)(
- window=engine_window,
- detail=xdisplay.keysym_to_keycode(keysym_out),
- state=0,
- root_x=xevent.root_x,
- root_y=xevent.root_y,
- event_x=xevent.event_x,
- event_y=xevent.event_y,
- child=xevent.child,
- root=xevent.root,
- time=xevent.time, # X.CurrentTime
- same_screen=xevent.same_screen,
- ))
- def launch(engine_path, username, password, validate_ssl_certs=True,
- cpu_limit_percent=None, enable_extended_keyboard_controls=False):
- result = login(
- username=username,
- password=password,
- validate_ssl_cert=validate_ssl_certs,
- )
- if isinstance(result, LoginDelayed):
- result = login(
- queue_token=result.queue_token,
- validate_ssl_cert=validate_ssl_certs,
- )
- if isinstance(result, LoginSuccessful):
- p = start_engine(
- engine_path=engine_path,
- gameserver=result.gameserver,
- playcookie=result.playcookie,
- )
- if cpu_limit_percent is not None:
- subprocess.Popen(args=[
- 'cpulimit',
- '--pid', str(p.pid),
- '--limit', str(cpu_limit_percent),
- # '--verbose',
- ])
- if enable_extended_keyboard_controls:
- try:
- run_extended_keyboard_controls(
- engine_process=p,
- )
- except Exception as e:
- if isinstance(e, KeyboardInterrupt):
- raise e
- else:
- traceback.print_exc()
- if p.poll() is None:
- p.wait()
- else:
- raise Exception(repr(result))
- class InvasionProgress:
- def __init__(self, district, date, cog_type,
- despawned_number, total_number):
- self.district = district
- self.date = date
- self.cog_type = cog_type
- self.despawned_number = despawned_number
- self.total_number = total_number
- @property
- def remaining_number(self):
- return self.total_number - self.despawned_number
- def request_active_invasions(validate_ssl_certs=True):
- resp_data = api_request(INVASIONS_API_URL)
- if resp_data['error'] is not None:
- raise Exception(resp_data['error'])
- else:
- invs = {}
- for district, inv_data in resp_data['invasions'].items():
- despawned_number, total_number = inv_data['progress'].split('/')
- invs[district] = InvasionProgress(
- district=district,
- date=datetime.datetime.utcfromtimestamp(inv_data['asOf']),
- cog_type=inv_data['type'],
- despawned_number=int(despawned_number),
- total_number=int(total_number),
- )
- return invs
|