123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367 |
- import datetime
- import logging
- import threading
- import time
- import unittest.mock
- import dbus
- import paho.mqtt.client
- import pytest
- from paho.mqtt.client import MQTTMessage
- import systemctl_mqtt
- @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
- @pytest.mark.parametrize("mqtt_port", [1833])
- @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host", "system/command"])
- @pytest.mark.parametrize("homeassistant_discovery_prefix", ["homeassistant"])
- @pytest.mark.parametrize("homeassistant_node_id", ["host", "node"])
- def test__run(
- caplog,
- mqtt_host,
- mqtt_port,
- mqtt_topic_prefix,
- homeassistant_discovery_prefix,
- homeassistant_node_id,
- ):
-
- caplog.set_level(logging.DEBUG)
- with unittest.mock.patch(
- "socket.create_connection"
- ) as create_socket_mock, unittest.mock.patch(
- "ssl.SSLContext.wrap_socket", autospec=True
- ) as ssl_wrap_socket_mock, unittest.mock.patch(
- "paho.mqtt.client.Client.loop_forever", autospec=True
- ) as mqtt_loop_forever_mock, unittest.mock.patch(
- "gi.repository.GLib.MainLoop.run"
- ) as glib_loop_mock, unittest.mock.patch(
- "systemctl_mqtt._dbus.get_login_manager"
- ) as get_login_manager_mock:
- ssl_wrap_socket_mock.return_value.send = len
- get_login_manager_mock.return_value.Get.return_value = dbus.Boolean(False)
- systemctl_mqtt._run(
- mqtt_host=mqtt_host,
- mqtt_port=mqtt_port,
- mqtt_username=None,
- mqtt_password=None,
- mqtt_topic_prefix=mqtt_topic_prefix,
- homeassistant_discovery_prefix=homeassistant_discovery_prefix,
- homeassistant_node_id=homeassistant_node_id,
- poweroff_delay=datetime.timedelta(),
- )
- assert caplog.records[0].levelno == logging.INFO
- assert caplog.records[0].message == (
- f"connecting to MQTT broker {mqtt_host}:{mqtt_port} (TLS enabled)"
- )
-
- create_socket_mock.assert_called_once()
- create_socket_args, _ = create_socket_mock.call_args
- assert create_socket_args[0] == (mqtt_host, mqtt_port)
-
- ssl_wrap_socket_mock.assert_called_once()
- ssl_context = ssl_wrap_socket_mock.call_args[0][0]
- assert ssl_context.check_hostname is True
- assert ssl_wrap_socket_mock.call_args[1]["server_hostname"] == mqtt_host
-
- while threading.active_count() > 1:
- time.sleep(0.01)
- mqtt_loop_forever_mock.assert_called_once()
- (mqtt_client,) = mqtt_loop_forever_mock.call_args[0]
- assert mqtt_client._tls_insecure is False
-
- assert mqtt_client._username is None
- assert mqtt_client._password is None
-
- caplog.clear()
- mqtt_client.socket().getpeername.return_value = (mqtt_host, mqtt_port)
- with unittest.mock.patch(
- "paho.mqtt.client.Client.subscribe"
- ) as mqtt_subscribe_mock:
- mqtt_client.on_connect(mqtt_client, mqtt_client._userdata, {}, 0)
- state = mqtt_client._userdata
- assert (
- state._login_manager.connect_to_signal.call_args[1]["signal_name"]
- == "PrepareForShutdown"
- )
- assert sorted(mqtt_subscribe_mock.call_args_list) == [
- unittest.mock.call(mqtt_topic_prefix + "/lock-all-sessions"),
- unittest.mock.call(mqtt_topic_prefix + "/poweroff"),
- ]
- assert mqtt_client.on_message is None
- for suffix in ("poweroff", "lock-all-sessions"):
- assert (
- mqtt_client._on_message_filtered[mqtt_topic_prefix + "/" + suffix]
- == systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[
- suffix
- ].mqtt_message_callback
- )
- assert caplog.records[0].levelno == logging.DEBUG
- assert (
- caplog.records[0].message == f"connected to MQTT broker {mqtt_host}:{mqtt_port}"
- )
- assert caplog.records[1].levelno == logging.DEBUG
- assert caplog.records[1].message == "acquired shutdown inhibitor lock"
- assert caplog.records[2].levelno == logging.INFO
- assert (
- caplog.records[2].message
- == f"publishing 'false' on {mqtt_topic_prefix}/preparing-for-shutdown"
- )
- assert caplog.records[3].levelno == logging.DEBUG
- assert (
- caplog.records[3].message
- == "publishing home assistant config on "
- + homeassistant_discovery_prefix
- + "/binary_sensor/"
- + homeassistant_node_id
- + "/preparing-for-shutdown/config"
- )
- assert all(r.levelno == logging.INFO for r in caplog.records[4::2])
- assert {r.message for r in caplog.records[4::2]} == {
- f"subscribing to {mqtt_topic_prefix}/{s}"
- for s in ("poweroff", "lock-all-sessions")
- }
- assert all(r.levelno == logging.DEBUG for r in caplog.records[5::2])
- assert {r.message for r in caplog.records[5::2]} == {
- f"registered MQTT callback for topic {mqtt_topic_prefix}/{s}"
- f" triggering {systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[s]}"
- for s in ("poweroff", "lock-all-sessions")
- }
-
- glib_loop_mock.assert_called_once_with()
-
- assert mqtt_client._thread_terminate
- assert mqtt_client._thread is None
- @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
- @pytest.mark.parametrize("mqtt_port", [1833])
- @pytest.mark.parametrize("mqtt_disable_tls", [True, False])
- def test__run_tls(caplog, mqtt_host, mqtt_port, mqtt_disable_tls):
- caplog.set_level(logging.INFO)
- with unittest.mock.patch(
- "paho.mqtt.client.Client"
- ) as mqtt_client_class, unittest.mock.patch("gi.repository.GLib.MainLoop.run"):
- systemctl_mqtt._run(
- mqtt_host=mqtt_host,
- mqtt_port=mqtt_port,
- mqtt_disable_tls=mqtt_disable_tls,
- mqtt_username=None,
- mqtt_password=None,
- mqtt_topic_prefix="systemctl/hosts",
- homeassistant_discovery_prefix="homeassistant",
- homeassistant_node_id="host",
- poweroff_delay=datetime.timedelta(),
- )
- assert caplog.records[0].levelno == logging.INFO
- assert caplog.records[0].message == (
- f"connecting to MQTT broker {mqtt_host}:{mqtt_port}"
- f" (TLS {'disabled' if mqtt_disable_tls else 'enabled'})"
- )
- if mqtt_disable_tls:
- mqtt_client_class().tls_set.assert_not_called()
- else:
- mqtt_client_class().tls_set.assert_called_once_with(ca_certs=None)
- def test__run_tls_default():
- with unittest.mock.patch(
- "paho.mqtt.client.Client"
- ) as mqtt_client_class, unittest.mock.patch("gi.repository.GLib.MainLoop.run"):
- systemctl_mqtt._run(
- mqtt_host="mqtt-broker.local",
- mqtt_port=1833,
-
- mqtt_username=None,
- mqtt_password=None,
- mqtt_topic_prefix="systemctl/hosts",
- homeassistant_discovery_prefix="homeassistant",
- homeassistant_node_id="host",
- poweroff_delay=datetime.timedelta(),
- )
-
- mqtt_client_class().tls_set.assert_called_once_with(ca_certs=None)
- @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
- @pytest.mark.parametrize("mqtt_port", [1833])
- @pytest.mark.parametrize("mqtt_username", ["me"])
- @pytest.mark.parametrize("mqtt_password", [None, "secret"])
- @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host"])
- def test__run_authentication(
- mqtt_host, mqtt_port, mqtt_username, mqtt_password, mqtt_topic_prefix
- ):
- with unittest.mock.patch("socket.create_connection"), unittest.mock.patch(
- "ssl.SSLContext.wrap_socket"
- ) as ssl_wrap_socket_mock, unittest.mock.patch(
- "paho.mqtt.client.Client.loop_forever", autospec=True
- ) as mqtt_loop_forever_mock, unittest.mock.patch(
- "gi.repository.GLib.MainLoop.run"
- ), unittest.mock.patch(
- "systemctl_mqtt._dbus.get_login_manager"
- ):
- ssl_wrap_socket_mock.return_value.send = len
- systemctl_mqtt._run(
- mqtt_host=mqtt_host,
- mqtt_port=mqtt_port,
- mqtt_username=mqtt_username,
- mqtt_password=mqtt_password,
- mqtt_topic_prefix=mqtt_topic_prefix,
- homeassistant_discovery_prefix="discovery-prefix",
- homeassistant_node_id="node-id",
- poweroff_delay=datetime.timedelta(),
- )
- mqtt_loop_forever_mock.assert_called_once()
- (mqtt_client,) = mqtt_loop_forever_mock.call_args[0]
- assert mqtt_client._username.decode() == mqtt_username
- if mqtt_password:
- assert mqtt_client._password.decode() == mqtt_password
- else:
- assert mqtt_client._password is None
- def _initialize_mqtt_client(
- mqtt_host, mqtt_port, mqtt_topic_prefix
- ) -> paho.mqtt.client.Client:
- with unittest.mock.patch("socket.create_connection"), unittest.mock.patch(
- "ssl.SSLContext.wrap_socket"
- ) as ssl_wrap_socket_mock, unittest.mock.patch(
- "paho.mqtt.client.Client.loop_forever", autospec=True
- ) as mqtt_loop_forever_mock, unittest.mock.patch(
- "gi.repository.GLib.MainLoop.run"
- ), unittest.mock.patch(
- "systemctl_mqtt._dbus.get_login_manager"
- ) as get_login_manager_mock:
- ssl_wrap_socket_mock.return_value.send = len
- get_login_manager_mock.return_value.Get.return_value = dbus.Boolean(False)
- systemctl_mqtt._run(
- mqtt_host=mqtt_host,
- mqtt_port=mqtt_port,
- mqtt_username=None,
- mqtt_password=None,
- mqtt_topic_prefix=mqtt_topic_prefix,
- homeassistant_discovery_prefix="discovery-prefix",
- homeassistant_node_id="node-id",
- poweroff_delay=datetime.timedelta(),
- )
- while threading.active_count() > 1:
- time.sleep(0.01)
- mqtt_loop_forever_mock.assert_called_once()
- (mqtt_client,) = mqtt_loop_forever_mock.call_args[0]
- mqtt_client.socket().getpeername.return_value = (mqtt_host, mqtt_port)
- mqtt_client.on_connect(mqtt_client, mqtt_client._userdata, {}, 0)
- return mqtt_client
- @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
- @pytest.mark.parametrize("mqtt_port", [1833])
- @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host", "system/command"])
- def test__client_handle_message(caplog, mqtt_host, mqtt_port, mqtt_topic_prefix):
- mqtt_client = _initialize_mqtt_client(
- mqtt_host=mqtt_host, mqtt_port=mqtt_port, mqtt_topic_prefix=mqtt_topic_prefix
- )
- caplog.clear()
- caplog.set_level(logging.DEBUG)
- poweroff_message = MQTTMessage(topic=mqtt_topic_prefix.encode() + b"/poweroff")
- with unittest.mock.patch.object(
- systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "trigger"
- ) as poweroff_trigger_mock:
- mqtt_client._handle_on_message(poweroff_message)
- poweroff_trigger_mock.assert_called_once_with(state=mqtt_client._userdata)
- assert all(r.levelno == logging.DEBUG for r in caplog.records)
- assert (
- caplog.records[0].message
- == f"received topic={poweroff_message.topic} payload=b''"
- )
- assert caplog.records[1].message == "executing action _MQTTActionSchedulePoweroff"
- assert caplog.records[2].message == "completed action _MQTTActionSchedulePoweroff"
- @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
- @pytest.mark.parametrize("mqtt_port", [1833])
- @pytest.mark.parametrize("mqtt_password", ["secret"])
- def test__run_authentication_missing_username(mqtt_host, mqtt_port, mqtt_password):
- with unittest.mock.patch("paho.mqtt.client.Client"), unittest.mock.patch(
- "systemctl_mqtt._dbus.get_login_manager"
- ):
- with pytest.raises(ValueError, match=r"^Missing MQTT username$"):
- systemctl_mqtt._run(
- mqtt_host=mqtt_host,
- mqtt_port=mqtt_port,
- mqtt_username=None,
- mqtt_password=mqtt_password,
- mqtt_topic_prefix="prefix",
- homeassistant_discovery_prefix="discovery-prefix",
- homeassistant_node_id="node-id",
- poweroff_delay=datetime.timedelta(),
- )
- @pytest.mark.parametrize("mqtt_topic", ["system/command/poweroff"])
- @pytest.mark.parametrize("payload", [b"", b"junk"])
- def test_mqtt_message_callback_poweroff(caplog, mqtt_topic: str, payload: bytes):
- message = MQTTMessage(topic=mqtt_topic.encode())
- message.payload = payload
- with unittest.mock.patch.object(
- systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "trigger"
- ) as trigger_mock, caplog.at_level(logging.DEBUG):
- systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[
- "poweroff"
- ].mqtt_message_callback(
- None, "state_dummy", message
- )
- trigger_mock.assert_called_once_with(state="state_dummy")
- assert len(caplog.records) == 3
- assert caplog.records[0].levelno == logging.DEBUG
- assert caplog.records[0].message == (
- f"received topic={mqtt_topic} payload={payload!r}"
- )
- assert caplog.records[1].levelno == logging.DEBUG
- assert caplog.records[1].message == "executing action _MQTTActionSchedulePoweroff"
- assert caplog.records[2].levelno == logging.DEBUG
- assert caplog.records[2].message == "completed action _MQTTActionSchedulePoweroff"
- @pytest.mark.parametrize("mqtt_topic", ["system/command/poweroff"])
- @pytest.mark.parametrize("payload", [b"", b"junk"])
- def test_mqtt_message_callback_poweroff_retained(
- caplog, mqtt_topic: str, payload: bytes
- ):
- message = MQTTMessage(topic=mqtt_topic.encode())
- message.payload = payload
- message.retain = True
- with unittest.mock.patch.object(
- systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "trigger"
- ) as trigger_mock, caplog.at_level(logging.DEBUG):
- systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[
- "poweroff"
- ].mqtt_message_callback(
- None, None, message
- )
- trigger_mock.assert_not_called()
- assert len(caplog.records) == 2
- assert caplog.records[0].levelno == logging.DEBUG
- assert caplog.records[0].message == (
- f"received topic={mqtt_topic} payload={payload!r}"
- )
- assert caplog.records[1].levelno == logging.INFO
- assert caplog.records[1].message == "ignoring retained message"
|