test_dbus.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. # systemctl-mqtt - MQTT client triggering & reporting shutdown on systemd-based systems
  2. #
  3. # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. import datetime
  18. import logging
  19. import typing
  20. import unittest.mock
  21. import jeepney
  22. import jeepney.low_level
  23. import jeepney.wrappers
  24. import pytest
  25. import systemctl_mqtt._dbus
  26. # pylint: disable=protected-access
  27. def test_get_login_manager_proxy():
  28. login_manager = systemctl_mqtt._dbus.get_login_manager_proxy()
  29. assert isinstance(login_manager, jeepney.io.blocking.Proxy)
  30. assert login_manager._msggen.interface == "org.freedesktop.login1.Manager"
  31. # https://freedesktop.org/wiki/Software/systemd/logind/
  32. assert login_manager.CanPowerOff() in {("yes",), ("challenge",)}
  33. def test__log_shutdown_inhibitors_some(caplog):
  34. login_manager = unittest.mock.MagicMock()
  35. login_manager.ListInhibitors.return_value = (
  36. [
  37. (
  38. "shutdown:sleep",
  39. "Developer",
  40. "Haven't pushed my commits yet",
  41. "delay",
  42. 1000,
  43. 1234,
  44. ),
  45. ("shutdown", "Editor", "", "Unsafed files open", 0, 42),
  46. ],
  47. )
  48. with caplog.at_level(logging.DEBUG):
  49. systemctl_mqtt._dbus._log_shutdown_inhibitors(login_manager)
  50. assert len(caplog.records) == 2
  51. assert caplog.records[0].levelno == logging.DEBUG
  52. assert (
  53. caplog.records[0].message
  54. == "detected shutdown inhibitor Developer (pid=1234, uid=1000, mode=delay): "
  55. + "Haven't pushed my commits yet"
  56. )
  57. def test__log_shutdown_inhibitors_none(caplog):
  58. login_manager = unittest.mock.MagicMock()
  59. login_manager.ListInhibitors.return_value = ([],)
  60. with caplog.at_level(logging.DEBUG):
  61. systemctl_mqtt._dbus._log_shutdown_inhibitors(login_manager)
  62. assert len(caplog.records) == 1
  63. assert caplog.records[0].levelno == logging.DEBUG
  64. assert caplog.records[0].message == "no shutdown inhibitor locks found"
  65. def test__log_shutdown_inhibitors_fail(caplog):
  66. login_manager = unittest.mock.MagicMock()
  67. login_manager.ListInhibitors.side_effect = DBusErrorResponseMock("error", "mocked")
  68. with caplog.at_level(logging.DEBUG):
  69. systemctl_mqtt._dbus._log_shutdown_inhibitors(login_manager)
  70. assert len(caplog.records) == 1
  71. assert caplog.records[0].levelno == logging.WARNING
  72. assert (
  73. caplog.records[0].message
  74. == "failed to fetch shutdown inhibitors: [error] mocked"
  75. )
  76. @pytest.mark.parametrize("action", ["poweroff", "reboot"])
  77. @pytest.mark.parametrize("delay", [datetime.timedelta(0), datetime.timedelta(hours=1)])
  78. def test__schedule_shutdown(action, delay):
  79. login_manager_mock = unittest.mock.MagicMock()
  80. with unittest.mock.patch(
  81. "systemctl_mqtt._dbus.get_login_manager_proxy", return_value=login_manager_mock
  82. ):
  83. systemctl_mqtt._dbus.schedule_shutdown(action=action, delay=delay)
  84. login_manager_mock.ScheduleShutdown.assert_called_once()
  85. schedule_args, schedule_kwargs = login_manager_mock.ScheduleShutdown.call_args
  86. assert not schedule_args
  87. assert schedule_kwargs.pop("action") == action
  88. actual_delay = schedule_kwargs.pop("time") - datetime.datetime.now()
  89. assert actual_delay.total_seconds() == pytest.approx(delay.total_seconds(), abs=0.1)
  90. assert not schedule_kwargs
  91. class DBusErrorResponseMock(jeepney.wrappers.DBusErrorResponse):
  92. # pylint: disable=missing-class-docstring,super-init-not-called
  93. def __init__(self, name: str, data: typing.Any):
  94. self.name = name
  95. self.data = data
  96. @pytest.mark.parametrize("action", ["poweroff"])
  97. @pytest.mark.parametrize(
  98. ("error_name", "error_message", "log_message"),
  99. [
  100. (
  101. "test error",
  102. "test message",
  103. "[test error] ('test message',)",
  104. ),
  105. (
  106. "org.freedesktop.DBus.Error.InteractiveAuthorizationRequired",
  107. "Interactive authentication required.",
  108. "unauthorized; missing polkit authorization rules?",
  109. ),
  110. ],
  111. )
  112. def test__schedule_shutdown_fail(
  113. caplog, action, error_name, error_message, log_message
  114. ):
  115. login_manager_mock = unittest.mock.MagicMock()
  116. login_manager_mock.ScheduleShutdown.side_effect = DBusErrorResponseMock(
  117. name=error_name,
  118. data=(error_message,),
  119. )
  120. login_manager_mock.ListInhibitors.return_value = ([],)
  121. with unittest.mock.patch(
  122. "systemctl_mqtt._dbus.get_login_manager_proxy", return_value=login_manager_mock
  123. ), caplog.at_level(logging.DEBUG):
  124. systemctl_mqtt._dbus.schedule_shutdown(
  125. action=action, delay=datetime.timedelta(seconds=21)
  126. )
  127. login_manager_mock.ScheduleShutdown.assert_called_once()
  128. assert len(caplog.records) == 3
  129. assert caplog.records[0].levelno == logging.INFO
  130. assert caplog.records[0].message.startswith(f"scheduling {action} for ")
  131. assert caplog.records[1].levelno == logging.ERROR
  132. assert caplog.records[1].message == f"failed to schedule {action}: {log_message}"
  133. assert "inhibitor" in caplog.records[2].message
  134. def test_lock_all_sessions(caplog):
  135. login_manager_mock = unittest.mock.MagicMock()
  136. with unittest.mock.patch(
  137. "systemctl_mqtt._dbus.get_login_manager_proxy", return_value=login_manager_mock
  138. ), caplog.at_level(logging.INFO):
  139. systemctl_mqtt._dbus.lock_all_sessions()
  140. login_manager_mock.LockSessions.assert_called_once_with()
  141. assert len(caplog.records) == 1
  142. assert caplog.records[0].levelno == logging.INFO
  143. assert caplog.records[0].message == "instruct all sessions to activate screen locks"
  144. def test__run_signal_loop():
  145. # pylint: disable=too-many-locals,too-many-arguments
  146. login_manager_mock = unittest.mock.MagicMock()
  147. dbus_connection_mock = unittest.mock.MagicMock()
  148. with unittest.mock.patch(
  149. "paho.mqtt.client.Client"
  150. ) as mqtt_client_mock, unittest.mock.patch(
  151. "systemctl_mqtt._dbus.get_login_manager_proxy", return_value=login_manager_mock
  152. ), unittest.mock.patch(
  153. "jeepney.io.blocking.open_dbus_connection", return_value=dbus_connection_mock
  154. ) as open_dbus_connection_mock:
  155. add_match_reply = unittest.mock.Mock()
  156. add_match_reply.body = ()
  157. dbus_connection_mock.send_and_get_reply.return_value = add_match_reply
  158. dbus_connection_mock.recv_until_filtered.side_effect = [
  159. jeepney.low_level.Message(header=None, body=(False,)),
  160. jeepney.low_level.Message(header=None, body=(True,)),
  161. jeepney.low_level.Message(header=None, body=(False,)),
  162. ]
  163. login_manager_mock.Inhibit.return_value = (jeepney.fds.FileDescriptor(-1),)
  164. with pytest.raises(StopIteration):
  165. systemctl_mqtt._run(
  166. mqtt_host="localhost",
  167. mqtt_port=1833,
  168. mqtt_username=None,
  169. mqtt_password=None,
  170. mqtt_topic_prefix="systemctl/host",
  171. homeassistant_discovery_prefix="homeassistant",
  172. homeassistant_discovery_object_id="test",
  173. poweroff_delay=datetime.timedelta(),
  174. )
  175. open_dbus_connection_mock.assert_called_once_with(bus="SYSTEM")
  176. dbus_connection_mock.send_and_get_reply.assert_called_once()
  177. add_match_msg = dbus_connection_mock.send_and_get_reply.call_args[0][0]
  178. assert (
  179. add_match_msg.header.fields[jeepney.low_level.HeaderFields.member] == "AddMatch"
  180. )
  181. assert add_match_msg.body == (
  182. "interface='org.freedesktop.login1.Manager',member='PrepareForShutdown'"
  183. ",path='/org/freedesktop/login1',type='signal'",
  184. )
  185. assert mqtt_client_mock().publish.call_args_list == [
  186. unittest.mock.call(
  187. topic="systemctl/host/preparing-for-shutdown", payload="false", retain=True
  188. ),
  189. unittest.mock.call(
  190. topic="systemctl/host/preparing-for-shutdown", payload="true", retain=True
  191. ),
  192. unittest.mock.call(
  193. topic="systemctl/host/preparing-for-shutdown", payload="false", retain=True
  194. ),
  195. ]