controls.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. import copy
  2. import os
  3. import select
  4. import time
  5. import psutil
  6. import Xlib.display
  7. from Xlib import XK, X, Xatom
  8. from rescriptoon.actions import *
  9. EXTENDED_CONTROLS_DEFAULT_TOGGLE_KEYSYM_NAME = "grave"
  10. EXTENDED_CONTROLS_PID_XPROPERTY_NAME = "_TOONCHER_EXTENDED_CONTROLS_PID"
  11. TOONTOWN_WINDOW_NAME = "Toontown Rewritten"
  12. EXTENDED_CONTROLS_DEFAULT_KEYSYM_MAPPINGS = {
  13. XK.XK_w: RewriteKeyEventAction(keysym=XK.XK_Up, target_engine_index=0),
  14. XK.XK_a: RewriteKeyEventAction(keysym=XK.XK_Left, target_engine_index=0),
  15. XK.XK_s: RewriteKeyEventAction(keysym=XK.XK_Down, target_engine_index=0),
  16. XK.XK_d: RewriteKeyEventAction(keysym=XK.XK_Right, target_engine_index=0),
  17. XK.XK_Control_L: RewriteKeyEventAction(
  18. keysym=XK.XK_Control_L, target_engine_index=0
  19. ),
  20. XK.XK_v: RewriteKeyEventAction(keysym=XK.XK_Delete, target_engine_index=0),
  21. XK.XK_i: RewriteKeyEventAction(keysym=XK.XK_Up, target_engine_index=1),
  22. XK.XK_j: RewriteKeyEventAction(keysym=XK.XK_Left, target_engine_index=1),
  23. XK.XK_k: RewriteKeyEventAction(keysym=XK.XK_Down, target_engine_index=1),
  24. XK.XK_l: RewriteKeyEventAction(keysym=XK.XK_Right, target_engine_index=1),
  25. XK.XK_slash: RewriteKeyEventAction(keysym=XK.XK_Control_L, target_engine_index=1),
  26. XK.XK_n: RewriteKeyEventAction(keysym=XK.XK_Delete, target_engine_index=1),
  27. XK.XK_space: RewriteKeyEventAction(
  28. keysym=XK.XK_Control_L, target_engine_index=TargetEngine.All
  29. ),
  30. XK.XK_e: SelectGagAction(
  31. target_engine_index=0, column_index=4, factor_y=-0.047
  32. ), # elephant trunk
  33. XK.XK_o: SelectGagAction(
  34. target_engine_index=1, column_index=4, factor_y=-0.047
  35. ), # elephant trunk
  36. XK.XK_f: SelectGagAction(
  37. target_engine_index=0, column_index=5, factor_y=-0.047
  38. ), # foghorn
  39. XK.XK_semicolon: SelectGagAction(
  40. target_engine_index=1, column_index=5, factor_y=-0.047
  41. ), # foghorn
  42. }
  43. def x_find_window(parent_window, filter_callback):
  44. matching = []
  45. for child_window in parent_window.query_tree().children:
  46. if filter_callback(child_window):
  47. matching.append(child_window)
  48. matching += x_find_window(child_window, filter_callback)
  49. return matching
  50. def x_find_window_by_pid(display, pid):
  51. pid_prop = display.intern_atom("_NET_WM_PID")
  52. def filter_callback(window):
  53. prop = window.get_full_property(pid_prop, X.AnyPropertyType)
  54. return prop and prop.value.tolist() == [pid]
  55. return x_find_window(display.screen().root, filter_callback)
  56. def x_wait_for_event(xdisplay, timeout_seconds):
  57. """ Wait up to `timeout_seconds` seconds for a xevent.
  58. Return True, if a xevent is available.
  59. Return False, if the timeout was reached. """
  60. rlist = select.select(
  61. [xdisplay.display.socket], # rlist
  62. [], # wlist
  63. [], # xlist
  64. timeout_seconds, # timeout [seconds]
  65. )[0]
  66. return len(rlist) > 0
  67. class ExtendedControls:
  68. def __init__(
  69. self,
  70. primary_engine_pid,
  71. primary_engine_window_name=None,
  72. toggle_keysym_name=EXTENDED_CONTROLS_DEFAULT_TOGGLE_KEYSYM_NAME,
  73. ):
  74. self._primary_engine_pid = primary_engine_pid
  75. self._primary_engine_window_name = primary_engine_window_name
  76. self._xdisplay = Xlib.display.Display()
  77. self._toggle_keysym = XK.string_to_keysym(toggle_keysym_name)
  78. if self._toggle_keysym == X.NoSymbol:
  79. raise Exception(
  80. "Extended keyboard controls toggle:"
  81. + " Unknown keysym name '{}'".format(toggle_keysym_name)
  82. )
  83. self._keysym_mappings = copy.deepcopy(
  84. EXTENDED_CONTROLS_DEFAULT_KEYSYM_MAPPINGS,
  85. )
  86. if self._toggle_keysym in self._keysym_mappings:
  87. print(
  88. "INFO Extended Controls:"
  89. + " Ignoring mapping for toggle key '{}'".format(toggle_keysym_name)
  90. )
  91. self._keysym_mappings[self._toggle_keysym] = ToggleExtendedControlsAction()
  92. self._default_action = ForwardKeyEventAction()
  93. self._primary_engine_window = None
  94. self._engine_windows_by_target_index = None
  95. self._active_key_registry = {}
  96. self._enabled = False
  97. @property
  98. def engine_running(self):
  99. return psutil.pid_exists(self._primary_engine_pid)
  100. @property
  101. def xdisplay(self):
  102. return self._xdisplay
  103. @property
  104. def primary_engine_window(self):
  105. return self._primary_engine_window
  106. def _wait_for_engine_window(self, timeout_seconds=20, search_interval_seconds=2):
  107. start_epoch = time.time()
  108. while self.engine_running and (time.time() - start_epoch) <= timeout_seconds:
  109. windows = x_find_window_by_pid(self._xdisplay, self._primary_engine_pid,)
  110. assert len(windows) <= 1
  111. if len(windows) == 1:
  112. return windows[0]
  113. time.sleep(search_interval_seconds)
  114. return None
  115. def run(self):
  116. self._primary_engine_window = self._wait_for_engine_window()
  117. if not self._primary_engine_window:
  118. raise Exception("Could not find the game's window.")
  119. self._grab_key(self._xdisplay.keysym_to_keycode(self._toggle_keysym),)
  120. self._primary_engine_window.change_property(
  121. self.xdisplay.intern_atom(EXTENDED_CONTROLS_PID_XPROPERTY_NAME),
  122. Xatom.CARDINAL,
  123. format=32,
  124. data=[os.getpid()],
  125. mode=X.PropModeReplace,
  126. )
  127. if self._primary_engine_window_name:
  128. self._primary_engine_window.set_wm_name(self._primary_engine_window_name,)
  129. print(
  130. "INFO Changed engine's window name to {!r}".format(
  131. self._primary_engine_window_name,
  132. )
  133. )
  134. if not self.enabled:
  135. keysym_name = XK.keysym_to_string(self._toggle_keysym)
  136. print(
  137. "INFO Extended Controls are currently disabled."
  138. + " Press key '{}' to enable.".format(keysym_name)
  139. )
  140. while self.engine_running:
  141. while self.xdisplay.pending_events():
  142. self._handle_xevent(self._xdisplay.next_event())
  143. self._check_active_key_registry()
  144. # keep timeout low for _check_active_key_registry()
  145. # to be called frequently
  146. x_wait_for_event(self.xdisplay, timeout_seconds=0.05)
  147. def _handle_xevent(self, xevent):
  148. if isinstance(xevent, Xlib.protocol.event.KeyPress) or isinstance(
  149. xevent, Xlib.protocol.event.KeyRelease
  150. ):
  151. self._handle_xkeyevent(xevent)
  152. def _handle_xkeyevent(self, xkeyevent):
  153. self._update_active_key_registry(xkeyevent)
  154. keysym_in = self._xdisplay.keycode_to_keysym(xkeyevent.detail, index=0,)
  155. if keysym_in in self._keysym_mappings:
  156. action = self._keysym_mappings[keysym_in]
  157. else:
  158. action = self._default_action
  159. action.execute(self, xkeyevent)
  160. def enable(self):
  161. for keysym in self._keysym_mappings.keys():
  162. if keysym != self._toggle_keysym:
  163. self._grab_key(self._xdisplay.keysym_to_keycode(keysym),)
  164. self._enabled = True
  165. # reset cache
  166. self._engine_windows_by_target_index = None
  167. print("INFO Enabled Extended Controls")
  168. def disable(self):
  169. for keysym in self._keysym_mappings.keys():
  170. if keysym != self._toggle_keysym:
  171. self._ungrab_key(self._xdisplay.keysym_to_keycode(keysym),)
  172. self._enabled = False
  173. print("INFO Disabled Extended Controls")
  174. @property
  175. def enabled(self):
  176. return self._enabled
  177. def toggle(self):
  178. if self.enabled:
  179. self.disable()
  180. else:
  181. self.enable()
  182. def _grab_key(self, keycode):
  183. self.primary_engine_window.grab_key(
  184. keycode,
  185. X.AnyModifier,
  186. # owner_events
  187. # https://stackoverflow.com/questions/32122360/x11-will-xgrabpointer-prevent-other-apps-from-any-mouse-event
  188. # False,
  189. True,
  190. X.GrabModeAsync,
  191. X.GrabModeAsync,
  192. )
  193. def _ungrab_key(self, keycode):
  194. self.primary_engine_window.ungrab_key(keycode, X.AnyModifier)
  195. def find_engine_windows(self):
  196. controls_xprop = self.xdisplay.intern_atom(
  197. EXTENDED_CONTROLS_PID_XPROPERTY_NAME,
  198. )
  199. return x_find_window(
  200. self.xdisplay.screen().root,
  201. lambda w: w.get_wm_name() == TOONTOWN_WINDOW_NAME
  202. or w.get_full_property(controls_xprop, X.AnyPropertyType),
  203. )
  204. @property
  205. def engine_windows_by_target_index(self):
  206. if not self._engine_windows_by_target_index:
  207. win_by_index = {}
  208. for target_index, win in enumerate(self.find_engine_windows()):
  209. print(
  210. "INFO Engine window {} has no target index, assuming {}".format(
  211. win.id, target_index,
  212. )
  213. )
  214. if not target_index in win_by_index:
  215. win_by_index[target_index] = []
  216. win_by_index[target_index].append(win)
  217. self._engine_windows_by_target_index = win_by_index
  218. return self._engine_windows_by_target_index
  219. @property
  220. def engine_windows(self):
  221. return [w for g in self.engine_windows_by_target_index.values() for w in g]
  222. def _update_active_key_registry(self, xkeyevent):
  223. # see self._check_active_key_registry
  224. keycode = xkeyevent.detail
  225. if isinstance(xkeyevent, Xlib.protocol.event.KeyPress):
  226. self._active_key_registry[keycode] = xkeyevent
  227. elif keycode in self._active_key_registry:
  228. del self._active_key_registry[keycode]
  229. def _check_active_key_registry(self):
  230. """
  231. WORKAROUND
  232. For an unknown reason some key release events don't get queued
  233. when multiple keys are being released simultaneously.
  234. So we keep a hashmap of supposedly currently pressed keys
  235. and periodically compare it with xdispaly.query_keymap().
  236. ref: https://stackoverflow.com/q/18160792/5894777
  237. """
  238. # https://tronche.com/gui/x/xlib/input/XQueryKeymap.html
  239. keymap = self.xdisplay.query_keymap()
  240. missed_releases = []
  241. for keycode, press_event in self._active_key_registry.items():
  242. byte_index = keycode >> 3
  243. bit_index = keycode & ((1 << 3) - 1)
  244. if not keymap[byte_index] & (1 << bit_index):
  245. print("DEBUG missed release event of key {}".format(keycode))
  246. missed_releases.append(
  247. Xlib.protocol.event.KeyRelease(
  248. window=press_event.window,
  249. detail=press_event.detail,
  250. state=press_event.state,
  251. root_x=press_event.root_x,
  252. root_y=press_event.root_y,
  253. event_x=press_event.event_x,
  254. event_y=press_event.event_y,
  255. child=press_event.child,
  256. root=press_event.root,
  257. time=X.CurrentTime,
  258. same_screen=press_event.same_screen,
  259. )
  260. )
  261. for release_event in missed_releases:
  262. self._handle_xkeyevent(release_event)