123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- import logging
- import socket
- import typing
- import paho.mqtt.client
- from switchbot_mqtt._actors import _ButtonAutomator, _CurtainMotor
- from switchbot_mqtt._actors.base import _MQTTCallbackUserdata
- _LOGGER = logging.getLogger(__name__)
- _MQTT_AVAILABILITY_TOPIC = "switchbot-mqtt/status"
- _MQTT_BIRTH_PAYLOAD = "online"
- _MQTT_LAST_WILL_PAYLOAD = "offline"
- def _mqtt_on_connect(
- mqtt_client: paho.mqtt.client.Client,
- userdata: _MQTTCallbackUserdata,
- flags: typing.Dict[str, int],
- return_code: int,
- ) -> None:
-
-
- assert return_code == 0, return_code
- mqtt_broker_host, mqtt_broker_port, *_ = mqtt_client.socket().getpeername()
-
- _LOGGER.debug(
- "connected to MQTT broker %s:%d",
- f"[{mqtt_broker_host}]"
- if mqtt_client.socket().family == socket.AF_INET6
- else mqtt_broker_host,
- mqtt_broker_port,
- )
- mqtt_client.publish(
- topic=userdata.mqtt_topic_prefix + _MQTT_AVAILABILITY_TOPIC,
- payload=_MQTT_BIRTH_PAYLOAD,
- retain=True,
- )
- _ButtonAutomator.mqtt_subscribe(mqtt_client=mqtt_client, settings=userdata)
- _CurtainMotor.mqtt_subscribe(mqtt_client=mqtt_client, settings=userdata)
- def _run(
- *,
- mqtt_host: str,
- mqtt_port: int,
- mqtt_disable_tls: bool,
- mqtt_username: typing.Optional[str],
- mqtt_password: typing.Optional[str],
- mqtt_topic_prefix: str,
- retry_count: int,
- device_passwords: typing.Dict[str, str],
- fetch_device_info: bool,
- ) -> None:
-
- mqtt_client = paho.mqtt.client.Client(
- userdata=_MQTTCallbackUserdata(
- retry_count=retry_count,
- device_passwords=device_passwords,
- fetch_device_info=fetch_device_info,
- mqtt_topic_prefix=mqtt_topic_prefix,
- )
- )
- mqtt_client.on_connect = _mqtt_on_connect
- _LOGGER.info(
- "connecting to MQTT broker %s:%d (TLS %s)",
- mqtt_host,
- mqtt_port,
- "disabled" if mqtt_disable_tls else "enabled",
- )
- if not mqtt_disable_tls:
- mqtt_client.tls_set(ca_certs=None)
- if mqtt_username:
- mqtt_client.username_pw_set(username=mqtt_username, password=mqtt_password)
- elif mqtt_password:
- raise ValueError("Missing MQTT username")
- mqtt_client.will_set(
- topic=mqtt_topic_prefix + _MQTT_AVAILABILITY_TOPIC,
- payload=_MQTT_LAST_WILL_PAYLOAD,
- retain=True,
- )
- mqtt_client.connect(host=mqtt_host, port=mqtt_port)
-
- mqtt_client.loop_forever()
|