123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- # systemctl-mqtt - MQTT client triggering & reporting shutdown on systemd-based systems
- #
- # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
- #
- # This program is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program. If not, see <https://www.gnu.org/licenses/>.
- import logging
- import threading
- import time
- import unittest.mock
- import dbus
- import paho.mqtt.client
- import pytest
- from paho.mqtt.client import MQTTMessage
- import systemctl_mqtt
- # pylint: disable=protected-access
- @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
- @pytest.mark.parametrize("mqtt_port", [1833])
- @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host", "system/command"])
- def test__run(caplog, mqtt_host, mqtt_port, mqtt_topic_prefix):
- caplog.set_level(logging.DEBUG)
- with unittest.mock.patch(
- "socket.create_connection"
- ) as create_socket_mock, unittest.mock.patch(
- "ssl.SSLContext.wrap_socket", autospec=True,
- ) as ssl_wrap_socket_mock, unittest.mock.patch(
- "paho.mqtt.client.Client.loop_forever", autospec=True,
- ) as mqtt_loop_forever_mock, unittest.mock.patch(
- "gi.repository.GLib.MainLoop.run"
- ) as glib_loop_mock, unittest.mock.patch(
- "systemctl_mqtt._get_login_manager"
- ) as get_login_manager_mock:
- ssl_wrap_socket_mock.return_value.send = len
- get_login_manager_mock.return_value.Get.return_value = dbus.Boolean(False)
- systemctl_mqtt._run(
- mqtt_host=mqtt_host,
- mqtt_port=mqtt_port,
- mqtt_username=None,
- mqtt_password=None,
- mqtt_topic_prefix=mqtt_topic_prefix,
- )
- assert caplog.records[0].levelno == logging.INFO
- assert caplog.records[0].message == "connecting to MQTT broker {}:{}".format(
- mqtt_host, mqtt_port
- )
- # correct remote?
- assert create_socket_mock.call_count == 1
- create_socket_args, _ = create_socket_mock.call_args
- assert create_socket_args[0] == (mqtt_host, mqtt_port)
- # ssl enabled?
- assert ssl_wrap_socket_mock.call_count == 1
- ssl_context = ssl_wrap_socket_mock.call_args[0][0] # self
- assert ssl_context.check_hostname is True
- assert ssl_wrap_socket_mock.call_args[1]["server_hostname"] == mqtt_host
- # loop started?
- while threading.active_count() > 1:
- time.sleep(0.01)
- assert mqtt_loop_forever_mock.call_count == 1
- (mqtt_client,) = mqtt_loop_forever_mock.call_args[0]
- assert mqtt_client._tls_insecure is False
- # credentials
- assert mqtt_client._username is None
- assert mqtt_client._password is None
- # connect callback
- caplog.clear()
- mqtt_client.socket().getpeername.return_value = (mqtt_host, mqtt_port)
- with unittest.mock.patch(
- "paho.mqtt.client.Client.subscribe"
- ) as mqtt_subscribe_mock:
- mqtt_client.on_connect(mqtt_client, mqtt_client._userdata, {}, 0)
- state = mqtt_client._userdata
- assert (
- state._login_manager.connect_to_signal.call_args[1]["signal_name"]
- == "PrepareForShutdown"
- )
- mqtt_subscribe_mock.assert_called_once_with(mqtt_topic_prefix + "/poweroff")
- assert mqtt_client.on_message is None
- assert ( # pylint: disable=comparison-with-callable
- mqtt_client._on_message_filtered[mqtt_topic_prefix + "/poweroff"]
- == systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[
- "poweroff"
- ].mqtt_message_callback
- )
- assert caplog.records[0].levelno == logging.DEBUG
- assert caplog.records[0].message == "connected to MQTT broker {}:{}".format(
- mqtt_host, mqtt_port
- )
- assert caplog.records[1].levelno == logging.DEBUG
- assert caplog.records[1].message == "acquired shutdown inhibitor lock"
- assert caplog.records[2].levelno == logging.INFO
- assert caplog.records[2].message == "publishing 'false' on {}".format(
- mqtt_topic_prefix + "/preparing-for-shutdown"
- )
- assert caplog.records[3].levelno == logging.INFO
- assert caplog.records[3].message == "subscribing to {}".format(
- mqtt_topic_prefix + "/poweroff"
- )
- assert caplog.records[4].levelno == logging.DEBUG
- assert caplog.records[4].message == "registered MQTT callback for topic {}".format(
- mqtt_topic_prefix + "/poweroff"
- ) + " triggering {}".format(
- systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"].action
- )
- # dbus loop started?
- glib_loop_mock.assert_called_once_with()
- # waited for mqtt loop to stop?
- assert mqtt_client._thread_terminate
- assert mqtt_client._thread is None
- @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
- @pytest.mark.parametrize("mqtt_port", [1833])
- @pytest.mark.parametrize("mqtt_username", ["me"])
- @pytest.mark.parametrize("mqtt_password", [None, "secret"])
- @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host"])
- def test__run_authentication(
- mqtt_host, mqtt_port, mqtt_username, mqtt_password, mqtt_topic_prefix
- ):
- with unittest.mock.patch("socket.create_connection"), unittest.mock.patch(
- "ssl.SSLContext.wrap_socket"
- ) as ssl_wrap_socket_mock, unittest.mock.patch(
- "paho.mqtt.client.Client.loop_forever", autospec=True,
- ) as mqtt_loop_forever_mock, unittest.mock.patch(
- "gi.repository.GLib.MainLoop.run"
- ), unittest.mock.patch(
- "systemctl_mqtt._get_login_manager"
- ):
- ssl_wrap_socket_mock.return_value.send = len
- systemctl_mqtt._run(
- mqtt_host=mqtt_host,
- mqtt_port=mqtt_port,
- mqtt_username=mqtt_username,
- mqtt_password=mqtt_password,
- mqtt_topic_prefix=mqtt_topic_prefix,
- )
- assert mqtt_loop_forever_mock.call_count == 1
- (mqtt_client,) = mqtt_loop_forever_mock.call_args[0]
- assert mqtt_client._username.decode() == mqtt_username
- if mqtt_password:
- assert mqtt_client._password.decode() == mqtt_password
- else:
- assert mqtt_client._password is None
- def _initialize_mqtt_client(
- mqtt_host, mqtt_port, mqtt_topic_prefix
- ) -> paho.mqtt.client.Client:
- with unittest.mock.patch("socket.create_connection"), unittest.mock.patch(
- "ssl.SSLContext.wrap_socket",
- ) as ssl_wrap_socket_mock, unittest.mock.patch(
- "paho.mqtt.client.Client.loop_forever", autospec=True,
- ) as mqtt_loop_forever_mock, unittest.mock.patch(
- "gi.repository.GLib.MainLoop.run"
- ), unittest.mock.patch(
- "systemctl_mqtt._get_login_manager"
- ) as get_login_manager_mock:
- ssl_wrap_socket_mock.return_value.send = len
- get_login_manager_mock.return_value.Get.return_value = dbus.Boolean(False)
- systemctl_mqtt._run(
- mqtt_host=mqtt_host,
- mqtt_port=mqtt_port,
- mqtt_username=None,
- mqtt_password=None,
- mqtt_topic_prefix=mqtt_topic_prefix,
- )
- while threading.active_count() > 1:
- time.sleep(0.01)
- assert mqtt_loop_forever_mock.call_count == 1
- (mqtt_client,) = mqtt_loop_forever_mock.call_args[0]
- mqtt_client.socket().getpeername.return_value = (mqtt_host, mqtt_port)
- mqtt_client.on_connect(mqtt_client, mqtt_client._userdata, {}, 0)
- return mqtt_client
- @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
- @pytest.mark.parametrize("mqtt_port", [1833])
- @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host", "system/command"])
- def test__client_handle_message(caplog, mqtt_host, mqtt_port, mqtt_topic_prefix):
- mqtt_client = _initialize_mqtt_client(
- mqtt_host=mqtt_host, mqtt_port=mqtt_port, mqtt_topic_prefix=mqtt_topic_prefix
- )
- caplog.clear()
- caplog.set_level(logging.DEBUG)
- poweroff_message = MQTTMessage(topic=mqtt_topic_prefix.encode() + b"/poweroff")
- with unittest.mock.patch.object(
- systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "action",
- ) as poweroff_action_mock:
- mqtt_client._handle_on_message(poweroff_message)
- poweroff_action_mock.assert_called_once_with()
- assert all(r.levelno == logging.DEBUG for r in caplog.records)
- assert caplog.records[0].message == "received topic={} payload=b''".format(
- poweroff_message.topic
- )
- assert caplog.records[1].message.startswith("executing action poweroff")
- assert caplog.records[2].message.startswith("completed action poweroff")
- @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
- @pytest.mark.parametrize("mqtt_port", [1833])
- @pytest.mark.parametrize("mqtt_password", ["secret"])
- def test__run_authentication_missing_username(mqtt_host, mqtt_port, mqtt_password):
- with unittest.mock.patch("paho.mqtt.client.Client"), unittest.mock.patch(
- "systemctl_mqtt._get_login_manager"
- ):
- with pytest.raises(ValueError):
- systemctl_mqtt._run(
- mqtt_host=mqtt_host,
- mqtt_port=mqtt_port,
- mqtt_username=None,
- mqtt_password=mqtt_password,
- mqtt_topic_prefix="prefix",
- )
- @pytest.mark.parametrize("mqtt_topic", ["system/command/poweroff"])
- @pytest.mark.parametrize("payload", [b"", b"junk"])
- def test_mqtt_message_callback_poweroff(caplog, mqtt_topic: str, payload: bytes):
- message = MQTTMessage(topic=mqtt_topic.encode())
- message.payload = payload
- with unittest.mock.patch.object(
- systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "action",
- ) as action_mock, caplog.at_level(logging.DEBUG):
- systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[
- "poweroff"
- ].mqtt_message_callback(
- None, None, message # type: ignore
- )
- action_mock.assert_called_once_with()
- assert len(caplog.records) == 3
- assert caplog.records[0].levelno == logging.DEBUG
- assert caplog.records[0].message == (
- "received topic={} payload={!r}".format(mqtt_topic, payload)
- )
- assert caplog.records[1].levelno == logging.DEBUG
- assert caplog.records[1].message.startswith(
- "executing action {} ({!r})".format("poweroff", action_mock)
- )
- assert caplog.records[2].levelno == logging.DEBUG
- assert caplog.records[2].message.startswith(
- "completed action {} ({!r})".format("poweroff", action_mock)
- )
- @pytest.mark.parametrize("mqtt_topic", ["system/command/poweroff"])
- @pytest.mark.parametrize("payload", [b"", b"junk"])
- def test_mqtt_message_callback_poweroff_retained(
- caplog, mqtt_topic: str, payload: bytes
- ):
- message = MQTTMessage(topic=mqtt_topic.encode())
- message.payload = payload
- message.retain = True
- with unittest.mock.patch.object(
- systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "action",
- ) as action_mock, caplog.at_level(logging.DEBUG):
- systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[
- "poweroff"
- ].mqtt_message_callback(
- None, None, message # type: ignore
- )
- action_mock.assert_not_called()
- assert len(caplog.records) == 2
- assert caplog.records[0].levelno == logging.DEBUG
- assert caplog.records[0].message == (
- "received topic={} payload={!r}".format(mqtt_topic, payload)
- )
- assert caplog.records[1].levelno == logging.INFO
- assert caplog.records[1].message == "ignoring retained message"
|