refactor: split `_MQTTControlledActor` into new switchbot_mqtt/_actors/_base.py

Fabian Peter Hammerle 3 년 전
+ 7 - 0

@@ -3,6 +3,13 @@
+# avoid marking param list of implementation as duplicate of abstract declaration
+# https://github.com/PyCQA/pylint/issues/3239
+# https://github.com/PyCQA/pylint/issues/214

+ 2 - 5

@@ -21,11 +21,8 @@ import typing
 import paho.mqtt.client
-from switchbot_mqtt._actors import (
-    _ButtonAutomator,
-    _CurtainMotor,
-    _MQTTCallbackUserdata,
+from switchbot_mqtt._actors import _ButtonAutomator, _CurtainMotor
+from switchbot_mqtt._actors._base import _MQTTCallbackUserdata
 _LOGGER = logging.getLogger(__name__)

+ 1 - 193
switchbot_mqtt/_actors.py → switchbot_mqtt/_actors/__init__.py

@@ -16,22 +16,18 @@
 # You should have received a copy of the GNU General Public License
 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
-import abc
 import logging
-import queue
-import shlex
 import typing
 import bluepy.btle
 import paho.mqtt.client
 import switchbot
+from switchbot_mqtt._actors._base import _MQTTControlledActor
 from switchbot_mqtt._utils import (
-    _mac_address_valid,
-    _QueueLogHandler,
 _LOGGER = logging.getLogger(__name__)
@@ -40,194 +36,6 @@ _LOGGER = logging.getLogger(__name__)
 _MQTT_TOPIC_LEVELS_PREFIX: typing.List[_MQTTTopicLevel] = ["homeassistant"]
-class _MQTTCallbackUserdata:
-    # pylint: disable=too-few-public-methods; @dataclasses.dataclass when python_requires>=3.7
-    def __init__(
-        self,
-        *,
-        retry_count: int,
-        device_passwords: typing.Dict[str, str],
-        fetch_device_info: bool,
-    ) -> None:
-        self.retry_count = retry_count
-        self.device_passwords = device_passwords
-        self.fetch_device_info = fetch_device_info
-    def __eq__(self, other: object) -> bool:
-        return isinstance(other, type(self)) and vars(self) == vars(other)
-class _MQTTControlledActor(abc.ABC):
-    MQTT_COMMAND_TOPIC_LEVELS: typing.List[_MQTTTopicLevel] = NotImplemented
-    MQTT_STATE_TOPIC_LEVELS: typing.List[_MQTTTopicLevel] = NotImplemented
-    _MQTT_BATTERY_PERCENTAGE_TOPIC_LEVELS: typing.List[_MQTTTopicLevel] = NotImplemented
-    @classmethod
-    def get_mqtt_battery_percentage_topic(cls, mac_address: str) -> str:
-        return _join_mqtt_topic_levels(
-            topic_levels=cls._MQTT_BATTERY_PERCENTAGE_TOPIC_LEVELS,
-            mac_address=mac_address,
-        )
-    @abc.abstractmethod
-    def __init__(
-        self, *, mac_address: str, retry_count: int, password: typing.Optional[str]
-    ) -> None:
-        # alternative: pySwitchbot >=0.10.0 provides SwitchbotDevice.get_mac()
-        self._mac_address = mac_address
-    @abc.abstractmethod
-    def _get_device(self) -> switchbot.SwitchbotDevice:
-        raise NotImplementedError()
-    def _update_device_info(self) -> None:
-        log_queue: queue.Queue[logging.LogRecord] = queue.Queue(maxsize=0)
-        logging.getLogger("switchbot").addHandler(_QueueLogHandler(log_queue))
-        try:
-            self._get_device().update()
-            # pySwitchbot>=v0.10.1 catches bluepy.btle.BTLEManagementError :(
-            # https://github.com/Danielhiversen/pySwitchbot/blob/0.10.1/switchbot/__init__.py#L141
-            while not log_queue.empty():
-                log_record = log_queue.get()
-                if log_record.exc_info:
-                    exc: typing.Optional[BaseException] = log_record.exc_info[1]
-                    if (
-                        isinstance(exc, bluepy.btle.BTLEManagementError)
-                        and exc.emsg == "Permission Denied"
-                    ):
-                        raise exc
-        except bluepy.btle.BTLEManagementError as exc:
-            if (
-                exc.emsg == "Permission Denied"
-                and exc.message == "Failed to execute management command 'le on'"
-            ):
-                raise PermissionError(
-                    "bluepy-helper failed to enable low energy mode"
-                    " due to insufficient permissions."
-                    "\nSee https://github.com/IanHarvey/bluepy/issues/313#issuecomment-428324639"
-                    ", https://github.com/fphammerle/switchbot-mqtt/pull/31#issuecomment-846383603"
-                    ", and https://github.com/IanHarvey/bluepy/blob/v/1.3.0/bluepy"
-                    "/bluepy-helper.c#L1260."
-                    "\nInsecure workaround:"
-                    "\n1. sudo apt-get install --no-install-recommends libcap2-bin"
-                    f"\n2. sudo setcap cap_net_admin+ep {shlex.quote(bluepy.btle.helperExe)}"
-                    "\n3. restart switchbot-mqtt"
-                    "\nIn docker-based setups, you could use"
-                    " `sudo docker run --cap-drop ALL --cap-add NET_ADMIN --user 0 …`"
-                    " (seriously insecure)."
-                ) from exc
-            raise
-    def _report_battery_level(self, mqtt_client: paho.mqtt.client.Client) -> None:
-        # > battery: Percentage of battery that is left.
-        # https://www.home-assistant.io/integrations/sensor/#device-class
-        self._mqtt_publish(
-            topic_levels=self._MQTT_BATTERY_PERCENTAGE_TOPIC_LEVELS,
-            payload=str(self._get_device().get_battery_percent()).encode(),
-            mqtt_client=mqtt_client,
-        )
-    def _update_and_report_device_info(
-        self, mqtt_client: paho.mqtt.client.Client
-    ) -> None:
-        self._update_device_info()
-        self._report_battery_level(mqtt_client=mqtt_client)
-    @abc.abstractmethod
-    def execute_command(
-        self,
-        mqtt_message_payload: bytes,
-        mqtt_client: paho.mqtt.client.Client,
-        update_device_info: bool,
-    ) -> None:
-        raise NotImplementedError()
-    @classmethod
-    def _mqtt_command_callback(
-        cls,
-        mqtt_client: paho.mqtt.client.Client,
-        userdata: _MQTTCallbackUserdata,
-        message: paho.mqtt.client.MQTTMessage,
-    ) -> None:
-        # pylint: disable=unused-argument; callback
-        # https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L469
-        _LOGGER.debug("received topic=%s payload=%r", message.topic, message.payload)
-        if message.retain:
-            _LOGGER.info("ignoring retained message")
-            return
-        topic_split = message.topic.split("/")
-        if len(topic_split) != len(cls.MQTT_COMMAND_TOPIC_LEVELS):
-            _LOGGER.warning("unexpected topic %s", message.topic)
-            return
-        mac_address = None
-        for given_part, expected_part in zip(
-            topic_split, cls.MQTT_COMMAND_TOPIC_LEVELS
-        ):
-            if expected_part == _MQTTTopicPlaceholder.MAC_ADDRESS:
-                mac_address = given_part
-            elif expected_part != given_part:
-                _LOGGER.warning("unexpected topic %s", message.topic)
-                return
-        assert mac_address
-        if not _mac_address_valid(mac_address):
-            _LOGGER.warning("invalid mac address %s", mac_address)
-            return
-        actor = cls(
-            mac_address=mac_address,
-            retry_count=userdata.retry_count,
-            password=userdata.device_passwords.get(mac_address, None),
-        )
-        actor.execute_command(
-            mqtt_message_payload=message.payload,
-            mqtt_client=mqtt_client,
-            # consider calling update+report method directly when adding support for battery levels
-            update_device_info=userdata.fetch_device_info,
-        )
-    @classmethod
-    def mqtt_subscribe(cls, mqtt_client: paho.mqtt.client.Client) -> None:
-        command_topic = "/".join(
-            "+" if isinstance(l, _MQTTTopicPlaceholder) else l
-            for l in cls.MQTT_COMMAND_TOPIC_LEVELS
-        )
-        _LOGGER.info("subscribing to MQTT topic %r", command_topic)
-        mqtt_client.subscribe(command_topic)
-        mqtt_client.message_callback_add(
-            sub=command_topic,
-            callback=cls._mqtt_command_callback,
-        )
-    def _mqtt_publish(
-        self,
-        *,
-        topic_levels: typing.List[_MQTTTopicLevel],
-        payload: bytes,
-        mqtt_client: paho.mqtt.client.Client,
-    ) -> None:
-        topic = _join_mqtt_topic_levels(
-            topic_levels=topic_levels, mac_address=self._mac_address
-        )
-        # https://pypi.org/project/paho-mqtt/#publishing
-        _LOGGER.debug("publishing topic=%s payload=%r", topic, payload)
-        message_info: paho.mqtt.client.MQTTMessageInfo = mqtt_client.publish(
-            topic=topic, payload=payload, retain=True
-        )
-        # wait before checking status?
-        if message_info.rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
-            _LOGGER.error(
-                "Failed to publish MQTT message on topic %s (rc=%d)",
-                topic,
-                message_info.rc,
-            )
-    def report_state(self, state: bytes, mqtt_client: paho.mqtt.client.Client) -> None:
-        self._mqtt_publish(
-            topic_levels=self.MQTT_STATE_TOPIC_LEVELS,
-            payload=state,
-            mqtt_client=mqtt_client,
-        )
 class _ButtonAutomator(_MQTTControlledActor):
     # https://www.home-assistant.io/integrations/switch.mqtt/

+ 224 - 0

@@ -0,0 +1,224 @@
+# switchbot-mqtt - MQTT client controlling SwitchBot button & curtain automators,
+# compatible with home-assistant.io's MQTT Switch & Cover platform
+# Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+import abc
+import logging
+import queue
+import shlex
+import typing
+import bluepy.btle
+import paho.mqtt.client
+import switchbot
+from switchbot_mqtt._utils import (
+    _join_mqtt_topic_levels,
+    _mac_address_valid,
+    _MQTTTopicLevel,
+    _MQTTTopicPlaceholder,
+    _QueueLogHandler,
+_LOGGER = logging.getLogger(__name__)
+class _MQTTCallbackUserdata:
+    # pylint: disable=too-few-public-methods; @dataclasses.dataclass when python_requires>=3.7
+    def __init__(
+        self,
+        *,
+        retry_count: int,
+        device_passwords: typing.Dict[str, str],
+        fetch_device_info: bool,
+    ) -> None:
+        self.retry_count = retry_count
+        self.device_passwords = device_passwords
+        self.fetch_device_info = fetch_device_info
+    def __eq__(self, other: object) -> bool:
+        return isinstance(other, type(self)) and vars(self) == vars(other)
+class _MQTTControlledActor(abc.ABC):
+    MQTT_COMMAND_TOPIC_LEVELS: typing.List[_MQTTTopicLevel] = NotImplemented
+    MQTT_STATE_TOPIC_LEVELS: typing.List[_MQTTTopicLevel] = NotImplemented
+    _MQTT_BATTERY_PERCENTAGE_TOPIC_LEVELS: typing.List[_MQTTTopicLevel] = NotImplemented
+    @classmethod
+    def get_mqtt_battery_percentage_topic(cls, mac_address: str) -> str:
+        return _join_mqtt_topic_levels(
+            topic_levels=cls._MQTT_BATTERY_PERCENTAGE_TOPIC_LEVELS,
+            mac_address=mac_address,
+        )
+    @abc.abstractmethod
+    def __init__(
+        self, *, mac_address: str, retry_count: int, password: typing.Optional[str]
+    ) -> None:
+        # alternative: pySwitchbot >=0.10.0 provides SwitchbotDevice.get_mac()
+        self._mac_address = mac_address
+    @abc.abstractmethod
+    def _get_device(self) -> switchbot.SwitchbotDevice:
+        raise NotImplementedError()
+    def _update_device_info(self) -> None:
+        log_queue: queue.Queue[logging.LogRecord] = queue.Queue(maxsize=0)
+        logging.getLogger("switchbot").addHandler(_QueueLogHandler(log_queue))
+        try:
+            self._get_device().update()
+            # pySwitchbot>=v0.10.1 catches bluepy.btle.BTLEManagementError :(
+            # https://github.com/Danielhiversen/pySwitchbot/blob/0.10.1/switchbot/__init__.py#L141
+            while not log_queue.empty():
+                log_record = log_queue.get()
+                if log_record.exc_info:
+                    exc: typing.Optional[BaseException] = log_record.exc_info[1]
+                    if (
+                        isinstance(exc, bluepy.btle.BTLEManagementError)
+                        and exc.emsg == "Permission Denied"
+                    ):
+                        raise exc
+        except bluepy.btle.BTLEManagementError as exc:
+            if (
+                exc.emsg == "Permission Denied"
+                and exc.message == "Failed to execute management command 'le on'"
+            ):
+                raise PermissionError(
+                    "bluepy-helper failed to enable low energy mode"
+                    " due to insufficient permissions."
+                    "\nSee https://github.com/IanHarvey/bluepy/issues/313#issuecomment-428324639"
+                    ", https://github.com/fphammerle/switchbot-mqtt/pull/31#issuecomment-846383603"
+                    ", and https://github.com/IanHarvey/bluepy/blob/v/1.3.0/bluepy"
+                    "/bluepy-helper.c#L1260."
+                    "\nInsecure workaround:"
+                    "\n1. sudo apt-get install --no-install-recommends libcap2-bin"
+                    f"\n2. sudo setcap cap_net_admin+ep {shlex.quote(bluepy.btle.helperExe)}"
+                    "\n3. restart switchbot-mqtt"
+                    "\nIn docker-based setups, you could use"
+                    " `sudo docker run --cap-drop ALL --cap-add NET_ADMIN --user 0 …`"
+                    " (seriously insecure)."
+                ) from exc
+            raise
+    def _report_battery_level(self, mqtt_client: paho.mqtt.client.Client) -> None:
+        # > battery: Percentage of battery that is left.
+        # https://www.home-assistant.io/integrations/sensor/#device-class
+        self._mqtt_publish(
+            topic_levels=self._MQTT_BATTERY_PERCENTAGE_TOPIC_LEVELS,
+            payload=str(self._get_device().get_battery_percent()).encode(),
+            mqtt_client=mqtt_client,
+        )
+    def _update_and_report_device_info(
+        self, mqtt_client: paho.mqtt.client.Client
+    ) -> None:
+        self._update_device_info()
+        self._report_battery_level(mqtt_client=mqtt_client)
+    @abc.abstractmethod
+    def execute_command(
+        self,
+        mqtt_message_payload: bytes,
+        mqtt_client: paho.mqtt.client.Client,
+        update_device_info: bool,
+    ) -> None:
+        raise NotImplementedError()
+    @classmethod
+    def _mqtt_command_callback(
+        cls,
+        mqtt_client: paho.mqtt.client.Client,
+        userdata: _MQTTCallbackUserdata,
+        message: paho.mqtt.client.MQTTMessage,
+    ) -> None:
+        # pylint: disable=unused-argument; callback
+        # https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L469
+        _LOGGER.debug("received topic=%s payload=%r", message.topic, message.payload)
+        if message.retain:
+            _LOGGER.info("ignoring retained message")
+            return
+        topic_split = message.topic.split("/")
+        if len(topic_split) != len(cls.MQTT_COMMAND_TOPIC_LEVELS):
+            _LOGGER.warning("unexpected topic %s", message.topic)
+            return
+        mac_address = None
+        for given_part, expected_part in zip(
+            topic_split, cls.MQTT_COMMAND_TOPIC_LEVELS
+        ):
+            if expected_part == _MQTTTopicPlaceholder.MAC_ADDRESS:
+                mac_address = given_part
+            elif expected_part != given_part:
+                _LOGGER.warning("unexpected topic %s", message.topic)
+                return
+        assert mac_address
+        if not _mac_address_valid(mac_address):
+            _LOGGER.warning("invalid mac address %s", mac_address)
+            return
+        actor = cls(
+            mac_address=mac_address,
+            retry_count=userdata.retry_count,
+            password=userdata.device_passwords.get(mac_address, None),
+        )
+        actor.execute_command(
+            mqtt_message_payload=message.payload,
+            mqtt_client=mqtt_client,
+            # consider calling update+report method directly when adding support for battery levels
+            update_device_info=userdata.fetch_device_info,
+        )
+    @classmethod
+    def mqtt_subscribe(cls, mqtt_client: paho.mqtt.client.Client) -> None:
+        command_topic = "/".join(
+            "+" if isinstance(l, _MQTTTopicPlaceholder) else l
+            for l in cls.MQTT_COMMAND_TOPIC_LEVELS
+        )
+        _LOGGER.info("subscribing to MQTT topic %r", command_topic)
+        mqtt_client.subscribe(command_topic)
+        mqtt_client.message_callback_add(
+            sub=command_topic,
+            callback=cls._mqtt_command_callback,
+        )
+    def _mqtt_publish(
+        self,
+        *,
+        topic_levels: typing.List[_MQTTTopicLevel],
+        payload: bytes,
+        mqtt_client: paho.mqtt.client.Client,
+    ) -> None:
+        topic = _join_mqtt_topic_levels(
+            topic_levels=topic_levels, mac_address=self._mac_address
+        )
+        # https://pypi.org/project/paho-mqtt/#publishing
+        _LOGGER.debug("publishing topic=%s payload=%r", topic, payload)
+        message_info: paho.mqtt.client.MQTTMessageInfo = mqtt_client.publish(
+            topic=topic, payload=payload, retain=True
+        )
+        # wait before checking status?
+        if message_info.rc != paho.mqtt.client.MQTT_ERR_SUCCESS:
+            _LOGGER.error(
+                "Failed to publish MQTT message on topic %s (rc=%d)",
+                topic,
+                message_info.rc,
+            )
+    def report_state(self, state: bytes, mqtt_client: paho.mqtt.client.Client) -> None:
+        self._mqtt_publish(
+            topic_levels=self.MQTT_STATE_TOPIC_LEVELS,
+            payload=state,
+            mqtt_client=mqtt_client,
+        )

+ 11 - 11

@@ -93,12 +93,12 @@ def test__run(
             f"connected to MQTT broker {mqtt_host}:{mqtt_port}",
-            "switchbot_mqtt._actors",
+            "switchbot_mqtt._actors._base",
             "subscribing to MQTT topic 'homeassistant/switch/switchbot/+/set'",
-            "switchbot_mqtt._actors",
+            "switchbot_mqtt._actors._base",
             "subscribing to MQTT topic 'homeassistant/cover/switchbot-curtain/+/set'",
@@ -256,7 +256,7 @@ def test__mqtt_command_callback(
     assert caplog.record_tuples == [
-            "switchbot_mqtt._actors",
+            "switchbot_mqtt._actors._base",
             f"received topic={topic.decode()} payload={payload!r}",
@@ -332,12 +332,12 @@ def test__mqtt_command_callback_unexpected_topic(caplog, topic: bytes, payload:
     assert caplog.record_tuples == [
-            "switchbot_mqtt._actors",
+            "switchbot_mqtt._actors._base",
             f"received topic={topic.decode()} payload={payload!r}",
-            "switchbot_mqtt._actors",
+            "switchbot_mqtt._actors._base",
             f"unexpected topic {topic.decode()}",
@@ -372,12 +372,12 @@ def test__mqtt_command_callback_invalid_mac_address(
     assert caplog.record_tuples == [
-            "switchbot_mqtt._actors",
+            "switchbot_mqtt._actors._base",
             f"received topic={topic.decode()} payload={payload!r}",
-            "switchbot_mqtt._actors",
+            "switchbot_mqtt._actors._base",
             f"invalid mac address {mac_address}",
@@ -413,11 +413,11 @@ def test__mqtt_command_callback_ignore_retained(caplog, topic: bytes, payload: b
     assert caplog.record_tuples == [
-            "switchbot_mqtt._actors",
+            "switchbot_mqtt._actors._base",
             f"received topic={topic.decode()} payload={payload!r}",
-        ("switchbot_mqtt._actors", logging.INFO, "ignoring retained message"),
+        ("switchbot_mqtt._actors._base", logging.INFO, "ignoring retained message"),
@@ -476,7 +476,7 @@ def test__report_state(
         topic=expected_topic, payload=state, retain=True
     assert caplog.record_tuples[0] == (
-        "switchbot_mqtt._actors",
+        "switchbot_mqtt._actors._base",
         f"publishing topic={expected_topic} payload={state!r}",
@@ -485,7 +485,7 @@ def test__report_state(
         assert caplog.record_tuples[1:] == [
-                "switchbot_mqtt._actors",
+                "switchbot_mqtt._actors._base",
                 f"Failed to publish MQTT message on topic {expected_topic} (rc={return_code})",