__init__.py 16 KB


  1. # systemctl-mqtt - MQTT client triggering & reporting shutdown on systemd-based systems
  2. #
  3. # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. import abc
  18. import argparse
  19. import asyncio
  20. import datetime
  21. import functools
  22. import importlib.metadata
  23. import json
  24. import logging
  25. import os
  26. import pathlib
  27. import socket
  28. import ssl
  29. import threading
  30. import typing
  31. import aiomqtt
  32. import jeepney
  33. import jeepney.bus_messages
  34. import jeepney.io.asyncio
  35. import systemctl_mqtt._dbus
  36. import systemctl_mqtt._homeassistant
  37. import systemctl_mqtt._mqtt
  38. _MQTT_DEFAULT_PORT = 1883
  39. _MQTT_DEFAULT_TLS_PORT = 8883
  40. _ARGUMENT_LOG_LEVEL_MAPPING = {
  41. a: getattr(logging, a.upper())
  42. for a in ("debug", "info", "warning", "error", "critical")
  43. }
  44. _LOGGER = logging.getLogger(__name__)
  45. class _State:
  46. def __init__(
  47. self,
  48. *,
  49. mqtt_topic_prefix: str,
  50. homeassistant_discovery_prefix: str,
  51. homeassistant_discovery_object_id: str,
  52. poweroff_delay: datetime.timedelta,
  53. ) -> None:
  54. self._mqtt_topic_prefix = mqtt_topic_prefix
  55. self._homeassistant_discovery_prefix = homeassistant_discovery_prefix
  56. self._homeassistant_discovery_object_id = homeassistant_discovery_object_id
  57. self._login_manager = systemctl_mqtt._dbus.get_login_manager_proxy()
  58. self._shutdown_lock: typing.Optional[jeepney.fds.FileDescriptor] = None
  59. self._shutdown_lock_mutex = threading.Lock()
  60. self.poweroff_delay = poweroff_delay
  61. @property
  62. def mqtt_topic_prefix(self) -> str:
  63. return self._mqtt_topic_prefix
  64. @property
  65. def shutdown_lock_acquired(self) -> bool:
  66. return self._shutdown_lock is not None
  67. def acquire_shutdown_lock(self) -> None:
  68. with self._shutdown_lock_mutex:
  69. assert self._shutdown_lock is None
  70. # https://www.freedesktop.org/wiki/Software/systemd/inhibit/
  71. (self._shutdown_lock,) = self._login_manager.Inhibit(
  72. what="shutdown",
  73. who="systemctl-mqtt",
  74. why="Report shutdown via MQTT",
  75. mode="delay",
  76. )
  77. assert isinstance(
  78. self._shutdown_lock, jeepney.fds.FileDescriptor
  79. ), self._shutdown_lock
  80. _LOGGER.debug("acquired shutdown inhibitor lock")
  81. def release_shutdown_lock(self) -> None:
  82. with self._shutdown_lock_mutex:
  83. if self._shutdown_lock:
  84. self._shutdown_lock.close()
  85. _LOGGER.debug("released shutdown inhibitor lock")
  86. self._shutdown_lock = None
  87. @property
  88. def _preparing_for_shutdown_topic(self) -> str:
  89. return self.mqtt_topic_prefix + "/preparing-for-shutdown"
  90. async def _publish_preparing_for_shutdown(
  91. self, *, mqtt_client: aiomqtt.Client, active: bool
  92. ) -> None:
  93. topic = self._preparing_for_shutdown_topic
  94. # pylint: disable=protected-access
  95. payload = systemctl_mqtt._mqtt.encode_bool(active)
  96. _LOGGER.info("publishing %r on %s", payload, topic)
  97. await mqtt_client.publish(topic=topic, payload=payload, retain=False)
  98. async def preparing_for_shutdown_handler(
  99. self, active: bool, mqtt_client: aiomqtt.Client
  100. ) -> None:
  101. active = bool(active)
  102. await self._publish_preparing_for_shutdown(
  103. mqtt_client=mqtt_client, active=active
  104. )
  105. if active:
  106. self.release_shutdown_lock()
  107. else:
  108. self.acquire_shutdown_lock()
  109. async def publish_preparing_for_shutdown(self, mqtt_client: aiomqtt.Client) -> None:
  110. try:
  111. ((return_type, active),) = self._login_manager.Get("PreparingForShutdown")
  112. except jeepney.wrappers.DBusErrorResponse as exc:
  113. _LOGGER.error(
  114. "failed to read logind's PreparingForShutdown property: %s", exc
  115. )
  116. return
  117. assert return_type == "b", return_type
  118. assert isinstance(active, bool), active
  119. await self._publish_preparing_for_shutdown(
  120. mqtt_client=mqtt_client, active=active
  121. )
  122. async def publish_homeassistant_device_config(
  123. self, mqtt_client: aiomqtt.Client
  124. ) -> None:
  125. # <discovery_prefix>/<component>/[<node_id>/]<object_id>/config
  126. # https://www.home-assistant.io/integrations/mqtt/#mqtt-discovery
  127. discovery_topic = "/".join(
  128. (
  129. self._homeassistant_discovery_prefix,
  130. "device",
  131. self._homeassistant_discovery_object_id,
  132. "config",
  133. )
  134. )
  135. hostname = (
  136. # pylint: disable=protected-access; function in internal module
  137. systemctl_mqtt._utils.get_hostname()
  138. )
  139. package_metadata = importlib.metadata.metadata(__name__)
  140. unique_id_prefix = "systemctl-mqtt-" + hostname
  141. config = {
  142. "device": {"identifiers": [hostname], "name": hostname},
  143. "origin": {
  144. "name": package_metadata["Name"],
  145. "sw_version": package_metadata["Version"],
  146. "support_url": package_metadata["Home-page"],
  147. },
  148. "components": {
  149. "logind/preparing-for-shutdown": {
  150. "unique_id": unique_id_prefix + "-logind-preparing-for-shutdown",
  151. "object_id": f"{hostname}_logind_preparing_for_shutdown", # entity id
  152. "name": "preparing for shutdown", # home assistant prepends device name
  153. "platform": "binary_sensor",
  154. "state_topic": self._preparing_for_shutdown_topic,
  155. # pylint: disable=protected-access
  156. "payload_on": systemctl_mqtt._mqtt.encode_bool(True),
  157. "payload_off": systemctl_mqtt._mqtt.encode_bool(False),
  158. },
  159. },
  160. }
  161. for mqtt_topic_suffix in _MQTT_TOPIC_SUFFIX_ACTION_MAPPING.keys():
  162. # false positive warning by mypy:
  163. # > Unsupported target for indexed assignment
  164. config["components"]["logind/" + mqtt_topic_suffix] = { # type: ignore
  165. "unique_id": unique_id_prefix + "-logind-" + mqtt_topic_suffix,
  166. "object_id": hostname
  167. + "_logind_"
  168. + mqtt_topic_suffix.replace("-", "_"), # entity id
  169. "name": mqtt_topic_suffix.replace("-", " "),
  170. "platform": "button",
  171. "command_topic": self.mqtt_topic_prefix + "/" + mqtt_topic_suffix,
  172. }
  173. _LOGGER.debug("publishing home assistant config on %s", discovery_topic)
  174. await mqtt_client.publish(
  175. topic=discovery_topic, payload=json.dumps(config), retain=False
  176. )
  177. class _MQTTAction(metaclass=abc.ABCMeta):
  178. @abc.abstractmethod
  179. def trigger(self, state: _State) -> None:
  180. pass # pragma: no cover
  181. def __str__(self) -> str:
  182. return type(self).__name__
  183. class _MQTTActionSchedulePoweroff(_MQTTAction):
  184. # pylint: disable=too-few-public-methods
  185. def trigger(self, state: _State) -> None:
  186. # pylint: disable=protected-access
  187. systemctl_mqtt._dbus.schedule_shutdown(
  188. action="poweroff", delay=state.poweroff_delay
  189. )
  190. class _MQTTActionLockAllSessions(_MQTTAction):
  191. # pylint: disable=too-few-public-methods
  192. def trigger(self, state: _State) -> None:
  193. # pylint: disable=protected-access
  194. systemctl_mqtt._dbus.lock_all_sessions()
  195. class _MQTTActionSuspend(_MQTTAction):
  196. # pylint: disable=too-few-public-methods
  197. def trigger(self, state: _State) -> None:
  198. # pylint: disable=protected-access
  199. systemctl_mqtt._dbus.suspend()
  200. _MQTT_TOPIC_SUFFIX_ACTION_MAPPING = {
  201. "poweroff": _MQTTActionSchedulePoweroff(),
  202. "lock-all-sessions": _MQTTActionLockAllSessions(),
  203. "suspend": _MQTTActionSuspend(),
  204. }
  205. async def _mqtt_message_loop(*, state: _State, mqtt_client: aiomqtt.Client) -> None:
  206. action_by_topic: typing.Dict[str, _MQTTAction] = {}
  207. for topic_suffix, action in _MQTT_TOPIC_SUFFIX_ACTION_MAPPING.items():
  208. topic = state.mqtt_topic_prefix + "/" + topic_suffix
  209. _LOGGER.info("subscribing to %s", topic)
  210. await mqtt_client.subscribe(topic)
  211. action_by_topic[topic] = action
  212. async for message in mqtt_client.messages:
  213. if message.retain:
  214. _LOGGER.info("ignoring retained message on topic %r", message.topic.value)
  215. else:
  216. _LOGGER.debug(
  217. "received message on topic %r: %r", message.topic.value, message.payload
  218. )
  219. action_by_topic[message.topic.value].trigger(state=state)
  220. async def _dbus_signal_loop(*, state: _State, mqtt_client: aiomqtt.Client) -> None:
  221. async with jeepney.io.asyncio.open_dbus_router(bus="SYSTEM") as router:
  222. # router: jeepney.io.asyncio.DBusRouter
  223. bus_proxy = jeepney.io.asyncio.Proxy(
  224. msggen=jeepney.bus_messages.message_bus, router=router
  225. )
  226. preparing_for_shutdown_match_rule = (
  227. # pylint: disable=protected-access
  228. systemctl_mqtt._dbus.get_login_manager_signal_match_rule(
  229. "PrepareForShutdown"
  230. )
  231. )
  232. assert await bus_proxy.AddMatch(preparing_for_shutdown_match_rule) == ()
  233. with router.filter(preparing_for_shutdown_match_rule) as queue:
  234. while True:
  235. message: jeepney.low_level.Message = await queue.get()
  236. (preparing_for_shutdown,) = message.body
  237. await state.preparing_for_shutdown_handler(
  238. active=preparing_for_shutdown, mqtt_client=mqtt_client
  239. )
  240. queue.task_done()
  241. async def _run( # pylint: disable=too-many-arguments
  242. *,
  243. mqtt_host: str,
  244. mqtt_port: int,
  245. mqtt_username: typing.Optional[str],
  246. mqtt_password: typing.Optional[str],
  247. mqtt_topic_prefix: str,
  248. homeassistant_discovery_prefix: str,
  249. homeassistant_discovery_object_id: str,
  250. poweroff_delay: datetime.timedelta,
  251. mqtt_disable_tls: bool = False,
  252. ) -> None:
  253. state = _State(
  254. mqtt_topic_prefix=mqtt_topic_prefix,
  255. homeassistant_discovery_prefix=homeassistant_discovery_prefix,
  256. homeassistant_discovery_object_id=homeassistant_discovery_object_id,
  257. poweroff_delay=poweroff_delay,
  258. )
  259. _LOGGER.info(
  260. "connecting to MQTT broker %s:%d (TLS %s)",
  261. mqtt_host,
  262. mqtt_port,
  263. "disabled" if mqtt_disable_tls else "enabled",
  264. )
  265. if mqtt_password and not mqtt_username:
  266. raise ValueError("Missing MQTT username")
  267. async with aiomqtt.Client( # raises aiomqtt.MqttError
  268. hostname=mqtt_host,
  269. port=mqtt_port,
  270. # > The settings [...] usually represent a higher security level than
  271. # > when calling the SSLContext constructor directly.
  272. # https://web.archive.org/web/20230714183106/https://docs.python.org/3/library/ssl.html
  273. tls_context=None if mqtt_disable_tls else ssl.create_default_context(),
  274. username=None if mqtt_username is None else mqtt_username,
  275. password=None if mqtt_password is None else mqtt_password,
  276. ) as mqtt_client:
  277. _LOGGER.debug("connected to MQTT broker %s:%d", mqtt_host, mqtt_port)
  278. if not state.shutdown_lock_acquired:
  279. state.acquire_shutdown_lock()
  280. await state.publish_homeassistant_device_config(mqtt_client=mqtt_client)
  281. await state.publish_preparing_for_shutdown(mqtt_client=mqtt_client)
  282. # asyncio.TaskGroup added in python3.11
  283. await asyncio.gather(
  284. _mqtt_message_loop(state=state, mqtt_client=mqtt_client),
  285. _dbus_signal_loop(state=state, mqtt_client=mqtt_client),
  286. return_exceptions=False,
  287. )
  288. def _main() -> None:
  289. logging.basicConfig(
  290. level=logging.INFO,
  291. format="%(asctime)s:%(levelname)s:%(message)s",
  292. datefmt="%Y-%m-%dT%H:%M:%S%z",
  293. )
  294. argparser = argparse.ArgumentParser(
  295. description="MQTT client triggering & reporting shutdown on systemd-based systems",
  296. )
  297. argparser.add_argument(
  298. "--log-level",
  299. choices=_ARGUMENT_LOG_LEVEL_MAPPING.keys(),
  300. default="info",
  301. help="log level (default: %(default)s)",
  302. )
  303. argparser.add_argument("--mqtt-host", type=str, required=True)
  304. argparser.add_argument(
  305. "--mqtt-port",
  306. type=int,
  307. help=f"default {_MQTT_DEFAULT_TLS_PORT} ({_MQTT_DEFAULT_PORT} with --mqtt-disable-tls)",
  308. )
  309. argparser.add_argument("--mqtt-username", type=str)
  310. argparser.add_argument("--mqtt-disable-tls", action="store_true")
  311. password_argument_group = argparser.add_mutually_exclusive_group()
  312. password_argument_group.add_argument("--mqtt-password", type=str)
  313. password_argument_group.add_argument(
  314. "--mqtt-password-file",
  315. type=pathlib.Path,
  316. metavar="PATH",
  317. dest="mqtt_password_path",
  318. help="stripping trailing newline",
  319. )
  320. argparser.add_argument(
  321. "--mqtt-topic-prefix",
  322. type=str,
  323. # pylint: disable=protected-access
  324. default="systemctl/" + systemctl_mqtt._utils.get_hostname(),
  325. help="default: %(default)s",
  326. )
  327. # https://www.home-assistant.io/docs/mqtt/discovery/#discovery_prefix
  328. argparser.add_argument(
  329. "--homeassistant-discovery-prefix",
  330. type=str,
  331. default="homeassistant",
  332. help="home assistant's prefix for discovery topics" + " (default: %(default)s)",
  333. )
  334. argparser.add_argument(
  335. "--homeassistant-discovery-object-id",
  336. type=str,
  337. # pylint: disable=protected-access
  338. default=systemctl_mqtt._homeassistant.get_default_discovery_object_id(),
  339. help="part of discovery topic (default: %(default)s)",
  340. )
  341. argparser.add_argument(
  342. "--poweroff-delay-seconds", type=float, default=4.0, help="default: %(default)s"
  343. )
  344. args = argparser.parse_args()
  345. logging.root.setLevel(_ARGUMENT_LOG_LEVEL_MAPPING[args.log_level])
  346. if args.mqtt_port:
  347. mqtt_port = args.mqtt_port
  348. elif args.mqtt_disable_tls:
  349. mqtt_port = _MQTT_DEFAULT_PORT
  350. else:
  351. mqtt_port = _MQTT_DEFAULT_TLS_PORT
  352. if args.mqtt_password_path:
  353. # .read_text() replaces \r\n with \n
  354. mqtt_password = args.mqtt_password_path.read_bytes().decode()
  355. if mqtt_password.endswith("\r\n"):
  356. mqtt_password = mqtt_password[:-2]
  357. elif mqtt_password.endswith("\n"):
  358. mqtt_password = mqtt_password[:-1]
  359. else:
  360. mqtt_password = args.mqtt_password
  361. # pylint: disable=protected-access
  362. if not systemctl_mqtt._homeassistant.validate_discovery_object_id(
  363. args.homeassistant_discovery_object_id
  364. ):
  365. raise ValueError(
  366. # pylint: disable=protected-access
  367. "invalid home assistant discovery object id"
  368. f" {args.homeassistant_discovery_object_id!r} (length >= 1"
  369. ", allowed characters:"
  370. f" {systemctl_mqtt._homeassistant.NODE_ID_ALLOWED_CHARS})"
  371. "\nchange --homeassistant-discovery-object-id"
  372. )
  373. asyncio.run(
  374. _run(
  375. mqtt_host=args.mqtt_host,
  376. mqtt_port=mqtt_port,
  377. mqtt_disable_tls=args.mqtt_disable_tls,
  378. mqtt_username=args.mqtt_username,
  379. mqtt_password=mqtt_password,
  380. mqtt_topic_prefix=args.mqtt_topic_prefix,
  381. homeassistant_discovery_prefix=args.homeassistant_discovery_prefix,
  382. homeassistant_discovery_object_id=args.homeassistant_discovery_object_id,
  383. poweroff_delay=datetime.timedelta(seconds=args.poweroff_delay_seconds),
  384. )
  385. )