123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- import logging
- import socket
- import ssl
- import typing
- import aiomqtt
- from switchbot_mqtt._actors import _ButtonAutomator, _CurtainMotor
- _LOGGER = logging.getLogger(__name__)
- _MQTT_AVAILABILITY_TOPIC = "switchbot-mqtt/status"
- _MQTT_BIRTH_PAYLOAD = "online"
- _MQTT_LAST_WILL_PAYLOAD = "offline"
- async def _listen(
- *,
- mqtt_client: aiomqtt.Client,
- topic_callbacks: typing.Iterable[typing.Tuple[str, typing.Callable]],
- mqtt_topic_prefix: str,
- retry_count: int,
- device_passwords: typing.Dict[str, str],
- fetch_device_info: bool,
- ) -> None:
- async with mqtt_client.messages() as messages:
- await mqtt_client.publish(
- topic=mqtt_topic_prefix + _MQTT_AVAILABILITY_TOPIC,
- payload=_MQTT_BIRTH_PAYLOAD,
- retain=True,
- )
- async for message in messages:
- for topic, callback in topic_callbacks:
- if message.topic.matches(topic):
- await callback(
- mqtt_client=mqtt_client,
- message=message,
- mqtt_topic_prefix=mqtt_topic_prefix,
- retry_count=retry_count,
- device_passwords=device_passwords,
- fetch_device_info=fetch_device_info,
- )
- def _log_mqtt_connected(mqtt_client: aiomqtt.Client) -> None:
- if _LOGGER.getEffectiveLevel() <= logging.DEBUG:
- mqtt_socket = (
-
-
-
- mqtt_client._client.socket()
- )
- (mqtt_broker_host, mqtt_broker_port, *_) = mqtt_socket.getpeername()
-
- _LOGGER.debug(
- "connected to MQTT broker %s:%d",
- (
- f"[{mqtt_broker_host}]"
- if mqtt_socket.family == socket.AF_INET6
- else mqtt_broker_host
- ),
- mqtt_broker_port,
- )
- async def _run(
- *,
- mqtt_host: str,
- mqtt_port: int,
- mqtt_disable_tls: bool,
- mqtt_username: typing.Optional[str],
- mqtt_password: typing.Optional[str],
- mqtt_topic_prefix: str,
- retry_count: int,
- device_passwords: typing.Dict[str, str],
- fetch_device_info: bool,
- ) -> None:
- _LOGGER.info(
- "connecting to MQTT broker %s:%d (TLS %s)",
- mqtt_host,
- mqtt_port,
- "disabled" if mqtt_disable_tls else "enabled",
- )
- if mqtt_password is not None and mqtt_username is None:
- raise ValueError("Missing MQTT username")
- async with aiomqtt.Client(
- hostname=mqtt_host,
- port=mqtt_port,
-
-
-
- tls_context=None if mqtt_disable_tls else ssl.create_default_context(),
- username=None if mqtt_username is None else mqtt_username,
- password=None if mqtt_password is None else mqtt_password,
- will=aiomqtt.Will(
- topic=mqtt_topic_prefix + _MQTT_AVAILABILITY_TOPIC,
- payload=_MQTT_LAST_WILL_PAYLOAD,
- retain=True,
- ),
- ) as mqtt_client:
- _log_mqtt_connected(mqtt_client=mqtt_client)
- topic_callbacks: typing.List[typing.Tuple[str, typing.Callable]] = []
- for actor_class in (_ButtonAutomator, _CurtainMotor):
- async for topic, callback in actor_class.mqtt_subscribe(
- mqtt_client=mqtt_client,
- mqtt_topic_prefix=mqtt_topic_prefix,
- fetch_device_info=fetch_device_info,
- ):
- topic_callbacks.append((topic, callback))
- await _listen(
- mqtt_client=mqtt_client,
- topic_callbacks=topic_callbacks,
- mqtt_topic_prefix=mqtt_topic_prefix,
- retry_count=retry_count,
- device_passwords=device_passwords,
- fetch_device_info=fetch_device_info,
- )
|