| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 | 
							- # systemctl-mqtt - MQTT client triggering shutdown on systemd-based systems
 
- #
 
- # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
 
- #
 
- # This program is free software: you can redistribute it and/or modify
 
- # it under the terms of the GNU General Public License as published by
 
- # the Free Software Foundation, either version 3 of the License, or
 
- # any later version.
 
- #
 
- # This program is distributed in the hope that it will be useful,
 
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
 
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
- # GNU General Public License for more details.
 
- #
 
- # You should have received a copy of the GNU General Public License
 
- # along with this program.  If not, see <https://www.gnu.org/licenses/>.
 
- import logging
 
- import threading
 
- import time
 
- import unittest.mock
 
- import paho.mqtt.client
 
- import pytest
 
- from paho.mqtt.client import MQTTMessage
 
- import systemctl_mqtt
 
- # pylint: disable=protected-access
 
- @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
 
- @pytest.mark.parametrize("mqtt_port", [1833])
 
- @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host", "system/command"])
 
- def test__run(caplog, mqtt_host, mqtt_port, mqtt_topic_prefix):
 
-     caplog.set_level(logging.DEBUG)
 
-     with unittest.mock.patch(
 
-         "socket.create_connection"
 
-     ) as create_socket_mock, unittest.mock.patch(
 
-         "ssl.SSLContext.wrap_socket", autospec=True,
 
-     ) as ssl_wrap_socket_mock, unittest.mock.patch(
 
-         "paho.mqtt.client.Client.loop_forever", autospec=True,
 
-     ) as mqtt_loop_forever_mock, unittest.mock.patch(
 
-         "gi.repository.GLib.MainLoop.run"
 
-     ) as glib_loop_mock, unittest.mock.patch(
 
-         "systemctl_mqtt._get_login_manager"
 
-     ):
 
-         ssl_wrap_socket_mock.return_value.send = len
 
-         systemctl_mqtt._run(
 
-             mqtt_host=mqtt_host,
 
-             mqtt_port=mqtt_port,
 
-             mqtt_username=None,
 
-             mqtt_password=None,
 
-             mqtt_topic_prefix=mqtt_topic_prefix,
 
-         )
 
-     assert caplog.records[0].levelno == logging.INFO
 
-     assert caplog.records[0].message == "connecting to MQTT broker {}:{}".format(
 
-         mqtt_host, mqtt_port
 
-     )
 
-     # correct remote?
 
-     assert create_socket_mock.call_count == 1
 
-     create_socket_args, _ = create_socket_mock.call_args
 
-     assert create_socket_args[0] == (mqtt_host, mqtt_port)
 
-     # ssl enabled?
 
-     assert ssl_wrap_socket_mock.call_count == 1
 
-     ssl_context = ssl_wrap_socket_mock.call_args[0][0]  # self
 
-     assert ssl_context.check_hostname is True
 
-     assert ssl_wrap_socket_mock.call_args[1]["server_hostname"] == mqtt_host
 
-     # loop started?
 
-     while threading.active_count() > 1:
 
-         time.sleep(0.01)
 
-     assert mqtt_loop_forever_mock.call_count == 1
 
-     (mqtt_client,) = mqtt_loop_forever_mock.call_args[0]
 
-     assert mqtt_client._tls_insecure is False
 
-     # credentials
 
-     assert mqtt_client._username is None
 
-     assert mqtt_client._password is None
 
-     # connect callback
 
-     caplog.clear()
 
-     mqtt_client.socket().getpeername.return_value = (mqtt_host, mqtt_port)
 
-     with unittest.mock.patch(
 
-         "paho.mqtt.client.Client.subscribe"
 
-     ) as mqtt_subscribe_mock:
 
-         mqtt_client.on_connect(mqtt_client, mqtt_client._userdata, {}, 0)
 
-     state = mqtt_client._userdata
 
-     assert (
 
-         state._login_manager.connect_to_signal.call_args[1]["signal_name"]
 
-         == "PrepareForShutdown"
 
-     )
 
-     mqtt_subscribe_mock.assert_called_once_with(mqtt_topic_prefix + "/poweroff")
 
-     assert mqtt_client.on_message is None
 
-     assert (  # pylint: disable=comparison-with-callable
 
-         mqtt_client._on_message_filtered[mqtt_topic_prefix + "/poweroff"]
 
-         == systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[
 
-             "poweroff"
 
-         ].mqtt_message_callback
 
-     )
 
-     assert caplog.records[0].levelno == logging.DEBUG
 
