controls.py 10 KB


  1. import copy
  2. import select
  3. import time
  4. from tooncher.actions import *
  5. try:
  6. import psutil
  7. except ImportError:
  8. psutil = False
  9. try:
  10. import Xlib.display
  11. from Xlib import X, XK
  12. except ImportError:
  13. Xlib = False
  14. EXTENDED_CONTROLS_DEFAULT_TOGGLE_KEYSYM_NAME = 'grave'
  15. TOONTOWN_WINDOW_NAME = 'Toontown Rewritten'
  16. if Xlib:
  17. EXTENDED_CONTROLS_DEFAULT_KEYSYM_MAPPINGS = {
  18. XK.XK_w: RewriteKeyEventAction(keysym=XK.XK_Up),
  19. XK.XK_a: RewriteKeyEventAction(keysym=XK.XK_Left),
  20. XK.XK_s: RewriteKeyEventAction(keysym=XK.XK_Down),
  21. XK.XK_d: RewriteKeyEventAction(keysym=XK.XK_Right),
  22. XK.XK_v: RewriteKeyEventAction(keysym=XK.XK_Delete),
  23. XK.XK_i: RewriteKeyEventAction(
  24. keysym=XK.XK_Up,
  25. target_engine=TargetEngine.NonPrimary,
  26. ),
  27. XK.XK_j: RewriteKeyEventAction(
  28. keysym=XK.XK_Left,
  29. target_engine=TargetEngine.NonPrimary,
  30. ),
  31. XK.XK_k: RewriteKeyEventAction(
  32. keysym=XK.XK_Down,
  33. target_engine=TargetEngine.NonPrimary,
  34. ),
  35. XK.XK_l: RewriteKeyEventAction(
  36. keysym=XK.XK_Right,
  37. target_engine=TargetEngine.NonPrimary,
  38. ),
  39. XK.XK_slash: RewriteKeyEventAction(
  40. keysym=XK.XK_Control_L,
  41. target_engine=TargetEngine.NonPrimary,
  42. ),
  43. XK.XK_n: RewriteKeyEventAction(
  44. keysym=XK.XK_Delete,
  45. target_engine=TargetEngine.NonPrimary,
  46. ),
  47. XK.XK_space: RewriteKeyEventAction(
  48. keysym=XK.XK_Control_L,
  49. target_engine=TargetEngine.All,
  50. ),
  51. XK.XK_e: SelectGagAction(factor_x=0.038, factor_y=-0.047), # elephant trunk
  52. XK.XK_f: SelectGagAction(factor_x=0.119, factor_y=-0.047), # foghorn
  53. }
  54. def x_find_window(parent_window, filter_callback):
  55. matching = []
  56. for child_window in parent_window.query_tree().children:
  57. if filter_callback(child_window):
  58. matching.append(child_window)
  59. matching += x_find_window(child_window, filter_callback)
  60. return matching
  61. def x_find_window_by_pid(display, pid):
  62. pid_prop = display.intern_atom('_NET_WM_PID')
  63. def filter_callback(window):
  64. prop = window.get_full_property(pid_prop, X.AnyPropertyType)
  65. return prop and prop.value.tolist() == [pid]
  66. return x_find_window(display.screen().root, filter_callback)
  67. def x_wait_for_event(xdisplay, timeout_seconds):
  68. """ Wait up to `timeout_seconds` seconds for a xevent.
  69. Return True, if a xevent is available.
  70. Return False, if the timeout was reached. """
  71. rlist = select.select(
  72. [xdisplay.display.socket], # rlist
  73. [], # wlist
  74. [], # xlist
  75. timeout_seconds, # timeout [seconds]
  76. )[0]
  77. return len(rlist) > 0
  78. class ExtendedControls:
  79. def __init__(self, primary_engine_pid,
  80. toggle_keysym_name=EXTENDED_CONTROLS_DEFAULT_TOGGLE_KEYSYM_NAME):
  81. if not psutil:
  82. raise Exception('\n'.join([
  83. 'Extended keyboard controls require the python lib psutil to be installed.',
  84. 'Depending on your system run',
  85. '\t$ sudo apt-get install python3-psutil',
  86. 'or',
  87. '\t$ pip3 install --user psutil',
  88. ]))
  89. if not Xlib:
  90. raise Exception('\n'.join([
  91. 'Extended keyboard controls require xlib for python to be installed.',
  92. 'Depending on your system run',
  93. '\t$ sudo apt-get install python3-xlib',
  94. 'or',
  95. '\t$ pip3 install --user xlib',
  96. ]))
  97. self._primary_engine_pid = primary_engine_pid
  98. self._xdisplay = Xlib.display.Display()
  99. self._toggle_keysym = XK.string_to_keysym(toggle_keysym_name)
  100. if self._toggle_keysym == X.NoSymbol:
  101. raise Exception("Extended keyboard controls toggle:"
  102. + " Unknown keysym name '{}'".format(toggle_keysym_name))
  103. self._keysym_mappings = copy.deepcopy(
  104. EXTENDED_CONTROLS_DEFAULT_KEYSYM_MAPPINGS,
  105. )
  106. if self._toggle_keysym in self._keysym_mappings:
  107. print("INFO Extended Controls:"
  108. + " Ignoring mapping for toggle key '{}'".format(toggle_keysym_name))
  109. self._keysym_mappings[self._toggle_keysym] \
  110. = ToggleExtendedControlsAction()
  111. self._default_action = ForwardKeyEventAction()
  112. self._primary_engine_window = None
  113. self._other_engine_windows = None
  114. self._active_key_registry = {}
  115. self._enabled = False
  116. @property
  117. def engine_running(self):
  118. return psutil.pid_exists(self._primary_engine_pid)
  119. @property
  120. def xdisplay(self):
  121. return self._xdisplay
  122. @property
  123. def primary_engine_window(self):
  124. return self._primary_engine_window
  125. def _wait_for_engine_window(self, timeout_seconds=20, search_interval_seconds=2):
  126. start_epoch = time.time()
  127. while self.engine_running and (time.time() - start_epoch) <= timeout_seconds:
  128. windows = x_find_window_by_pid(
  129. self._xdisplay,
  130. self._primary_engine_pid,
  131. )
  132. assert len(windows) <= 1
  133. if len(windows) == 1:
  134. return windows[0]
  135. time.sleep(search_interval_seconds)
  136. return None
  137. def run(self):
  138. self._primary_engine_window = self._wait_for_engine_window()
  139. if not self._primary_engine_window:
  140. raise Exception('Could not find the game\'s window.')
  141. self._grab_key(
  142. self._xdisplay.keysym_to_keycode(self._toggle_keysym),
  143. )
  144. if not self.enabled:
  145. keysym_name = XK.keysym_to_string(self._toggle_keysym)
  146. print("INFO Extended Controls are currently disabled."
  147. + " Press key '{}' to enable.".format(keysym_name))
  148. while self.engine_running:
  149. while self.xdisplay.pending_events():
  150. self._handle_xevent(self._xdisplay.next_event())
  151. self._check_active_key_registry()
  152. # keep timeout low for _check_active_key_registry()
  153. # to be called frequently
  154. x_wait_for_event(self.xdisplay, timeout_seconds=0.05)
  155. def _handle_xevent(self, xevent):
  156. if isinstance(xevent, Xlib.protocol.event.KeyPress) \
  157. or isinstance(xevent, Xlib.protocol.event.KeyRelease):
  158. self._handle_xkeyevent(xevent)
  159. def _handle_xkeyevent(self, xkeyevent):
  160. self._update_active_key_registry(xkeyevent)
  161. keysym_in = self._xdisplay.keycode_to_keysym(
  162. xkeyevent.detail,
  163. index=0,
  164. )
  165. if keysym_in in self._keysym_mappings:
  166. action = self._keysym_mappings[keysym_in]
  167. else:
  168. action = self._default_action
  169. action.execute(self, xkeyevent)
  170. def enable(self):
  171. for keysym in self._keysym_mappings.keys():
  172. if keysym != self._toggle_keysym:
  173. self._grab_key(
  174. self._xdisplay.keysym_to_keycode(keysym),
  175. )
  176. self._enabled = True
  177. # reset cache
  178. self._other_engine_windows = None
  179. print("INFO Enabled Extended Controls")
  180. def disable(self):
  181. for keysym in self._keysym_mappings.keys():
  182. if keysym != self._toggle_keysym:
  183. self._ungrab_key(
  184. self._xdisplay.keysym_to_keycode(keysym),
  185. )
  186. self._enabled = False
  187. print("INFO Disabled Extended Controls")
  188. @property
  189. def enabled(self):
  190. return self._enabled
  191. def toggle(self):
  192. if self.enabled:
  193. self.disable()
  194. else:
  195. self.enable()
  196. def _grab_key(self, keycode):
  197. self.primary_engine_window.grab_key(
  198. keycode,
  199. X.AnyModifier,
  200. # owner_events
  201. # https://stackoverflow.com/questions/32122360/x11-will-xgrabpointer-prevent-other-apps-from-any-mouse-event
  202. # False,
  203. True,
  204. X.GrabModeAsync,
  205. X.GrabModeAsync,
  206. )
  207. def _ungrab_key(self, keycode):
  208. self.primary_engine_window.ungrab_key(keycode, X.AnyModifier)
  209. def find_other_engine_windows(self):
  210. return x_find_window(
  211. self.xdisplay.screen().root,
  212. lambda w: w.get_wm_name() == TOONTOWN_WINDOW_NAME
  213. and w.id != self.primary_engine_window.id,
  214. )
  215. @property
  216. def other_engine_windows(self):
  217. if not self._other_engine_windows:
  218. self._other_engine_windows = self.find_other_engine_windows()
  219. return self._other_engine_windows
  220. def _update_active_key_registry(self, xkeyevent):
  221. # see self._check_active_key_registry
  222. keycode = xkeyevent.detail
  223. if isinstance(xkeyevent, Xlib.protocol.event.KeyPress):
  224. self._active_key_registry[keycode] = xkeyevent
  225. elif keycode in self._active_key_registry:
  226. del self._active_key_registry[keycode]
  227. def _check_active_key_registry(self):
  228. """
  229. WORKAROUND
  230. For an unknown reason some key release events don't get queued
  231. when multiple keys are being released simultaneously.
  232. So we keep a hashmap of supposedly currently pressed keys
  233. and periodically compare it with xdispaly.query_keymap().
  234. ref: https://stackoverflow.com/q/18160792/5894777
  235. """
  236. # https://tronche.com/gui/x/xlib/input/XQueryKeymap.html
  237. keymap = self.xdisplay.query_keymap()
  238. missed_releases = []
  239. for keycode, press_event in self._active_key_registry.items():
  240. byte_index = keycode >> 3
  241. bit_index = keycode & ((1 << 3) - 1)
  242. if not keymap[byte_index] & (1 << bit_index):
  243. print("DEBUG missed release event of key {}".format(keycode))
  244. missed_releases.append(Xlib.protocol.event.KeyRelease(
  245. window=press_event.window,
  246. detail=press_event.detail,
  247. state=press_event.state,
  248. root_x=press_event.root_x,
  249. root_y=press_event.root_y,
  250. event_x=press_event.event_x,
  251. event_y=press_event.event_y,
  252. child=press_event.child,
  253. root=press_event.root,
  254. time=X.CurrentTime,
  255. same_screen=press_event.same_screen,
  256. ))
  257. for release_event in missed_releases:
  258. self._handle_xkeyevent(release_event)