controls.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. import copy
  2. import select
  3. import time
  4. from tooncher.actions import *
  5. try:
  6. import psutil
  7. except ImportError:
  8. psutil = False
  9. try:
  10. import Xlib.display
  11. from Xlib import X, XK
  12. except ImportError:
  13. Xlib = False
  14. EXTENDED_CONTROLS_DEFAULT_TOGGLE_KEYSYM_NAME = 'grave'
  15. TOONTOWN_WINDOW_NAME = 'Toontown Rewritten'
  16. if Xlib:
  17. EXTENDED_CONTROLS_DEFAULT_KEYSYM_MAPPINGS = {
  18. XK.XK_w: RewriteKeyEventAction(keysym=XK.XK_Up),
  19. XK.XK_a: RewriteKeyEventAction(keysym=XK.XK_Left),
  20. XK.XK_s: RewriteKeyEventAction(keysym=XK.XK_Down),
  21. XK.XK_d: RewriteKeyEventAction(keysym=XK.XK_Right),
  22. XK.XK_i: RewriteKeyEventAction(
  23. keysym=XK.XK_Up,
  24. target_engine=TargetEngine.NonPrimary,
  25. ),
  26. XK.XK_j: RewriteKeyEventAction(
  27. keysym=XK.XK_Left,
  28. target_engine=TargetEngine.NonPrimary,
  29. ),
  30. XK.XK_k: RewriteKeyEventAction(
  31. keysym=XK.XK_Down,
  32. target_engine=TargetEngine.NonPrimary,
  33. ),
  34. XK.XK_l: RewriteKeyEventAction(
  35. keysym=XK.XK_Right,
  36. target_engine=TargetEngine.NonPrimary,
  37. ),
  38. XK.XK_space: RewriteKeyEventAction(
  39. keysym=XK.XK_Control_L,
  40. target_engine=TargetEngine.All,
  41. ),
  42. XK.XK_e: SelectGagAction(factor_x=0.038, factor_y=-0.047), # elephant trunk
  43. XK.XK_f: SelectGagAction(factor_x=0.119, factor_y=-0.047), # foghorn
  44. }
  45. def x_find_window(parent_window, filter_callback):
  46. matching = []
  47. for child_window in parent_window.query_tree().children:
  48. if filter_callback(child_window):
  49. matching.append(child_window)
  50. matching += x_find_window(child_window, filter_callback)
  51. return matching
  52. def x_find_window_by_pid(display, pid):
  53. pid_prop = display.intern_atom('_NET_WM_PID')
  54. def filter_callback(window):
  55. prop = window.get_full_property(pid_prop, X.AnyPropertyType)
  56. return prop and prop.value.tolist() == [pid]
  57. return x_find_window(display.screen().root, filter_callback)
  58. def x_wait_for_event(xdisplay, timeout_seconds):
  59. """ Wait up to `timeout_seconds` seconds for a xevent.
  60. Return True, if a xevent is available.
  61. Return False, if the timeout was reached. """
  62. rlist = select.select(
  63. [xdisplay.display.socket], # rlist
  64. [], # wlist
  65. [], # xlist
  66. timeout_seconds, # timeout [seconds]
  67. )[0]
  68. return len(rlist) > 0
  69. class ExtendedControls:
  70. def __init__(self, engine_pid,
  71. toggle_keysym_name=EXTENDED_CONTROLS_DEFAULT_TOGGLE_KEYSYM_NAME):
  72. if not psutil:
  73. raise Exception('\n'.join([
  74. 'Extended keyboard controls require the python lib psutil to be installed.',
  75. 'Depending on your system run',
  76. '\t$ sudo apt-get install python3-psutil',
  77. 'or',
  78. '\t$ pip3 install --user psutil',
  79. ]))
  80. if not Xlib:
  81. raise Exception('\n'.join([
  82. 'Extended keyboard controls require xlib for python to be installed.',
  83. 'Depending on your system run',
  84. '\t$ sudo apt-get install python3-xlib',
  85. 'or',
  86. '\t$ pip3 install --user xlib',
  87. ]))
  88. self._engine_pid = engine_pid
  89. self._xdisplay = Xlib.display.Display()
  90. self._toggle_keysym = XK.string_to_keysym(toggle_keysym_name)
  91. if self._toggle_keysym == X.NoSymbol:
  92. raise Exception("Extended keyboard controls toggle:"
  93. + " Unknown keysym name '{}'".format(toggle_keysym_name))
  94. self._keysym_mappings = copy.deepcopy(
  95. EXTENDED_CONTROLS_DEFAULT_KEYSYM_MAPPINGS,
  96. )
  97. if self._toggle_keysym in self._keysym_mappings:
  98. print("INFO Extended Controls:"
  99. + " Ignoring mapping for toggle key '{}'".format(toggle_keysym_name))
  100. self._keysym_mappings[self._toggle_keysym] \
  101. = ToggleExtendedControlsAction()
  102. self._default_action = ForwardKeyEventAction()
  103. self._engine_window = None
  104. self._other_engine_windows = None
  105. self._active_key_registry = {}
  106. self._enabled = False
  107. @property
  108. def engine_running(self):
  109. return psutil.pid_exists(self._engine_pid)
  110. @property
  111. def xdisplay(self):
  112. return self._xdisplay
  113. @property
  114. def engine_window(self):
  115. return self._engine_window
  116. def _wait_for_engine_window(self, timeout_seconds=20, search_interval_seconds=2):
  117. start_epoch = time.time()
  118. while self.engine_running and (time.time() - start_epoch) <= timeout_seconds:
  119. windows = x_find_window_by_pid(
  120. self._xdisplay,
  121. self._engine_pid,
  122. )
  123. assert len(windows) <= 1
  124. if len(windows) == 1:
  125. return windows[0]
  126. time.sleep(search_interval_seconds)
  127. return None
  128. def run(self):
  129. self._engine_window = self._wait_for_engine_window()
  130. if not self._engine_window:
  131. raise Exception('Could not find the game\'s window.')
  132. self._grab_key(
  133. self._xdisplay.keysym_to_keycode(self._toggle_keysym),
  134. )
  135. if not self.enabled:
  136. keysym_name = XK.keysym_to_string(self._toggle_keysym)
  137. print("INFO Extended Controls are currently disabled."
  138. + " Press key '{}' to enable.".format(keysym_name))
  139. while self.engine_running:
  140. while self.xdisplay.pending_events():
  141. self._handle_xevent(self._xdisplay.next_event())
  142. self._check_active_key_registry()
  143. # keep timeout low for _check_active_key_registry()
  144. # to be called frequently
  145. x_wait_for_event(self.xdisplay, timeout_seconds=0.05)
  146. def _handle_xevent(self, xevent):
  147. if isinstance(xevent, Xlib.protocol.event.KeyPress) \
  148. or isinstance(xevent, Xlib.protocol.event.KeyRelease):
  149. self._handle_xkeyevent(xevent)
  150. def _handle_xkeyevent(self, xkeyevent):
  151. self._update_active_key_registry(xkeyevent)
  152. keysym_in = self._xdisplay.keycode_to_keysym(
  153. xkeyevent.detail,
  154. index=0,
  155. )
  156. if keysym_in in self._keysym_mappings:
  157. action = self._keysym_mappings[keysym_in]
  158. else:
  159. action = self._default_action
  160. action.execute(self, xkeyevent)
  161. def enable(self):
  162. for keysym in self._keysym_mappings.keys():
  163. if keysym != self._toggle_keysym:
  164. self._grab_key(
  165. self._xdisplay.keysym_to_keycode(keysym),
  166. )
  167. self._enabled = True
  168. # reset cache
  169. self._other_engine_windows = None
  170. print("INFO Enabled Extended Controls")
  171. def disable(self):
  172. for keysym in self._keysym_mappings.keys():
  173. if keysym != self._toggle_keysym:
  174. self._ungrab_key(
  175. self._xdisplay.keysym_to_keycode(keysym),
  176. )
  177. self._enabled = False
  178. print("INFO Disabled Extended Controls")
  179. @property
  180. def enabled(self):
  181. return self._enabled
  182. def toggle(self):
  183. if self.enabled:
  184. self.disable()
  185. else:
  186. self.enable()
  187. def _grab_key(self, keycode):
  188. self._engine_window.grab_key(
  189. keycode,
  190. X.AnyModifier,
  191. # owner_events
  192. # https://stackoverflow.com/questions/32122360/x11-will-xgrabpointer-prevent-other-apps-from-any-mouse-event
  193. # False,
  194. True,
  195. X.GrabModeAsync,
  196. X.GrabModeAsync,
  197. )
  198. def _ungrab_key(self, keycode):
  199. self._engine_window.ungrab_key(keycode, X.AnyModifier)
  200. def find_other_engine_windows(self):
  201. return x_find_window(
  202. self.xdisplay.screen().root,
  203. lambda w: w.get_wm_name() == TOONTOWN_WINDOW_NAME
  204. and w.id != self.engine_window.id,
  205. )
  206. @property
  207. def other_engine_windows(self):
  208. if not self._other_engine_windows:
  209. self._other_engine_windows = self.find_other_engine_windows()
  210. return self._other_engine_windows
  211. def _update_active_key_registry(self, xkeyevent):
  212. # see self._check_active_key_registry
  213. keycode = xkeyevent.detail
  214. if isinstance(xkeyevent, Xlib.protocol.event.KeyPress):
  215. self._active_key_registry[keycode] = xkeyevent
  216. elif keycode in self._active_key_registry:
  217. del self._active_key_registry[keycode]
  218. def _check_active_key_registry(self):
  219. """
  220. WORKAROUND
  221. For an unknown reason some key release events don't get queued
  222. when multiple keys are being released simultaneously.
  223. So we keep a hashmap of supposedly currently pressed keys
  224. and periodically compare it with xdispaly.query_keymap().
  225. ref: https://stackoverflow.com/q/18160792/5894777
  226. """
  227. # https://tronche.com/gui/x/xlib/input/XQueryKeymap.html
  228. keymap = self.xdisplay.query_keymap()
  229. missed_releases = []
  230. for keycode, press_event in self._active_key_registry.items():
  231. if not (1 << (keycode & ((1 << 3) - 1))) & keymap[keycode >> 3]:
  232. print("WARN missed release event of key {}".format(keycode))
  233. missed_releases.append(Xlib.protocol.event.KeyRelease(
  234. window=press_event.window,
  235. detail=press_event.detail,
  236. state=press_event.state,
  237. root_x=press_event.root_x,
  238. root_y=press_event.root_y,
  239. event_x=press_event.event_x,
  240. event_y=press_event.event_y,
  241. child=press_event.child,
  242. root=press_event.root,
  243. time=X.CurrentTime,
  244. same_screen=press_event.same_screen,
  245. ))
  246. for release_event in missed_releases:
  247. self._handle_xkeyevent(release_event)