__init__.py 9.5 KB


  1. import copy
  2. import logging
  3. import os
  4. import select
  5. import time
  6. import typing
  7. import Xlib.display
  8. from Xlib import XK, X, Xatom
  9. from rescriptoon._actions import (
  10. LowThrowAction,
  11. RewriteKeyEventAction,
  12. SelectGagAction,
  13. ToggleOverlayAction,
  14. )
  15. from rescriptoon._keys import keysym_to_label
  16. EXTENDED_CONTROLS_DEFAULT_TOGGLE_KEYSYM_NAME = "grave"
  17. TOONTOWN_WINDOW_NAME = "Toontown Rewritten"
  18. _KEYSYM_ACTION_MAPPINGS = {
  19. # pylint: disable=no-member; false positive for XK.*
  20. XK.XK_w: RewriteKeyEventAction(keysym=XK.XK_Up, target_engine_index=0),
  21. XK.XK_a: RewriteKeyEventAction(keysym=XK.XK_Left, target_engine_index=0),
  22. XK.XK_s: RewriteKeyEventAction(keysym=XK.XK_Down, target_engine_index=0),
  23. XK.XK_d: RewriteKeyEventAction(keysym=XK.XK_Right, target_engine_index=0),
  24. XK.XK_Control_L: RewriteKeyEventAction(
  25. keysym=XK.XK_Control_L, target_engine_index=0
  26. ),
  27. XK.XK_v: LowThrowAction(target_engine_index=0),
  28. XK.XK_o: RewriteKeyEventAction(keysym=XK.XK_Up, target_engine_index=1),
  29. XK.XK_k: RewriteKeyEventAction(keysym=XK.XK_Left, target_engine_index=1),
  30. XK.XK_l: RewriteKeyEventAction(keysym=XK.XK_Down, target_engine_index=1),
  31. XK.XK_semicolon: RewriteKeyEventAction(keysym=XK.XK_Right, target_engine_index=1),
  32. XK.XK_slash: RewriteKeyEventAction(keysym=XK.XK_Control_L, target_engine_index=1),
  33. XK.XK_n: LowThrowAction(target_engine_index=1),
  34. XK.XK_space: RewriteKeyEventAction(keysym=XK.XK_Control_L),
  35. # TODO replace gag_name with enum
  36. XK.XK_e: SelectGagAction(
  37. gag_name="elephant trunk",
  38. target_engine_index=0,
  39. column_index=4,
  40. factor_y=-0.047,
  41. ),
  42. XK.XK_i: SelectGagAction(
  43. gag_name="elephant trunk",
  44. target_engine_index=1,
  45. column_index=4,
  46. factor_y=-0.047,
  47. ),
  48. XK.XK_f: SelectGagAction(
  49. gag_name="foghorn", target_engine_index=0, column_index=5, factor_y=-0.047
  50. ),
  51. XK.XK_j: SelectGagAction(
  52. gag_name="foghorn", target_engine_index=1, column_index=5, factor_y=-0.047
  53. ),
  54. }
  55. def _x_walk_children_windows(
  56. parent_window: "Xlib.display.Window",
  57. ) -> typing.Iterator["Xlib.display.Window"]:
  58. yield parent_window
  59. for child_window in parent_window.query_tree().children:
  60. for subchild_window in _x_walk_children_windows(child_window):
  61. yield subchild_window
  62. def _x_wait_for_event(xdisplay, timeout_seconds):
  63. """ Wait up to `timeout_seconds` seconds for a xevent.
  64. Return True, if a xevent is available.
  65. Return False, if the timeout was reached. """
  66. rlist = select.select(
  67. [xdisplay.display.socket], # rlist
  68. [], # wlist
  69. [], # xlist
  70. timeout_seconds, # timeout [seconds]
  71. )[0]
  72. return len(rlist) > 0
  73. class Overlay:
  74. def __init__(
  75. self, toggle_keysym_name=EXTENDED_CONTROLS_DEFAULT_TOGGLE_KEYSYM_NAME,
  76. ):
  77. self._xdisplay = Xlib.display.Display()
  78. self._toggle_keysym = XK.string_to_keysym(toggle_keysym_name)
  79. if self._toggle_keysym == X.NoSymbol:
  80. raise ValueError(
  81. "rescriptoon controls toggle:"
  82. + " unknown keysym name '{}'".format(toggle_keysym_name)
  83. )
  84. self._keysym_mappings = copy.deepcopy(_KEYSYM_ACTION_MAPPINGS)
  85. if self._toggle_keysym in self._keysym_mappings:
  86. logging.warning("ignoring mapping for toggle key %s", toggle_keysym_name)
  87. self._keysym_mappings[self._toggle_keysym] = ToggleOverlayAction()
  88. self._active_key_registry = {}
  89. self._enabled = False
  90. self._engine_windows = None
  91. @property
  92. def xdisplay(self):
  93. return self._xdisplay
  94. @property
  95. def engine_windows(self) -> typing.List["Xlib.display.Window"]:
  96. return self._engine_windows
  97. @property
  98. def _engine_windows_open(self) -> bool:
  99. for window in self._engine_windows:
  100. try:
  101. window.get_wm_state()
  102. except Xlib.error.BadWindow:
  103. logging.info("engine window %x is no longer available", window.id)
  104. return False
  105. return True
  106. def run(self) -> None:
  107. self._engine_windows = self._find_engine_windows()
  108. logging.debug("engine window ids %r", [hex(w.id) for w in self._engine_windows])
  109. if not self._engine_windows:
  110. raise Exception("no toontown window found")
  111. self._grab_key(self.xdisplay.keysym_to_keycode(self._toggle_keysym),)
  112. print("key bindings:")
  113. for keysym, action in self._keysym_mappings.items():
  114. print("{}: {}".format(keysym_to_label(keysym), action.description))
  115. self.enable()
  116. while self._engine_windows_open:
  117. while self.xdisplay.pending_events():
  118. self._handle_xevent(self.xdisplay.next_event())
  119. self._check_active_key_registry()
  120. # keep timeout low for _check_active_key_registry()
  121. # to be called frequently
  122. _x_wait_for_event(self.xdisplay, timeout_seconds=0.05)
  123. self._disable()
  124. def _handle_xevent(self, xevent: Xlib.protocol.rq.Event) -> None:
  125. if isinstance(
  126. xevent, (Xlib.protocol.event.KeyPress, Xlib.protocol.event.KeyRelease)
  127. ):
  128. self._handle_xkeyevent(xevent)
  129. def _handle_xkeyevent(
  130. self,
  131. xkeyevent: typing.Union[
  132. Xlib.protocol.event.KeyPress, Xlib.protocol.event.KeyRelease
  133. ],
  134. ) -> None:
  135. self._update_active_key_registry(xkeyevent)
  136. keysym_in = self.xdisplay.keycode_to_keysym(xkeyevent.detail, index=0,)
  137. try:
  138. action = self._keysym_mappings[keysym_in]
  139. except KeyError:
  140. logging.warning("received key event of unmapped keysym %d", keysym_in)
  141. return
  142. action.execute(self, xkeyevent)
  143. @property
  144. def _toggle_keysym_name(self) -> str:
  145. return XK.keysym_to_string(self._toggle_keysym)
  146. def enable(self):
  147. for keysym in self._keysym_mappings.keys():
  148. if keysym != self._toggle_keysym:
  149. self._grab_key(self.xdisplay.keysym_to_keycode(keysym),)
  150. self._enabled = True
  151. logging.info(
  152. "rescriptoon is now enabled. press %s to disable.",
  153. self._toggle_keysym_name,
  154. )
  155. def _disable(self):
  156. for keysym in self._keysym_mappings.keys():
  157. if keysym != self._toggle_keysym:
  158. self._ungrab_key(self.xdisplay.keysym_to_keycode(keysym),)
  159. self._enabled = False
  160. def disable(self):
  161. self._disable()
  162. logging.info(
  163. "rescriptoon is now disabled. press %s to enable.",
  164. self._toggle_keysym_name,
  165. )
  166. @property
  167. def enabled(self):
  168. return self._enabled
  169. def toggle(self):
  170. if self.enabled:
  171. self.disable()
  172. else:
  173. self.enable()
  174. def _grab_key(self, keycode):
  175. for window in self._engine_windows:
  176. window.grab_key(
  177. keycode,
  178. X.AnyModifier,
  179. # owner_events
  180. # https://stackoverflow.com/questions/32122360/x11-will-xgrabpointer-prevent-other-apps-from-any-mouse-event
  181. # False,
  182. True,
  183. X.GrabModeAsync,
  184. X.GrabModeAsync,
  185. )
  186. def _ungrab_key(self, keycode):
  187. for window in self._engine_windows:
  188. window.ungrab_key(keycode, X.AnyModifier)
  189. def _find_engine_windows(self) -> typing.List["Xlib.display.Window"]:
  190. return list(
  191. filter(
  192. lambda w: w.get_wm_name() == TOONTOWN_WINDOW_NAME,
  193. _x_walk_children_windows(self.xdisplay.screen().root),
  194. )
  195. )
  196. def _update_active_key_registry(self, xkeyevent):
  197. # see self._check_active_key_registry
  198. keycode = xkeyevent.detail
  199. if isinstance(xkeyevent, Xlib.protocol.event.KeyPress):
  200. self._active_key_registry[keycode] = xkeyevent
  201. elif keycode in self._active_key_registry:
  202. del self._active_key_registry[keycode]
  203. def _check_active_key_registry(self):
  204. """
  205. WORKAROUND
  206. For an unknown reason some key release events don't get queued
  207. when multiple keys are being released simultaneously.
  208. So we keep a hashmap of supposedly currently pressed keys
  209. and periodically compare it with xdispaly.query_keymap().
  210. ref: https://stackoverflow.com/q/18160792/5894777
  211. """
  212. # https://tronche.com/gui/x/xlib/input/XQueryKeymap.html
  213. keymap = self.xdisplay.query_keymap()
  214. missed_releases = []
  215. for keycode, press_event in self._active_key_registry.items():
  216. byte_index = keycode >> 3
  217. bit_index = keycode & ((1 << 3) - 1)
  218. if not keymap[byte_index] & (1 << bit_index):
  219. print("DEBUG missed release event of key {}".format(keycode))
  220. missed_releases.append(
  221. Xlib.protocol.event.KeyRelease(
  222. window=press_event.window,
  223. detail=press_event.detail,
  224. state=press_event.state,
  225. root_x=press_event.root_x,
  226. root_y=press_event.root_y,
  227. event_x=press_event.event_x,
  228. event_y=press_event.event_y,
  229. child=press_event.child,
  230. root=press_event.root,
  231. time=X.CurrentTime,
  232. same_screen=press_event.same_screen,
  233. )
  234. )
  235. for release_event in missed_releases:
  236. self._handle_xkeyevent(release_event)