Browse Source

refactor: moved invocation of `switchbot.SwitchbotDevice.update` to new method `_MQTTControlledActor._get_device` (to prepare retrieval of battery state for button automators)

https://github.com/fphammerle/switchbot-mqtt/issues/41
Fabian Peter Hammerle 2 years ago
parent
commit
592cdec215

+ 57 - 44
switchbot_mqtt/__init__.py

@@ -110,6 +110,48 @@ class _MQTTControlledActor(abc.ABC):
     ) -> None:
         raise NotImplementedError()
 
+    @abc.abstractmethod
+    def _get_device(self) -> switchbot.SwitchbotDevice:
+        raise NotImplementedError()
+
+    def _update_device_info(self) -> None:
+        log_queue: queue.Queue[logging.LogRecord] = queue.Queue(maxsize=0)
+        logging.getLogger("switchbot").addHandler(_QueueLogHandler(log_queue))
+        try:
+            self._get_device().update()
+            # pySwitchbot>=v0.10.1 catches bluepy.btle.BTLEManagementError :(
+            # https://github.com/Danielhiversen/pySwitchbot/blob/0.10.1/switchbot/__init__.py#L141
+            while not log_queue.empty():
+                log_record = log_queue.get()
+                if log_record.exc_info:
+                    exc: typing.Optional[BaseException] = log_record.exc_info[1]
+                    if (
+                        isinstance(exc, bluepy.btle.BTLEManagementError)
+                        and exc.emsg == "Permission Denied"
+                    ):
+                        raise exc
+        except bluepy.btle.BTLEManagementError as exc:
+            if (
+                exc.emsg == "Permission Denied"
+                and exc.message == "Failed to execute management command 'le on'"
+            ):
+                raise PermissionError(
+                    "bluepy-helper failed to enable low energy mode"
+                    " due to insufficient permissions."
+                    "\nSee https://github.com/IanHarvey/bluepy/issues/313#issuecomment-428324639"
+                    ", https://github.com/fphammerle/switchbot-mqtt/pull/31#issuecomment-846383603"
+                    ", and https://github.com/IanHarvey/bluepy/blob/v/1.3.0/bluepy"
+                    "/bluepy-helper.c#L1260."
+                    "\nInsecure workaround:"
+                    "\n1. sudo apt-get install --no-install-recommends libcap2-bin"
+                    f"\n2. sudo setcap cap_net_admin+ep {shlex.quote(bluepy.btle.helperExe)}"
+                    "\n3. restart switchbot-mqtt"
+                    "\nIn docker-based setups, you could use"
+                    " `sudo docker run --cap-drop ALL --cap-add NET_ADMIN --user 0 …`"
+                    " (seriously insecure)."
+                ) from exc
+            raise
+
     @classmethod
     def _mqtt_command_callback(
         cls,
@@ -216,13 +258,16 @@ class _ButtonAutomator(_MQTTControlledActor):
     def __init__(
         self, *, mac_address: str, retry_count: int, password: typing.Optional[str]
     ) -> None:
-        self._device = switchbot.Switchbot(
+        self.__device = switchbot.Switchbot(
             mac=mac_address, password=password, retry_count=retry_count
         )
         super().__init__(
             mac_address=mac_address, retry_count=retry_count, password=password
         )
 
+    def _get_device(self) -> switchbot.SwitchbotDevice:
+        return self.__device
+
     def execute_command(
         self,
         mqtt_message_payload: bytes,
@@ -231,7 +276,7 @@ class _ButtonAutomator(_MQTTControlledActor):
     ) -> None:
         # https://www.home-assistant.io/integrations/switch.mqtt/#payload_on
         if mqtt_message_payload.lower() == b"on":
-            if not self._device.turn_on():
+            if not self.__device.turn_on():
                 _LOGGER.error("failed to turn on switchbot %s", self._mac_address)
             else:
                 _LOGGER.info("switchbot %s turned on", self._mac_address)
@@ -239,7 +284,7 @@ class _ButtonAutomator(_MQTTControlledActor):
                 self.report_state(mqtt_client=mqtt_client, state=b"ON")
         # https://www.home-assistant.io/integrations/switch.mqtt/#payload_off
         elif mqtt_message_payload.lower() == b"off":
-            if not self._device.turn_off():
+            if not self.__device.turn_off():
                 _LOGGER.error("failed to turn off switchbot %s", self._mac_address)
             else:
                 _LOGGER.info("switchbot %s turned off", self._mac_address)
@@ -285,7 +330,7 @@ class _CurtainMotor(_MQTTControlledActor):
     ) -> None:
         # > The position of the curtain is saved in self._pos with 0 = open and 100 = closed.
         # https://github.com/Danielhiversen/pySwitchbot/blob/0.10.0/switchbot/__init__.py#L150
-        self._device = switchbot.SwitchbotCurtain(
+        self.__device = switchbot.SwitchbotCurtain(
             mac=mac_address,
             password=password,
             retry_count=retry_count,
@@ -295,6 +340,9 @@ class _CurtainMotor(_MQTTControlledActor):
             mac_address=mac_address, retry_count=retry_count, password=password
         )
 
+    def _get_device(self) -> switchbot.SwitchbotDevice:
+        return self.__device
+
     def _report_position(self, mqtt_client: paho.mqtt.client.Client) -> None:
         # > position_closed integer (Optional, default: 0)
         # > position_open integer (Optional, default: 100)
@@ -305,47 +353,12 @@ class _CurtainMotor(_MQTTControlledActor):
         # https://github.com/Danielhiversen/pySwitchbot/blob/0.10.0/switchbot/__init__.py#L202
         self._mqtt_publish(
             topic_levels=self._MQTT_POSITION_TOPIC_LEVELS,
-            payload=str(int(self._device.get_position())).encode(),
+            payload=str(int(self.__device.get_position())).encode(),
             mqtt_client=mqtt_client,
         )
 
     def _update_position(self, mqtt_client: paho.mqtt.client.Client) -> None:
-        log_queue: queue.Queue[logging.LogRecord] = queue.Queue(maxsize=0)
-        logging.getLogger("switchbot").addHandler(_QueueLogHandler(log_queue))
-        try:
-            self._device.update()
-            # pySwitchbot>=v0.10.1 catches bluepy.btle.BTLEManagementError :(
-            # https://github.com/Danielhiversen/pySwitchbot/blob/0.10.1/switchbot/__init__.py#L141
-            while not log_queue.empty():
-                log_record = log_queue.get()
-                if log_record.exc_info:
-                    exc: typing.Optional[BaseException] = log_record.exc_info[1]
-                    if (
-                        isinstance(exc, bluepy.btle.BTLEManagementError)
-                        and exc.emsg == "Permission Denied"
-                    ):
-                        raise exc
-        except bluepy.btle.BTLEManagementError as exc:
-            if (
-                exc.emsg == "Permission Denied"
-                and exc.message == "Failed to execute management command 'le on'"
-            ):
-                raise PermissionError(
-                    "bluepy-helper failed to enable low energy mode"
-                    " due to insufficient permissions."
-                    "\nSee https://github.com/IanHarvey/bluepy/issues/313#issuecomment-428324639"
-                    ", https://github.com/fphammerle/switchbot-mqtt/pull/31#issuecomment-846383603"
-                    ", and https://github.com/IanHarvey/bluepy/blob/v/1.3.0/bluepy"
-                    "/bluepy-helper.c#L1260."
-                    "\nInsecure workaround:"
-                    "\n1. sudo apt-get install --no-install-recommends libcap2-bin"
-                    f"\n2. sudo setcap cap_net_admin+ep {shlex.quote(bluepy.btle.helperExe)}"
-                    "\n3. restart switchbot-mqtt"
-                    "\nIn docker-based setups, you could use"
-                    " `sudo docker run --cap-drop ALL --cap-add NET_ADMIN --user 0 …`"
-                    " (seriously insecure)."
-                ) from exc
-            raise
+        self._update_device_info()
         self._report_position(mqtt_client=mqtt_client)
 
     def execute_command(
@@ -356,7 +369,7 @@ class _CurtainMotor(_MQTTControlledActor):
     ) -> None:
         # https://www.home-assistant.io/integrations/cover.mqtt/#payload_open
         if mqtt_message_payload.lower() == b"open":
-            if not self._device.open():
+            if not self.__device.open():
                 _LOGGER.error("failed to open switchbot curtain %s", self._mac_address)
             else:
                 _LOGGER.info("switchbot curtain %s opening", self._mac_address)
@@ -364,14 +377,14 @@ class _CurtainMotor(_MQTTControlledActor):
                 # https://www.home-assistant.io/integrations/cover.mqtt/#state_opening
                 self.report_state(mqtt_client=mqtt_client, state=b"opening")
         elif mqtt_message_payload.lower() == b"close":
-            if not self._device.close():
+            if not self.__device.close():
                 _LOGGER.error("failed to close switchbot curtain %s", self._mac_address)
             else:
                 _LOGGER.info("switchbot curtain %s closing", self._mac_address)
                 # https://www.home-assistant.io/integrations/cover.mqtt/#state_closing
                 self.report_state(mqtt_client=mqtt_client, state=b"closing")
         elif mqtt_message_payload.lower() == b"stop":
-            if not self._device.stop():
+            if not self.__device.stop():
                 _LOGGER.error("failed to stop switchbot curtain %s", self._mac_address)
             else:
                 _LOGGER.info("switchbot curtain %s stopped", self._mac_address)

+ 5 - 0
tests/test_actor_base.py

@@ -55,8 +55,13 @@ def test_execute_command_abstract():
                 update_device_info=update_device_info,
             )
 
+        def _get_device(self):
+            return super()._get_device() or 42
+
     actor = _ActorMock(mac_address=None, retry_count=42, password=None)
     with pytest.raises(NotImplementedError):
         actor.execute_command(
             mqtt_message_payload=b"dummy", mqtt_client="dummy", update_device_info=True
         )
+    with pytest.raises(NotImplementedError):
+        actor._get_device()

+ 30 - 37
tests/test_switchbot_curtain_motor_device_info.py → tests/test_actor_base_device_info.py

@@ -37,39 +37,36 @@ _LE_ON_PERMISSION_DENIED_ERROR = bluepy.btle.BTLEManagementError(
 )
 
 
-def test__update_position_le_on_permission_denied_log():  # pySwitchbot>=v0.10.0
-    actor = switchbot_mqtt._CurtainMotor(
-        mac_address="dummy", retry_count=21, password=None
-    )
+@pytest.mark.parametrize(
+    "actor_class", [switchbot_mqtt._CurtainMotor, switchbot_mqtt._ButtonAutomator]
+)
+def test__update_device_info_le_on_permission_denied_log(
+    actor_class,
+):  # pySwitchbot>=v0.10.0
+    actor = actor_class(mac_address="dummy", retry_count=21, password=None)
     with unittest.mock.patch(
         "bluepy.btle.Scanner.scan",
         side_effect=_LE_ON_PERMISSION_DENIED_ERROR,
-    ), unittest.mock.patch.object(
-        actor, "_report_position"
-    ) as report_position_mock, pytest.raises(
-        PermissionError
-    ) as exc_info:
-        actor._update_position(mqtt_client="client")
-    report_position_mock.assert_not_called()
+    ), pytest.raises(PermissionError) as exc_info:
+        actor._update_device_info()
     assert "sudo setcap cap_net_admin+ep /" in exc_info.exconly()
     assert exc_info.value.__cause__ == _LE_ON_PERMISSION_DENIED_ERROR
 
 
