|
@@ -93,6 +93,14 @@ class _MQTTCallbackUserdata:
|
|
|
class _MQTTControlledActor(abc.ABC):
|
|
|
MQTT_COMMAND_TOPIC_LEVELS: typing.List[_MQTTTopicLevel] = NotImplemented
|
|
|
MQTT_STATE_TOPIC_LEVELS: typing.List[_MQTTTopicLevel] = NotImplemented
|
|
|
+ _MQTT_BATTERY_PERCENTAGE_TOPIC_LEVELS: typing.List[_MQTTTopicLevel] = NotImplemented
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def get_mqtt_battery_percentage_topic(cls, mac_address: str) -> str:
|
|
|
+ return _join_mqtt_topic_levels(
|
|
|
+ topic_levels=cls._MQTT_BATTERY_PERCENTAGE_TOPIC_LEVELS,
|
|
|
+ mac_address=mac_address,
|
|
|
+ )
|
|
|
|
|
|
@abc.abstractmethod
|
|
|
def __init__(
|
|
@@ -101,15 +109,6 @@ class _MQTTControlledActor(abc.ABC):
|
|
|
|
|
|
self._mac_address = mac_address
|
|
|
|
|
|
- @abc.abstractmethod
|
|
|
- def execute_command(
|
|
|
- self,
|
|
|
- mqtt_message_payload: bytes,
|
|
|
- mqtt_client: paho.mqtt.client.Client,
|
|
|
- update_device_info: bool,
|
|
|
- ) -> None:
|
|
|
- raise NotImplementedError()
|
|
|
-
|
|
|
@abc.abstractmethod
|
|
|
def _get_device(self) -> switchbot.SwitchbotDevice:
|
|
|
raise NotImplementedError()
|
|
@@ -152,6 +151,30 @@ class _MQTTControlledActor(abc.ABC):
|
|
|
) from exc
|
|
|
raise
|
|
|
|
|
|
+ def _report_battery_level(self, mqtt_client: paho.mqtt.client.Client) -> None:
|
|
|
+
|
|
|
+
|
|
|
+ self._mqtt_publish(
|
|
|
+ topic_levels=self._MQTT_BATTERY_PERCENTAGE_TOPIC_LEVELS,
|
|
|
+ payload=str(self._get_device().get_battery_percent()).encode(),
|
|
|
+ mqtt_client=mqtt_client,
|
|
|
+ )
|
|
|
+
|
|
|
+ def _update_and_report_device_info(
|
|
|
+ self, mqtt_client: paho.mqtt.client.Client
|
|
|
+ ) -> None:
|
|
|
+ self._update_device_info()
|
|
|
+ self._report_battery_level(mqtt_client=mqtt_client)
|
|
|
+
|
|
|
+ @abc.abstractmethod
|
|
|
+ def execute_command(
|
|
|
+ self,
|
|
|
+ mqtt_message_payload: bytes,
|
|
|
+ mqtt_client: paho.mqtt.client.Client,
|
|
|
+ update_device_info: bool,
|
|
|
+ ) -> None:
|
|
|
+ raise NotImplementedError()
|
|
|
+
|
|
|
@classmethod
|
|
|
def _mqtt_command_callback(
|
|
|
cls,
|
|
@@ -247,13 +270,18 @@ class _ButtonAutomator(_MQTTControlledActor):
|
|
|
_MQTTTopicPlaceholder.MAC_ADDRESS,
|
|
|
"set",
|
|
|
]
|
|
|
-
|
|
|
MQTT_STATE_TOPIC_LEVELS = _MQTT_TOPIC_LEVELS_PREFIX + [
|
|
|
"switch",
|
|
|
"switchbot",
|
|
|
_MQTTTopicPlaceholder.MAC_ADDRESS,
|
|
|
"state",
|
|
|
]
|
|
|
+ _MQTT_BATTERY_PERCENTAGE_TOPIC_LEVELS = _MQTT_TOPIC_LEVELS_PREFIX + [
|
|
|
+ "cover",
|
|
|
+ "switchbot",
|
|
|
+ _MQTTTopicPlaceholder.MAC_ADDRESS,
|
|
|
+ "battery-percentage",
|
|
|
+ ]
|
|
|
|
|
|
def __init__(
|
|
|
self, *, mac_address: str, retry_count: int, password: typing.Optional[str]
|
|
@@ -282,6 +310,8 @@ class _ButtonAutomator(_MQTTControlledActor):
|
|
|
_LOGGER.info("switchbot %s turned on", self._mac_address)
|
|
|
|
|
|
self.report_state(mqtt_client=mqtt_client, state=b"ON")
|
|
|
+ if update_device_info:
|
|
|
+ self._update_and_report_device_info(mqtt_client)
|
|
|
|
|
|
elif mqtt_message_payload.lower() == b"off":
|
|
|
if not self.__device.turn_off():
|
|
@@ -289,6 +319,8 @@ class _ButtonAutomator(_MQTTControlledActor):
|
|
|
else:
|
|
|
_LOGGER.info("switchbot %s turned off", self._mac_address)
|
|
|
self.report_state(mqtt_client=mqtt_client, state=b"OFF")
|
|
|
+ if update_device_info:
|
|
|
+ self._update_and_report_device_info(mqtt_client)
|
|
|
else:
|
|
|
_LOGGER.warning(
|
|
|
"unexpected payload %r (expected 'ON' or 'OFF')", mqtt_message_payload
|
|
@@ -323,13 +355,6 @@ class _CurtainMotor(_MQTTControlledActor):
|
|
|
"position",
|
|
|
]
|
|
|
|
|
|
- @classmethod
|
|
|
- def get_mqtt_battery_percentage_topic(cls, mac_address: str) -> str:
|
|
|
- return _join_mqtt_topic_levels(
|
|
|
- topic_levels=cls._MQTT_BATTERY_PERCENTAGE_TOPIC_LEVELS,
|
|
|
- mac_address=mac_address,
|
|
|
- )
|
|
|
-
|
|
|
@classmethod
|
|
|
def get_mqtt_position_topic(cls, mac_address: str) -> str:
|
|
|
return _join_mqtt_topic_levels(
|
|
@@ -354,15 +379,6 @@ class _CurtainMotor(_MQTTControlledActor):
|
|
|
def _get_device(self) -> switchbot.SwitchbotDevice:
|
|
|
return self.__device
|
|
|
|
|
|
- def _report_battery_level(self, mqtt_client: paho.mqtt.client.Client) -> None:
|
|
|
-
|
|
|
-
|
|
|
- self._mqtt_publish(
|
|
|
- topic_levels=self._MQTT_BATTERY_PERCENTAGE_TOPIC_LEVELS,
|
|
|
- payload=str(self.__device.get_battery_percent()).encode(),
|
|
|
- mqtt_client=mqtt_client,
|
|
|
- )
|
|
|
-
|
|
|
def _report_position(self, mqtt_client: paho.mqtt.client.Client) -> None:
|
|
|
|
|
|
|
|
@@ -377,11 +393,10 @@ class _CurtainMotor(_MQTTControlledActor):
|
|
|
mqtt_client=mqtt_client,
|
|
|
)
|
|
|
|
|
|
- def _update_and_report_device_info(
|
|
|
- self, mqtt_client: paho.mqtt.client.Client, *, report_position: bool
|
|
|
+ def _update_and_report_device_info(
|
|
|
+ self, mqtt_client: paho.mqtt.client.Client, *, report_position: bool = True
|
|
|
) -> None:
|
|
|
- self._update_device_info()
|
|
|
- self._report_battery_level(mqtt_client=mqtt_client)
|
|
|
+ super()._update_and_report_device_info(mqtt_client)
|
|
|
if report_position:
|
|
|
self._report_position(mqtt_client=mqtt_client)
|
|
|
|
|
@@ -517,11 +532,13 @@ def _main() -> None:
|
|
|
argparser.add_argument(
|
|
|
"--fetch-device-info",
|
|
|
action="store_true",
|
|
|
- help="Report curtain motors' position on"
|
|
|
- f" topic {_CurtainMotor.get_mqtt_position_topic(mac_address='MAC_ADDRESS')}"
|
|
|
- " after sending stop command and battery level on topic"
|
|
|
+ help="Report devices' battery level on topic"
|
|
|
+ f" {_ButtonAutomator.get_mqtt_battery_percentage_topic(mac_address='MAC_ADDRESS')}"
|
|
|
+ " or, respectively,"
|
|
|
f" {_CurtainMotor.get_mqtt_battery_percentage_topic(mac_address='MAC_ADDRESS')}"
|
|
|
- " after every commands.",
|
|
|
+ " after every command. Additionally report curtain motors' position on"
|
|
|
+ f" topic {_CurtainMotor.get_mqtt_position_topic(mac_address='MAC_ADDRESS')}"
|
|
|
+ " after executing stop commands.",
|
|
|
)
|
|
|
args = argparser.parse_args()
|
|
|
if args.mqtt_password_path:
|