|
@@ -0,0 +1,113 @@
|
|
|
+import argparse
|
|
|
+import logging
|
|
|
+import pathlib
|
|
|
+import typing
|
|
|
+
|
|
|
+import paho.mqtt.client
|
|
|
+import intertechno_cc1101
|
|
|
+
|
|
|
+_LOGGER = logging.getLogger(__name__)
|
|
|
+
|
|
|
+
|
|
|
+def _mqtt_on_message(
|
|
|
+ mqtt_client: paho.mqtt.client.Client,
|
|
|
+ userdata: None,
|
|
|
+ message: paho.mqtt.client.MQTTMessage,
|
|
|
+):
|
|
|
+
|
|
|
+
|
|
|
+ _LOGGER.debug("received topic=%s payload=%r", message.topic, message.payload)
|
|
|
+ if message.retain:
|
|
|
+ _LOGGER.warning("ignoring retained message")
|
|
|
+ return
|
|
|
+ topic_split = message.topic.split("/")
|
|
|
+ address = int(topic_split[1])
|
|
|
+ button_index = int(topic_split[2])
|
|
|
+ remote_control = intertechno_cc1101.RemoteControl(address=address)
|
|
|
+
|
|
|
+ if message.payload.upper() == b"ON":
|
|
|
+ remote_control.turn_on(button_index=button_index)
|
|
|
+ elif message.payload.upper() == b"OFF":
|
|
|
+ remote_control.turn_off(button_index=button_index)
|
|
|
+ else:
|
|
|
+ _LOGGER.warning(
|
|
|
+ "unexpected payload %r; expected 'ON' or 'OFF'", message.payload
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+def _mqtt_on_connect(
|
|
|
+ mqtt_client: paho.mqtt.client.Client,
|
|
|
+ user_data: typing.Any,
|
|
|
+ flags: typing.Dict,
|
|
|
+ return_code: int,
|
|
|
+) -> None:
|
|
|
+
|
|
|
+
|
|
|
+ assert return_code == 0, return_code
|
|
|
+ mqtt_broker_host, mqtt_broker_port = mqtt_client.socket().getpeername()
|
|
|
+ _LOGGER.debug("connected to MQTT broker %s:%d", mqtt_broker_host, mqtt_broker_port)
|
|
|
+ set_topic = "intertechno-cc1101/+/+/set"
|
|
|
+ _LOGGER.info("subscribing to MQTT topic %r", set_topic)
|
|
|
+
|
|
|
+ mqtt_client.on_message = _mqtt_on_message
|
|
|
+ mqtt_client.subscribe(set_topic)
|
|
|
+
|
|
|
+
|
|
|
+def _run(
|
|
|
+ mqtt_host: str,
|
|
|
+ mqtt_port: int,
|
|
|
+ mqtt_username: typing.Optional[str],
|
|
|
+ mqtt_password: typing.Optional[str],
|
|
|
+) -> None:
|
|
|
+
|
|
|
+ mqtt_client = paho.mqtt.client.Client()
|
|
|
+ mqtt_client.on_connect = _mqtt_on_connect
|
|
|
+ _LOGGER.info("connecting to MQTT broker %s:%d", mqtt_host, mqtt_port)
|
|
|
+ if mqtt_username:
|
|
|
+ mqtt_client.username_pw_set(username=mqtt_username, password=mqtt_password)
|
|
|
+ elif mqtt_password:
|
|
|
+ raise ValueError("Missing MQTT username")
|
|
|
+ mqtt_client.connect(host=mqtt_host, port=mqtt_port)
|
|
|
+ mqtt_client.loop_forever()
|
|
|
+
|
|
|
+
|
|
|
+def _main() -> None:
|
|
|
+ logging.basicConfig(
|
|
|
+ level=logging.DEBUG,
|
|
|
+ format="%(asctime)s:%(levelname)s:%(name)s:%(message)s",
|
|
|
+ datefmt="%Y-%m-%dT%H:%M:%S%z",
|
|
|
+ )
|
|
|
+ logging.getLogger("cc1101").setLevel(logging.INFO)
|
|
|
+ argparser = argparse.ArgumentParser(
|
|
|
+ description="MQTT client controlling Intertechno smart outlets via a CC1101 transceiver, "
|
|
|
+ "compatible with home-assistant.io's MQTT Switch platform",
|
|
|
+ allow_abbrev=False,
|
|
|
+ )
|
|
|
+ argparser.add_argument("--mqtt-host", type=str, required=True)
|
|
|
+ argparser.add_argument("--mqtt-port", type=int, default=1883)
|
|
|
+ argparser.add_argument("--mqtt-username", type=str)
|
|
|
+ password_argument_group = argparser.add_mutually_exclusive_group()
|
|
|
+ password_argument_group.add_argument("--mqtt-password", type=str)
|
|
|
+ password_argument_group.add_argument(
|
|
|
+ "--mqtt-password-file",
|
|
|
+ type=pathlib.Path,
|
|
|
+ metavar="PATH",
|
|
|
+ dest="mqtt_password_path",
|
|
|
+ help="stripping trailing newline",
|
|
|
+ )
|
|
|
+ args = argparser.parse_args()
|
|
|
+ if args.mqtt_password_path:
|
|
|
+
|
|
|
+ mqtt_password = args.mqtt_password_path.read_bytes().decode()
|
|
|
+ if mqtt_password.endswith("\r\n"):
|
|
|
+ mqtt_password = mqtt_password[:-2]
|
|
|
+ elif mqtt_password.endswith("\n"):
|
|
|
+ mqtt_password = mqtt_password[:-1]
|
|
|
+ else:
|
|
|
+ mqtt_password = args.mqtt_password
|
|
|
+ _run(
|
|
|
+ mqtt_host=args.mqtt_host,
|
|
|
+ mqtt_port=args.mqtt_port,
|
|
|
+ mqtt_username=args.mqtt_username,
|
|
|
+ mqtt_password=mqtt_password,
|
|
|
+ )
|