Browse Source

Add basic support for the mini plugs (#53)

J. Nick Koston 1 year ago
parent
commit
464e72c052
1 changed files with 55 additions and 5 deletions
  1. 55 5
      switchbot/__init__.py

+ 55 - 5
switchbot/__init__.py

@@ -22,6 +22,10 @@ DEVICE_GET_BASIC_SETTINGS_KEY = "5702"
 DEVICE_SET_MODE_KEY = "5703"
 DEVICE_SET_EXTENDED_KEY = "570f"
 
+# Plug Mini keys
+PLUG_ON_KEY = "570f50010180"
+PLUG_OFF_KEY = "570f50010100"
+
 # Bot keys
 PRESS_KEY = "570100"
 ON_KEY = "570101"
@@ -56,7 +60,7 @@ def _sb_uuid(comms_type: str = "service") -> UUID | str:
     return "Incorrect type, choose between: tx, rx or service"
 
 
-def _process_wohand(data: bytes) -> dict[str, bool | int]:
+def _process_wohand(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
     """Process woHand/Bot services data."""
     _switch_mode = bool(data[1] & 0b10000000)
 
@@ -69,7 +73,9 @@ def _process_wohand(data: bytes) -> dict[str, bool | int]:
     return _bot_data
 
 
-def _process_wocurtain(data: bytes, reverse: bool = True) -> dict[str, bool | int]:
+def _process_wocurtain(
+    data: bytes, mfr_data: bytes | None, reverse: bool = True
+) -> dict[str, bool | int]:
     """Process woCurtain/Curtain services data."""
 
     _position = max(min(data[3] & 0b01111111, 100), 0)
@@ -86,7 +92,7 @@ def _process_wocurtain(data: bytes, reverse: bool = True) -> dict[str, bool | in
     return _curtain_data
 
 
-def _process_wosensorth(data: bytes) -> dict[str, object]:
+def _process_wosensorth(data: bytes, mfr_data: bytes | None) -> dict[str, object]:
     """Process woSensorTH/Temp sensor services data."""
 
     _temp_sign = 1 if data[4] & 0b10000000 else -1
@@ -104,7 +110,7 @@ def _process_wosensorth(data: bytes) -> dict[str, object]:
     return _wosensorth_data
 
 
-def _process_wocontact(data: bytes) -> dict[str, bool | int]:
+def _process_wocontact(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
     """Process woContact Sensor services data."""
     return {
         "tested": bool(data[1] & 0b10000000),
@@ -117,6 +123,15 @@ def _process_wocontact(data: bytes) -> dict[str, bool | int]:
     }
 
 
+def _process_woplugmini(data: bytes, mfr_data: bytes | None) -> dict[str, bool | int]:
+    """Process plug mini."""
+    return {
+        "switchMode": True,
+        "isOn": mfr_data[7] == 0x80,
+        "wifi_rssi": mfr_data[9],
+    }
+
+
 @dataclass
 class SwitchBotAdvertisement:
     """Switchbot advertisement."""
@@ -131,9 +146,12 @@ def parse_advertisement_data(
 ) -> SwitchBotAdvertisement | None:
     """Parse advertisement data."""
     _services = list(advertisement_data.service_data.values())
+    _mgr_datas = list(advertisement_data.manufacturer_data.values())
+
     if not _services:
         return
     _service_data = _services[0]
+    _mfr_data = _mgr_datas[0] if _mgr_datas else None
     _model = chr(_service_data[0] & 0b01111111)
 
     supported_types: dict[str, dict[str, Any]] = {
@@ -142,6 +160,7 @@ def parse_advertisement_data(
         "c": {"modelName": "WoCurtain", "func": _process_wocurtain},
         "T": {"modelName": "WoSensorTH", "func": _process_wosensorth},
         "i": {"modelName": "WoSensorTH", "func": _process_wosensorth},
+        "g": {"modelName": "WoPlug", "func": _process_woplugmini},
     }
 
     data = {
@@ -159,7 +178,7 @@ def parse_advertisement_data(
                 "isEncrypted": bool(_service_data[0] & 0b10000000),
                 "model": _model,
                 "modelName": supported_types[_model]["modelName"],
-                "data": supported_types[_model]["func"](_service_data),
+                "data": supported_types[_model]["func"](_service_data, _mfr_data),
             }
         )
 
@@ -722,3 +741,34 @@ class SwitchbotCurtain(SwitchbotDevice):
         """Return True curtain is calibrated."""
         # To get actual light level call update() first.
         return self._get_adv_value("calibration")
+
+
+class SwitchbotPlugMini(SwitchbotDevice):
+    """Representation of a Switchbot plug mini."""
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
+        """Switchbot plug mini constructor."""
+        super().__init__(*args, **kwargs)
+        self._settings: dict[str, Any] = {}
+
+    async def update(self, interface: int | None = None) -> None:
+        """Update state of device."""
+        await self.get_device_data(retry=self._retry_count, interface=interface)
+
+    async def turn_on(self) -> bool:
+        """Turn device on."""
+        result = await self._sendcommand(PLUG_ON_KEY, self._retry_count)
+        return result[1] == 0x80
+
+    async def turn_off(self) -> bool:
+        """Turn device off."""
+        result = await self._sendcommand(PLUG_OFF_KEY, self._retry_count)
+        return result[1] == 0x00
+
+    def is_on(self) -> Any:
+        """Return switch state from cache."""
+        # To get actual position call update() first.
+        value = self._get_adv_value("isOn")
+        if value is None:
+            return None
+        return value