controls.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. import copy
  2. import select
  3. import time
  4. from tooncher.actions import *
  5. try:
  6. import psutil
  7. except ImportError:
  8. psutil = False
  9. try:
  10. import Xlib.display
  11. from Xlib import X, XK
  12. except ImportError:
  13. Xlib = False
  14. EXTENDED_CONTROLS_DEFAULT_TOGGLE_KEYSYM_NAME = 'grave'
  15. TOONTOWN_WINDOW_NAME = 'Toontown Rewritten'
  16. if Xlib:
  17. EXTENDED_CONTROLS_DEFAULT_KEYSYM_MAPPINGS = {
  18. XK.XK_w: RewriteKeyEventAction(keysym=XK.XK_Up, target_engine_index=0),
  19. XK.XK_a: RewriteKeyEventAction(keysym=XK.XK_Left, target_engine_index=0),
  20. XK.XK_s: RewriteKeyEventAction(keysym=XK.XK_Down, target_engine_index=0),
  21. XK.XK_d: RewriteKeyEventAction(keysym=XK.XK_Right, target_engine_index=0),
  22. XK.XK_Control_L: RewriteKeyEventAction(keysym=XK.XK_Control_L, target_engine_index=0),
  23. XK.XK_v: RewriteKeyEventAction(keysym=XK.XK_Delete, target_engine_index=0),
  24. XK.XK_i: RewriteKeyEventAction(keysym=XK.XK_Up, target_engine_index=1),
  25. XK.XK_j: RewriteKeyEventAction(keysym=XK.XK_Left, target_engine_index=1),
  26. XK.XK_k: RewriteKeyEventAction(keysym=XK.XK_Down, target_engine_index=1),
  27. XK.XK_l: RewriteKeyEventAction(keysym=XK.XK_Right, target_engine_index=1),
  28. XK.XK_slash: RewriteKeyEventAction(keysym=XK.XK_Control_L, target_engine_index=1),
  29. XK.XK_n: RewriteKeyEventAction(keysym=XK.XK_Delete, target_engine_index=1),
  30. XK.XK_space: RewriteKeyEventAction(keysym=XK.XK_Control_L, target_engine_index=TargetEngine.All),
  31. XK.XK_e: SelectGagAction(target_engine_index=0, factor_x=0.038, factor_y=-0.047), # elephant trunk
  32. XK.XK_o: SelectGagAction(target_engine_index=1, factor_x=0.038, factor_y=-0.047), # elephant trunk
  33. XK.XK_f: SelectGagAction(target_engine_index=0, factor_x=0.119, factor_y=-0.047), # foghorn
  34. XK.XK_semicolon: SelectGagAction(target_engine_index=1, factor_x=0.119, factor_y=-0.047), # foghorn
  35. }
  36. def x_find_window(parent_window, filter_callback):
  37. matching = []
  38. for child_window in parent_window.query_tree().children:
  39. if filter_callback(child_window):
  40. matching.append(child_window)
  41. matching += x_find_window(child_window, filter_callback)
  42. return matching
  43. def x_find_window_by_pid(display, pid):
  44. pid_prop = display.intern_atom('_NET_WM_PID')
  45. def filter_callback(window):
  46. prop = window.get_full_property(pid_prop, X.AnyPropertyType)
  47. return prop and prop.value.tolist() == [pid]
  48. return x_find_window(display.screen().root, filter_callback)
  49. def x_wait_for_event(xdisplay, timeout_seconds):
  50. """ Wait up to `timeout_seconds` seconds for a xevent.
  51. Return True, if a xevent is available.
  52. Return False, if the timeout was reached. """
  53. rlist = select.select(
  54. [xdisplay.display.socket], # rlist
  55. [], # wlist
  56. [], # xlist
  57. timeout_seconds, # timeout [seconds]
  58. )[0]
  59. return len(rlist) > 0
  60. class ExtendedControls:
  61. def __init__(self, primary_engine_pid,
  62. toggle_keysym_name=EXTENDED_CONTROLS_DEFAULT_TOGGLE_KEYSYM_NAME):
  63. if not psutil:
  64. raise Exception('\n'.join([
  65. 'Extended keyboard controls require the python lib psutil to be installed.',
  66. 'Depending on your system run',
  67. '\t$ sudo apt-get install python3-psutil',
  68. 'or',
  69. '\t$ pip3 install --user psutil',
  70. ]))
  71. if not Xlib:
  72. raise Exception('\n'.join([
  73. 'Extended keyboard controls require xlib for python to be installed.',
  74. 'Depending on your system run',
  75. '\t$ sudo apt-get install python3-xlib',
  76. 'or',
  77. '\t$ pip3 install --user xlib',
  78. ]))
  79. self._primary_engine_pid = primary_engine_pid
  80. self._xdisplay = Xlib.display.Display()
  81. self._toggle_keysym = XK.string_to_keysym(toggle_keysym_name)
  82. if self._toggle_keysym == X.NoSymbol:
  83. raise Exception("Extended keyboard controls toggle:"
  84. + " Unknown keysym name '{}'".format(toggle_keysym_name))
  85. self._keysym_mappings = copy.deepcopy(
  86. EXTENDED_CONTROLS_DEFAULT_KEYSYM_MAPPINGS,
  87. )
  88. if self._toggle_keysym in self._keysym_mappings:
  89. print("INFO Extended Controls:"
  90. + " Ignoring mapping for toggle key '{}'".format(toggle_keysym_name))
  91. self._keysym_mappings[self._toggle_keysym] \
  92. = ToggleExtendedControlsAction()
  93. self._default_action = ForwardKeyEventAction()
  94. self._primary_engine_window = None
  95. self._engine_windows_by_target_index = None
  96. self._active_key_registry = {}
  97. self._enabled = False
  98. @property
  99. def engine_running(self):
  100. return psutil.pid_exists(self._primary_engine_pid)
  101. @property
  102. def xdisplay(self):
  103. return self._xdisplay
  104. @property
  105. def primary_engine_window(self):
  106. return self._primary_engine_window
  107. def _wait_for_engine_window(self, timeout_seconds=20, search_interval_seconds=2):
  108. start_epoch = time.time()
  109. while self.engine_running and (time.time() - start_epoch) <= timeout_seconds:
  110. windows = x_find_window_by_pid(
  111. self._xdisplay,
  112. self._primary_engine_pid,
  113. )
  114. assert len(windows) <= 1
  115. if len(windows) == 1:
  116. return windows[0]
  117. time.sleep(search_interval_seconds)
  118. return None
  119. def run(self):
  120. self._primary_engine_window = self._wait_for_engine_window()
  121. if not self._primary_engine_window:
  122. raise Exception('Could not find the game\'s window.')
  123. self._grab_key(
  124. self._xdisplay.keysym_to_keycode(self._toggle_keysym),
  125. )
  126. if not self.enabled:
  127. keysym_name = XK.keysym_to_string(self._toggle_keysym)
  128. print("INFO Extended Controls are currently disabled."
  129. + " Press key '{}' to enable.".format(keysym_name))
  130. while self.engine_running:
  131. while self.xdisplay.pending_events():
  132. self._handle_xevent(self._xdisplay.next_event())
  133. self._check_active_key_registry()
  134. # keep timeout low for _check_active_key_registry()
  135. # to be called frequently
  136. x_wait_for_event(self.xdisplay, timeout_seconds=0.05)
  137. def _handle_xevent(self, xevent):
  138. if isinstance(xevent, Xlib.protocol.event.KeyPress) \
  139. or isinstance(xevent, Xlib.protocol.event.KeyRelease):
  140. self._handle_xkeyevent(xevent)
  141. def _handle_xkeyevent(self, xkeyevent):
  142. self._update_active_key_registry(xkeyevent)
  143. keysym_in = self._xdisplay.keycode_to_keysym(
  144. xkeyevent.detail,
  145. index=0,
  146. )
  147. if keysym_in in self._keysym_mappings:
  148. action = self._keysym_mappings[keysym_in]
  149. else:
  150. action = self._default_action
  151. action.execute(self, xkeyevent)
  152. def enable(self):
  153. for keysym in self._keysym_mappings.keys():
  154. if keysym != self._toggle_keysym:
  155. self._grab_key(
  156. self._xdisplay.keysym_to_keycode(keysym),
  157. )
  158. self._enabled = True
  159. # reset cache
  160. self._engine_windows_by_target_index = None
  161. print("INFO Enabled Extended Controls")
  162. def disable(self):
  163. for keysym in self._keysym_mappings.keys():
  164. if keysym != self._toggle_keysym:
  165. self._ungrab_key(
  166. self._xdisplay.keysym_to_keycode(keysym),
  167. )
  168. self._enabled = False
  169. print("INFO Disabled Extended Controls")
  170. @property
  171. def enabled(self):
  172. return self._enabled
  173. def toggle(self):
  174. if self.enabled:
  175. self.disable()
  176. else:
  177. self.enable()
  178. def _grab_key(self, keycode):
  179. self.primary_engine_window.grab_key(
  180. keycode,
  181. X.AnyModifier,
  182. # owner_events
  183. # https://stackoverflow.com/questions/32122360/x11-will-xgrabpointer-prevent-other-apps-from-any-mouse-event
  184. # False,
  185. True,
  186. X.GrabModeAsync,
  187. X.GrabModeAsync,
  188. )
  189. def _ungrab_key(self, keycode):
  190. self.primary_engine_window.ungrab_key(keycode, X.AnyModifier)
  191. def find_engine_windows(self):
  192. return x_find_window(
  193. self.xdisplay.screen().root,
  194. lambda w: w.get_wm_name() == TOONTOWN_WINDOW_NAME,
  195. )
  196. @property
  197. def engine_windows_by_target_index(self):
  198. if not self._engine_windows_by_target_index:
  199. win_by_index = {}
  200. for target_index, win in enumerate(self.find_engine_windows()):
  201. print('INFO Engine window {} has no target index, assuming {}'.format(
  202. win.id,
  203. target_index,
  204. ))
  205. if not target_index in win_by_index:
  206. win_by_index[target_index] = []
  207. win_by_index[target_index].append(win)
  208. self._engine_windows_by_target_index = win_by_index
  209. return self._engine_windows_by_target_index
  210. @property
  211. def engine_windows(self):
  212. return [w for g in self.engine_windows_by_target_index.values() for w in g]
  213. def _update_active_key_registry(self, xkeyevent):
  214. # see self._check_active_key_registry
  215. keycode = xkeyevent.detail
  216. if isinstance(xkeyevent, Xlib.protocol.event.KeyPress):
  217. self._active_key_registry[keycode] = xkeyevent
  218. elif keycode in self._active_key_registry:
  219. del self._active_key_registry[keycode]
  220. def _check_active_key_registry(self):
  221. """
  222. WORKAROUND
  223. For an unknown reason some key release events don't get queued
  224. when multiple keys are being released simultaneously.
  225. So we keep a hashmap of supposedly currently pressed keys
  226. and periodically compare it with xdispaly.query_keymap().
  227. ref: https://stackoverflow.com/q/18160792/5894777
  228. """
  229. # https://tronche.com/gui/x/xlib/input/XQueryKeymap.html
  230. keymap = self.xdisplay.query_keymap()
  231. missed_releases = []
  232. for keycode, press_event in self._active_key_registry.items():
  233. byte_index = keycode >> 3
  234. bit_index = keycode & ((1 << 3) - 1)
  235. if not keymap[byte_index] & (1 << bit_index):
  236. print("DEBUG missed release event of key {}".format(keycode))
  237. missed_releases.append(Xlib.protocol.event.KeyRelease(
  238. window=press_event.window,
  239. detail=press_event.detail,
  240. state=press_event.state,
  241. root_x=press_event.root_x,
  242. root_y=press_event.root_y,
  243. event_x=press_event.event_x,
  244. event_y=press_event.event_y,
  245. child=press_event.child,
  246. root=press_event.root,
  247. time=X.CurrentTime,
  248. same_screen=press_event.same_screen,
  249. ))
  250. for release_event in missed_releases:
  251. self._handle_xkeyevent(release_event)