Browse Source

migrate from paho-mqtt to its async wrapper aiomqtt & disable `retain` flag on topic `systemctl/[hostname]/preparing-for-shutdown`

https://github.com/fphammerle/systemctl-mqtt/commit/ffac98894a5b321578dc79d5af91cd3c93f0e212
https://github.com/fphammerle/systemctl-mqtt/commit/4a4cc551d42fcc2a572ae41611edc5b980629d8d
https://github.com/fphammerle/switchbot-mqtt/commit/52764c26950db0464584cc41161c41f0c9f155d3
Fabian Peter Hammerle 2 months ago
parent
commit
8f9c415ec4
9 changed files with 288 additions and 385 deletions
  1. 11 8
      CHANGELOG.md
  2. 51 35
      Pipfile.lock
  3. 1 1
      README.md
  4. 1 1
      setup.py
  5. 56 101
      systemctl_mqtt/__init__.py
  6. 7 0
      tests/test_action.py
  7. 1 1
      tests/test_dbus.py
  8. 143 223
      tests/test_mqtt.py
  9. 17 15
      tests/test_state_dbus.py

+ 11 - 8
CHANGELOG.md

@@ -16,24 +16,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 - declare compatibility with `python3.11`, `python3.12` & `python3.13`
 
 ### Changed
