瀏覽代碼

refactor: new class _MQTTActionSchedulePoweroff

Fabian Peter Hammerle 3 年之前
父節點
當前提交
8817fd95e8
共有 3 個文件被更改,包括 38 次插入39 次删除
  1. 23 20
      systemctl_mqtt/__init__.py
  2. 1 1
      tests/test_dbus.py
  3. 14 18
      tests/test_mqtt.py

+ 23 - 20
systemctl_mqtt/__init__.py

@@ -15,6 +15,7 @@
 # You should have received a copy of the GNU General Public License
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
+import abc
 import argparse
 import datetime
 import functools
@@ -192,13 +193,10 @@ class _State:
         )
 
 
-class _MQTTAction:
-
-    # pylint: disable=too-few-public-methods
-
-    def __init__(self, name: str, action: typing.Callable) -> None:
-        self.name = name
-        self.action = action
+class _MQTTAction(metaclass=abc.ABCMeta):
+    @abc.abstractmethod
+    def trigger(self) -> None:
+        pass
 
     def mqtt_message_callback(
         self,
@@ -213,21 +211,26 @@ class _MQTTAction:
         if message.retain:
             _LOGGER.info("ignoring retained message")
             return
-        _LOGGER.debug("executing action %s (%r)", self.name, self.action)
-        self.action()
-        _LOGGER.debug("completed action %s (%r)", self.name, self.action)
+        _LOGGER.debug("executing action %s", self)
+        self.trigger()
+        _LOGGER.debug("completed action %s", self)
+
+
+class _MQTTActionSchedulePoweroff(_MQTTAction):
+    def __init__(self, delay: datetime.timedelta) -> None:
+        super().__init__()
+        self._delay = delay
+
+    def trigger(self) -> None:
+        # pylint: disable=protected-access
+        systemctl_mqtt._dbus.schedule_shutdown(action="poweroff", delay=self._delay)
+
+    def __str__(self) -> str:
+        return type(self).__name__
 
 
 _MQTT_TOPIC_SUFFIX_ACTION_MAPPING = {
-    "poweroff": _MQTTAction(
-        name="poweroff",
-        action=functools.partial(
-            # pylint: disable=protected-access
-            systemctl_mqtt._dbus.schedule_shutdown,
-            action="poweroff",
-            delay=datetime.timedelta(seconds=4),
-        ),
-    ),
+    "poweroff": _MQTTActionSchedulePoweroff(delay=datetime.timedelta(seconds=4))
 }
 
 
@@ -255,7 +258,7 @@ def _mqtt_on_connect(
             sub=topic, callback=action.mqtt_message_callback
         )
         _LOGGER.debug(
-            "registered MQTT callback for topic %s triggering %r", topic, action.action,
+            "registered MQTT callback for topic %s triggering %s", topic, action
         )
 
 

+ 1 - 1
tests/test_dbus.py

@@ -160,7 +160,7 @@ def test_mqtt_topic_suffix_action_mapping(topic_suffix, expected_action_arg):
     with unittest.mock.patch(
         "systemctl_mqtt._dbus.get_login_manager", return_value=login_manager_mock,
     ):
-        mqtt_action.action()
+        mqtt_action.trigger()
     assert login_manager_mock.ScheduleShutdown.call_count == 1
     schedule_args, schedule_kwargs = login_manager_mock.ScheduleShutdown.call_args
     assert len(schedule_args) == 2

+ 14 - 18
tests/test_mqtt.py

@@ -136,7 +136,7 @@ def test__run(
     assert caplog.records[5].message == "registered MQTT callback for topic {}".format(
         mqtt_topic_prefix + "/poweroff"
     ) + " triggering {}".format(
-        systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"].action
+        systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"]
     )
     # dbus loop started?
     glib_loop_mock.assert_called_once_with()
@@ -272,16 +272,16 @@ def test__client_handle_message(caplog, mqtt_host, mqtt_port, mqtt_topic_prefix)
     caplog.set_level(logging.DEBUG)
     poweroff_message = MQTTMessage(topic=mqtt_topic_prefix.encode() + b"/poweroff")
     with unittest.mock.patch.object(
-        systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "action",
-    ) as poweroff_action_mock:
+        systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "trigger"
+    ) as poweroff_trigger_mock:
         mqtt_client._handle_on_message(poweroff_message)
-    poweroff_action_mock.assert_called_once_with()
+    poweroff_trigger_mock.assert_called_once_with()
     assert all(r.levelno == logging.DEBUG for r in caplog.records)
     assert caplog.records[0].message == "received topic={} payload=b''".format(
         poweroff_message.topic
     )
-    assert caplog.records[1].message.startswith("executing action poweroff")
-    assert caplog.records[2].message.startswith("completed action poweroff")
+    assert caplog.records[1].message == "executing action _MQTTActionSchedulePoweroff"
+    assert caplog.records[2].message == "completed action _MQTTActionSchedulePoweroff"
 
 
 @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
@@ -309,27 +309,23 @@ def test_mqtt_message_callback_poweroff(caplog, mqtt_topic: str, payload: bytes)
     message = MQTTMessage(topic=mqtt_topic.encode())
     message.payload = payload
     with unittest.mock.patch.object(
-        systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "action",
-    ) as action_mock, caplog.at_level(logging.DEBUG):
+        systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "trigger"
+    ) as trigger_mock, caplog.at_level(logging.DEBUG):
         systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[
             "poweroff"
         ].mqtt_message_callback(
             None, None, message  # type: ignore
         )
-    action_mock.assert_called_once_with()
+    trigger_mock.assert_called_once_with()
     assert len(caplog.records) == 3
     assert caplog.records[0].levelno == logging.DEBUG
     assert caplog.records[0].message == (
         "received topic={} payload={!r}".format(mqtt_topic, payload)
     )
     assert caplog.records[1].levelno == logging.DEBUG
-    assert caplog.records[1].message.startswith(
-        "executing action {} ({!r})".format("poweroff", action_mock)
-    )
+    assert caplog.records[1].message == "executing action _MQTTActionSchedulePoweroff"
     assert caplog.records[2].levelno == logging.DEBUG
-    assert caplog.records[2].message.startswith(
-        "completed action {} ({!r})".format("poweroff", action_mock)
-    )
+    assert caplog.records[2].message == "completed action _MQTTActionSchedulePoweroff"
 
 
 @pytest.mark.parametrize("mqtt_topic", ["system/command/poweroff"])
@@ -341,14 +337,14 @@ def test_mqtt_message_callback_poweroff_retained(
     message.payload = payload
     message.retain = True
     with unittest.mock.patch.object(
-        systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "action",
-    ) as action_mock, caplog.at_level(logging.DEBUG):
+        systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "trigger"
+    ) as trigger_mock, caplog.at_level(logging.DEBUG):
         systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[
             "poweroff"
         ].mqtt_message_callback(
             None, None, message  # type: ignore
         )
-    action_mock.assert_not_called()
+    trigger_mock.assert_not_called()
     assert len(caplog.records) == 2
     assert caplog.records[0].levelno == logging.DEBUG
     assert caplog.records[0].message == (