-def test__update_position_le_on_permission_denied_exc():  # pySwitchbot<v0.10.1
-    actor = switchbot_mqtt._CurtainMotor(
-        mac_address="dummy", retry_count=21, password=None
-    )
-    with unittest.mock.patch(
-        "switchbot.SwitchbotCurtain.update",
+@pytest.mark.parametrize(
+    "actor_class", [switchbot_mqtt._CurtainMotor, switchbot_mqtt._ButtonAutomator]
+)
+def test__update_device_info_le_on_permission_denied_exc(
+    actor_class,
+):  # pySwitchbot<v0.10.1
+    actor = actor_class(mac_address="dummy", retry_count=21, password=None)
+    with unittest.mock.patch.object(
+        actor._get_device(),
+        "update",
         side_effect=_LE_ON_PERMISSION_DENIED_ERROR,
-    ) as update_mock, unittest.mock.patch.object(
-        actor, "_report_position"
-    ) as report_position_mock, pytest.raises(
-        PermissionError
-    ) as exc_info:
-        actor._update_position(mqtt_client="client")
+    ) as update_mock, pytest.raises(PermissionError) as exc_info:
+        actor._update_device_info()
     update_mock.assert_called_once_with()
-    report_position_mock.assert_not_called()
     assert os.path.isfile(
         re.search(
             r"sudo setcap cap_net_admin\+ep (\S+/bluepy-helper)\b",
@@ -79,19 +76,15 @@ def test__update_position_le_on_permission_denied_exc():  # pySwitchbot<v0.10.1
     assert exc_info.value.__cause__ == _LE_ON_PERMISSION_DENIED_ERROR
 
 
-def test__update_position_other_error():
-    actor = switchbot_mqtt._CurtainMotor(
-        mac_address="dummy", retry_count=21, password=None
-    )
+@pytest.mark.parametrize(
+    "actor_class", [switchbot_mqtt._CurtainMotor, switchbot_mqtt._ButtonAutomator]
+)
+def test__update_device_info_other_error(actor_class):
+    actor = actor_class(mac_address="dummy", retry_count=21, password=None)
     side_effect = bluepy.btle.BTLEManagementError("test")
-    with unittest.mock.patch(
-        "switchbot.SwitchbotCurtain.update", side_effect=side_effect
-    ) as update_mock, unittest.mock.patch.object(
-        actor, "_report_position"
-    ) as report_position_mock, pytest.raises(
-        type(side_effect)
-    ) as exc_info:
-        actor._update_position(mqtt_client="client")
+    with unittest.mock.patch.object(
+        actor._get_device(), "update", side_effect=side_effect
+    ) as update_mock, pytest.raises(type(side_effect)) as exc_info:
+        actor._update_device_info()
     update_mock.assert_called_once_with()
-    report_position_mock.assert_not_called()
     assert exc_info.value == side_effect

+ 6 - 0
tests/test_mqtt.py

@@ -164,6 +164,9 @@ def _mock_actor_class(
         ):
             pass
 
+        def _get_device(self):
+            return None
+
     return _ActorMock
 
 
@@ -464,6 +467,9 @@ def test__report_state(
         ):
             pass
 
+        def _get_device(self):
+            return None
+
     mqtt_client_mock = unittest.mock.MagicMock()
     mqtt_client_mock.publish.return_value.rc = return_code
     with caplog.at_level(logging.DEBUG):

+ 22 - 0
tests/test_switchbot_curtain_motor.py

@@ -271,3 +271,25 @@ def test_execute_command_bluetooth_error(caplog, mac_address, message_payload):
             f"failed to {message_payload.decode().lower()} switchbot curtain {mac_address}",
         ),
     ]
+
+
+@pytest.mark.parametrize(
+    "exception",
+    [
+        PermissionError("bluepy-helper failed to enable low energy mode..."),
+        bluepy.btle.BTLEManagementError("test"),
+    ],
+)
+def test__update_position_update_error(exception):
+    actor = switchbot_mqtt._CurtainMotor(
+        mac_address="dummy", retry_count=21, password=None
+    )
+    with unittest.mock.patch.object(
+        actor._get_device(), "update", side_effect=exception
+    ), unittest.mock.patch.object(
+        actor, "_report_position"
+    ) as report_position_mock, pytest.raises(
+        type(exception)
+    ):
+        actor._update_position(mqtt_client="client")
+    report_position_mock.assert_not_called()