login_manager.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. # systemctl-mqtt - MQTT client triggering & reporting shutdown on systemd-based systems
  2. #
  3. # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. import datetime
  18. import getpass
  19. import json
  20. import logging
  21. import jeepney
  22. import jeepney.io.blocking
  23. import systemctl_mqtt._dbus
  24. _LOGGER = logging.getLogger(__name__)
  25. _LOGIN_MANAGER_OBJECT_PATH = "/org/freedesktop/login1"
  26. _LOGIN_MANAGER_INTERFACE = "org.freedesktop.login1.Manager"
  27. def _get_username() -> str | None:
  28. try:
  29. return getpass.getuser()
  30. except OSError:
  31. # > Traceback (most recent call last):
  32. # > File "/usr/local/lib/python3.13/getpass.py", line 173, in getuser
  33. # > return pwd.getpwuid(os.getuid())[0]
  34. # > ~~~~~~~~~~~~^^^^^^^^^^^^^
  35. # > KeyError: 'getpwuid(): uid not found: 100'
  36. #
  37. # > The above exception was the direct cause of the following exception:
  38. # > …
  39. # > OSError: No username set in the environment
  40. return None
  41. def _log_interactive_authorization_required(
  42. *, action_label: str, action_id: str
  43. ) -> None:
  44. _LOGGER.error(
  45. """failed to %s: interactive authorization required
  46. create %s and insert the following rule:
  47. polkit.addRule(function(action, subject) {
  48. if(action.id === %s && subject.user === %s) {
  49. return polkit.Result.YES;
  50. }
  51. });
  52. """,
  53. action_label,
  54. "/etc/polkit-1/rules.d/50-systemctl-mqtt.rules",
  55. json.dumps(action_id),
  56. json.dumps(_get_username() or "USERNAME"),
  57. )
  58. def get_login_manager_signal_match_rule(member: str) -> jeepney.MatchRule:
  59. return jeepney.MatchRule(
  60. type="signal",
  61. interface=_LOGIN_MANAGER_INTERFACE,
  62. member=member,
  63. path=_LOGIN_MANAGER_OBJECT_PATH,
  64. )
  65. class LoginManager(systemctl_mqtt._dbus.Properties): # pylint: disable=protected-access
  66. """
  67. https://freedesktop.org/wiki/Software/systemd/logind/
  68. $ python3 -m jeepney.bindgen \
  69. --bus unix:path=/var/run/dbus/system_bus_socket \
  70. --name org.freedesktop.login1 --path /org/freedesktop/login1
  71. """
  72. interface = _LOGIN_MANAGER_INTERFACE
  73. def __init__(self):
  74. super().__init__(
  75. object_path=_LOGIN_MANAGER_OBJECT_PATH, bus_name="org.freedesktop.login1"
  76. )
  77. # pylint: disable=invalid-name; inherited method names from Manager object
  78. def ListInhibitors(self) -> jeepney.low_level.Message:
  79. return jeepney.new_method_call(remote_obj=self, method="ListInhibitors")
  80. def LockSessions(self) -> jeepney.low_level.Message:
  81. return jeepney.new_method_call(remote_obj=self, method="LockSessions")
  82. def CanPowerOff(self) -> jeepney.low_level.Message:
  83. return jeepney.new_method_call(remote_obj=self, method="CanPowerOff")
  84. def ScheduleShutdown(
  85. self, *, action: str, time: datetime.datetime
  86. ) -> jeepney.low_level.Message:
  87. return jeepney.new_method_call(
  88. remote_obj=self,
  89. method="ScheduleShutdown",
  90. signature="st",
  91. body=(action, int(time.timestamp() * 1e6)), # (type, usec)
  92. )
  93. def Suspend(self, *, interactive: bool) -> jeepney.low_level.Message:
  94. return jeepney.new_method_call(
  95. remote_obj=self, method="Suspend", signature="b", body=(interactive,)
  96. )
  97. def Inhibit(
  98. self, *, what: str, who: str, why: str, mode: str
  99. ) -> jeepney.low_level.Message:
  100. return jeepney.new_method_call(
  101. remote_obj=self,
  102. method="Inhibit",
  103. signature="ssss",
  104. body=(what, who, why, mode),
  105. )
  106. def get_login_manager_proxy() -> jeepney.io.blocking.Proxy:
  107. # https://jeepney.readthedocs.io/en/latest/integrate.html
  108. # https://gitlab.com/takluyver/jeepney/-/blob/master/examples/aio_notify.py
  109. return jeepney.io.blocking.Proxy(
  110. msggen=LoginManager(),
  111. connection=jeepney.io.blocking.open_dbus_connection(
  112. bus="SYSTEM",
  113. # > dbus-broker[…]: Peer :1.… is being disconnected as it does not
  114. # . support receiving file descriptors it requested.
  115. enable_fds=True,
  116. ),
  117. )
  118. def _log_shutdown_inhibitors(login_manager_proxy: jeepney.io.blocking.Proxy) -> None:
  119. if _LOGGER.getEffectiveLevel() > logging.DEBUG:
  120. return
  121. found_inhibitor = False
  122. try:
  123. # https://www.freedesktop.org/wiki/Software/systemd/inhibit/
  124. (inhibitors,) = login_manager_proxy.ListInhibitors()
  125. for what, who, why, mode, uid, pid in inhibitors:
  126. if "shutdown" in what:
  127. found_inhibitor = True
  128. _LOGGER.debug(
  129. "detected shutdown inhibitor %s (pid=%u, uid=%u, mode=%s): %s",
  130. who,
  131. pid,
  132. uid,
  133. mode,
  134. why,
  135. )
  136. except jeepney.wrappers.DBusErrorResponse as exc:
  137. _LOGGER.warning("failed to fetch shutdown inhibitors: %s", exc)
  138. return
  139. if not found_inhibitor:
  140. _LOGGER.debug("no shutdown inhibitor locks found")
  141. def schedule_shutdown(*, action: str, delay: datetime.timedelta) -> None:
  142. # https://github.com/systemd/systemd/blob/v237/src/systemctl/systemctl.c#L8553
  143. assert action in ["poweroff", "reboot"], action
  144. time = datetime.datetime.now() + delay
  145. # datetime.datetime.isoformat(timespec=) not available in python3.5
  146. # https://github.com/python/cpython/blob/v3.5.9/Lib/datetime.py#L1552
  147. _LOGGER.info("scheduling %s for %s", action, time.strftime("%Y-%m-%d %H:%M:%S"))
  148. login_manager = get_login_manager_proxy()
  149. try:
  150. # $ gdbus introspect --system --dest org.freedesktop.login1 \
  151. # --object-path /org/freedesktop/login1 | grep -A 1 ScheduleShutdown
  152. # ScheduleShutdown(in s arg_0,
  153. # in t arg_1);
  154. # $ gdbus call --system --dest org.freedesktop.login1 \
  155. # --object-path /org/freedesktop/login1 \
  156. # --method org.freedesktop.login1.Manager.ScheduleShutdown \
  157. # poweroff "$(date --date=10min +%s)000000"
  158. # $ dbus-send --type=method_call --print-reply --system --dest=org.freedesktop.login1 \
  159. # /org/freedesktop/login1 \
  160. # org.freedesktop.login1.Manager.ScheduleShutdown \
  161. # string:poweroff "uint64:$(date --date=10min +%s)000000"
  162. login_manager.ScheduleShutdown(action=action, time=time)
  163. except jeepney.wrappers.DBusErrorResponse as exc:
  164. if exc.name == "org.freedesktop.DBus.Error.InteractiveAuthorizationRequired":
  165. _log_interactive_authorization_required(
  166. action_label="schedule " + action,
  167. action_id="org.freedesktop.login1."
  168. + {"poweroff": "power-off"}.get(action, action),
  169. )
  170. else:
  171. _LOGGER.error("failed to schedule %s: %s", action, exc)
  172. _log_shutdown_inhibitors(login_manager)
  173. def suspend() -> None:
  174. _LOGGER.info("suspending system")
  175. get_login_manager_proxy().Suspend(interactive=False)
  176. def lock_all_sessions() -> None:
  177. """
  178. $ loginctl lock-sessions
  179. """
  180. _LOGGER.info("instruct all sessions to activate screen locks")
  181. login_manager = get_login_manager_proxy()
  182. try:
  183. login_manager.LockSessions()
  184. except jeepney.wrappers.DBusErrorResponse as exc:
  185. if exc.name == "org.freedesktop.DBus.Error.InteractiveAuthorizationRequired":
  186. _log_interactive_authorization_required(
  187. action_label="lock all sessions",
  188. action_id="org.freedesktop.login1.lock-sessions",
  189. )
  190. else:
  191. _LOGGER.error("failed to lock all sessions: %s", exc)