1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- import logging
- import typing
- import paho.mqtt.client
- from switchbot_mqtt._actors import _ButtonAutomator, _CurtainMotor
- from switchbot_mqtt._actors._base import _MQTTCallbackUserdata
- _LOGGER = logging.getLogger(__name__)
- def _mqtt_on_connect(
- mqtt_client: paho.mqtt.client.Client,
- userdata: _MQTTCallbackUserdata,
- flags: typing.Dict[str, int],
- return_code: int,
- ) -> None:
-
-
- assert return_code == 0, return_code
- mqtt_broker_host, mqtt_broker_port = mqtt_client.socket().getpeername()
- _LOGGER.debug("connected to MQTT broker %s:%d", mqtt_broker_host, mqtt_broker_port)
- _ButtonAutomator.mqtt_subscribe(
- mqtt_client=mqtt_client,
- enable_device_info_update_topic=userdata.fetch_device_info,
- )
- _CurtainMotor.mqtt_subscribe(
- mqtt_client=mqtt_client,
- enable_device_info_update_topic=userdata.fetch_device_info,
- )
- def _run(
- *,
- mqtt_host: str,
- mqtt_port: int,
- mqtt_username: typing.Optional[str],
- mqtt_password: typing.Optional[str],
- retry_count: int,
- device_passwords: typing.Dict[str, str],
- fetch_device_info: bool,
- ) -> None:
-
- mqtt_client = paho.mqtt.client.Client(
- userdata=_MQTTCallbackUserdata(
- retry_count=retry_count,
- device_passwords=device_passwords,
- fetch_device_info=fetch_device_info,
- )
- )
- mqtt_client.on_connect = _mqtt_on_connect
- _LOGGER.info("connecting to MQTT broker %s:%d", mqtt_host, mqtt_port)
- if mqtt_username:
- mqtt_client.username_pw_set(username=mqtt_username, password=mqtt_password)
- elif mqtt_password:
- raise ValueError("Missing MQTT username")
- mqtt_client.connect(host=mqtt_host, port=mqtt_port)
-
- mqtt_client.loop_forever()
|