__init__.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. import copy
  2. import logging
  3. import os
  4. import select
  5. import time
  6. import typing
  7. import Xlib.display
  8. from Xlib import XK, X, Xatom
  9. from rescriptoon._actions import (
  10. LowThrowAction,
  11. RewriteKeyEventAction,
  12. SelectGagAction,
  13. ToggleOverlayAction,
  14. )
  15. from rescriptoon._keys import keysym_to_label
  16. EXTENDED_CONTROLS_DEFAULT_TOGGLE_KEYSYM_NAME = "grave"
  17. TOONTOWN_WINDOW_NAME = "Toontown Rewritten"
  18. _KEYSYM_ACTION_MAPPINGS = {
  19. # pylint: disable=no-member; false positive for XK.*
  20. XK.XK_w: RewriteKeyEventAction(keysym=XK.XK_Up, target_engine_index=0),
  21. XK.XK_a: RewriteKeyEventAction(keysym=XK.XK_Left, target_engine_index=0),
  22. XK.XK_s: RewriteKeyEventAction(keysym=XK.XK_Down, target_engine_index=0),
  23. XK.XK_d: RewriteKeyEventAction(keysym=XK.XK_Right, target_engine_index=0),
  24. XK.XK_Control_L: RewriteKeyEventAction(
  25. keysym=XK.XK_Control_L, target_engine_index=0
  26. ),
  27. XK.XK_v: LowThrowAction(target_engine_index=0),
  28. XK.XK_o: RewriteKeyEventAction(keysym=XK.XK_Up, target_engine_index=1),
  29. XK.XK_k: RewriteKeyEventAction(keysym=XK.XK_Left, target_engine_index=1),
  30. XK.XK_l: RewriteKeyEventAction(keysym=XK.XK_Down, target_engine_index=1),
  31. XK.XK_semicolon: RewriteKeyEventAction(keysym=XK.XK_Right, target_engine_index=1),
  32. XK.XK_slash: RewriteKeyEventAction(keysym=XK.XK_Control_L, target_engine_index=1),
  33. XK.XK_n: LowThrowAction(target_engine_index=1),
  34. XK.XK_space: RewriteKeyEventAction(keysym=XK.XK_Control_L),
  35. # TODO replace gag_name with enum
  36. XK.XK_e: SelectGagAction(
  37. gag_name="elephant trunk",
  38. target_engine_index=0,
  39. column_index=4,
  40. factor_y=-0.047,
  41. ),
  42. XK.XK_i: SelectGagAction(
  43. gag_name="elephant trunk",
  44. target_engine_index=1,
  45. column_index=4,
  46. factor_y=-0.047,
  47. ),
  48. XK.XK_f: SelectGagAction(
  49. gag_name="foghorn", target_engine_index=0, column_index=5, factor_y=-0.047
  50. ),
  51. XK.XK_j: SelectGagAction(
  52. gag_name="foghorn", target_engine_index=1, column_index=5, factor_y=-0.047
  53. ),
  54. }
  55. def _x_walk_children_windows(
  56. parent_window: "Xlib.display.Window",
  57. ) -> typing.Iterator["Xlib.display.Window"]:
  58. yield parent_window
  59. for child_window in parent_window.query_tree().children:
  60. for subchild_window in _x_walk_children_windows(child_window):
  61. yield subchild_window
  62. def _x_wait_for_event(xdisplay, timeout_seconds):
  63. """ Wait up to `timeout_seconds` seconds for a xevent.
  64. Return True, if a xevent is available.
  65. Return False, if the timeout was reached. """
  66. rlist = select.select(
  67. [xdisplay.display.socket], # rlist
  68. [], # wlist
  69. [], # xlist
  70. timeout_seconds, # timeout [seconds]
  71. )[0]
  72. return len(rlist) > 0
  73. class Overlay:
  74. def __init__(
  75. self, toggle_keysym_name=EXTENDED_CONTROLS_DEFAULT_TOGGLE_KEYSYM_NAME,
  76. ):
  77. self._xdisplay = Xlib.display.Display()
  78. self._toggle_keysym = XK.string_to_keysym(toggle_keysym_name)
  79. if self._toggle_keysym == X.NoSymbol:
  80. raise ValueError(
  81. "rescriptoon controls toggle:"
  82. + " unknown keysym name '{}'".format(toggle_keysym_name)
  83. )
  84. self._keysym_mappings = copy.deepcopy(_KEYSYM_ACTION_MAPPINGS)
  85. if self._toggle_keysym in self._keysym_mappings:
  86. logging.warning("ignoring mapping for toggle key %s", toggle_keysym_name)
  87. self._keysym_mappings[self._toggle_keysym] = ToggleOverlayAction()
  88. self._active_key_registry = {}
  89. self._enabled = False
  90. self._engine_windows = None
  91. @property
  92. def xdisplay(self):
  93. return self._xdisplay
  94. @property
  95. def engine_windows(self) -> typing.List["Xlib.display.Window"]:
  96. return self._engine_windows
  97. @property
  98. def _engine_windows_open(self) -> bool:
  99. for window in self._engine_windows:
  100. try:
  101. window.get_wm_state()
  102. except Xlib.error.BadWindow:
  103. logging.info("engine window %x is no longer available", window.id)
  104. return False
  105. return True
  106. def run(self) -> None:
  107. self._engine_windows = self._find_engine_windows()
  108. logging.debug("engine window ids %r", [hex(w.id) for w in self._engine_windows])
  109. if not self._engine_windows:
  110. raise Exception("no toontown window found")
  111. self._grab_key(self.xdisplay.keysym_to_keycode(self._toggle_keysym),)
  112. print("key bindings:")
  113. for keysym, action in self._keysym_mappings.items():
  114. print("{}: {}".format(keysym_to_label(keysym), action.description))
  115. self.enable()
  116. while self._engine_windows_open:
  117. while self.xdisplay.pending_events():
  118. self._handle_xevent(self.xdisplay.next_event())
  119. self._check_active_key_registry()
  120. # keep timeout low for _check_active_key_registry()
  121. # to be called frequently
  122. _x_wait_for_event(self.xdisplay, timeout_seconds=0.05)
  123. self._disable()
  124. def _handle_xevent(self, xevent: Xlib.protocol.rq.Event) -> None:
  125. if isinstance(
  126. xevent, (Xlib.protocol.event.KeyPress, Xlib.protocol.event.KeyRelease)
  127. ):
  128. self._handle_xkeyevent(xevent)
  129. def _handle_xkeyevent(
  130. self,
  131. xkeyevent: typing.Union[
  132. Xlib.protocol.event.KeyPress, Xlib.protocol.event.KeyRelease
  133. ],
  134. ) -> None:
  135. self._update_active_key_registry(xkeyevent)
  136. keysym_in = self.xdisplay.keycode_to_keysym(xkeyevent.detail, index=0,)
  137. try:
  138. action = self._keysym_mappings[keysym_in]
  139. except KeyError:
  140. logging.warning("received key event of unmapped keysym %d", keysym_in)
  141. return
  142. action.execute(self, xkeyevent)
  143. @property
  144. def _toggle_keysym_name(self) -> str:
  145. return XK.keysym_to_string(self._toggle_keysym)
  146. def enable(self):
  147. for keysym in self._keysym_mappings.keys():
  148. if keysym != self._toggle_keysym:
  149. self._grab_key(self.xdisplay.keysym_to_keycode(keysym),)
  150. self._enabled = True
  151. logging.info(
  152. "rescriptoon is now enabled. press %s to disable.",
  153. self._toggle_keysym_name,
  154. )
  155. def _disable(self):
  156. for keysym in self._keysym_mappings.keys():
  157. if keysym != self._toggle_keysym:
  158. self._ungrab_key(self.xdisplay.keysym_to_keycode(keysym),)
  159. self._enabled = False
  160. def disable(self):
  161. self._disable()
  162. logging.info(
  163. "rescriptoon is now disabled. press %s to enable.",
  164. self._toggle_keysym_name,
  165. )
  166. @property
  167. def enabled(self):
  168. return self._enabled
  169. def toggle(self):
  170. if self.enabled:
  171. self.disable()
  172. else:
  173. self.enable()
  174. def _grab_key(self, keycode):
  175. for window in self._engine_windows:
  176. window.grab_key(
  177. keycode,
  178. X.AnyModifier,
  179. # owner_events
  180. # https://stackoverflow.com/questions/32122360/x11-will-xgrabpointer-prevent-other-apps-from-any-mouse-event
  181. # False,
  182. True,
  183. X.GrabModeAsync,
  184. X.GrabModeAsync,
  185. )
  186. def _ungrab_key(self, keycode):
  187. for window in self._engine_windows:
  188. window.ungrab_key(keycode, X.AnyModifier)
  189. def _find_engine_windows(self) -> typing.List["Xlib.display.Window"]:
  190. return list(
  191. filter(
  192. lambda w: w.get_wm_name() == TOONTOWN_WINDOW_NAME,
  193. _x_walk_children_windows(self.xdisplay.screen().root),
  194. )
  195. )
  196. def _update_active_key_registry(self, xkeyevent):
  197. # see self._check_active_key_registry
  198. keycode = xkeyevent.detail
  199. if isinstance(xkeyevent, Xlib.protocol.event.KeyPress):
  200. self._active_key_registry[keycode] = xkeyevent
  201. elif keycode in self._active_key_registry:
  202. del self._active_key_registry[keycode]
  203. def _check_active_key_registry(self):
  204. """
  205. WORKAROUND
  206. For an unknown reason some key release events don't get queued
  207. when multiple keys are being released simultaneously.
  208. So we keep a hashmap of supposedly currently pressed keys
  209. and periodically compare it with xdispaly.query_keymap().
  210. ref: https://stackoverflow.com/q/18160792/5894777
  211. """
  212. # https://tronche.com/gui/x/xlib/input/XQueryKeymap.html
  213. keymap = self.xdisplay.query_keymap()
  214. missed_releases = []
  215. for keycode, press_event in self._active_key_registry.items():
  216. byte_index = keycode >> 3
  217. bit_index = keycode & ((1 << 3) - 1)
  218. if not keymap[byte_index] & (1 << bit_index):
  219. print("DEBUG missed release event of key {}".format(keycode))
  220. missed_releases.append(
  221. Xlib.protocol.event.KeyRelease(
  222. window=press_event.window,
  223. detail=press_event.detail,
  224. state=press_event.state,
  225. root_x=press_event.root_x,
  226. root_y=press_event.root_y,
  227. event_x=press_event.event_x,
  228. event_y=press_event.event_y,
  229. child=press_event.child,
  230. root=press_event.root,
  231. time=X.CurrentTime,
  232. same_screen=press_event.same_screen,
  233. )
  234. )
  235. for release_event in missed_releases:
  236. self._handle_xkeyevent(release_event)