-     assert caplog.records[0].message == "connected to MQTT broker {}:{}".format(
 
-         mqtt_host, mqtt_port
 
-     )
 
-     assert caplog.records[1].levelno == logging.DEBUG
 
-     assert caplog.records[1].message == "acquired shutdown inhibitor lock"
 
-     assert caplog.records[2].levelno == logging.INFO
 
-     assert caplog.records[2].message == "subscribing to {}".format(
 
-         mqtt_topic_prefix + "/poweroff"
 
-     )
 
-     assert caplog.records[3].levelno == logging.DEBUG
 
-     assert caplog.records[3].message == "registered MQTT callback for topic {}".format(
 
-         mqtt_topic_prefix + "/poweroff"
 
-     ) + " triggering {}".format(
 
-         systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"].action
 
-     )
 
-     # dbus loop started?
 
-     glib_loop_mock.assert_called_once_with()
 
-     # waited for mqtt loop to stop?
 
-     assert mqtt_client._thread_terminate
 
-     assert mqtt_client._thread is None
 
- @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
 
- @pytest.mark.parametrize("mqtt_port", [1833])
 
- @pytest.mark.parametrize("mqtt_username", ["me"])
 
- @pytest.mark.parametrize("mqtt_password", [None, "secret"])
 
- @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host"])
 
- def test__run_authentication(
 
-     mqtt_host, mqtt_port, mqtt_username, mqtt_password, mqtt_topic_prefix
 
- ):
 
-     with unittest.mock.patch("socket.create_connection"), unittest.mock.patch(
 
-         "ssl.SSLContext.wrap_socket"
 
-     ) as ssl_wrap_socket_mock, unittest.mock.patch(
 
-         "paho.mqtt.client.Client.loop_forever", autospec=True,
 
-     ) as mqtt_loop_forever_mock, unittest.mock.patch(
 
-         "gi.repository.GLib.MainLoop.run"
 
-     ), unittest.mock.patch(
 
-         "systemctl_mqtt._get_login_manager"
 
-     ):
 
-         ssl_wrap_socket_mock.return_value.send = len
 
-         systemctl_mqtt._run(
 
-             mqtt_host=mqtt_host,
 
-             mqtt_port=mqtt_port,
 
-             mqtt_username=mqtt_username,
 
-             mqtt_password=mqtt_password,
 
-             mqtt_topic_prefix=mqtt_topic_prefix,
 
-         )
 
-     assert mqtt_loop_forever_mock.call_count == 1
 
-     (mqtt_client,) = mqtt_loop_forever_mock.call_args[0]
 
-     assert mqtt_client._username.decode() == mqtt_username
 
-     if mqtt_password:
 
-         assert mqtt_client._password.decode() == mqtt_password
 
-     else:
 
-         assert mqtt_client._password is None
 
- def _initialize_mqtt_client(
 
-     mqtt_host, mqtt_port, mqtt_topic_prefix
 
- ) -> paho.mqtt.client.Client:
 
-     with unittest.mock.patch("socket.create_connection"), unittest.mock.patch(
 
-         "ssl.SSLContext.wrap_socket",
 
-     ) as ssl_wrap_socket_mock, unittest.mock.patch(
 
-         "paho.mqtt.client.Client.loop_forever", autospec=True,
 
-     ) as mqtt_loop_forever_mock, unittest.mock.patch(
 
-         "gi.repository.GLib.MainLoop.run"
 
-     ), unittest.mock.patch(
 
-         "systemctl_mqtt._get_login_manager"
 
-     ):
 
-         ssl_wrap_socket_mock.return_value.send = len
 
-         systemctl_mqtt._run(
 
-             mqtt_host=mqtt_host,
 
-             mqtt_port=mqtt_port,
 
-             mqtt_username=None,
 
-             mqtt_password=None,
 
-             mqtt_topic_prefix=mqtt_topic_prefix,
 
-         )
 
-     while threading.active_count() > 1:
 
-         time.sleep(0.01)
 
-     assert mqtt_loop_forever_mock.call_count == 1
 
-     (mqtt_client,) = mqtt_loop_forever_mock.call_args[0]
 
-     mqtt_client.socket().getpeername.return_value = (mqtt_host, mqtt_port)
 
-     mqtt_client.on_connect(mqtt_client, mqtt_client._userdata, {}, 0)
 
-     return mqtt_client
 
- @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
 
- @pytest.mark.parametrize("mqtt_port", [1833])
 
- @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host", "system/command"])
 
