test_dbus.py 15 KB


  1. # systemctl-mqtt - MQTT client triggering & reporting shutdown on systemd-based systems
  2. #
  3. # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. import asyncio
  18. import datetime
  19. import getpass
  20. import logging
  21. import typing
  22. import unittest.mock
  23. import jeepney
  24. import jeepney.low_level
  25. import jeepney.wrappers
  26. import pytest
  27. import systemctl_mqtt._dbus.login_manager
  28. # pylint: disable=protected-access
  29. def test_get_login_manager_proxy():
  30. login_manager = systemctl_mqtt._dbus.login_manager.get_login_manager_proxy()
  31. assert isinstance(login_manager, jeepney.io.blocking.Proxy)
  32. assert login_manager._msggen.interface == "org.freedesktop.login1.Manager"
  33. # https://freedesktop.org/wiki/Software/systemd/logind/
  34. assert login_manager.CanPowerOff() in {("yes",), ("challenge",)}
  35. def test__log_shutdown_inhibitors_some(caplog):
  36. login_manager = unittest.mock.MagicMock()
  37. login_manager.ListInhibitors.return_value = (
  38. [
  39. (
  40. "shutdown:sleep",
  41. "Developer",
  42. "Haven't pushed my commits yet",
  43. "delay",
  44. 1000,
  45. 1234,
  46. ),
  47. ("shutdown", "Editor", "", "Unsafed files open", 0, 42),
  48. ],
  49. )
  50. with caplog.at_level(logging.DEBUG):
  51. systemctl_mqtt._dbus.login_manager._log_shutdown_inhibitors(login_manager)
  52. assert len(caplog.records) == 2
  53. assert caplog.records[0].levelno == logging.DEBUG
  54. assert (
  55. caplog.records[0].message
  56. == "detected shutdown inhibitor Developer (pid=1234, uid=1000, mode=delay): "
  57. + "Haven't pushed my commits yet"
  58. )
  59. def test__log_shutdown_inhibitors_none(caplog):
  60. login_manager = unittest.mock.MagicMock()
  61. login_manager.ListInhibitors.return_value = ([],)
  62. with caplog.at_level(logging.DEBUG):
  63. systemctl_mqtt._dbus.login_manager._log_shutdown_inhibitors(login_manager)
  64. assert len(caplog.records) == 1
  65. assert caplog.records[0].levelno == logging.DEBUG
  66. assert caplog.records[0].message == "no shutdown inhibitor locks found"
  67. def test__log_shutdown_inhibitors_fail(caplog):
  68. login_manager = unittest.mock.MagicMock()
  69. login_manager.ListInhibitors.side_effect = DBusErrorResponseMock("error", "mocked")
  70. with caplog.at_level(logging.DEBUG):
  71. systemctl_mqtt._dbus.login_manager._log_shutdown_inhibitors(login_manager)
  72. assert len(caplog.records) == 1
  73. assert caplog.records[0].levelno == logging.WARNING
  74. assert (
  75. caplog.records[0].message
  76. == "failed to fetch shutdown inhibitors: [error] mocked"
  77. )
  78. @pytest.mark.parametrize("action", ["poweroff", "reboot"])
  79. @pytest.mark.parametrize("delay", [datetime.timedelta(0), datetime.timedelta(hours=1)])
  80. def test__schedule_shutdown(action, delay):
  81. login_manager_mock = unittest.mock.MagicMock()
  82. with unittest.mock.patch(
  83. "systemctl_mqtt._dbus.login_manager.get_login_manager_proxy",
  84. return_value=login_manager_mock,
  85. ):
  86. login_manager_mock.ListInhibitors.return_value = ([],)
  87. systemctl_mqtt._dbus.login_manager.schedule_shutdown(action=action, delay=delay)
  88. login_manager_mock.ScheduleShutdown.assert_called_once()
  89. schedule_args, schedule_kwargs = login_manager_mock.ScheduleShutdown.call_args
  90. assert not schedule_args
  91. assert schedule_kwargs.pop("action") == action
  92. actual_delay = schedule_kwargs.pop("time") - datetime.datetime.now()
  93. assert actual_delay.total_seconds() == pytest.approx(delay.total_seconds(), abs=0.1)
  94. assert not schedule_kwargs
  95. class DBusErrorResponseMock(jeepney.wrappers.DBusErrorResponse):
  96. # pylint: disable=missing-class-docstring,super-init-not-called
  97. def __init__(self, name: str, data: typing.Any):
  98. self.name = name
  99. self.data = data
  100. @pytest.mark.parametrize("action", ["poweroff"])
  101. @pytest.mark.parametrize(
  102. ("error_name", "error_message", "log_message"),
  103. [
  104. (
  105. "test error",
  106. "test message",
  107. "[test error] ('test message',)",
  108. ),
  109. (
  110. "org.freedesktop.DBus.Error.InteractiveAuthorizationRequired",
  111. "Interactive authentication required.",
  112. """interactive authorization required
  113. create /etc/polkit-1/rules.d/50-systemctl-mqtt.rules and insert the following rule:
  114. polkit.addRule(function(action, subject) {
  115. if(action.id === "org.freedesktop.login1.power-off" && subject.user === "{{username}}") {
  116. return polkit.Result.YES;
  117. }
  118. });
  119. """.replace(
  120. "{{username}}", getpass.getuser()
  121. ),
  122. ),
  123. ],
  124. )
  125. def test__schedule_shutdown_fail(
  126. caplog, action, error_name, error_message, log_message
  127. ):
  128. login_manager_mock = unittest.mock.MagicMock()
  129. login_manager_mock.ScheduleShutdown.side_effect = DBusErrorResponseMock(
  130. name=error_name,
  131. data=(error_message,),
  132. )
  133. login_manager_mock.ListInhibitors.return_value = ([],)
  134. with unittest.mock.patch(
  135. "systemctl_mqtt._dbus.login_manager.get_login_manager_proxy",
  136. return_value=login_manager_mock,
  137. ), caplog.at_level(logging.DEBUG):
  138. systemctl_mqtt._dbus.login_manager.schedule_shutdown(
  139. action=action, delay=datetime.timedelta(seconds=21)
  140. )
  141. login_manager_mock.ScheduleShutdown.assert_called_once()
  142. assert len(caplog.records) == 3
  143. assert caplog.records[0].levelno == logging.INFO
  144. assert caplog.records[0].message.startswith(f"scheduling {action} for ")
  145. assert caplog.records[1].levelno == logging.ERROR
  146. assert caplog.records[1].message == f"failed to schedule {action}: {log_message}"
  147. assert "inhibitor" in caplog.records[2].message
  148. @pytest.mark.parametrize("action", ["poweroff"])
  149. @pytest.mark.parametrize(
  150. ("error_name", "error_message", "log_message"),
  151. [
  152. (
  153. "org.freedesktop.DBus.Error.InteractiveAuthorizationRequired",
  154. "Interactive authentication required.",
  155. """interactive authorization required
  156. create /etc/polkit-1/rules.d/50-systemctl-mqtt.rules and insert the following rule:
  157. polkit.addRule(function(action, subject) {
  158. if(action.id === "org.freedesktop.login1.power-off" && subject.user === "USERNAME") {
  159. return polkit.Result.YES;
  160. }
  161. });
  162. """,
  163. ),
  164. ],
  165. )
  166. def test__schedule_shutdown_fail_no_username(
  167. caplog, action, error_name, error_message, log_message
  168. ):
  169. login_manager_mock = unittest.mock.MagicMock()
  170. login_manager_mock.ScheduleShutdown.side_effect = DBusErrorResponseMock(
  171. name=error_name,
  172. data=(error_message,),
  173. )
  174. login_manager_mock.ListInhibitors.return_value = ([],)
  175. with unittest.mock.patch(
  176. "systemctl_mqtt._dbus.login_manager.get_login_manager_proxy",
  177. return_value=login_manager_mock,
  178. ), unittest.mock.patch(
  179. "getpass.getuser", side_effect=OSError("No username set in the environment")
  180. ), caplog.at_level(
  181. logging.ERROR
  182. ):
  183. systemctl_mqtt._dbus.login_manager.schedule_shutdown(
  184. action=action, delay=datetime.timedelta(seconds=21)
  185. )
  186. login_manager_mock.ScheduleShutdown.assert_called_once()
  187. assert len(caplog.records) == 1
  188. assert caplog.records[0].levelno == logging.ERROR
  189. assert caplog.records[0].message == f"failed to schedule {action}: {log_message}"
  190. def test_suspend(caplog):
  191. login_manager_mock = unittest.mock.MagicMock()
  192. with unittest.mock.patch(
  193. "systemctl_mqtt._dbus.login_manager.get_login_manager_proxy",
  194. return_value=login_manager_mock,
  195. ), caplog.at_level(logging.INFO):
  196. systemctl_mqtt._dbus.login_manager.suspend()
  197. login_manager_mock.Suspend.assert_called_once_with(interactive=False)
  198. assert len(caplog.records) == 1
  199. assert caplog.records[0].levelno == logging.INFO
  200. assert caplog.records[0].message == "suspending system"
  201. def test_lock_all_sessions(caplog):
  202. login_manager_mock = unittest.mock.MagicMock()
  203. with unittest.mock.patch(
  204. "systemctl_mqtt._dbus.login_manager.get_login_manager_proxy",
  205. return_value=login_manager_mock,
  206. ), caplog.at_level(logging.INFO):
  207. systemctl_mqtt._dbus.login_manager.lock_all_sessions()
  208. login_manager_mock.LockSessions.assert_called_once_with()
  209. assert len(caplog.records) == 1
  210. assert caplog.records[0].levelno == logging.INFO
  211. assert caplog.records[0].message == "instruct all sessions to activate screen locks"
  212. async def _get_unit_path_mock( # pylint: disable=unused-argument
  213. *, service_manager: jeepney.io.asyncio.Proxy, unit_name: str
  214. ) -> str:
  215. return "/org/freedesktop/systemd1/unit/" + unit_name
  216. @pytest.mark.asyncio
  217. @pytest.mark.parametrize(
  218. "monitored_system_unit_names", [[], ["foo.service", "bar.service"]]
  219. )
  220. async def test__dbus_signal_loop(monitored_system_unit_names: typing.List[str]) -> None:
  221. # pylint: disable=too-many-locals,too-many-arguments
  222. state_mock = unittest.mock.AsyncMock()
  223. with unittest.mock.patch(
  224. "jeepney.io.asyncio.open_dbus_router",
  225. ) as open_dbus_router_mock, unittest.mock.patch(
  226. "systemctl_mqtt._get_unit_path", _get_unit_path_mock
  227. ), unittest.mock.patch(
  228. "systemctl_mqtt._dbus_signal_loop_unit"
  229. ) as dbus_signal_loop_unit_mock:
  230. async with open_dbus_router_mock() as dbus_router_mock:
  231. pass
  232. add_match_reply = unittest.mock.Mock()
  233. add_match_reply.body = ()
  234. dbus_router_mock.send_and_get_reply.return_value = add_match_reply
  235. msg_queue: asyncio.Queue[jeepney.low_level.Message] = asyncio.Queue()
  236. await msg_queue.put(jeepney.low_level.Message(header=None, body=(False,)))
  237. await msg_queue.put(jeepney.low_level.Message(header=None, body=(True,)))
  238. await msg_queue.put(jeepney.low_level.Message(header=None, body=(False,)))
  239. dbus_router_mock.filter = unittest.mock.MagicMock()
  240. dbus_router_mock.filter.return_value.__enter__.return_value = msg_queue
  241. state_mock.monitored_system_unit_names = monitored_system_unit_names
  242. # asyncio.TaskGroup added in python3.11
  243. loop_task = asyncio.create_task(
  244. systemctl_mqtt._dbus_signal_loop(
  245. state=state_mock, mqtt_client=unittest.mock.MagicMock()
  246. )
  247. )
  248. async def _abort_after_msg_queue():
  249. await msg_queue.join()
  250. loop_task.cancel()
  251. with pytest.raises(asyncio.exceptions.CancelledError):
  252. await asyncio.gather(*(loop_task, _abort_after_msg_queue()))
  253. assert unittest.mock.call(bus="SYSTEM") in open_dbus_router_mock.call_args_list
  254. dbus_router_mock.filter.assert_called_once()
  255. (filter_match_rule,) = dbus_router_mock.filter.call_args[0]
  256. assert (
  257. filter_match_rule.header_fields["interface"] == "org.freedesktop.login1.Manager"
  258. )
  259. assert filter_match_rule.header_fields["member"] == "PrepareForShutdown"
  260. add_match_msg = dbus_router_mock.send_and_get_reply.call_args[0][0]
  261. assert (
  262. add_match_msg.header.fields[jeepney.low_level.HeaderFields.member] == "AddMatch"
  263. )
  264. assert add_match_msg.body == (
  265. "interface='org.freedesktop.login1.Manager',member='PrepareForShutdown'"
  266. ",path='/org/freedesktop/login1',type='signal'",
  267. )
  268. assert [
  269. c[1]["active"] for c in state_mock.preparing_for_shutdown_handler.call_args_list
  270. ] == [False, True, False]
  271. assert not any(args for args, _ in dbus_signal_loop_unit_mock.await_args_list)
  272. dbus_signal_loop_unit_kwargs = [
  273. kwargs for _, kwargs in dbus_signal_loop_unit_mock.await_args_list
  274. ]
  275. assert [(a["unit_name"], a["unit_path"]) for a in dbus_signal_loop_unit_kwargs] == [
  276. (n, f"/org/freedesktop/systemd1/unit/{n}") for n in monitored_system_unit_names
  277. ]
  278. def _mock_get_active_state_reply(state: str) -> unittest.mock.MagicMock:
  279. reply_mock = unittest.mock.MagicMock()
  280. reply_mock.body = (("s", state),)
  281. return reply_mock
  282. @pytest.mark.asyncio
  283. async def test__dbus_signal_loop_unit() -> None:
  284. state = systemctl_mqtt._State(
  285. mqtt_topic_prefix="prefix",
  286. homeassistant_discovery_prefix="unused",
  287. homeassistant_discovery_object_id="unused",
  288. poweroff_delay=datetime.timedelta(),
  289. monitored_system_unit_names=[],
  290. )
  291. mqtt_client_mock = unittest.mock.AsyncMock()
  292. dbus_router_mock = unittest.mock.AsyncMock()
  293. bus_proxy_mock = unittest.mock.AsyncMock()
  294. bus_proxy_mock.AddMatch.return_value = ()
  295. get_active_state_reply_mock = unittest.mock.MagicMock()
  296. get_active_state_reply_mock.body = (("s", "active"),)
  297. states = [
  298. "active",
  299. "deactivating",
  300. "inactive",
  301. "inactive",
  302. "activating",
  303. "active",
  304. "active",
  305. "active",
  306. "inactive",
  307. ]
  308. dbus_router_mock.send_and_get_reply.side_effect = [
  309. _mock_get_active_state_reply(s) for s in states
  310. ]
  311. msg_queue: asyncio.Queue[jeepney.low_level.Message] = asyncio.Queue()
  312. for _ in range(len(states) - 1):
  313. await msg_queue.put(jeepney.low_level.Message(header=None, body=()))
  314. dbus_router_mock.filter = unittest.mock.MagicMock()
  315. dbus_router_mock.filter.return_value.__enter__.return_value = msg_queue
  316. loop_task = asyncio.create_task(
  317. systemctl_mqtt._dbus_signal_loop_unit(
  318. state=state,
  319. mqtt_client=mqtt_client_mock,
  320. dbus_router=dbus_router_mock,
  321. bus_proxy=bus_proxy_mock,
  322. unit_name="foo.service",
  323. unit_path="/org/freedesktop/systemd1/unit/whatever.service",
  324. )
  325. )
  326. async def _abort_after_msg_queue():
  327. await msg_queue.join()
  328. loop_task.cancel()
  329. with pytest.raises(asyncio.exceptions.CancelledError):
  330. await asyncio.gather(*(loop_task, _abort_after_msg_queue()))
  331. bus_proxy_mock.AddMatch.assert_awaited_once()
  332. ((match_rule,), add_match_kwargs) = bus_proxy_mock.AddMatch.await_args
  333. assert match_rule.header_fields["interface"] == "org.freedesktop.DBus.Properties"
  334. assert match_rule.header_fields["member"] == "PropertiesChanged"
  335. assert not add_match_kwargs
  336. assert mqtt_client_mock.publish.await_args_list == [
  337. unittest.mock.call(
  338. topic="prefix/unit/system/foo.service/active-state", payload=s
  339. )
  340. for s in [ # consecutive duplicates filtered
  341. "active",
  342. "deactivating",
  343. "inactive",
  344. "activating",
  345. "active",
  346. "inactive",
  347. ]
  348. ]