-- migrate from [dbus-python](https://gitlab.freedesktop.org/dbus/dbus-python/)
+- disable `retain` flag on topic `systemctl/[hostname]/preparing-for-shutdown`
+- migrated from [dbus-python](https://gitlab.freedesktop.org/dbus/dbus-python/)
   to pure-python [jeepney](https://gitlab.com/takluyver/jeepney)
   (removes indirect dependency on libdbus, glib,
   [PyGObject](https://gitlab.gnome.org/GNOME/pygobject) and
   [pycairo](https://github.com/pygobject/pycairo),
   fixes https://github.com/fphammerle/systemctl-mqtt/issues/39)
+- migrate from [paho-mqtt](https://github.com/eclipse/paho.mqtt.python) to its
+  async wrapper [aiomqtt](https://github.com/sbtinstruments/aiomqtt)
 - automatic discovery in home assistant:
-  - replace component-based (topic:
+  - replaced component-based (topic:
     `<discovery_prefix>/binary_sensor/<node_id>/preparing-for-shutdown/config`)
     with device-based discovery (`<discovery_prefix>/device/<object_id>/config`)
-  - replace command-line option `--homeassistant-node-id` with
+  - replaced command-line option `--homeassistant-node-id` with
     `--homeassistant-discovery-object-id`
-  - rename entity `binary_sensor.[hostname]_preparing_for_shutdown` to
+  - renamed entity `binary_sensor.[hostname]_preparing_for_shutdown` to
     `binary_sensor.[hostname]_logind_preparing_for_shutdown`
-  - disable "retain" flag for discovery messages
+  - disabled "retain" flag for discovery messages
     (to avoid reappearing ghost devices)
 - container image / dockerfile:
-  - upgrade alpine base image from 3.13.1 to 3.21.0 including upgrade of python
+  - upgraded alpine base image from 3.13.1 to 3.21.0 including upgrade of python
     from 3.8 to 3.12
   - support build without git history by manually setting build argument
     `SETUPTOOLS_SCM_PRETEND_VERSION`
@@ -45,8 +48,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
 - container image / dockerfile:
   - split `pipenv install` into two stages to speed up image builds
   - `chmod` files copied from host to no longer require `o=rX` perms on host
-  - add registry to base image specifier for `podman build`
-  - add `--force` flag to `rm` invocation to avoid interactive questions while
+  - added registry to base image specifier for `podman build`
+  - added `--force` flag to `rm` invocation to avoid interactive questions while
     running `podman build`
 
 ### Removed

+ 51 - 35
Pipfile.lock

@@ -16,6 +16,14 @@
         ]
     },
     "default": {
+        "aiomqtt": {
+            "hashes": [
+                "sha256:127926717bd6b012d1630f9087f24552eb9c4af58205bc2964f09d6e304f7e63",
+                "sha256:312feebe20bc76dc7c20916663011f3bd37aa6f42f9f687a19a1c58308d80d47"
+            ],
+            "markers": "python_version >= '3.8' and python_version < '4.0'",
+            "version": "==2.3.0"
+        },
         "jeepney": {
             "hashes": [
                 "sha256:5efe48d255973902f6badc3ce55e2aa6c5c3b3bc642059ef3a91247bcfcc5806",
@@ -26,9 +34,11 @@
         },
         "paho-mqtt": {
             "hashes": [
-                "sha256:2a8291c81623aec00372b5a85558a372c747cbca8e9934dfe218638b8eefc26f"
+                "sha256:12d6e7511d4137555a3f6ea167ae846af2c7357b10bc6fa4f7c3968fc1723834",
+                "sha256:6db9ba9b34ed5bc6b6e3812718c7e06e2fd7444540df2455d2c51bd58808feee"
             ],
-            "version": "==1.6.1"
+            "markers": "python_version >= '3.7'",
+            "version": "==2.1.0"
         },
         "systemctl-mqtt": {
             "editable": true,
@@ -195,42 +205,48 @@
         },
         "mypy": {
             "hashes": [
-                "sha256:00df23b42e533e02a6f0055e54de9a6ed491cd8b7ea738647364fd3a39ea7efc",
-                "sha256:0b16738b1d80ec4334654e89e798eb705ac0c36c8a5c4798496cd3623aa02286",
-                "sha256:10065fcebb7c66df04b05fc799a854b1ae24d9963c8bb27e9064a9bdb43aa8ad",
-                "sha256:14117b9da3305b39860d0aa34b8f1ff74d209a368829a584eb77524389a9c13e",
-                "sha256:1628c5c3ce823d296e41e2984ff88c5861499041cb416a8809615d0c1f41740e",
-                "sha256:1daca283d732943731a6a9f20fdbcaa927f160bc51602b1d4ef880a6fb252015",
-                "sha256:2238d7f93fc4027ed1efc944507683df3ba406445a2b6c96e79666a045aadfab",
-                "sha256:273e70fcb2e38c5405a188425aa60b984ffdcef65d6c746ea5813024b68c73dc",
-                "sha256:342de51c48bab326bfc77ce056ba08c076d82ce4f5a86621f972ed39970f94d8",
-                "sha256:3498cb55448dc5533e438cd13d6ddd28654559c8c4d1fd4b5ca57a31b81bac01",
-                "sha256:390dfb898239c25289495500f12fa73aa7f24a4c6d90ccdc165762462b998d63",
-                "sha256:3fa76988dc760da377c1e5069200a50d9eaaccf34f4ea18428a3337034ab5a44",
-                "sha256:56b2280cedcb312c7a79f5001ae5325582d0d339bce684e4a529069d0e7ca1e7",
-                "sha256:585ed36031d0b3ee362e5107ef449a8b5dfd4e9c90ccbe36414ee405ee6b32ba",
-                "sha256:6e73c8a154eed31db3445fe28f63ad2d97b674b911c00191416cf7f6459fd49a",
-                "sha256:74e925649c1ee0a79aa7448baf2668d81cc287dc5782cff6a04ee93f40fb8d3f",
-                "sha256:7a52f26b9c9b1664a60d87675f3bae00b5c7f2806e0c2800545a32c325920bcc",
-                "sha256:7e026d55ddcd76e29e87865c08cbe2d0104e2b3153a523c529de584759379d3d",
-                "sha256:7e68047bedb04c1c25bba9901ea46ff60d5eaac2d71b1f2161f33107e2b368eb",
-                "sha256:7fadb29b77fc14a0dd81304ed73c828c3e5cde0016c7e668a86a3e0dfc9f3af3",
-                "sha256:822dbd184d4a9804df5a7d5335a68cf7662930e70b8c1bc976645d1509f9a9d6",
-                "sha256:af98c5a958f9c37404bd4eef2f920b94874507e146ed6ee559f185b8809c44cc",
-                "sha256:bf4ec4980bec1e0e24e5075f449d014011527ae0055884c7e3abc6a99cd2c7f1",
-                "sha256:c7b243408ea43755f3a21a0a08e5c5ae30eddb4c58a80f415ca6b118816e60aa",
-                "sha256:cdb5563c1726c85fb201be383168f8c866032db95e1095600806625b3a648cb7",
-                "sha256:d5326ab70a6db8e856d59ad4cb72741124950cbbf32e7b70e30166ba7bbf61dd",
-                "sha256:e86aaeaa3221a278c66d3d673b297232947d873773d61ca3ee0e28b2ff027179",
-                "sha256:e8c8387e5d9dff80e7daf961df357c80e694e942d9755f3ad77d69b0957b8e3f",
-                "sha256:e971c1c667007f9f2b397ffa80fa8e1e0adccff336e5e77e74cb5f22868bee87",
-                "sha256:e9f6f4c0b27401d14c483c622bc5105eff3911634d576bbdf6695b9a7c1ba741",
-                "sha256:f0b343a1d3989547024377c2ba0dca9c74a2428ad6ed24283c213af8dbb0710b",
-                "sha256:fbb7d683fa6bdecaa106e8368aa973ecc0ddb79a9eaeb4b821591ecd07e9e03c"
+                "sha256:07ba89fdcc9451f2ebb02853deb6aaaa3d2239a236669a63ab3801bbf923ef5c",
+                "sha256:0c911fde686394753fff899c409fd4e16e9b294c24bfd5e1ea4675deae1ac6fd",
+                "sha256:183cf0a45457d28ff9d758730cd0210419ac27d4d3f285beda038c9083363b1f",
+                "sha256:1fb545ca340537d4b45d3eecdb3def05e913299ca72c290326be19b3804b39c0",
+                "sha256:27fc248022907e72abfd8e22ab1f10e903915ff69961174784a3900a8cba9ad9",
+                "sha256:2ae753f5c9fef278bcf12e1a564351764f2a6da579d4a81347e1d5a15819997b",
+                "sha256:30ff5ef8519bbc2e18b3b54521ec319513a26f1bba19a7582e7b1f58a6e69f14",
+                "sha256:3888a1816d69f7ab92092f785a462944b3ca16d7c470d564165fe703b0970c35",
+                "sha256:44bf464499f0e3a2d14d58b54674dee25c031703b2ffc35064bd0df2e0fac319",
+                "sha256:46c756a444117c43ee984bd055db99e498bc613a70bbbc120272bd13ca579fbc",
+                "sha256:499d6a72fb7e5de92218db961f1a66d5f11783f9ae549d214617edab5d4dbdbb",
+                "sha256:52686e37cf13d559f668aa398dd7ddf1f92c5d613e4f8cb262be2fb4fedb0fcb",
+                "sha256:553c293b1fbdebb6c3c4030589dab9fafb6dfa768995a453d8a5d3b23784af2e",
+                "sha256:57961db9795eb566dc1d1b4e9139ebc4c6b0cb6e7254ecde69d1552bf7613f60",
+                "sha256:7084fb8f1128c76cd9cf68fe5971b37072598e7c31b2f9f95586b65c741a9d31",
+                "sha256:7d54bd85b925e501c555a3227f3ec0cfc54ee8b6930bd6141ec872d1c572f81f",
+                "sha256:7ec88144fe9b510e8475ec2f5f251992690fcf89ccb4500b214b4226abcd32d6",
+                "sha256:8b21525cb51671219f5307be85f7e646a153e5acc656e5cebf64bfa076c50107",
+                "sha256:8b4e3413e0bddea671012b063e27591b953d653209e7a4fa5e48759cda77ca11",
+                "sha256:8c6d94b16d62eb3e947281aa7347d78236688e21081f11de976376cf010eb31a",
+                "sha256:8edc07eeade7ebc771ff9cf6b211b9a7d93687ff892150cb5692e4f4272b0837",
+                "sha256:8f845a00b4f420f693f870eaee5f3e2692fa84cc8514496114649cfa8fd5e2c6",
+                "sha256:8fa2220e54d2946e94ab6dbb3ba0a992795bd68b16dc852db33028df2b00191b",
+                "sha256:90716d8b2d1f4cd503309788e51366f07c56635a3309b0f6a32547eaaa36a64d",
+                "sha256:92c3ed5afb06c3a8e188cb5da4984cab9ec9a77ba956ee419c68a388b4595255",
+                "sha256:ad3301ebebec9e8ee7135d8e3109ca76c23752bac1e717bc84cd3836b4bf3eae",
+                "sha256:b66a60cc4073aeb8ae00057f9c1f64d49e90f918fbcef9a977eb121da8b8f1d1",
+                "sha256:ba24549de7b89b6381b91fbc068d798192b1b5201987070319889e93038967a8",
+                "sha256:bce23c7377b43602baa0bd22ea3265c49b9ff0b76eb315d6c34721af4cdf1d9b",
+                "sha256:c99f27732c0b7dc847adb21c9d47ce57eb48fa33a17bc6d7d5c5e9f9e7ae5bac",
+                "sha256:cb9f255c18052343c70234907e2e532bc7e55a62565d64536dbc7706a20b78b9",
+                "sha256:d4b19b03fdf54f3c5b2fa474c56b4c13c9dbfb9a2db4370ede7ec11a2c5927d9",
+                "sha256:d64169ec3b8461311f8ce2fd2eb5d33e2d0f2c7b49116259c51d0d96edee48d1",
+                "sha256:dbec574648b3e25f43d23577309b16534431db4ddc09fda50841f1e34e64ed34",
+                "sha256:e0fe0f5feaafcb04505bcf439e991c6d8f1bf8b15f12b05feeed96e9e7bf1427",
+                "sha256:f2a0ecc86378f45347f586e4163d1769dd81c5a223d577fe351f26b179e148b1",
+                "sha256:f995e511de847791c3b11ed90084a7a0aafdc074ab88c5a9711622fe4751138c",
+                "sha256:fad79bfe3b65fe6a1efaed97b445c3d37f7be9fdc348bdb2d7cac75579607c89"
             ],
             "index": "pypi",
             "markers": "python_version >= '3.8'",
-            "version": "==1.14.0"
+            "version": "==1.14.1"
         },
         "mypy-extensions": {
             "hashes": [

+ 1 - 1
README.md

@@ -16,7 +16,7 @@ $ pip3 install --user --upgrade systemctl-mqtt
 $ systemctl-mqtt --mqtt-host HOSTNAME_OR_IP_ADDRESS
 ```
 
-On debian-based systems, dependencies can optionally be installed via:
+On debian-based systems, a subset of dependencies can optionally be installed via:
 ```sh
 $ sudo apt-get install --no-install-recommends python3-jeepney python3-paho-mqtt
 ```

+ 1 - 1
setup.py

@@ -75,7 +75,7 @@ setuptools.setup(
     # > implementing the protocol, and integrations for both blocking I/O and
     # > for different asynchronous frameworks.
     # https://web.archive.org/web/20241206000411/https://www.freedesktop.org/wiki/Software/DBusBindings/
-    install_requires=["jeepney>=0.8,<0.9", "paho-mqtt<2"],
+    install_requires=["aiomqtt>=2,<3", "jeepney>=0.8,<0.9"],
     setup_requires=["setuptools_scm"],
     tests_require=["pytest"],
 )

+ 56 - 101
systemctl_mqtt/__init__.py

@@ -26,13 +26,14 @@ import logging
 import os
 import pathlib
 import socket
+import ssl
 import threading
 import typing
 
+import aiomqtt
 import jeepney
 import jeepney.bus_messages
 import jeepney.io.asyncio
-import paho.mqtt.client
 
 import systemctl_mqtt._dbus
 import systemctl_mqtt._homeassistant
@@ -99,40 +100,28 @@ class _State:
     def _preparing_for_shutdown_topic(self) -> str:
         return self.mqtt_topic_prefix + "/preparing-for-shutdown"
 
-    def _publish_preparing_for_shutdown(
-        self, *, mqtt_client: paho.mqtt.client.Client, active: bool, block: bool
+    async def _publish_preparing_for_shutdown(
+        self, *, mqtt_client: aiomqtt.Client, active: bool
     ) -> None:
-        # https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L1199
         topic = self._preparing_for_shutdown_topic
         # pylint: disable=protected-access
         payload = systemctl_mqtt._mqtt.encode_bool(active)
         _LOGGER.info("publishing %r on %s", payload, topic)
-        msg_info = mqtt_client.publish(
-            topic=topic, payload=payload, retain=True
-        )  # type: paho.mqtt.client.MQTTMessageInfo
-        if not block:
-            return
-        msg_info.wait_for_publish()
-        if msg_info.rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
-            _LOGGER.error(
-                "failed to publish on %s (return code %d)", topic, msg_info.rc
-            )
+        await mqtt_client.publish(topic=topic, payload=payload, retain=False)
 
-    def preparing_for_shutdown_handler(
-        self, active: bool, mqtt_client: paho.mqtt.client.Client
+    async def preparing_for_shutdown_handler(
+        self, active: bool, mqtt_client: aiomqtt.Client
     ) -> None:
         active = bool(active)
-        self._publish_preparing_for_shutdown(
-            mqtt_client=mqtt_client, active=active, block=True
+        await self._publish_preparing_for_shutdown(
+            mqtt_client=mqtt_client, active=active
         )
         if active:
             self.release_shutdown_lock()
         else:
             self.acquire_shutdown_lock()
 
-    def publish_preparing_for_shutdown(
-        self, mqtt_client: paho.mqtt.client.Client
-    ) -> None:
+    async def publish_preparing_for_shutdown(self, mqtt_client: aiomqtt.Client) -> None:
         try:
             ((return_type, active),) = self._login_manager.Get("PreparingForShutdown")
         except jeepney.wrappers.DBusErrorResponse as exc:
@@ -142,15 +131,12 @@ class _State:
             return
         assert return_type == "b", return_type
         assert isinstance(active, bool), active
-        self._publish_preparing_for_shutdown(
-            mqtt_client=mqtt_client,
-            active=active,
-            # https://github.com/eclipse/paho.mqtt.python/issues/439#issuecomment-565514393
-            block=False,
+        await self._publish_preparing_for_shutdown(
+            mqtt_client=mqtt_client, active=active
         )
 
-    def publish_homeassistant_device_config(
-        self, mqtt_client: paho.mqtt.client.Client
+    async def publish_homeassistant_device_config(
+        self, mqtt_client: aiomqtt.Client
     ) -> None:
         # <discovery_prefix>/<component>/[<node_id>/]<object_id>/config
         # https://www.home-assistant.io/integrations/mqtt/#mqtt-discovery
@@ -201,7 +187,7 @@ class _State:
                 "command_topic": self.mqtt_topic_prefix + "/" + mqtt_topic_suffix,
             }
         _LOGGER.debug("publishing home assistant config on %s", discovery_topic)
-        mqtt_client.publish(
+        await mqtt_client.publish(
             topic=discovery_topic, payload=json.dumps(config), retain=False
         )
 
@@ -211,52 +197,32 @@ class _MQTTAction(metaclass=abc.ABCMeta):
     def trigger(self, state: _State) -> None:
         pass  # pragma: no cover
 
-    def mqtt_message_callback(
-        self,
-        mqtt_client: paho.mqtt.client.Client,
-        state: _State,
-        message: paho.mqtt.client.MQTTMessage,
-    ) -> None:
-        # pylint: disable=unused-argument; callback
-        # https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L3416
-        # https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L469
-        _LOGGER.debug("received topic=%s payload=%r", message.topic, message.payload)
-        if message.retain:
-            _LOGGER.info("ignoring retained message")
-            return
-        _LOGGER.debug("executing action %s", self)
-        self.trigger(state=state)
-        _LOGGER.debug("completed action %s", self)
+    def __str__(self) -> str:
+        return type(self).__name__
 
 
 class _MQTTActionSchedulePoweroff(_MQTTAction):
+    # pylint: disable=too-few-public-methods
     def trigger(self, state: _State) -> None:
         # pylint: disable=protected-access
         systemctl_mqtt._dbus.schedule_shutdown(
             action="poweroff", delay=state.poweroff_delay
         )
 
-    def __str__(self) -> str:
-        return type(self).__name__
-
 
 class _MQTTActionLockAllSessions(_MQTTAction):
+    # pylint: disable=too-few-public-methods
     def trigger(self, state: _State) -> None:
         # pylint: disable=protected-access
         systemctl_mqtt._dbus.lock_all_sessions()
 
-    def __str__(self) -> str:
-        return type(self).__name__
-
 
 class _MQTTActionSuspend(_MQTTAction):
+    # pylint: disable=too-few-public-methods
     def trigger(self, state: _State) -> None:
         # pylint: disable=protected-access
         systemctl_mqtt._dbus.suspend()
 
-    def __str__(self) -> str:
-        return type(self).__name__
-
 
 _MQTT_TOPIC_SUFFIX_ACTION_MAPPING = {
     "poweroff": _MQTTActionSchedulePoweroff(),
@@ -265,36 +231,24 @@ _MQTT_TOPIC_SUFFIX_ACTION_MAPPING = {
 }
 
 
-def _mqtt_on_connect(
-    mqtt_client: paho.mqtt.client.Client,
-    state: _State,
-    flags: typing.Dict,
-    return_code: int,
-) -> None:
-    # pylint: disable=unused-argument; callback
-    # https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L441
-    assert return_code == 0, return_code  # connection accepted
-    mqtt_broker_host, mqtt_broker_port = mqtt_client.socket().getpeername()
-    _LOGGER.debug("connected to MQTT broker %s:%d", mqtt_broker_host, mqtt_broker_port)
-    if not state.shutdown_lock_acquired:
-        state.acquire_shutdown_lock()
-    state.publish_preparing_for_shutdown(mqtt_client=mqtt_client)
-    state.publish_homeassistant_device_config(mqtt_client=mqtt_client)
+async def _mqtt_message_loop(*, state: _State, mqtt_client: aiomqtt.Client) -> None:
+    action_by_topic: typing.Dict[str, _MQTTAction] = {}
     for topic_suffix, action in _MQTT_TOPIC_SUFFIX_ACTION_MAPPING.items():
         topic = state.mqtt_topic_prefix + "/" + topic_suffix
         _LOGGER.info("subscribing to %s", topic)
-        mqtt_client.subscribe(topic)
-        mqtt_client.message_callback_add(
-            sub=topic, callback=action.mqtt_message_callback
-        )
-        _LOGGER.debug(
-            "registered MQTT callback for topic %s triggering %s", topic, action
-        )
+        await mqtt_client.subscribe(topic)
+        action_by_topic[topic] = action
+    async for message in mqtt_client.messages:
+        if message.retain:
+            _LOGGER.info("ignoring retained message on topic %r", message.topic.value)
+        else:
+            _LOGGER.debug(
+                "received message on topic %r: %r", message.topic.value, message.payload
+            )
+            action_by_topic[message.topic.value].trigger(state=state)
 
 
-async def _dbus_signal_loop(
-    *, state: _State, mqtt_client: paho.mqtt.client.Client
-) -> None:
+async def _dbus_signal_loop(*, state: _State, mqtt_client: aiomqtt.Client) -> None:
     async with jeepney.io.asyncio.open_dbus_router(bus="SYSTEM") as router:
         # router: jeepney.io.asyncio.DBusRouter
         bus_proxy = jeepney.io.asyncio.Proxy(
@@ -311,7 +265,7 @@ async def _dbus_signal_loop(
             while True:
                 message: jeepney.low_level.Message = await queue.get()
                 (preparing_for_shutdown,) = message.body
-                state.preparing_for_shutdown_handler(
+                await state.preparing_for_shutdown_handler(
                     active=preparing_for_shutdown, mqtt_client=mqtt_client
                 )
                 queue.task_done()
@@ -335,34 +289,35 @@ async def _run(  # pylint: disable=too-many-arguments
         homeassistant_discovery_object_id=homeassistant_discovery_object_id,
         poweroff_delay=poweroff_delay,
     )
-    # https://pypi.org/project/paho-mqtt/
-    mqtt_client = paho.mqtt.client.Client(userdata=state)
-    mqtt_client.on_connect = _mqtt_on_connect
-    if not mqtt_disable_tls:
-        mqtt_client.tls_set(ca_certs=None)  # enable tls trusting default system certs
     _LOGGER.info(
         "connecting to MQTT broker %s:%d (TLS %s)",
         mqtt_host,
         mqtt_port,
         "disabled" if mqtt_disable_tls else "enabled",
     )
-    if mqtt_username:
-        mqtt_client.username_pw_set(username=mqtt_username, password=mqtt_password)
-    elif mqtt_password:
+    if mqtt_password and not mqtt_username:
         raise ValueError("Missing MQTT username")
-    mqtt_client.connect(host=mqtt_host, port=mqtt_port)
-    # loop_start runs loop_forever in a new thread (daemon)
-    # https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L1814
-    # loop_forever attempts to reconnect if disconnected
-    # https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L1744
-    mqtt_client.loop_start()
-    try:
-        await _dbus_signal_loop(state=state, mqtt_client=mqtt_client)
-    finally:
-        # blocks until loop_forever stops
-        _LOGGER.debug("waiting for MQTT loop to stop")
-        mqtt_client.loop_stop()
-        _LOGGER.debug("MQTT loop stopped")
+    async with aiomqtt.Client(  # raises aiomqtt.MqttError
+        hostname=mqtt_host,
+        port=mqtt_port,
+        # > The settings [...] usually represent a higher security level than
+        # > when calling the SSLContext constructor directly.
+        # https://web.archive.org/web/20230714183106/https://docs.python.org/3/library/ssl.html
+        tls_context=None if mqtt_disable_tls else ssl.create_default_context(),
+        username=None if mqtt_username is None else mqtt_username,
+        password=None if mqtt_password is None else mqtt_password,
+    ) as mqtt_client:
+        _LOGGER.debug("connected to MQTT broker %s:%d", mqtt_host, mqtt_port)
+        if not state.shutdown_lock_acquired:
+            state.acquire_shutdown_lock()
+        await state.publish_homeassistant_device_config(mqtt_client=mqtt_client)
+        await state.publish_preparing_for_shutdown(mqtt_client=mqtt_client)
+        # asyncio.TaskGroup added in python3.11
+        await asyncio.gather(
+            _mqtt_message_loop(state=state, mqtt_client=mqtt_client),
+            _dbus_signal_loop(state=state, mqtt_client=mqtt_client),
+            return_exceptions=False,
+        )
 
 
 def _main() -> None:

+ 7 - 0
tests/test_action.py

@@ -74,3 +74,10 @@ def test_mqtt_topic_suffix_action_mapping_suspend():
     ):
         mqtt_action.trigger(state="dummy")
     login_manager_mock.Suspend.assert_called_once_with(interactive=False)
+
+
+def test_poweroff_str():
+    assert (
+        str(systemctl_mqtt._MQTTActionSchedulePoweroff())
+        == "_MQTTActionSchedulePoweroff"
+    )

+ 1 - 1
tests/test_dbus.py

@@ -180,7 +180,7 @@ def test_lock_all_sessions(caplog):
 @pytest.mark.asyncio
 async def test__dbus_signal_loop():
     # pylint: disable=too-many-locals,too-many-arguments
-    state_mock = unittest.mock.MagicMock()
+    state_mock = unittest.mock.AsyncMock()
     with unittest.mock.patch(
         "jeepney.io.asyncio.open_dbus_router",
     ) as open_dbus_router_mock:

+ 143 - 223
tests/test_mqtt.py

@@ -17,15 +17,13 @@
 
 import datetime
 import logging
-import threading
-import time
+import ssl
 import unittest.mock
 
+import aiomqtt
 import jeepney.fds
 import jeepney.low_level
-import paho.mqtt.client
 import pytest
-from paho.mqtt.client import MQTTMessage
 
 import systemctl_mqtt
 
@@ -50,17 +48,12 @@ async def test__run(
     caplog.set_level(logging.DEBUG)
     login_manager_mock = unittest.mock.MagicMock()
     with unittest.mock.patch(
-        "socket.create_connection"
-    ) as create_socket_mock, unittest.mock.patch(
-        "ssl.SSLContext.wrap_socket", autospec=True
-    ) as ssl_wrap_socket_mock, unittest.mock.patch(
-        "paho.mqtt.client.Client.loop_forever", autospec=True
-    ) as mqtt_loop_forever_mock, unittest.mock.patch(
+        "aiomqtt.Client", autospec=False
+    ) as mqtt_client_class_mock, unittest.mock.patch(
         "systemctl_mqtt._dbus.get_login_manager_proxy", return_value=login_manager_mock
     ), unittest.mock.patch(
         "systemctl_mqtt._dbus_signal_loop"
     ) as dbus_signal_loop_mock:
-        ssl_wrap_socket_mock.return_value.send = len
         login_manager_mock.Inhibit.return_value = (jeepney.fds.FileDescriptor(-1),)
         login_manager_mock.Get.return_value = (("b", False),)
         await systemctl_mqtt._run(
@@ -77,31 +70,14 @@ async def test__run(
     assert caplog.records[0].message == (
         f"connecting to MQTT broker {mqtt_host}:{mqtt_port} (TLS enabled)"
     )
-    # correct remote?
-    create_socket_mock.assert_called_once()
-    create_socket_args, _ = create_socket_mock.call_args
-    assert create_socket_args[0] == (mqtt_host, mqtt_port)
-    # ssl enabled?
-    ssl_wrap_socket_mock.assert_called_once()
-    ssl_context = ssl_wrap_socket_mock.call_args[0][0]  # self
-    assert ssl_context.check_hostname is True
-    assert ssl_wrap_socket_mock.call_args[1]["server_hostname"] == mqtt_host
-    # loop started?
-    while threading.active_count() > 1:
-        time.sleep(0.01)
-    mqtt_loop_forever_mock.assert_called_once()
-    (mqtt_client,) = mqtt_loop_forever_mock.call_args[0]
-    assert mqtt_client._tls_insecure is False
-    # credentials
-    assert mqtt_client._username is None
-    assert mqtt_client._password is None
-    # connect callback
-    caplog.clear()
-    mqtt_client.socket().getpeername.return_value = (mqtt_host, mqtt_port)
-    with unittest.mock.patch(
-        "paho.mqtt.client.Client.subscribe"
-    ) as mqtt_subscribe_mock:
-        mqtt_client.on_connect(mqtt_client, mqtt_client._userdata, {}, 0)
+    mqtt_client_class_mock.assert_called_once()
+    _, mqtt_client_init_kwargs = mqtt_client_class_mock.call_args
+    assert mqtt_client_init_kwargs.pop("hostname") == mqtt_host
+    assert mqtt_client_init_kwargs.pop("port") == mqtt_port
+    assert isinstance(mqtt_client_init_kwargs.pop("tls_context"), ssl.SSLContext)
+    assert mqtt_client_init_kwargs.pop("username") is None
+    assert mqtt_client_init_kwargs.pop("password") is None
+    assert not mqtt_client_init_kwargs
     login_manager_mock.Inhibit.assert_called_once_with(
         what="shutdown",
         who="systemctl-mqtt",
@@ -109,30 +85,29 @@ async def test__run(
         mode="delay",
     )
     login_manager_mock.Get.assert_called_once_with("PreparingForShutdown")
-    assert sorted(mqtt_subscribe_mock.call_args_list) == [
+    async with mqtt_client_class_mock() as mqtt_client_mock:
+        pass
+    assert mqtt_client_mock.publish.call_count == 2
+    assert (
+        mqtt_client_mock.publish.call_args_list[0][1]["topic"]
+        == f"{homeassistant_discovery_prefix}/device/{homeassistant_discovery_object_id}/config"
+    )
+    assert mqtt_client_mock.publish.call_args_list[1] == unittest.mock.call(
+        topic=mqtt_topic_prefix + "/preparing-for-shutdown",
+        payload="false",
+        retain=False,
+    )
+    assert sorted(mqtt_client_mock.subscribe.call_args_list) == [
         unittest.mock.call(mqtt_topic_prefix + "/lock-all-sessions"),
         unittest.mock.call(mqtt_topic_prefix + "/poweroff"),
         unittest.mock.call(mqtt_topic_prefix + "/suspend"),
     ]
-    assert mqtt_client.on_message is None
-    for suffix in ("poweroff", "lock-all-sessions"):
-        assert (  # pylint: disable=comparison-with-callable
-            mqtt_client._on_message_filtered[mqtt_topic_prefix + "/" + suffix]
-            == systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[
-                suffix
-            ].mqtt_message_callback
-        )
-    assert caplog.records[0].levelno == logging.DEBUG
-    assert (
-        caplog.records[0].message == f"connected to MQTT broker {mqtt_host}:{mqtt_port}"
-    )
     assert caplog.records[1].levelno == logging.DEBUG
-    assert caplog.records[1].message == "acquired shutdown inhibitor lock"
-    assert caplog.records[2].levelno == logging.INFO
     assert (
-        caplog.records[2].message
-        == f"publishing 'false' on {mqtt_topic_prefix}/preparing-for-shutdown"
+        caplog.records[1].message == f"connected to MQTT broker {mqtt_host}:{mqtt_port}"
     )
+    assert caplog.records[2].levelno == logging.DEBUG
+    assert caplog.records[2].message == "acquired shutdown inhibitor lock"
     assert caplog.records[3].levelno == logging.DEBUG
     assert (
         caplog.records[3].message
@@ -142,21 +117,17 @@ async def test__run(
         + homeassistant_discovery_object_id
         + "/config"
     )
-    assert all(r.levelno == logging.INFO for r in caplog.records[4::2])
-    assert {r.message for r in caplog.records[4::2]} == {
+    assert caplog.records[4].levelno == logging.INFO
+    assert (
+        caplog.records[4].message
+        == f"publishing 'false' on {mqtt_topic_prefix}/preparing-for-shutdown"
+    )
+    assert all(r.levelno == logging.INFO for r in caplog.records[5::2])
+    assert {r.message for r in caplog.records[5:]} == {
         f"subscribing to {mqtt_topic_prefix}/{s}"
         for s in ("poweroff", "lock-all-sessions", "suspend")
     }
-    assert all(r.levelno == logging.DEBUG for r in caplog.records[5::2])
-    assert {r.message for r in caplog.records[5::2]} == {
-        f"registered MQTT callback for topic {mqtt_topic_prefix}/{s}"
-        f" triggering {systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[s]}"
-        for s in ("poweroff", "lock-all-sessions", "suspend")
-    }
     dbus_signal_loop_mock.assert_awaited_once()
-    # waited for mqtt loop to stop?
-    assert mqtt_client._thread_terminate
-    assert mqtt_client._thread is None
 
 
 @pytest.mark.asyncio
@@ -166,8 +137,8 @@ async def test__run(
 async def test__run_tls(caplog, mqtt_host, mqtt_port, mqtt_disable_tls):
     caplog.set_level(logging.INFO)
     with unittest.mock.patch(
-        "paho.mqtt.client.Client"
-    ) as mqtt_client_class, unittest.mock.patch(
+        "aiomqtt.Client"
+    ) as mqtt_client_class_mock, unittest.mock.patch(
         "systemctl_mqtt._dbus_signal_loop"
     ) as dbus_signal_loop_mock:
         await systemctl_mqtt._run(
@@ -181,23 +152,30 @@ async def test__run_tls(caplog, mqtt_host, mqtt_port, mqtt_disable_tls):
             homeassistant_discovery_object_id="host",
             poweroff_delay=datetime.timedelta(),
         )
+    mqtt_client_class_mock.assert_called_once()
+    _, mqtt_client_init_kwargs = mqtt_client_class_mock.call_args
+    assert mqtt_client_init_kwargs.pop("hostname") == mqtt_host
+    assert mqtt_client_init_kwargs.pop("port") == mqtt_port
+    if mqtt_disable_tls:
+        assert mqtt_client_init_kwargs.pop("tls_context") is None
+    else:
+        assert isinstance(mqtt_client_init_kwargs.pop("tls_context"), ssl.SSLContext)
+    assert mqtt_client_init_kwargs.pop("username") is None
+    assert mqtt_client_init_kwargs.pop("password") is None
+    assert not mqtt_client_init_kwargs
     assert caplog.records[0].levelno == logging.INFO
     assert caplog.records[0].message == (
         f"connecting to MQTT broker {mqtt_host}:{mqtt_port}"
         f" (TLS {'disabled' if mqtt_disable_tls else 'enabled'})"
     )
-    if mqtt_disable_tls:
-        mqtt_client_class().tls_set.assert_not_called()
-    else:
-        mqtt_client_class().tls_set.assert_called_once_with(ca_certs=None)
     dbus_signal_loop_mock.assert_awaited_once()
 
 
 @pytest.mark.asyncio
 async def test__run_tls_default():
     with unittest.mock.patch(
-        "paho.mqtt.client.Client"
-    ) as mqtt_client_class, unittest.mock.patch(
+        "aiomqtt.Client"
+    ) as mqtt_client_class_mock, unittest.mock.patch(
         "systemctl_mqtt._dbus_signal_loop"
     ) as dbus_signal_loop_mock:
         await systemctl_mqtt._run(
@@ -211,8 +189,11 @@ async def test__run_tls_default():
             homeassistant_discovery_object_id="host",
             poweroff_delay=datetime.timedelta(),
         )
+    mqtt_client_class_mock.assert_called_once()
     # enabled by default
-    mqtt_client_class().tls_set.assert_called_once_with(ca_certs=None)
+    assert isinstance(
+        mqtt_client_class_mock.call_args[1]["tls_context"], ssl.SSLContext
+    )
     dbus_signal_loop_mock.assert_awaited_once()
 
 
@@ -225,16 +206,11 @@ async def test__run_tls_default():
 async def test__run_authentication(
     mqtt_host, mqtt_port, mqtt_username, mqtt_password, mqtt_topic_prefix
 ):
-    with unittest.mock.patch("socket.create_connection"), unittest.mock.patch(
-        "ssl.SSLContext.wrap_socket"
-    ) as ssl_wrap_socket_mock, unittest.mock.patch(
-        "paho.mqtt.client.Client.loop_forever", autospec=True
-    ) as mqtt_loop_forever_mock, unittest.mock.patch(
-        "systemctl_mqtt._dbus.get_login_manager_proxy"
-    ), unittest.mock.patch(
+    with unittest.mock.patch(
+        "aiomqtt.Client"
+    ) as mqtt_client_class_mock, unittest.mock.patch(
         "systemctl_mqtt._dbus_signal_loop"
     ) as dbus_signal_loop_mock:
-        ssl_wrap_socket_mock.return_value.send = len
         await systemctl_mqtt._run(
             mqtt_host=mqtt_host,
             mqtt_port=mqtt_port,
@@ -245,86 +221,24 @@ async def test__run_authentication(
             homeassistant_discovery_object_id="node-id",
             poweroff_delay=datetime.timedelta(),
         )
-    mqtt_loop_forever_mock.assert_called_once()
-    (mqtt_client,) = mqtt_loop_forever_mock.call_args[0]
-    assert mqtt_client._username.decode() == mqtt_username
+    mqtt_client_class_mock.assert_called_once()
+    _, mqtt_client_init_kwargs = mqtt_client_class_mock.call_args
+    assert mqtt_client_init_kwargs["username"] == mqtt_username
     if mqtt_password:
-        assert mqtt_client._password.decode() == mqtt_password
+        assert mqtt_client_init_kwargs["password"] == mqtt_password
     else:
-        assert mqtt_client._password is None
+        assert mqtt_client_init_kwargs["password"] is None
     dbus_signal_loop_mock.assert_awaited_once()
 
 
-@pytest.mark.asyncio
-async def _initialize_mqtt_client(
-    mqtt_host, mqtt_port, mqtt_topic_prefix
-) -> paho.mqtt.client.Client:
-    with unittest.mock.patch("socket.create_connection"), unittest.mock.patch(
-        "ssl.SSLContext.wrap_socket"
-    ) as ssl_wrap_socket_mock, unittest.mock.patch(
-        "paho.mqtt.client.Client.loop_forever", autospec=True
-    ) as mqtt_loop_forever_mock, unittest.mock.patch(
-        "systemctl_mqtt._dbus.get_login_manager_proxy"
-    ) as get_login_manager_mock, unittest.mock.patch(
-        "systemctl_mqtt._dbus_signal_loop"
-    ):
-        ssl_wrap_socket_mock.return_value.send = len
-        get_login_manager_mock.return_value.Inhibit.return_value = (
-            jeepney.fds.FileDescriptor(-1),
-        )
-        get_login_manager_mock.return_value.Get.return_value = (("b", True),)
-        await systemctl_mqtt._run(
-            mqtt_host=mqtt_host,
-            mqtt_port=mqtt_port,
-            mqtt_username=None,
-            mqtt_password=None,
-            mqtt_topic_prefix=mqtt_topic_prefix,
-            homeassistant_discovery_prefix="discovery-prefix",
-            homeassistant_discovery_object_id="node-id",
-            poweroff_delay=datetime.timedelta(),
-        )
-    while threading.active_count() > 1:
-        time.sleep(0.01)
-    mqtt_loop_forever_mock.assert_called_once()
-    (mqtt_client,) = mqtt_loop_forever_mock.call_args[0]
-    mqtt_client.socket().getpeername.return_value = (mqtt_host, mqtt_port)
-    mqtt_client.on_connect(mqtt_client, mqtt_client._userdata, {}, 0)
-    return mqtt_client
-
-
-@pytest.mark.asyncio
-@pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
-@pytest.mark.parametrize("mqtt_port", [1833])
-@pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host", "system/command"])
-async def test__client_handle_message(caplog, mqtt_host, mqtt_port, mqtt_topic_prefix):
-    mqtt_client = await _initialize_mqtt_client(
-        mqtt_host=mqtt_host, mqtt_port=mqtt_port, mqtt_topic_prefix=mqtt_topic_prefix
-    )
-    caplog.clear()
-    caplog.set_level(logging.DEBUG)
-    poweroff_message = MQTTMessage(topic=mqtt_topic_prefix.encode() + b"/poweroff")
-    with unittest.mock.patch.object(
-        systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "trigger"
-    ) as poweroff_trigger_mock:
-        mqtt_client._handle_on_message(poweroff_message)
-    poweroff_trigger_mock.assert_called_once_with(state=mqtt_client._userdata)
-    assert all(r.levelno == logging.DEBUG for r in caplog.records)
-    assert (
-        caplog.records[0].message
-        == f"received topic={poweroff_message.topic} payload=b''"
-    )
-    assert caplog.records[1].message == "executing action _MQTTActionSchedulePoweroff"
-    assert caplog.records[2].message == "completed action _MQTTActionSchedulePoweroff"
-
-
 @pytest.mark.asyncio
 @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
 @pytest.mark.parametrize("mqtt_port", [1833])
 @pytest.mark.parametrize("mqtt_password", ["secret"])
 async def test__run_authentication_missing_username(
-    mqtt_host, mqtt_port, mqtt_password
-):
-    with unittest.mock.patch("paho.mqtt.client.Client"), unittest.mock.patch(
+    mqtt_host: str, mqtt_port: int, mqtt_password: str
+) -> None:
+    with unittest.mock.patch("aiomqtt.Client"), unittest.mock.patch(
         "systemctl_mqtt._dbus.get_login_manager_proxy"
     ), unittest.mock.patch("systemctl_mqtt._dbus_signal_loop") as dbus_signal_loop_mock:
         with pytest.raises(ValueError, match=r"^Missing MQTT username$"):
@@ -341,82 +255,88 @@ async def test__run_authentication_missing_username(
     dbus_signal_loop_mock.assert_not_called()
 
 
-@pytest.mark.parametrize("mqtt_topic", ["system/command/poweroff"])
-@pytest.mark.parametrize("payload", [b"", b"junk"])
-def test_mqtt_message_callback_poweroff(caplog, mqtt_topic: str, payload: bytes):
-    message = MQTTMessage(topic=mqtt_topic.encode())
-    message.payload = payload
-    with unittest.mock.patch.object(
-        systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "trigger"
-    ) as trigger_mock, caplog.at_level(logging.DEBUG):
-        systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[
-            "poweroff"
-        ].mqtt_message_callback(
-            None, "state_dummy", message  # type: ignore
-        )
-    trigger_mock.assert_called_once_with(state="state_dummy")
-    assert len(caplog.records) == 3
-    assert caplog.records[0].levelno == logging.DEBUG
-    assert caplog.records[0].message == (
-        f"received topic={mqtt_topic} payload={payload!r}"
+@pytest.mark.asyncio
+@pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host", "system/command"])
+async def test__mqtt_message_loop_trigger_poweroff(
+    caplog: pytest.LogCaptureFixture, mqtt_topic_prefix: str
+) -> None:
+    state = systemctl_mqtt._State(
+        mqtt_topic_prefix=mqtt_topic_prefix,
+        homeassistant_discovery_prefix="homeassistant",
+        homeassistant_discovery_object_id="whatever",
+        poweroff_delay=datetime.timedelta(seconds=21),
     )
-    assert caplog.records[1].levelno == logging.DEBUG
-    assert caplog.records[1].message == "executing action _MQTTActionSchedulePoweroff"
-    assert caplog.records[2].levelno == logging.DEBUG
-    assert caplog.records[2].message == "completed action _MQTTActionSchedulePoweroff"
-
-
-@pytest.mark.parametrize("mqtt_topic", ["system/command/poweroff"])
-@pytest.mark.parametrize("payload", [b"", b"junk"])
-def test_mqtt_message_callback_poweroff_retained(
-    caplog, mqtt_topic: str, payload: bytes
-):
-    message = MQTTMessage(topic=mqtt_topic.encode())
-    message.payload = payload
-    message.retain = True
-    with unittest.mock.patch.object(
-        systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "trigger"
-    ) as trigger_mock, caplog.at_level(logging.DEBUG):
-        systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[
-            "poweroff"
-        ].mqtt_message_callback(
-            None, None, message  # type: ignore
+    mqtt_client_mock = unittest.mock.AsyncMock()
+    mqtt_client_mock.messages.__aiter__.return_value = [
+        aiomqtt.Message(
+            topic=mqtt_topic_prefix + "/poweroff",
+            payload=b"some-payload",
+            qos=0,
+            retain=False,
+            mid=42 // 2,
+            properties=None,
         )
-    trigger_mock.assert_not_called()
-    assert len(caplog.records) == 2
-    assert caplog.records[0].levelno == logging.DEBUG
-    assert caplog.records[0].message == (
-        f"received topic={mqtt_topic} payload={payload!r}"
+    ]
+    with unittest.mock.patch(
+        "systemctl_mqtt._dbus.schedule_shutdown"
+    ) as schedule_shutdown_mock, caplog.at_level(logging.DEBUG):
+        await systemctl_mqtt._mqtt_message_loop(
+            state=state, mqtt_client=mqtt_client_mock
+        )
+    assert sorted(mqtt_client_mock.subscribe.await_args_list) == [
+        unittest.mock.call(mqtt_topic_prefix + "/lock-all-sessions"),
+        unittest.mock.call(mqtt_topic_prefix + "/poweroff"),
+        unittest.mock.call(mqtt_topic_prefix + "/suspend"),
+    ]
+    schedule_shutdown_mock.assert_called_once_with(
+        action="poweroff", delay=datetime.timedelta(seconds=21)
     )
-    assert caplog.records[1].levelno == logging.INFO
-    assert caplog.records[1].message == "ignoring retained message"
+    assert [
+        t for t in caplog.record_tuples[2:] if not t[2].startswith("subscribing to ")
+    ] == [
+        (
+            "systemctl_mqtt",
+            logging.DEBUG,
+            f"received message on topic '{mqtt_topic_prefix}/poweroff': b'some-payload'",
+        ),
+    ]
 
 
-@pytest.mark.parametrize("active", [True, False])
-@pytest.mark.parametrize("block", [True, False])
-def test__publish_preparing_for_shutdown_blocking(active: bool, block: bool) -> None:
-    login_manager_mock = unittest.mock.MagicMock()
-    login_manager_mock.Get.return_value = (("b", active),)
+@pytest.mark.asyncio
+@pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host"])
+async def test__mqtt_message_loop_retained(
+    caplog: pytest.LogCaptureFixture, mqtt_topic_prefix: str
+) -> None:
+    state = systemctl_mqtt._State(
+        mqtt_topic_prefix=mqtt_topic_prefix,
+        homeassistant_discovery_prefix="homeassistant",
+        homeassistant_discovery_object_id="whatever",
+        poweroff_delay=datetime.timedelta(seconds=21),
+    )
+    mqtt_client_mock = unittest.mock.AsyncMock()
+    mqtt_client_mock.messages.__aiter__.return_value = [
+        aiomqtt.Message(
+            topic=mqtt_topic_prefix + "/poweroff",
+            payload=b"some-payload",
+            qos=0,
+            retain=True,
+            mid=42 // 2,
+            properties=None,
+        )
+    ]
     with unittest.mock.patch(
-        "systemctl_mqtt._dbus.get_login_manager_proxy", return_value=login_manager_mock
-    ):
-        state = systemctl_mqtt._State(
-            mqtt_topic_prefix="prefix",
-            homeassistant_discovery_prefix="prefix",
-            homeassistant_discovery_object_id="object-id",
-            poweroff_delay=datetime.timedelta(),
+        "systemctl_mqtt._dbus.schedule_shutdown"
+    ) as schedule_shutdown_mock, caplog.at_level(logging.DEBUG):
+        await systemctl_mqtt._mqtt_message_loop(
+            state=state, mqtt_client=mqtt_client_mock
         )
-    mqtt_client_mock = unittest.mock.MagicMock()
-    state._publish_preparing_for_shutdown(
-        mqtt_client=mqtt_client_mock, active=active, block=block
-    )
-    mqtt_client_mock.publish.assert_called_once_with(
-        topic="prefix/preparing-for-shutdown",
-        payload="true" if active else "false",
-        retain=True,
-    )
-    msg_info = mqtt_client_mock.publish.return_value
-    if block:
-        msg_info.wait_for_publish.assert_called_once()
-    else:
-        msg_info.wait_for_publish.assert_not_called()
+    schedule_shutdown_mock.assert_not_called()
+    assert [
+        t for t in caplog.record_tuples[2:] if not t[2].startswith("subscribing to ")
+    ] == [
+        (
+            "systemctl_mqtt",
+            logging.INFO,
+            "ignoring retained message on topic 'systemctl/host/poweroff'",
+        ),
+    ]

+ 17 - 15
tests/test_state_dbus.py

@@ -55,8 +55,9 @@ def test_shutdown_lock():
     lock_fd.close.assert_called_once_with()
 
 
+@pytest.mark.asyncio
 @pytest.mark.parametrize("active", [True, False])
-def test_preparing_for_shutdown_handler(active: bool) -> None:
+async def test_preparing_for_shutdown_handler(active: bool) -> None:
     with unittest.mock.patch("systemctl_mqtt._dbus.get_login_manager_proxy"):
         state = systemctl_mqtt._State(
             mqtt_topic_prefix="any",
@@ -72,12 +73,10 @@ def test_preparing_for_shutdown_handler(active: bool) -> None:
     ) as acquire_lock_mock, unittest.mock.patch.object(
         state, "release_shutdown_lock"
     ) as release_lock_mock:
-        state.preparing_for_shutdown_handler(
+        await state.preparing_for_shutdown_handler(
             active=active, mqtt_client=mqtt_client_mock
         )
-    publish_mock.assert_called_once_with(
-        mqtt_client=mqtt_client_mock, active=active, block=True
-    )
+    publish_mock.assert_awaited_once_with(mqtt_client=mqtt_client_mock, active=active)
     if active:
         acquire_lock_mock.assert_not_called()
         release_lock_mock.assert_called_once_with()
@@ -86,8 +85,9 @@ def test_preparing_for_shutdown_handler(active: bool) -> None:
         release_lock_mock.assert_not_called()
 
 
+@pytest.mark.asyncio
 @pytest.mark.parametrize("active", [True, False])
-def test_publish_preparing_for_shutdown(active: bool) -> None:
+async def test_publish_preparing_for_shutdown(active: bool) -> None:
     login_manager_mock = unittest.mock.MagicMock()
     login_manager_mock.Get.return_value = (("b", active),)[:]
     with unittest.mock.patch(
@@ -100,13 +100,13 @@ def test_publish_preparing_for_shutdown(active: bool) -> None:
             poweroff_delay=datetime.timedelta(),
         )
     assert state._login_manager == login_manager_mock
-    mqtt_client_mock = unittest.mock.MagicMock()
-    state.publish_preparing_for_shutdown(mqtt_client=mqtt_client_mock)
+    mqtt_client_mock = unittest.mock.AsyncMock()
+    await state.publish_preparing_for_shutdown(mqtt_client=mqtt_client_mock)
     login_manager_mock.Get.assert_called_once_with("PreparingForShutdown")
-    mqtt_client_mock.publish.assert_called_once_with(
+    mqtt_client_mock.publish.assert_awaited_once_with(
         topic="any/preparing-for-shutdown",
         payload="true" if active else "false",
-        retain=True,
+        retain=False,
     )
 
 
@@ -117,7 +117,8 @@ class DBusErrorResponseMock(jeepney.wrappers.DBusErrorResponse):
         self.data = data
 
 
-def test_publish_preparing_for_shutdown_get_fail(caplog):
+@pytest.mark.asyncio
+async def test_publish_preparing_for_shutdown_get_fail(caplog):
     login_manager_mock = unittest.mock.MagicMock()
     login_manager_mock.Get.side_effect = DBusErrorResponseMock("error", ("mocked",))
     with unittest.mock.patch(
@@ -130,7 +131,7 @@ def test_publish_preparing_for_shutdown_get_fail(caplog):
             poweroff_delay=datetime.timedelta(),
         )
     mqtt_client_mock = unittest.mock.MagicMock()
-    state.publish_preparing_for_shutdown(mqtt_client=None)
+    await state.publish_preparing_for_shutdown(mqtt_client=None)
     mqtt_client_mock.publish.assert_not_called()
     assert len(caplog.records) == 1
     assert caplog.records[0].levelno == logging.ERROR
@@ -140,11 +141,12 @@ def test_publish_preparing_for_shutdown_get_fail(caplog):
     )
 
 
+@pytest.mark.asyncio
 @pytest.mark.parametrize("topic_prefix", ["systemctl/hostname", "hostname/systemctl"])
 @pytest.mark.parametrize("discovery_prefix", ["homeassistant", "home/assistant"])
 @pytest.mark.parametrize("object_id", ["raspberrypi", "debian21"])
 @pytest.mark.parametrize("hostname", ["hostname", "host-name"])
-def test_publish_homeassistant_device_config(
+async def test_publish_homeassistant_device_config(
     topic_prefix, discovery_prefix, object_id, hostname
 ):
     with unittest.mock.patch("jeepney.io.blocking.open_dbus_connection"):
@@ -154,11 +156,11 @@ def test_publish_homeassistant_device_config(
             homeassistant_discovery_object_id=object_id,
             poweroff_delay=datetime.timedelta(),
         )
-    mqtt_client = unittest.mock.MagicMock()
+    mqtt_client = unittest.mock.AsyncMock()
     with unittest.mock.patch(
         "systemctl_mqtt._utils.get_hostname", return_value=hostname
     ):
-        state.publish_homeassistant_device_config(mqtt_client=mqtt_client)
+        await state.publish_homeassistant_device_config(mqtt_client=mqtt_client)
     mqtt_client.publish.assert_called_once()
     publish_args, publish_kwargs = mqtt_client.publish.call_args
     assert not publish_args