| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375 | # switchbot-mqtt - MQTT client controlling SwitchBot button & curtain automators,# compatible with home-assistant.io's MQTT Switch & Cover platform## Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>## This program is free software: you can redistribute it and/or modify# it under the terms of the GNU General Public License as published by# the Free Software Foundation, either version 3 of the License, or# any later version.## This program is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the# GNU General Public License for more details.## You should have received a copy of the GNU General Public License# along with this program.  If not, see <https://www.gnu.org/licenses/>.import abcimport argparseimport collectionsimport enumimport jsonimport loggingimport pathlibimport reimport typingimport paho.mqtt.clientimport switchbot_LOGGER = logging.getLogger(__name__)_MAC_ADDRESS_REGEX = re.compile(r"^[0-9a-f]{2}(:[0-9a-f]{2}){5}$")class _MQTTTopicPlaceholder(enum.Enum):    MAC_ADDRESS = "MAC_ADDRESS"_MQTTTopicLevel = typing.Union[str, _MQTTTopicPlaceholder]# "homeassistant" for historic reason, may be parametrized in future_MQTT_TOPIC_LEVELS_PREFIX = ["homeassistant"]  # type: typing.List[_MQTTTopicLevel]def _mac_address_valid(mac_address: str) -> bool:    return _MAC_ADDRESS_REGEX.match(mac_address.lower()) is not Noneclass _MQTTCallbackUserdata:    # pylint: disable=too-few-public-methods; @dataclasses.dataclass when python_requires>=3.7    def __init__(        self, retry_count: int, device_passwords: typing.Dict[str, str]    ) -> None:        self.retry_count = retry_count        self.device_passwords = device_passwords    def __eq__(self, other: object) -> bool:        return isinstance(other, type(self)) and vars(self) == vars(other)class _MQTTControlledActor(abc.ABC):    MQTT_COMMAND_TOPIC_LEVELS = NotImplemented  # type: typing.List[_MQTTTopicLevel]    MQTT_STATE_TOPIC_LEVELS = NotImplemented  # type: typing.List[_MQTTTopicLevel]    @abc.abstractmethod    def __init__(        self, mac_address: str, retry_count: int, password: typing.Optional[str]    ) -> None:        self._mac_address = mac_address    @abc.abstractmethod    def execute_command(        self, mqtt_message_payload: bytes, mqtt_client: paho.mqtt.client.Client    ) -> None:        raise NotImplementedError()    @classmethod    def _mqtt_command_callback(        cls,        mqtt_client: paho.mqtt.client.Client,        userdata: _MQTTCallbackUserdata,        message: paho.mqtt.client.MQTTMessage,    ) -> None:        # pylint: disable=unused-argument; callback        # https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L469        _LOGGER.debug("received topic=%s payload=%r", message.topic, message.payload)        if message.retain:            _LOGGER.info("ignoring retained message")            return        topic_split = message.topic.split("/")        if len(topic_split) != len(cls.MQTT_COMMAND_TOPIC_LEVELS):            _LOGGER.warning("unexpected topic %s", message.topic)            return        mac_address = None        for given_part, expected_part in zip(            topic_split, cls.MQTT_COMMAND_TOPIC_LEVELS        ):            if expected_part == _MQTTTopicPlaceholder.MAC_ADDRESS:                mac_address = given_part            elif expected_part != given_part:                _LOGGER.warning("unexpected topic %s", message.topic)                return        assert mac_address        if not _mac_address_valid(mac_address):            _LOGGER.warning("invalid mac address %s", mac_address)            return        actor = cls(            mac_address=mac_address,            retry_count=userdata.retry_count,            password=userdata.device_passwords.get(mac_address, None),        )        actor.execute_command(            mqtt_message_payload=message.payload, mqtt_client=mqtt_client        )    @classmethod    def mqtt_subscribe(cls, mqtt_client: paho.mqtt.client.Client) -> None:        command_topic = "/".join(            "+" if isinstance(l, _MQTTTopicPlaceholder) else l            for l in cls.MQTT_COMMAND_TOPIC_LEVELS        )        _LOGGER.info("subscribing to MQTT topic %r", command_topic)        mqtt_client.subscribe(command_topic)        mqtt_client.message_callback_add(            sub=command_topic,            callback=cls._mqtt_command_callback,        )    def _mqtt_publish(        self,        topic_levels: typing.List[_MQTTTopicLevel],        payload: bytes,        mqtt_client: paho.mqtt.client.Client,    ) -> None:        topic = "/".join(            self._mac_address            if l == _MQTTTopicPlaceholder.MAC_ADDRESS            else typing.cast(str, l)            for l in topic_levels        )        # https://pypi.org/project/paho-mqtt/#publishing        _LOGGER.debug("publishing topic=%s payload=%r", topic, payload)        message_info = mqtt_client.publish(            topic=topic, payload=payload, retain=True        )  # type: paho.mqtt.client.MQTTMessageInfo        # wait before checking status?        if message_info.rc != paho.mqtt.client.MQTT_ERR_SUCCESS:            _LOGGER.error(                "Failed to publish MQTT message on topic %s (rc=%d)",                topic,                message_info.rc,            )    def report_state(self, state: bytes, mqtt_client: paho.mqtt.client.Client) -> None:        self._mqtt_publish(            topic_levels=self.MQTT_STATE_TOPIC_LEVELS,            payload=state,            mqtt_client=mqtt_client,        )class _ButtonAutomator(_MQTTControlledActor):    # https://www.home-assistant.io/integrations/switch.mqtt/    MQTT_COMMAND_TOPIC_LEVELS = _MQTT_TOPIC_LEVELS_PREFIX + [        "switch",        "switchbot",        _MQTTTopicPlaceholder.MAC_ADDRESS,        "set",    ]    MQTT_STATE_TOPIC_LEVELS = _MQTT_TOPIC_LEVELS_PREFIX + [        "switch",        "switchbot",        _MQTTTopicPlaceholder.MAC_ADDRESS,        "state",    ]    def __init__(        self, mac_address: str, retry_count: int, password: typing.Optional[None]    ) -> None:        self._device = switchbot.Switchbot(            mac=mac_address, password=password, retry_count=retry_count        )        super().__init__(            mac_address=mac_address, retry_count=retry_count, password=password        )    def execute_command(        self, mqtt_message_payload: bytes, mqtt_client: paho.mqtt.client.Client    ) -> None:        # https://www.home-assistant.io/integrations/switch.mqtt/#payload_on        if mqtt_message_payload.lower() == b"on":            if not self._device.turn_on():                _LOGGER.error("failed to turn on switchbot %s", self._mac_address)            else:                _LOGGER.info("switchbot %s turned on", self._mac_address)                # https://www.home-assistant.io/integrations/switch.mqtt/#state_on                self.report_state(mqtt_client=mqtt_client, state=b"ON")        # https://www.home-assistant.io/integrations/switch.mqtt/#payload_off        elif mqtt_message_payload.lower() == b"off":            if not self._device.turn_off():                _LOGGER.error("failed to turn off switchbot %s", self._mac_address)            else:                _LOGGER.info("switchbot %s turned off", self._mac_address)                self.report_state(mqtt_client=mqtt_client, state=b"OFF")        else:            _LOGGER.warning(                "unexpected payload %r (expected 'ON' or 'OFF')", mqtt_message_payload            )class _CurtainMotor(_MQTTControlledActor):    # https://www.home-assistant.io/integrations/cover.mqtt/    MQTT_COMMAND_TOPIC_LEVELS = _MQTT_TOPIC_LEVELS_PREFIX + [        "cover",        "switchbot-curtain",        _MQTTTopicPlaceholder.MAC_ADDRESS,        "set",    ]    MQTT_STATE_TOPIC_LEVELS = _MQTT_TOPIC_LEVELS_PREFIX + [        "cover",        "switchbot-curtain",        _MQTTTopicPlaceholder.MAC_ADDRESS,        "state",    ]    def __init__(        self, mac_address: str, retry_count: int, password: typing.Optional[None]    ) -> None:        self._device = switchbot.SwitchbotCurtain(            mac=mac_address, password=password, retry_count=retry_count        )        super().__init__(            mac_address=mac_address, retry_count=retry_count, password=password        )    def execute_command(        self, mqtt_message_payload: bytes, mqtt_client: paho.mqtt.client.Client    ) -> None:        # https://www.home-assistant.io/integrations/cover.mqtt/#payload_open        if mqtt_message_payload.lower() == b"open":            if not self._device.open():                _LOGGER.error("failed to open switchbot curtain %s", self._mac_address)            else:                _LOGGER.info("switchbot curtain %s opening", self._mac_address)                # > state_opening string (Optional, default: opening)                # https://www.home-assistant.io/integrations/cover.mqtt/#state_opening                self.report_state(mqtt_client=mqtt_client, state=b"opening")        elif mqtt_message_payload.lower() == b"close":            if not self._device.close():                _LOGGER.error("failed to close switchbot curtain %s", self._mac_address)            else:                _LOGGER.info("switchbot curtain %s closing", self._mac_address)                # https://www.home-assistant.io/integrations/cover.mqtt/#state_closing                self.report_state(mqtt_client=mqtt_client, state=b"closing")        elif mqtt_message_payload.lower() == b"stop":            if not self._device.stop():                _LOGGER.error("failed to stop switchbot curtain %s", self._mac_address)            else:                _LOGGER.info("switchbot curtain %s stopped", self._mac_address)                # no "stopped" state mentioned at                # https://www.home-assistant.io/integrations/cover.mqtt/#configuration-variables                # https://community.home-assistant.io/t/mqtt-how-to-remove-retained-messages/79029/2                self.report_state(mqtt_client=mqtt_client, state=b"")        else:            _LOGGER.warning(                "unexpected payload %r (expected 'OPEN', 'CLOSE', or 'STOP')",                mqtt_message_payload,            )def _mqtt_on_connect(    mqtt_client: paho.mqtt.client.Client,    userdata: _MQTTCallbackUserdata,    flags: typing.Dict,    return_code: int,) -> None:    # pylint: disable=unused-argument; callback    # https://github.com/eclipse/paho.mqtt.python/blob/v1.5.0/src/paho/mqtt/client.py#L441    assert return_code == 0, return_code  # connection accepted    mqtt_broker_host, mqtt_broker_port = mqtt_client.socket().getpeername()    _LOGGER.debug("connected to MQTT broker %s:%d", mqtt_broker_host, mqtt_broker_port)    _ButtonAutomator.mqtt_subscribe(mqtt_client=mqtt_client)    _CurtainMotor.mqtt_subscribe(mqtt_client=mqtt_client)def _run(    mqtt_host: str,    mqtt_port: int,    mqtt_username: typing.Optional[str],    mqtt_password: typing.Optional[str],    retry_count: int,    device_passwords: typing.Dict[str, str],) -> None:    # https://pypi.org/project/paho-mqtt/    mqtt_client = paho.mqtt.client.Client(        userdata=_MQTTCallbackUserdata(            retry_count=retry_count, device_passwords=device_passwords        )    )    mqtt_client.on_connect = _mqtt_on_connect    _LOGGER.info("connecting to MQTT broker %s:%d", mqtt_host, mqtt_port)    if mqtt_username:        mqtt_client.username_pw_set(username=mqtt_username, password=mqtt_password)    elif mqtt_password:        raise ValueError("Missing MQTT username")    mqtt_client.connect(host=mqtt_host, port=mqtt_port)    # https://github.com/eclipse/paho.mqtt.python/blob/master/src/paho/mqtt/client.py#L1740    mqtt_client.loop_forever()def _main() -> None:    logging.basicConfig(        level=logging.DEBUG,        format="%(asctime)s:%(levelname)s:%(name)s:%(message)s",        datefmt="%Y-%m-%dT%H:%M:%S%z",    )    argparser = argparse.ArgumentParser(        description="MQTT client controlling SwitchBot button automators, "        "compatible with home-assistant.io's MQTT Switch platform"    )    argparser.add_argument("--mqtt-host", type=str, required=True)    argparser.add_argument("--mqtt-port", type=int, default=1883)    argparser.add_argument("--mqtt-username", type=str)    password_argument_group = argparser.add_mutually_exclusive_group()    password_argument_group.add_argument("--mqtt-password", type=str)    password_argument_group.add_argument(        "--mqtt-password-file",        type=pathlib.Path,        metavar="PATH",        dest="mqtt_password_path",        help="stripping trailing newline",    )    argparser.add_argument(        "--device-password-file",        type=pathlib.Path,        metavar="PATH",        dest="device_password_path",        help="path to json file mapping mac addresses of switchbot devices to passwords, e.g. "        + json.dumps({"11:22:33:44:55:66": "password", "aa:bb:cc:dd:ee:ff": "secret"}),    )    argparser.add_argument(        "--retries",        dest="retry_count",        type=int,        default=switchbot.DEFAULT_RETRY_COUNT,        help="Maximum number of attempts to send a command to a SwitchBot device"        " (default: %(default)d)",    )    args = argparser.parse_args()    if args.mqtt_password_path:        # .read_text() replaces \r\n with \n        mqtt_password = args.mqtt_password_path.read_bytes().decode()        if mqtt_password.endswith("\r\n"):            mqtt_password = mqtt_password[:-2]        elif mqtt_password.endswith("\n"):            mqtt_password = mqtt_password[:-1]    else:        mqtt_password = args.mqtt_password    if args.device_password_path:        device_passwords = json.loads(args.device_password_path.read_text())    else:        device_passwords = {}    _run(        mqtt_host=args.mqtt_host,        mqtt_port=args.mqtt_port,        mqtt_username=args.mqtt_username,        mqtt_password=mqtt_password,        retry_count=args.retry_count,        device_passwords=device_passwords,    )
 |