|
@@ -506,87 +506,3 @@ def _run(
|
|
|
mqtt_client.connect(host=mqtt_host, port=mqtt_port)
|
|
|
|
|
|
mqtt_client.loop_forever()
|
|
|
-
|
|
|
-
|
|
|
-def _main() -> None:
|
|
|
- argparser = argparse.ArgumentParser(
|
|
|
- description="MQTT client controlling SwitchBot button automators, "
|
|
|
- "compatible with home-assistant.io's MQTT Switch platform"
|
|
|
- )
|
|
|
- argparser.add_argument("--mqtt-host", type=str, required=True)
|
|
|
- argparser.add_argument("--mqtt-port", type=int, default=1883)
|
|
|
- argparser.add_argument("--mqtt-username", type=str)
|
|
|
- password_argument_group = argparser.add_mutually_exclusive_group()
|
|
|
- password_argument_group.add_argument("--mqtt-password", type=str)
|
|
|
- password_argument_group.add_argument(
|
|
|
- "--mqtt-password-file",
|
|
|
- type=pathlib.Path,
|
|
|
- metavar="PATH",
|
|
|
- dest="mqtt_password_path",
|
|
|
- help="stripping trailing newline",
|
|
|
- )
|
|
|
- argparser.add_argument(
|
|
|
- "--device-password-file",
|
|
|
- type=pathlib.Path,
|
|
|
- metavar="PATH",
|
|
|
- dest="device_password_path",
|
|
|
- help="path to json file mapping mac addresses of switchbot devices to passwords, e.g. "
|
|
|
- + json.dumps({"11:22:33:44:55:66": "password", "aa:bb:cc:dd:ee:ff": "secret"}),
|
|
|
- )
|
|
|
- argparser.add_argument(
|
|
|
- "--retries",
|
|
|
- dest="retry_count",
|
|
|
- type=int,
|
|
|
- default=switchbot.DEFAULT_RETRY_COUNT,
|
|
|
- help="Maximum number of attempts to send a command to a SwitchBot device"
|
|
|
- " (default: %(default)d)",
|
|
|
- )
|
|
|
- argparser.add_argument(
|
|
|
- "--fetch-device-info",
|
|
|
- action="store_true",
|
|
|
- help="Report devices' battery level on topic"
|
|
|
- f" {_ButtonAutomator.get_mqtt_battery_percentage_topic(mac_address='MAC_ADDRESS')}"
|
|
|
- " or, respectively,"
|
|
|
- f" {_CurtainMotor.get_mqtt_battery_percentage_topic(mac_address='MAC_ADDRESS')}"
|
|
|
- " after every command. Additionally report curtain motors' position on"
|
|
|
- f" topic {_CurtainMotor.get_mqtt_position_topic(mac_address='MAC_ADDRESS')}"
|
|
|
- " after executing stop commands."
|
|
|
- " This option can also be enabled by assigning a non-empty value to the"
|
|
|
- " environment variable FETCH_DEVICE_INFO.",
|
|
|
- )
|
|
|
- argparser.add_argument("--debug", action="store_true")
|
|
|
- args = argparser.parse_args()
|
|
|
-
|
|
|
- logging.basicConfig(
|
|
|
- level=logging.DEBUG if args.debug else logging.INFO,
|
|
|
- format="%(asctime)s:%(levelname)s:%(name)s:%(funcName)s:%(message)s"
|
|
|
- if args.debug
|
|
|
- else "%(message)s",
|
|
|
- datefmt="%Y-%m-%dT%H:%M:%S%z",
|
|
|
- )
|
|
|
- _LOGGER.debug("args=%r", args)
|
|
|
- if args.mqtt_password_path:
|
|
|
-
|
|
|
- mqtt_password = args.mqtt_password_path.read_bytes().decode()
|
|
|
- if mqtt_password.endswith("\r\n"):
|
|
|
- mqtt_password = mqtt_password[:-2]
|
|
|
- elif mqtt_password.endswith("\n"):
|
|
|
- mqtt_password = mqtt_password[:-1]
|
|
|
- else:
|
|
|
- mqtt_password = args.mqtt_password
|
|
|
- if args.device_password_path:
|
|
|
- device_passwords = json.loads(args.device_password_path.read_text())
|
|
|
- else:
|
|
|
- device_passwords = {}
|
|
|
- _run(
|
|
|
- mqtt_host=args.mqtt_host,
|
|
|
- mqtt_port=args.mqtt_port,
|
|
|
- mqtt_username=args.mqtt_username,
|
|
|
- mqtt_password=mqtt_password,
|
|
|
- retry_count=args.retry_count,
|
|
|
- device_passwords=device_passwords,
|
|
|
- fetch_device_info=args.fetch_device_info
|
|
|
-
|
|
|
-
|
|
|
- or bool(os.environ.get("FETCH_DEVICE_INFO")),
|
|
|
- )
|