123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303 |
- import copy
- import os
- import select
- import time
- from tooncher.actions import *
- try:
- import psutil
- except ImportError:
- psutil = False
- try:
- import Xlib.display
- from Xlib import X, XK, Xatom
- except ImportError:
- Xlib = False
- EXTENDED_CONTROLS_DEFAULT_TOGGLE_KEYSYM_NAME = 'grave'
- EXTENDED_CONTROLS_PID_XPROPERTY_NAME = '_TOONCHER_EXTENDED_CONTROLS_PID'
- TOONTOWN_WINDOW_NAME = 'Toontown Rewritten'
- if Xlib:
- EXTENDED_CONTROLS_DEFAULT_KEYSYM_MAPPINGS = {
- XK.XK_w: RewriteKeyEventAction(keysym=XK.XK_Up, target_engine_index=0),
- XK.XK_a: RewriteKeyEventAction(keysym=XK.XK_Left, target_engine_index=0),
- XK.XK_s: RewriteKeyEventAction(keysym=XK.XK_Down, target_engine_index=0),
- XK.XK_d: RewriteKeyEventAction(keysym=XK.XK_Right, target_engine_index=0),
- XK.XK_Control_L: RewriteKeyEventAction(keysym=XK.XK_Control_L, target_engine_index=0),
- XK.XK_v: RewriteKeyEventAction(keysym=XK.XK_Delete, target_engine_index=0),
- XK.XK_i: RewriteKeyEventAction(keysym=XK.XK_Up, target_engine_index=1),
- XK.XK_j: RewriteKeyEventAction(keysym=XK.XK_Left, target_engine_index=1),
- XK.XK_k: RewriteKeyEventAction(keysym=XK.XK_Down, target_engine_index=1),
- XK.XK_l: RewriteKeyEventAction(keysym=XK.XK_Right, target_engine_index=1),
- XK.XK_slash: RewriteKeyEventAction(keysym=XK.XK_Control_L, target_engine_index=1),
- XK.XK_n: RewriteKeyEventAction(keysym=XK.XK_Delete, target_engine_index=1),
- XK.XK_space: RewriteKeyEventAction(keysym=XK.XK_Control_L, target_engine_index=TargetEngine.All),
- XK.XK_e: SelectGagAction(target_engine_index=0, column_index=4, factor_y=-0.047), # elephant trunk
- XK.XK_o: SelectGagAction(target_engine_index=1, column_index=4, factor_y=-0.047), # elephant trunk
- XK.XK_f: SelectGagAction(target_engine_index=0, column_index=5, factor_y=-0.047), # foghorn
- XK.XK_semicolon: SelectGagAction(target_engine_index=1, column_index=5, factor_y=-0.047), # foghorn
- }
- def x_find_window(parent_window, filter_callback):
- matching = []
- for child_window in parent_window.query_tree().children:
- if filter_callback(child_window):
- matching.append(child_window)
- matching += x_find_window(child_window, filter_callback)
- return matching
- def x_find_window_by_pid(display, pid):
- pid_prop = display.intern_atom('_NET_WM_PID')
- def filter_callback(window):
- prop = window.get_full_property(pid_prop, X.AnyPropertyType)
- return prop and prop.value.tolist() == [pid]
- return x_find_window(display.screen().root, filter_callback)
- def x_wait_for_event(xdisplay, timeout_seconds):
- """ Wait up to `timeout_seconds` seconds for a xevent.
- Return True, if a xevent is available.
- Return False, if the timeout was reached. """
- rlist = select.select(
- [xdisplay.display.socket], # rlist
- [], # wlist
- [], # xlist
- timeout_seconds, # timeout [seconds]
- )[0]
- return len(rlist) > 0
- class ExtendedControls:
- def __init__(self, primary_engine_pid, primary_engine_window_name=None,
- toggle_keysym_name=EXTENDED_CONTROLS_DEFAULT_TOGGLE_KEYSYM_NAME):
- if not psutil:
- raise Exception('\n'.join([
- 'Extended keyboard controls require the python lib psutil to be installed.',
- 'Depending on your system run',
- '\t$ sudo apt-get install python3-psutil',
- 'or',
- '\t$ pip3 install --user psutil',
- ]))
- if not Xlib:
- raise Exception('\n'.join([
- 'Extended keyboard controls require xlib for python to be installed.',
- 'Depending on your system run',
- '\t$ sudo apt-get install python3-xlib',
- 'or',
- '\t$ pip3 install --user xlib',
- ]))
- self._primary_engine_pid = primary_engine_pid
- self._primary_engine_window_name = primary_engine_window_name
- self._xdisplay = Xlib.display.Display()
- self._toggle_keysym = XK.string_to_keysym(toggle_keysym_name)
- if self._toggle_keysym == X.NoSymbol:
- raise Exception("Extended keyboard controls toggle:"
- + " Unknown keysym name '{}'".format(toggle_keysym_name))
- self._keysym_mappings = copy.deepcopy(
- EXTENDED_CONTROLS_DEFAULT_KEYSYM_MAPPINGS,
- )
- if self._toggle_keysym in self._keysym_mappings:
- print("INFO Extended Controls:"
- + " Ignoring mapping for toggle key '{}'".format(toggle_keysym_name))
- self._keysym_mappings[self._toggle_keysym] \
- = ToggleExtendedControlsAction()
- self._default_action = ForwardKeyEventAction()
- self._primary_engine_window = None
- self._engine_windows_by_target_index = None
- self._active_key_registry = {}
- self._enabled = False
- @property
- def engine_running(self):
- return psutil.pid_exists(self._primary_engine_pid)
- @property
- def xdisplay(self):
- return self._xdisplay
- @property
- def primary_engine_window(self):
- return self._primary_engine_window
- def _wait_for_engine_window(self, timeout_seconds=20, search_interval_seconds=2):
- start_epoch = time.time()
- while self.engine_running and (time.time() - start_epoch) <= timeout_seconds:
- windows = x_find_window_by_pid(
- self._xdisplay,
- self._primary_engine_pid,
- )
- assert len(windows) <= 1
- if len(windows) == 1:
- return windows[0]
- time.sleep(search_interval_seconds)
- return None
- def run(self):
- self._primary_engine_window = self._wait_for_engine_window()
- if not self._primary_engine_window:
- raise Exception('Could not find the game\'s window.')
- self._grab_key(
- self._xdisplay.keysym_to_keycode(self._toggle_keysym),
- )
- self._primary_engine_window.change_property(
- self.xdisplay.intern_atom(EXTENDED_CONTROLS_PID_XPROPERTY_NAME),
- Xatom.CARDINAL,
- format=32,
- data=[os.getpid()],
- mode=X.PropModeReplace,
- )
- if self._primary_engine_window_name:
- self._primary_engine_window.set_wm_name(
- self._primary_engine_window_name,
- )
- print("INFO Changed engine's window name to {!r}".format(
- self._primary_engine_window_name,
- ))
- if not self.enabled:
- keysym_name = XK.keysym_to_string(self._toggle_keysym)
- print("INFO Extended Controls are currently disabled."
- + " Press key '{}' to enable.".format(keysym_name))
- while self.engine_running:
- while self.xdisplay.pending_events():
- self._handle_xevent(self._xdisplay.next_event())
- self._check_active_key_registry()
- # keep timeout low for _check_active_key_registry()
- # to be called frequently
- x_wait_for_event(self.xdisplay, timeout_seconds=0.05)
- def _handle_xevent(self, xevent):
- if isinstance(xevent, Xlib.protocol.event.KeyPress) \
- or isinstance(xevent, Xlib.protocol.event.KeyRelease):
- self._handle_xkeyevent(xevent)
- def _handle_xkeyevent(self, xkeyevent):
- self._update_active_key_registry(xkeyevent)
- keysym_in = self._xdisplay.keycode_to_keysym(
- xkeyevent.detail,
- index=0,
- )
- if keysym_in in self._keysym_mappings:
- action = self._keysym_mappings[keysym_in]
- else:
- action = self._default_action
- action.execute(self, xkeyevent)
- def enable(self):
- for keysym in self._keysym_mappings.keys():
- if keysym != self._toggle_keysym:
- self._grab_key(
- self._xdisplay.keysym_to_keycode(keysym),
- )
- self._enabled = True
- # reset cache
- self._engine_windows_by_target_index = None
- print("INFO Enabled Extended Controls")
- def disable(self):
- for keysym in self._keysym_mappings.keys():
- if keysym != self._toggle_keysym:
- self._ungrab_key(
- self._xdisplay.keysym_to_keycode(keysym),
- )
- self._enabled = False
- print("INFO Disabled Extended Controls")
- @property
- def enabled(self):
- return self._enabled
- def toggle(self):
- if self.enabled:
- self.disable()
- else:
- self.enable()
- def _grab_key(self, keycode):
- self.primary_engine_window.grab_key(
- keycode,
- X.AnyModifier,
- # owner_events
- # https://stackoverflow.com/questions/32122360/x11-will-xgrabpointer-prevent-other-apps-from-any-mouse-event
- # False,
- True,
- X.GrabModeAsync,
- X.GrabModeAsync,
- )
- def _ungrab_key(self, keycode):
- self.primary_engine_window.ungrab_key(keycode, X.AnyModifier)
- def find_engine_windows(self):
- controls_xprop = self.xdisplay.intern_atom(
- EXTENDED_CONTROLS_PID_XPROPERTY_NAME,
- )
- return x_find_window(
- self.xdisplay.screen().root,
- lambda w: w.get_wm_name() == TOONTOWN_WINDOW_NAME
- or w.get_full_property(controls_xprop, X.AnyPropertyType),
- )
- @property
- def engine_windows_by_target_index(self):
- if not self._engine_windows_by_target_index:
- win_by_index = {}
- for target_index, win in enumerate(self.find_engine_windows()):
- print('INFO Engine window {} has no target index, assuming {}'.format(
- win.id,
- target_index,
- ))
- if not target_index in win_by_index:
- win_by_index[target_index] = []
- win_by_index[target_index].append(win)
- self._engine_windows_by_target_index = win_by_index
- return self._engine_windows_by_target_index
- @property
- def engine_windows(self):
- return [w for g in self.engine_windows_by_target_index.values() for w in g]
- def _update_active_key_registry(self, xkeyevent):
- # see self._check_active_key_registry
- keycode = xkeyevent.detail
- if isinstance(xkeyevent, Xlib.protocol.event.KeyPress):
- self._active_key_registry[keycode] = xkeyevent
- elif keycode in self._active_key_registry:
- del self._active_key_registry[keycode]
- def _check_active_key_registry(self):
- """
- WORKAROUND
- For an unknown reason some key release events don't get queued
- when multiple keys are being released simultaneously.
- So we keep a hashmap of supposedly currently pressed keys
- and periodically compare it with xdispaly.query_keymap().
- ref: https://stackoverflow.com/q/18160792/5894777
- """
- # https://tronche.com/gui/x/xlib/input/XQueryKeymap.html
- keymap = self.xdisplay.query_keymap()
- missed_releases = []
- for keycode, press_event in self._active_key_registry.items():
- byte_index = keycode >> 3
- bit_index = keycode & ((1 << 3) - 1)
- if not keymap[byte_index] & (1 << bit_index):
- print("DEBUG missed release event of key {}".format(keycode))
- missed_releases.append(Xlib.protocol.event.KeyRelease(
- window=press_event.window,
- detail=press_event.detail,
- state=press_event.state,
- root_x=press_event.root_x,
- root_y=press_event.root_y,
- event_x=press_event.event_x,
- event_y=press_event.event_y,
- child=press_event.child,
- root=press_event.root,
- time=X.CurrentTime,
- same_screen=press_event.same_screen,
- ))
- for release_event in missed_releases:
- self._handle_xkeyevent(release_event)
|