|
@@ -15,13 +15,16 @@
|
|
|
# You should have received a copy of the GNU General Public License
|
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
|
|
+import contextlib
|
|
|
import datetime
|
|
|
import logging
|
|
|
import threading
|
|
|
import time
|
|
|
+import typing
|
|
|
import unittest.mock
|
|
|
|
|
|
-import dbus
|
|
|
+import jeepney.fds
|
|
|
+import jeepney.low_level
|
|
|
import paho.mqtt.client
|
|
|
import pytest
|
|
|
from paho.mqtt.client import MQTTMessage
|
|
@@ -31,6 +34,16 @@ import systemctl_mqtt
|
|
|
# pylint: disable=protected-access,too-many-positional-arguments
|
|
|
|
|
|
|
|
|
+@contextlib.contextmanager
|
|
|
+def mock_open_dbus_connection() -> typing.Iterator[unittest.mock.MagicMock]:
|
|
|
+ with unittest.mock.patch("jeepney.io.blocking.open_dbus_connection") as mock:
|
|
|
+ add_match_reply = unittest.mock.Mock()
|
|
|
+ add_match_reply.body = ()
|
|
|
+ mock.return_value.send_and_get_reply.return_value = add_match_reply
|
|
|
+ mock.return_value.recv_until_filtered.side_effect = []
|
|
|
+ yield mock
|
|
|
+
|
|
|
+
|
|
|
@pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
|
|
|
@pytest.mark.parametrize("mqtt_port", [1833])
|
|
|
@pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host", "system/command"])
|
|
@@ -46,6 +59,7 @@ def test__run(
|
|
|
):
|
|
|
# pylint: disable=too-many-locals,too-many-arguments
|
|
|
caplog.set_level(logging.DEBUG)
|
|
|
+ login_manager_mock = unittest.mock.MagicMock()
|
|
|
with unittest.mock.patch(
|
|
|
"socket.create_connection"
|
|
|
) as create_socket_mock, unittest.mock.patch(
|
|
@@ -53,22 +67,22 @@ def test__run(
|
|
|
) as ssl_wrap_socket_mock, unittest.mock.patch(
|
|
|
"paho.mqtt.client.Client.loop_forever", autospec=True
|
|
|
) as mqtt_loop_forever_mock, unittest.mock.patch(
|
|
|
- "gi.repository.GLib.MainLoop.run"
|
|
|
- ) as glib_loop_mock, unittest.mock.patch(
|
|
|
- "systemctl_mqtt._dbus.get_login_manager"
|
|
|
- ) as get_login_manager_mock:
|
|
|
+ "systemctl_mqtt._dbus.get_login_manager_proxy", return_value=login_manager_mock
|
|
|
+ ), mock_open_dbus_connection() as open_dbus_connection_mock:
|
|
|
ssl_wrap_socket_mock.return_value.send = len
|
|
|
- get_login_manager_mock.return_value.Get.return_value = dbus.Boolean(False)
|
|
|
- systemctl_mqtt._run(
|
|
|
- mqtt_host=mqtt_host,
|
|
|
- mqtt_port=mqtt_port,
|
|
|
- mqtt_username=None,
|
|
|
- mqtt_password=None,
|
|
|
- mqtt_topic_prefix=mqtt_topic_prefix,
|
|
|
- homeassistant_discovery_prefix=homeassistant_discovery_prefix,
|
|
|
- homeassistant_discovery_object_id=homeassistant_discovery_object_id,
|
|
|
- poweroff_delay=datetime.timedelta(),
|
|
|
- )
|
|
|
+ login_manager_mock.Inhibit.return_value = (jeepney.fds.FileDescriptor(-1),)
|
|
|
+ login_manager_mock.Get.return_value = (("b", False),)
|
|
|
+ with pytest.raises(StopIteration):
|
|
|
+ systemctl_mqtt._run(
|
|
|
+ mqtt_host=mqtt_host,
|
|
|
+ mqtt_port=mqtt_port,
|
|
|
+ mqtt_username=None,
|
|
|
+ mqtt_password=None,
|
|
|
+ mqtt_topic_prefix=mqtt_topic_prefix,
|
|
|
+ homeassistant_discovery_prefix=homeassistant_discovery_prefix,
|
|
|
+ homeassistant_discovery_object_id=homeassistant_discovery_object_id,
|
|
|
+ poweroff_delay=datetime.timedelta(),
|
|
|
+ )
|
|
|
assert caplog.records[0].levelno == logging.INFO
|
|
|
assert caplog.records[0].message == (
|
|
|
f"connecting to MQTT broker {mqtt_host}:{mqtt_port} (TLS enabled)"
|
|
@@ -98,11 +112,15 @@ def test__run(
|
|
|
"paho.mqtt.client.Client.subscribe"
|
|
|
) as mqtt_subscribe_mock:
|
|
|
mqtt_client.on_connect(mqtt_client, mqtt_client._userdata, {}, 0)
|
|
|
- state = mqtt_client._userdata
|
|
|
- assert (
|
|
|
- state._login_manager.connect_to_signal.call_args[1]["signal_name"]
|
|
|
- == "PrepareForShutdown"
|
|
|
+ open_dbus_connection_mock.assert_called_once_with(bus="SYSTEM")
|
|
|
+ login_manager_mock.Inhibit.assert_called_once_with(
|
|
|
+ what="shutdown",
|
|
|
+ who="systemctl-mqtt",
|
|
|
+ why="Report shutdown via MQTT",
|
|
|
+ mode="delay",
|
|
|
)
|
|
|
+ login_manager_mock.Get.assert_called_once_with("PreparingForShutdown")
|
|
|
+ open_dbus_connection_mock.return_value.send_and_get_reply.assert_called_once()
|
|
|
assert sorted(mqtt_subscribe_mock.call_args_list) == [
|
|
|
unittest.mock.call(mqtt_topic_prefix + "/lock-all-sessions"),
|
|
|
unittest.mock.call(mqtt_topic_prefix + "/poweroff"),
|
|
@@ -146,8 +164,7 @@ def test__run(
|
|
|
f" triggering {systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[s]}"
|
|
|
for s in ("poweroff", "lock-all-sessions")
|
|
|
}
|
|
|
- # dbus loop started?
|
|
|
- glib_loop_mock.assert_called_once_with()
|
|
|
+ open_dbus_connection_mock.return_value.filter.assert_called_once()
|
|
|
# waited for mqtt loop to stop?
|
|
|
assert mqtt_client._thread_terminate
|
|
|
assert mqtt_client._thread is None
|
|
@@ -160,7 +177,7 @@ def test__run_tls(caplog, mqtt_host, mqtt_port, mqtt_disable_tls):
|
|
|
caplog.set_level(logging.INFO)
|
|
|
with unittest.mock.patch(
|
|
|
"paho.mqtt.client.Client"
|
|
|
- ) as mqtt_client_class, unittest.mock.patch("gi.repository.GLib.MainLoop.run"):
|
|
|
+ ) as mqtt_client_class, mock_open_dbus_connection(), pytest.raises(StopIteration):
|
|
|
systemctl_mqtt._run(
|
|
|
mqtt_host=mqtt_host,
|
|
|
mqtt_port=mqtt_port,
|
|
@@ -186,7 +203,7 @@ def test__run_tls(caplog, mqtt_host, mqtt_port, mqtt_disable_tls):
|
|
|
def test__run_tls_default():
|
|
|
with unittest.mock.patch(
|
|
|
"paho.mqtt.client.Client"
|
|
|
- ) as mqtt_client_class, unittest.mock.patch("gi.repository.GLib.MainLoop.run"):
|
|
|
+ ) as mqtt_client_class, mock_open_dbus_connection(), pytest.raises(StopIteration):
|
|
|
systemctl_mqtt._run(
|
|
|
mqtt_host="mqtt-broker.local",
|
|
|
mqtt_port=1833,
|
|
@@ -215,9 +232,9 @@ def test__run_authentication(
|
|
|
) as ssl_wrap_socket_mock, unittest.mock.patch(
|
|
|
"paho.mqtt.client.Client.loop_forever", autospec=True
|
|
|
) as mqtt_loop_forever_mock, unittest.mock.patch(
|
|
|
- "gi.repository.GLib.MainLoop.run"
|
|
|
- ), unittest.mock.patch(
|
|
|
- "systemctl_mqtt._dbus.get_login_manager"
|
|
|
+ "systemctl_mqtt._dbus.get_login_manager_proxy"
|
|
|
+ ), mock_open_dbus_connection(), pytest.raises(
|
|
|
+ StopIteration
|
|
|
):
|
|
|
ssl_wrap_socket_mock.return_value.send = len
|
|
|
systemctl_mqtt._run(
|
|
@@ -247,12 +264,15 @@ def _initialize_mqtt_client(
|
|
|
) as ssl_wrap_socket_mock, unittest.mock.patch(
|
|
|
"paho.mqtt.client.Client.loop_forever", autospec=True
|
|
|
) as mqtt_loop_forever_mock, unittest.mock.patch(
|
|
|
- "gi.repository.GLib.MainLoop.run"
|
|
|
- ), unittest.mock.patch(
|
|
|
- "systemctl_mqtt._dbus.get_login_manager"
|
|
|
- ) as get_login_manager_mock:
|
|
|
+ "systemctl_mqtt._dbus.get_login_manager_proxy"
|
|
|
+ ) as get_login_manager_mock, mock_open_dbus_connection(), pytest.raises(
|
|
|
+ StopIteration
|
|
|
+ ):
|
|
|
ssl_wrap_socket_mock.return_value.send = len
|
|
|
- get_login_manager_mock.return_value.Get.return_value = dbus.Boolean(False)
|
|
|
+ get_login_manager_mock.return_value.Inhibit.return_value = (
|
|
|
+ jeepney.fds.FileDescriptor(-1),
|
|
|
+ )
|
|
|
+ get_login_manager_mock.return_value.Get.return_value = (("b", True),)
|
|
|
systemctl_mqtt._run(
|
|
|
mqtt_host=mqtt_host,
|
|
|
mqtt_port=mqtt_port,
|
|
@@ -301,8 +321,8 @@ def test__client_handle_message(caplog, mqtt_host, mqtt_port, mqtt_topic_prefix)
|
|
|
@pytest.mark.parametrize("mqtt_password", ["secret"])
|
|
|
def test__run_authentication_missing_username(mqtt_host, mqtt_port, mqtt_password):
|
|
|
with unittest.mock.patch("paho.mqtt.client.Client"), unittest.mock.patch(
|
|
|
- "systemctl_mqtt._dbus.get_login_manager"
|
|
|
- ):
|
|
|
+ "systemctl_mqtt._dbus.get_login_manager_proxy"
|
|
|
+ ), mock_open_dbus_connection():
|
|
|
with pytest.raises(ValueError, match=r"^Missing MQTT username$"):
|
|
|
systemctl_mqtt._run(
|
|
|
mqtt_host=mqtt_host,
|
|
@@ -365,3 +385,33 @@ def test_mqtt_message_callback_poweroff_retained(
|
|
|
)
|
|
|
assert caplog.records[1].levelno == logging.INFO
|
|
|
assert caplog.records[1].message == "ignoring retained message"
|
|
|
+
|
|
|
+
|
|
|
+@pytest.mark.parametrize("active", [True, False])
|
|
|
+@pytest.mark.parametrize("block", [True, False])
|
|
|
+def test__publish_preparing_for_shutdown_blocking(active: bool, block: bool) -> None:
|
|
|
+ login_manager_mock = unittest.mock.MagicMock()
|
|
|
+ login_manager_mock.Get.return_value = (("b", active),)
|
|
|
+ with unittest.mock.patch(
|
|
|
+ "systemctl_mqtt._dbus.get_login_manager_proxy", return_value=login_manager_mock
|
|
|
+ ):
|
|
|
+ state = systemctl_mqtt._State(
|
|
|
+ mqtt_topic_prefix="prefix",
|
|
|
+ homeassistant_discovery_prefix="prefix",
|
|
|
+ homeassistant_discovery_object_id="object-id",
|
|
|
+ poweroff_delay=datetime.timedelta(),
|
|
|
+ )
|
|
|
+ mqtt_client_mock = unittest.mock.MagicMock()
|
|
|
+ state._publish_preparing_for_shutdown(
|
|
|
+ mqtt_client=mqtt_client_mock, active=active, block=block
|
|
|
+ )
|
|
|
+ mqtt_client_mock.publish.assert_called_once_with(
|
|
|
+ topic="prefix/preparing-for-shutdown",
|
|
|
+ payload="true" if active else "false",
|
|
|
+ retain=True,
|
|
|
+ )
|
|
|
+ msg_info = mqtt_client_mock.publish.return_value
|
|
|
+ if block:
|
|
|
+ msg_info.wait_for_publish.assert_called_once()
|
|
|
+ else:
|
|
|
+ msg_info.wait_for_publish.assert_not_called()
|