login_manager.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. # systemctl-mqtt - MQTT client triggering & reporting shutdown on systemd-based systems
  2. #
  3. # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. import datetime
  18. import getpass
  19. import json
  20. import logging
  21. import typing
  22. import jeepney
  23. import jeepney.io.blocking
  24. import systemctl_mqtt._dbus
  25. _LOGGER = logging.getLogger(__name__)
  26. _LOGIN_MANAGER_OBJECT_PATH = "/org/freedesktop/login1"
  27. _LOGIN_MANAGER_INTERFACE = "org.freedesktop.login1.Manager"
  28. def _get_username() -> typing.Optional[str]:
  29. try:
  30. return getpass.getuser()
  31. except OSError:
  32. # > Traceback (most recent call last):
  33. # > File "/usr/local/lib/python3.13/getpass.py", line 173, in getuser
  34. # > return pwd.getpwuid(os.getuid())[0]
  35. # > ~~~~~~~~~~~~^^^^^^^^^^^^^
  36. # > KeyError: 'getpwuid(): uid not found: 100'
  37. #
  38. # > The above exception was the direct cause of the following exception:
  39. # > …
  40. # > OSError: No username set in the environment
  41. return None
  42. def _log_interactive_authorization_required(
  43. *, action_label: str, action_id: str
  44. ) -> None:
  45. _LOGGER.error(
  46. """failed to %s: interactive authorization required
  47. create %s and insert the following rule:
  48. polkit.addRule(function(action, subject) {
  49. if(action.id === %s && subject.user === %s) {
  50. return polkit.Result.YES;
  51. }
  52. });
  53. """,
  54. action_label,
  55. "/etc/polkit-1/rules.d/50-systemctl-mqtt.rules",
  56. json.dumps(action_id),
  57. json.dumps(_get_username() or "USERNAME"),
  58. )
  59. def get_login_manager_signal_match_rule(member: str) -> jeepney.MatchRule:
  60. return jeepney.MatchRule(
  61. type="signal",
  62. interface=_LOGIN_MANAGER_INTERFACE,
  63. member=member,
  64. path=_LOGIN_MANAGER_OBJECT_PATH,
  65. )
  66. class LoginManager(systemctl_mqtt._dbus.Properties): # pylint: disable=protected-access
  67. """
  68. https://freedesktop.org/wiki/Software/systemd/logind/
  69. $ python3 -m jeepney.bindgen \
  70. --bus unix:path=/var/run/dbus/system_bus_socket \
  71. --name org.freedesktop.login1 --path /org/freedesktop/login1
  72. """
  73. interface = _LOGIN_MANAGER_INTERFACE
  74. def __init__(self):
  75. super().__init__(
  76. object_path=_LOGIN_MANAGER_OBJECT_PATH, bus_name="org.freedesktop.login1"
  77. )
  78. # pylint: disable=invalid-name; inherited method names from Manager object
  79. def ListInhibitors(self) -> jeepney.low_level.Message:
  80. return jeepney.new_method_call(remote_obj=self, method="ListInhibitors")
  81. def LockSessions(self) -> jeepney.low_level.Message:
  82. return jeepney.new_method_call(remote_obj=self, method="LockSessions")
  83. def CanPowerOff(self) -> jeepney.low_level.Message:
  84. return jeepney.new_method_call(remote_obj=self, method="CanPowerOff")
  85. def ScheduleShutdown(
  86. self, *, action: str, time: datetime.datetime
  87. ) -> jeepney.low_level.Message:
  88. return jeepney.new_method_call(
  89. remote_obj=self,
  90. method="ScheduleShutdown",
  91. signature="st",
  92. body=(action, int(time.timestamp() * 1e6)), # (type, usec)
  93. )
  94. def Suspend(self, *, interactive: bool) -> jeepney.low_level.Message:
  95. return jeepney.new_method_call(
  96. remote_obj=self, method="Suspend", signature="b", body=(interactive,)
  97. )
  98. def Inhibit(
  99. self, *, what: str, who: str, why: str, mode: str
  100. ) -> jeepney.low_level.Message:
  101. return jeepney.new_method_call(
  102. remote_obj=self,
  103. method="Inhibit",
  104. signature="ssss",
  105. body=(what, who, why, mode),
  106. )
  107. def get_login_manager_proxy() -> jeepney.io.blocking.Proxy:
  108. # https://jeepney.readthedocs.io/en/latest/integrate.html
  109. # https://gitlab.com/takluyver/jeepney/-/blob/master/examples/aio_notify.py
  110. return jeepney.io.blocking.Proxy(
  111. msggen=LoginManager(),
  112. connection=jeepney.io.blocking.open_dbus_connection(
  113. bus="SYSTEM",
  114. # > dbus-broker[…]: Peer :1.… is being disconnected as it does not
  115. # . support receiving file descriptors it requested.
  116. enable_fds=True,
  117. ),
  118. )
  119. def _log_shutdown_inhibitors(login_manager_proxy: jeepney.io.blocking.Proxy) -> None:
  120. if _LOGGER.getEffectiveLevel() > logging.DEBUG:
  121. return
  122. found_inhibitor = False
  123. try:
  124. # https://www.freedesktop.org/wiki/Software/systemd/inhibit/
  125. (inhibitors,) = login_manager_proxy.ListInhibitors()
  126. for what, who, why, mode, uid, pid in inhibitors:
  127. if "shutdown" in what:
  128. found_inhibitor = True
  129. _LOGGER.debug(
  130. "detected shutdown inhibitor %s (pid=%u, uid=%u, mode=%s): %s",
  131. who,
  132. pid,
  133. uid,
  134. mode,
  135. why,
  136. )
  137. except jeepney.wrappers.DBusErrorResponse as exc:
  138. _LOGGER.warning("failed to fetch shutdown inhibitors: %s", exc)
  139. return
  140. if not found_inhibitor:
  141. _LOGGER.debug("no shutdown inhibitor locks found")
  142. def schedule_shutdown(*, action: str, delay: datetime.timedelta) -> None:
  143. # https://github.com/systemd/systemd/blob/v237/src/systemctl/systemctl.c#L8553
  144. assert action in ["poweroff", "reboot"], action
  145. time = datetime.datetime.now() + delay
  146. # datetime.datetime.isoformat(timespec=) not available in python3.5
  147. # https://github.com/python/cpython/blob/v3.5.9/Lib/datetime.py#L1552
  148. _LOGGER.info("scheduling %s for %s", action, time.strftime("%Y-%m-%d %H:%M:%S"))
  149. login_manager = get_login_manager_proxy()
  150. try:
  151. # $ gdbus introspect --system --dest org.freedesktop.login1 \
  152. # --object-path /org/freedesktop/login1 | grep -A 1 ScheduleShutdown
  153. # ScheduleShutdown(in s arg_0,
  154. # in t arg_1);
  155. # $ gdbus call --system --dest org.freedesktop.login1 \
  156. # --object-path /org/freedesktop/login1 \
  157. # --method org.freedesktop.login1.Manager.ScheduleShutdown \
  158. # poweroff "$(date --date=10min +%s)000000"
  159. # $ dbus-send --type=method_call --print-reply --system --dest=org.freedesktop.login1 \
  160. # /org/freedesktop/login1 \
  161. # org.freedesktop.login1.Manager.ScheduleShutdown \
  162. # string:poweroff "uint64:$(date --date=10min +%s)000000"
  163. login_manager.ScheduleShutdown(action=action, time=time)
  164. except jeepney.wrappers.DBusErrorResponse as exc:
  165. if exc.name == "org.freedesktop.DBus.Error.InteractiveAuthorizationRequired":
  166. _log_interactive_authorization_required(
  167. action_label="schedule " + action,
  168. action_id="org.freedesktop.login1."
  169. + {"poweroff": "power-off"}.get(action, action),
  170. )
  171. else:
  172. _LOGGER.error("failed to schedule %s: %s", action, exc)
  173. _log_shutdown_inhibitors(login_manager)
  174. def suspend() -> None:
  175. _LOGGER.info("suspending system")
  176. get_login_manager_proxy().Suspend(interactive=False)
  177. def lock_all_sessions() -> None:
  178. """
  179. $ loginctl lock-sessions
  180. """
  181. _LOGGER.info("instruct all sessions to activate screen locks")
  182. login_manager = get_login_manager_proxy()
  183. try:
  184. login_manager.LockSessions()
  185. except jeepney.wrappers.DBusErrorResponse as exc:
  186. if exc.name == "org.freedesktop.DBus.Error.InteractiveAuthorizationRequired":
  187. _log_interactive_authorization_required(
  188. action_label="lock all sessions",
  189. action_id="org.freedesktop.login1.lock-sessions",
  190. )
  191. else:
  192. _LOGGER.error("failed to lock all sessions: %s", exc)