- def test__client_handle_message(caplog, mqtt_host, mqtt_port, mqtt_topic_prefix):
 
-     mqtt_client = _initialize_mqtt_client(
 
-         mqtt_host=mqtt_host, mqtt_port=mqtt_port, mqtt_topic_prefix=mqtt_topic_prefix
 
-     )
 
-     caplog.clear()
 
-     caplog.set_level(logging.DEBUG)
 
-     poweroff_message = MQTTMessage(topic=mqtt_topic_prefix.encode() + b"/poweroff")
 
-     with unittest.mock.patch.object(
 
-         systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "action",
 
-     ) as poweroff_action_mock:
 
-         mqtt_client._handle_on_message(poweroff_message)
 
-     poweroff_action_mock.assert_called_once_with()
 
-     assert all(r.levelno == logging.DEBUG for r in caplog.records)
 
-     assert caplog.records[0].message == "received topic={} payload=b''".format(
 
-         poweroff_message.topic
 
-     )
 
-     assert caplog.records[1].message.startswith("executing action poweroff")
 
-     assert caplog.records[2].message.startswith("completed action poweroff")
 
- @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
 
- @pytest.mark.parametrize("mqtt_port", [1833])
 
- @pytest.mark.parametrize("mqtt_password", ["secret"])
 
- def test__run_authentication_missing_username(mqtt_host, mqtt_port, mqtt_password):
 
-     with unittest.mock.patch("paho.mqtt.client.Client"), unittest.mock.patch(
 
-         "systemctl_mqtt._get_login_manager"
 
-     ):
 
-         with pytest.raises(ValueError):
 
-             systemctl_mqtt._run(
 
-                 mqtt_host=mqtt_host,
 
-                 mqtt_port=mqtt_port,
 
-                 mqtt_username=None,
 
-                 mqtt_password=mqtt_password,
 
-                 mqtt_topic_prefix="prefix",
 
-             )
 
- @pytest.mark.parametrize("mqtt_topic", ["system/command/poweroff"])
 
- @pytest.mark.parametrize("payload", [b"", b"junk"])
 
- def test_mqtt_message_callback_poweroff(caplog, mqtt_topic: str, payload: bytes):
 
-     message = MQTTMessage(topic=mqtt_topic.encode())
 
-     message.payload = payload
 
-     with unittest.mock.patch.object(
 
-         systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "action",
 
-     ) as action_mock, caplog.at_level(logging.DEBUG):
 
-         systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[
 
-             "poweroff"
 
-         ].mqtt_message_callback(
 
-             None, None, message  # type: ignore
 
-         )
 
-     action_mock.assert_called_once_with()
 
-     assert len(caplog.records) == 3
 
-     assert caplog.records[0].levelno == logging.DEBUG
 
-     assert caplog.records[0].message == (
 
-         "received topic={} payload={!r}".format(mqtt_topic, payload)
 
-     )
 
-     assert caplog.records[1].levelno == logging.DEBUG
 
-     assert caplog.records[1].message.startswith(
 
-         "executing action {} ({!r})".format("poweroff", action_mock)
 
-     )
 
-     assert caplog.records[2].levelno == logging.DEBUG
 
-     assert caplog.records[2].message.startswith(
 
-         "completed action {} ({!r})".format("poweroff", action_mock)
 
-     )
 
- @pytest.mark.parametrize("mqtt_topic", ["system/command/poweroff"])
 
- @pytest.mark.parametrize("payload", [b"", b"junk"])
 
- def test_mqtt_message_callback_poweroff_retained(
 
-     caplog, mqtt_topic: str, payload: bytes
 
- ):
 
-     message = MQTTMessage(topic=mqtt_topic.encode())
 
-     message.payload = payload
 
-     message.retain = True
 
-     with unittest.mock.patch.object(
 
-         systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "action",
 
-     ) as action_mock, caplog.at_level(logging.DEBUG):
 
-         systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[
 
-             "poweroff"
 
-         ].mqtt_message_callback(
 
-             None, None, message  # type: ignore
 
-         )
 
-     action_mock.assert_not_called()
 
-     assert len(caplog.records) == 2
 
-     assert caplog.records[0].levelno == logging.DEBUG
 
-     assert caplog.records[0].message == (
 
-         "received topic={} payload={!r}".format(mqtt_topic, payload)
 
-     )
 
-     assert caplog.records[1].levelno == logging.INFO
 
-     assert caplog.records[1].message == "ignoring retained message"
 
 
  |