import copy import os import select import time from tooncher.actions import * try: import psutil except ImportError: psutil = False try: import Xlib.display from Xlib import X, XK, Xatom except ImportError: Xlib = False EXTENDED_CONTROLS_DEFAULT_TOGGLE_KEYSYM_NAME = "grave" EXTENDED_CONTROLS_PID_XPROPERTY_NAME = "_TOONCHER_EXTENDED_CONTROLS_PID" TOONTOWN_WINDOW_NAME = "Toontown Rewritten" if Xlib: EXTENDED_CONTROLS_DEFAULT_KEYSYM_MAPPINGS = { XK.XK_w: RewriteKeyEventAction(keysym=XK.XK_Up, target_engine_index=0), XK.XK_a: RewriteKeyEventAction(keysym=XK.XK_Left, target_engine_index=0), XK.XK_s: RewriteKeyEventAction(keysym=XK.XK_Down, target_engine_index=0), XK.XK_d: RewriteKeyEventAction(keysym=XK.XK_Right, target_engine_index=0), XK.XK_Control_L: RewriteKeyEventAction( keysym=XK.XK_Control_L, target_engine_index=0 ), XK.XK_v: RewriteKeyEventAction(keysym=XK.XK_Delete, target_engine_index=0), XK.XK_i: RewriteKeyEventAction(keysym=XK.XK_Up, target_engine_index=1), XK.XK_j: RewriteKeyEventAction(keysym=XK.XK_Left, target_engine_index=1), XK.XK_k: RewriteKeyEventAction(keysym=XK.XK_Down, target_engine_index=1), XK.XK_l: RewriteKeyEventAction(keysym=XK.XK_Right, target_engine_index=1), XK.XK_slash: RewriteKeyEventAction( keysym=XK.XK_Control_L, target_engine_index=1 ), XK.XK_n: RewriteKeyEventAction(keysym=XK.XK_Delete, target_engine_index=1), XK.XK_space: RewriteKeyEventAction( keysym=XK.XK_Control_L, target_engine_index=TargetEngine.All ), XK.XK_e: SelectGagAction( target_engine_index=0, column_index=4, factor_y=-0.047 ), # elephant trunk XK.XK_o: SelectGagAction( target_engine_index=1, column_index=4, factor_y=-0.047 ), # elephant trunk XK.XK_f: SelectGagAction( target_engine_index=0, column_index=5, factor_y=-0.047 ), # foghorn XK.XK_semicolon: SelectGagAction( target_engine_index=1, column_index=5, factor_y=-0.047 ), # foghorn } def x_find_window(parent_window, filter_callback): matching = [] for child_window in parent_window.query_tree().children: if filter_callback(child_window): matching.append(child_window) matching += x_find_window(child_window, filter_callback) return matching def x_find_window_by_pid(display, pid): pid_prop = display.intern_atom("_NET_WM_PID") def filter_callback(window): prop = window.get_full_property(pid_prop, X.AnyPropertyType) return prop and prop.value.tolist() == [pid] return x_find_window(display.screen().root, filter_callback) def x_wait_for_event(xdisplay, timeout_seconds): """ Wait up to `timeout_seconds` seconds for a xevent. Return True, if a xevent is available. Return False, if the timeout was reached. """ rlist = select.select( [xdisplay.display.socket], # rlist [], # wlist [], # xlist timeout_seconds, # timeout [seconds] )[0] return len(rlist) > 0 class ExtendedControls: def __init__( self, primary_engine_pid, primary_engine_window_name=None, toggle_keysym_name=EXTENDED_CONTROLS_DEFAULT_TOGGLE_KEYSYM_NAME, ): if not psutil: raise Exception( "\n".join( [ "Extended keyboard controls require the python lib psutil to be installed.", "Depending on your system run", "\t$ sudo apt-get install python3-psutil", "or", "\t$ pip3 install --user psutil", ] ) ) if not Xlib: raise Exception( "\n".join( [ "Extended keyboard controls require xlib for python to be installed.", "Depending on your system run", "\t$ sudo apt-get install python3-xlib", "or", "\t$ pip3 install --user xlib", ] ) ) self._primary_engine_pid = primary_engine_pid self._primary_engine_window_name = primary_engine_window_name self._xdisplay = Xlib.display.Display() self._toggle_keysym = XK.string_to_keysym(toggle_keysym_name) if self._toggle_keysym == X.NoSymbol: raise Exception( "Extended keyboard controls toggle:" + " Unknown keysym name '{}'".format(toggle_keysym_name) ) self._keysym_mappings = copy.deepcopy( EXTENDED_CONTROLS_DEFAULT_KEYSYM_MAPPINGS, ) if self._toggle_keysym in self._keysym_mappings: print( "INFO Extended Controls:" + " Ignoring mapping for toggle key '{}'".format(toggle_keysym_name) ) self._keysym_mappings[self._toggle_keysym] = ToggleExtendedControlsAction() self._default_action = ForwardKeyEventAction() self._primary_engine_window = None self._engine_windows_by_target_index = None self._active_key_registry = {} self._enabled = False @property def engine_running(self): return psutil.pid_exists(self._primary_engine_pid) @property def xdisplay(self): return self._xdisplay @property def primary_engine_window(self): return self._primary_engine_window def _wait_for_engine_window(self, timeout_seconds=20, search_interval_seconds=2): start_epoch = time.time() while self.engine_running and (time.time() - start_epoch) <= timeout_seconds: windows = x_find_window_by_pid(self._xdisplay, self._primary_engine_pid,) assert len(windows) <= 1 if len(windows) == 1: return windows[0] time.sleep(search_interval_seconds) return None def run(self): self._primary_engine_window = self._wait_for_engine_window() if not self._primary_engine_window: raise Exception("Could not find the game's window.") self._grab_key(self._xdisplay.keysym_to_keycode(self._toggle_keysym),) self._primary_engine_window.change_property( self.xdisplay.intern_atom(EXTENDED_CONTROLS_PID_XPROPERTY_NAME), Xatom.CARDINAL, format=32, data=[os.getpid()], mode=X.PropModeReplace, ) if self._primary_engine_window_name: self._primary_engine_window.set_wm_name(self._primary_engine_window_name,) print( "INFO Changed engine's window name to {!r}".format( self._primary_engine_window_name, ) ) if not self.enabled: keysym_name = XK.keysym_to_string(self._toggle_keysym) print( "INFO Extended Controls are currently disabled." + " Press key '{}' to enable.".format(keysym_name) ) while self.engine_running: while self.xdisplay.pending_events(): self._handle_xevent(self._xdisplay.next_event()) self._check_active_key_registry() # keep timeout low for _check_active_key_registry() # to be called frequently x_wait_for_event(self.xdisplay, timeout_seconds=0.05) def _handle_xevent(self, xevent): if isinstance(xevent, Xlib.protocol.event.KeyPress) or isinstance( xevent, Xlib.protocol.event.KeyRelease ): self._handle_xkeyevent(xevent) def _handle_xkeyevent(self, xkeyevent): self._update_active_key_registry(xkeyevent) keysym_in = self._xdisplay.keycode_to_keysym(xkeyevent.detail, index=0,) if keysym_in in self._keysym_mappings: action = self._keysym_mappings[keysym_in] else: action = self._default_action action.execute(self, xkeyevent) def enable(self): for keysym in self._keysym_mappings.keys(): if keysym != self._toggle_keysym: self._grab_key(self._xdisplay.keysym_to_keycode(keysym),) self._enabled = True # reset cache self._engine_windows_by_target_index = None print("INFO Enabled Extended Controls") def disable(self): for keysym in self._keysym_mappings.keys(): if keysym != self._toggle_keysym: self._ungrab_key(self._xdisplay.keysym_to_keycode(keysym),) self._enabled = False print("INFO Disabled Extended Controls") @property def enabled(self): return self._enabled def toggle(self): if self.enabled: self.disable() else: self.enable() def _grab_key(self, keycode): self.primary_engine_window.grab_key( keycode, X.AnyModifier, # owner_events # https://stackoverflow.com/questions/32122360/x11-will-xgrabpointer-prevent-other-apps-from-any-mouse-event # False, True, X.GrabModeAsync, X.GrabModeAsync, ) def _ungrab_key(self, keycode): self.primary_engine_window.ungrab_key(keycode, X.AnyModifier) def find_engine_windows(self): controls_xprop = self.xdisplay.intern_atom( EXTENDED_CONTROLS_PID_XPROPERTY_NAME, ) return x_find_window( self.xdisplay.screen().root, lambda w: w.get_wm_name() == TOONTOWN_WINDOW_NAME or w.get_full_property(controls_xprop, X.AnyPropertyType), ) @property def engine_windows_by_target_index(self): if not self._engine_windows_by_target_index: win_by_index = {} for target_index, win in enumerate(self.find_engine_windows()): print( "INFO Engine window {} has no target index, assuming {}".format( win.id, target_index, ) ) if not target_index in win_by_index: win_by_index[target_index] = [] win_by_index[target_index].append(win) self._engine_windows_by_target_index = win_by_index return self._engine_windows_by_target_index @property def engine_windows(self): return [w for g in self.engine_windows_by_target_index.values() for w in g] def _update_active_key_registry(self, xkeyevent): # see self._check_active_key_registry keycode = xkeyevent.detail if isinstance(xkeyevent, Xlib.protocol.event.KeyPress): self._active_key_registry[keycode] = xkeyevent elif keycode in self._active_key_registry: del self._active_key_registry[keycode] def _check_active_key_registry(self): """ WORKAROUND For an unknown reason some key release events don't get queued when multiple keys are being released simultaneously. So we keep a hashmap of supposedly currently pressed keys and periodically compare it with xdispaly.query_keymap(). ref: https://stackoverflow.com/q/18160792/5894777 """ # https://tronche.com/gui/x/xlib/input/XQueryKeymap.html keymap = self.xdisplay.query_keymap() missed_releases = [] for keycode, press_event in self._active_key_registry.items(): byte_index = keycode >> 3 bit_index = keycode & ((1 << 3) - 1) if not keymap[byte_index] & (1 << bit_index): print("DEBUG missed release event of key {}".format(keycode)) missed_releases.append( Xlib.protocol.event.KeyRelease( window=press_event.window, detail=press_event.detail, state=press_event.state, root_x=press_event.root_x, root_y=press_event.root_y, event_x=press_event.event_x, event_y=press_event.event_y, child=press_event.child, root=press_event.root, time=X.CurrentTime, same_screen=press_event.same_screen, ) ) for release_event in missed_releases: self._handle_xkeyevent(release_event)