| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 | # switchbot-mqtt - MQTT client controlling SwitchBot button & curtain automators,# compatible with home-assistant.io's MQTT Switch & Cover platform## Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>## This program is free software: you can redistribute it and/or modify# it under the terms of the GNU General Public License as published by# the Free Software Foundation, either version 3 of the License, or# any later version.## This program is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the# GNU General Public License for more details.## You should have received a copy of the GNU General Public License# along with this program.  If not, see <https://www.gnu.org/licenses/>.import loggingimport socketimport sslimport typingimport aiomqttfrom switchbot_mqtt._actors import _ButtonAutomator, _CurtainMotor_LOGGER = logging.getLogger(__name__)_MQTT_AVAILABILITY_TOPIC = "switchbot-mqtt/status"# "online" and "offline" to match home assistant's default settings# https://www.home-assistant.io/integrations/switch.mqtt/#payload_available_MQTT_BIRTH_PAYLOAD = "online"_MQTT_LAST_WILL_PAYLOAD = "offline"async def _listen(    *,    mqtt_client: aiomqtt.Client,    topic_callbacks: typing.Iterable[typing.Tuple[str, typing.Callable]],    mqtt_topic_prefix: str,    retry_count: int,    device_passwords: typing.Dict[str, str],    fetch_device_info: bool,) -> None:    async with mqtt_client.messages() as messages:        await mqtt_client.publish(            topic=mqtt_topic_prefix + _MQTT_AVAILABILITY_TOPIC,            payload=_MQTT_BIRTH_PAYLOAD,            retain=True,        )        async for message in messages:            for topic, callback in topic_callbacks:                if message.topic.matches(topic):                    await callback(                        mqtt_client=mqtt_client,                        message=message,                        mqtt_topic_prefix=mqtt_topic_prefix,                        retry_count=retry_count,                        device_passwords=device_passwords,                        fetch_device_info=fetch_device_info,                    )def _log_mqtt_connected(mqtt_client: aiomqtt.Client) -> None:    if _LOGGER.getEffectiveLevel() <= logging.DEBUG:        mqtt_socket = (            # aiomqtt neither exposes instance of paho.mqtt.client.Client nor socket publicly.            # level condition to avoid accessing protected `mqtt_client._client` in production.            # pylint: disable=protected-access            mqtt_client._client.socket()        )        (mqtt_broker_host, mqtt_broker_port, *_) = mqtt_socket.getpeername()        # https://github.com/sbtinstruments/aiomqtt/blob/v1.2.1/aiomqtt/client.py#L1089        _LOGGER.debug(            "connected to MQTT broker %s:%d",            f"[{mqtt_broker_host}]"            if mqtt_socket.family == socket.AF_INET6            else mqtt_broker_host,            mqtt_broker_port,        )async def _run(  # pylint: disable=too-many-arguments    *,    mqtt_host: str,    mqtt_port: int,    mqtt_disable_tls: bool,    mqtt_username: typing.Optional[str],    mqtt_password: typing.Optional[str],    mqtt_topic_prefix: str,    retry_count: int,    device_passwords: typing.Dict[str, str],    fetch_device_info: bool,) -> None:    _LOGGER.info(        "connecting to MQTT broker %s:%d (TLS %s)",        mqtt_host,        mqtt_port,        "disabled" if mqtt_disable_tls else "enabled",    )    if mqtt_password is not None and mqtt_username is None:        raise ValueError("Missing MQTT username")    async with aiomqtt.Client(  # raises aiomqtt.MqttError        hostname=mqtt_host,        port=mqtt_port,        # > The settings [...] usually represent a higher security level than        # > when calling the SSLContext constructor directly.        # https://web.archive.org/web/20230714183106/https://docs.python.org/3/library/ssl.html        tls_context=None if mqtt_disable_tls else ssl.create_default_context(),        username=None if mqtt_username is None else mqtt_username,        password=None if mqtt_password is None else mqtt_password,        will=aiomqtt.Will(            topic=mqtt_topic_prefix + _MQTT_AVAILABILITY_TOPIC,            payload=_MQTT_LAST_WILL_PAYLOAD,            retain=True,        ),    ) as mqtt_client:        _log_mqtt_connected(mqtt_client=mqtt_client)        topic_callbacks: typing.List[typing.Tuple[str, typing.Callable]] = []        for actor_class in (_ButtonAutomator, _CurtainMotor):            async for topic, callback in actor_class.mqtt_subscribe(                mqtt_client=mqtt_client,                mqtt_topic_prefix=mqtt_topic_prefix,                fetch_device_info=fetch_device_info,            ):                topic_callbacks.append((topic, callback))        await _listen(            mqtt_client=mqtt_client,            topic_callbacks=topic_callbacks,            mqtt_topic_prefix=mqtt_topic_prefix,            retry_count=retry_count,            device_passwords=device_passwords,            fetch_device_info=fetch_device_info,        )
 |