test_dbus.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. # systemctl-mqtt - MQTT client triggering & reporting shutdown on systemd-based systems
  2. #
  3. # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. import datetime
  18. import logging
  19. import typing
  20. import unittest.mock
  21. import jeepney
  22. import jeepney.low_level
  23. import jeepney.wrappers
  24. import pytest
  25. import systemctl_mqtt._dbus
  26. # pylint: disable=protected-access
  27. def test_get_login_manager_proxy():
  28. login_manager = systemctl_mqtt._dbus.get_login_manager_proxy()
  29. assert isinstance(login_manager, jeepney.io.blocking.Proxy)
  30. assert login_manager._msggen.interface == "org.freedesktop.login1.Manager"
  31. # https://freedesktop.org/wiki/Software/systemd/logind/
  32. assert login_manager.CanPowerOff() in {("yes",), ("challenge",)}
  33. def test__log_shutdown_inhibitors_some(caplog):
  34. login_manager = unittest.mock.MagicMock()
  35. login_manager.ListInhibitors.return_value = (
  36. [
  37. (
  38. "shutdown:sleep",
  39. "Developer",
  40. "Haven't pushed my commits yet",
  41. "delay",
  42. 1000,
  43. 1234,
  44. ),
  45. ("shutdown", "Editor", "", "Unsafed files open", 0, 42),
  46. ],
  47. )
  48. with caplog.at_level(logging.DEBUG):
  49. systemctl_mqtt._dbus._log_shutdown_inhibitors(login_manager)
  50. assert len(caplog.records) == 2
  51. assert caplog.records[0].levelno == logging.DEBUG
  52. assert (
  53. caplog.records[0].message
  54. == "detected shutdown inhibitor Developer (pid=1234, uid=1000, mode=delay): "
  55. + "Haven't pushed my commits yet"
  56. )
  57. def test__log_shutdown_inhibitors_none(caplog):
  58. login_manager = unittest.mock.MagicMock()
  59. login_manager.ListInhibitors.return_value = ([],)
  60. with caplog.at_level(logging.DEBUG):
  61. systemctl_mqtt._dbus._log_shutdown_inhibitors(login_manager)
  62. assert len(caplog.records) == 1
  63. assert caplog.records[0].levelno == logging.DEBUG
  64. assert caplog.records[0].message == "no shutdown inhibitor locks found"
  65. def test__log_shutdown_inhibitors_fail(caplog):
  66. login_manager = unittest.mock.MagicMock()
  67. login_manager.ListInhibitors.side_effect = DBusErrorResponseMock("error", "mocked")
  68. with caplog.at_level(logging.DEBUG):
  69. systemctl_mqtt._dbus._log_shutdown_inhibitors(login_manager)
  70. assert len(caplog.records) == 1
  71. assert caplog.records[0].levelno == logging.WARNING
  72. assert (
  73. caplog.records[0].message
  74. == "failed to fetch shutdown inhibitors: [error] mocked"
  75. )
  76. @pytest.mark.parametrize("action", ["poweroff", "reboot"])
  77. @pytest.mark.parametrize("delay", [datetime.timedelta(0), datetime.timedelta(hours=1)])
  78. def test__schedule_shutdown(action, delay):
  79. login_manager_mock = unittest.mock.MagicMock()
  80. with unittest.mock.patch(
  81. "systemctl_mqtt._dbus.get_login_manager_proxy", return_value=login_manager_mock
  82. ):
  83. login_manager_mock.ListInhibitors.return_value = ([],)
  84. systemctl_mqtt._dbus.schedule_shutdown(action=action, delay=delay)
  85. login_manager_mock.ScheduleShutdown.assert_called_once()
  86. schedule_args, schedule_kwargs = login_manager_mock.ScheduleShutdown.call_args
  87. assert not schedule_args
  88. assert schedule_kwargs.pop("action") == action
  89. actual_delay = schedule_kwargs.pop("time") - datetime.datetime.now()
  90. assert actual_delay.total_seconds() == pytest.approx(delay.total_seconds(), abs=0.1)
  91. assert not schedule_kwargs
  92. class DBusErrorResponseMock(jeepney.wrappers.DBusErrorResponse):
  93. # pylint: disable=missing-class-docstring,super-init-not-called
  94. def __init__(self, name: str, data: typing.Any):
  95. self.name = name
  96. self.data = data
  97. @pytest.mark.parametrize("action", ["poweroff"])
  98. @pytest.mark.parametrize(
  99. ("error_name", "error_message", "log_message"),
  100. [
  101. (
  102. "test error",
  103. "test message",
  104. "[test error] ('test message',)",
  105. ),
  106. (
  107. "org.freedesktop.DBus.Error.InteractiveAuthorizationRequired",
  108. "Interactive authentication required.",
  109. "unauthorized; missing polkit authorization rules?",
  110. ),
  111. ],
  112. )
  113. def test__schedule_shutdown_fail(
  114. caplog, action, error_name, error_message, log_message
  115. ):
  116. login_manager_mock = unittest.mock.MagicMock()
  117. login_manager_mock.ScheduleShutdown.side_effect = DBusErrorResponseMock(
  118. name=error_name,
  119. data=(error_message,),
  120. )
  121. login_manager_mock.ListInhibitors.return_value = ([],)
  122. with unittest.mock.patch(
  123. "systemctl_mqtt._dbus.get_login_manager_proxy", return_value=login_manager_mock
  124. ), caplog.at_level(logging.DEBUG):
  125. systemctl_mqtt._dbus.schedule_shutdown(
  126. action=action, delay=datetime.timedelta(seconds=21)
  127. )
  128. login_manager_mock.ScheduleShutdown.assert_called_once()
  129. assert len(caplog.records) == 3
  130. assert caplog.records[0].levelno == logging.INFO
  131. assert caplog.records[0].message.startswith(f"scheduling {action} for ")
  132. assert caplog.records[1].levelno == logging.ERROR
  133. assert caplog.records[1].message == f"failed to schedule {action}: {log_message}"
  134. assert "inhibitor" in caplog.records[2].message
  135. def test_suspend(caplog):
  136. login_manager_mock = unittest.mock.MagicMock()
  137. with unittest.mock.patch(
  138. "systemctl_mqtt._dbus.get_login_manager_proxy", return_value=login_manager_mock
  139. ), caplog.at_level(logging.INFO):
  140. systemctl_mqtt._dbus.suspend()
  141. login_manager_mock.Suspend.assert_called_once_with(interactive=False)
  142. assert len(caplog.records) == 1
  143. assert caplog.records[0].levelno == logging.INFO
  144. assert caplog.records[0].message == "suspending system"
  145. def test_lock_all_sessions(caplog):
  146. login_manager_mock = unittest.mock.MagicMock()
  147. with unittest.mock.patch(
  148. "systemctl_mqtt._dbus.get_login_manager_proxy", return_value=login_manager_mock
  149. ), caplog.at_level(logging.INFO):
  150. systemctl_mqtt._dbus.lock_all_sessions()
  151. login_manager_mock.LockSessions.assert_called_once_with()
  152. assert len(caplog.records) == 1
  153. assert caplog.records[0].levelno == logging.INFO
  154. assert caplog.records[0].message == "instruct all sessions to activate screen locks"
  155. @pytest.mark.asyncio
  156. async def test__run_signal_loop():
  157. # pylint: disable=too-many-locals,too-many-arguments
  158. login_manager_mock = unittest.mock.MagicMock()
  159. dbus_connection_mock = unittest.mock.MagicMock()
  160. with unittest.mock.patch(
  161. "paho.mqtt.client.Client"
  162. ) as mqtt_client_mock, unittest.mock.patch(
  163. "systemctl_mqtt._dbus.get_login_manager_proxy", return_value=login_manager_mock
  164. ), unittest.mock.patch(
  165. "jeepney.io.blocking.open_dbus_connection", return_value=dbus_connection_mock
  166. ) as open_dbus_connection_mock:
  167. add_match_reply = unittest.mock.Mock()
  168. add_match_reply.body = ()
  169. dbus_connection_mock.send_and_get_reply.return_value = add_match_reply
  170. dbus_connection_mock.recv_until_filtered.side_effect = [
  171. jeepney.low_level.Message(header=None, body=(False,)),
  172. jeepney.low_level.Message(header=None, body=(True,)),
  173. jeepney.low_level.Message(header=None, body=(False,)),
  174. ]
  175. login_manager_mock.Inhibit.return_value = (jeepney.fds.FileDescriptor(-1),)
  176. with pytest.raises(RuntimeError, match=r"^coroutine raised StopIteration$"):
  177. await systemctl_mqtt._run(
  178. mqtt_host="localhost",
  179. mqtt_port=1833,
  180. mqtt_username=None,
  181. mqtt_password=None,
  182. mqtt_topic_prefix="systemctl/host",
  183. homeassistant_discovery_prefix="homeassistant",
  184. homeassistant_discovery_object_id="test",
  185. poweroff_delay=datetime.timedelta(),
  186. )
  187. open_dbus_connection_mock.assert_called_once_with(bus="SYSTEM")
  188. dbus_connection_mock.send_and_get_reply.assert_called_once()
  189. add_match_msg = dbus_connection_mock.send_and_get_reply.call_args[0][0]
  190. assert (
  191. add_match_msg.header.fields[jeepney.low_level.HeaderFields.member] == "AddMatch"
  192. )
  193. assert add_match_msg.body == (
  194. "interface='org.freedesktop.login1.Manager',member='PrepareForShutdown'"
  195. ",path='/org/freedesktop/login1',type='signal'",
  196. )
  197. assert mqtt_client_mock().publish.call_args_list == [
  198. unittest.mock.call(
  199. topic="systemctl/host/preparing-for-shutdown", payload="false", retain=True
  200. ),
  201. unittest.mock.call(
  202. topic="systemctl/host/preparing-for-shutdown", payload="true", retain=True
  203. ),
  204. unittest.mock.call(
  205. topic="systemctl/host/preparing-for-shutdown", payload="false", retain=True
  206. ),
  207. ]