1
0

test_dbus.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. # systemctl-mqtt - MQTT client triggering & reporting shutdown on systemd-based systems
  2. #
  3. # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. import asyncio
  18. import datetime
  19. import getpass
  20. import logging
  21. import typing
  22. import unittest.mock
  23. import jeepney
  24. import jeepney.low_level
  25. import jeepney.wrappers
  26. import pytest
  27. import systemctl_mqtt._dbus.login_manager
  28. # pylint: disable=protected-access
  29. def test_get_login_manager_proxy():
  30. login_manager = systemctl_mqtt._dbus.login_manager.get_login_manager_proxy()
  31. assert isinstance(login_manager, jeepney.io.blocking.Proxy)
  32. assert login_manager._msggen.interface == "org.freedesktop.login1.Manager"
  33. # https://freedesktop.org/wiki/Software/systemd/logind/
  34. assert login_manager.CanPowerOff() in {("yes",), ("challenge",)}
  35. def test_get_service_manager_proxy():
  36. service_manager = systemctl_mqtt._dbus.service_manager.get_service_manager_proxy()
  37. assert isinstance(service_manager, jeepney.io.blocking.Proxy)
  38. assert service_manager._msggen.interface == "org.freedesktop.systemd1.Manager"
  39. def test__log_shutdown_inhibitors_some(caplog):
  40. login_manager = unittest.mock.MagicMock()
  41. login_manager.ListInhibitors.return_value = (
  42. [
  43. (
  44. "shutdown:sleep",
  45. "Developer",
  46. "Haven't pushed my commits yet",
  47. "delay",
  48. 1000,
  49. 1234,
  50. ),
  51. ("shutdown", "Editor", "", "Unsafed files open", 0, 42),
  52. ],
  53. )
  54. with caplog.at_level(logging.DEBUG):
  55. systemctl_mqtt._dbus.login_manager._log_shutdown_inhibitors(login_manager)
  56. assert len(caplog.records) == 2
  57. assert caplog.records[0].levelno == logging.DEBUG
  58. assert (
  59. caplog.records[0].message
  60. == "detected shutdown inhibitor Developer (pid=1234, uid=1000, mode=delay): "
  61. + "Haven't pushed my commits yet"
  62. )
  63. def test__log_shutdown_inhibitors_none(caplog):
  64. login_manager = unittest.mock.MagicMock()
  65. login_manager.ListInhibitors.return_value = ([],)
  66. with caplog.at_level(logging.DEBUG):
  67. systemctl_mqtt._dbus.login_manager._log_shutdown_inhibitors(login_manager)
  68. assert len(caplog.records) == 1
  69. assert caplog.records[0].levelno == logging.DEBUG
  70. assert caplog.records[0].message == "no shutdown inhibitor locks found"
  71. def test__log_shutdown_inhibitors_fail(caplog):
  72. login_manager = unittest.mock.MagicMock()
  73. login_manager.ListInhibitors.side_effect = DBusErrorResponseMock("error", "mocked")
  74. with caplog.at_level(logging.DEBUG):
  75. systemctl_mqtt._dbus.login_manager._log_shutdown_inhibitors(login_manager)
  76. assert len(caplog.records) == 1
  77. assert caplog.records[0].levelno == logging.WARNING
  78. assert (
  79. caplog.records[0].message
  80. == "failed to fetch shutdown inhibitors: [error] mocked"
  81. )
  82. @pytest.mark.parametrize("action", ["poweroff", "reboot"])
  83. @pytest.mark.parametrize("delay", [datetime.timedelta(0), datetime.timedelta(hours=1)])
  84. def test__schedule_shutdown(action, delay):
  85. login_manager_mock = unittest.mock.MagicMock()
  86. with unittest.mock.patch(
  87. "systemctl_mqtt._dbus.login_manager.get_login_manager_proxy",
  88. return_value=login_manager_mock,
  89. ):
  90. login_manager_mock.ListInhibitors.return_value = ([],)
  91. systemctl_mqtt._dbus.login_manager.schedule_shutdown(action=action, delay=delay)
  92. login_manager_mock.ScheduleShutdown.assert_called_once()
  93. schedule_args, schedule_kwargs = login_manager_mock.ScheduleShutdown.call_args
  94. assert not schedule_args
  95. assert schedule_kwargs.pop("action") == action
  96. actual_delay = schedule_kwargs.pop("time") - datetime.datetime.now()
  97. assert actual_delay.total_seconds() == pytest.approx(delay.total_seconds(), abs=0.1)
  98. assert not schedule_kwargs
  99. class DBusErrorResponseMock(jeepney.wrappers.DBusErrorResponse):
  100. # pylint: disable=missing-class-docstring,super-init-not-called
  101. def __init__(self, name: str, data: typing.Any):
  102. self.name = name
  103. self.data = data
  104. @pytest.mark.parametrize(
  105. ("action", "error_name", "error_message", "log_message"),
  106. [
  107. (
  108. "poweroff",
  109. "test error",
  110. "test message",
  111. "[test error] ('test message',)",
  112. ),
  113. (
  114. "poweroff",
  115. "org.freedesktop.DBus.Error.InteractiveAuthorizationRequired",
  116. "Interactive authentication required.",
  117. """interactive authorization required
  118. create /etc/polkit-1/rules.d/50-systemctl-mqtt.rules and insert the following rule:
  119. polkit.addRule(function(action, subject) {
  120. if(action.id === "org.freedesktop.login1.power-off" && subject.user === "{{username}}") {
  121. return polkit.Result.YES;
  122. }
  123. });
  124. """.replace(
  125. "{{username}}", getpass.getuser()
  126. ),
  127. ),
  128. (
  129. "reboot",
  130. "org.freedesktop.DBus.Error.InteractiveAuthorizationRequired",
  131. "Interactive authentication required.",
  132. """interactive authorization required
  133. create /etc/polkit-1/rules.d/50-systemctl-mqtt.rules and insert the following rule:
  134. polkit.addRule(function(action, subject) {
  135. if(action.id === "org.freedesktop.login1.reboot" && subject.user === "{{username}}") {
  136. return polkit.Result.YES;
  137. }
  138. });
  139. """.replace(
  140. "{{username}}", getpass.getuser()
  141. ),
  142. ),
  143. ],
  144. )
  145. def test__schedule_shutdown_fail(
  146. caplog, action: str, error_name: str, error_message: str, log_message: str
  147. ) -> None:
  148. login_manager_mock = unittest.mock.MagicMock()
  149. login_manager_mock.ScheduleShutdown.side_effect = DBusErrorResponseMock(
  150. name=error_name,
  151. data=(error_message,),
  152. )
  153. login_manager_mock.ListInhibitors.return_value = ([],)
  154. with unittest.mock.patch(
  155. "systemctl_mqtt._dbus.login_manager.get_login_manager_proxy",
  156. return_value=login_manager_mock,
  157. ), caplog.at_level(logging.DEBUG):
  158. systemctl_mqtt._dbus.login_manager.schedule_shutdown(
  159. action=action, delay=datetime.timedelta(seconds=21)
  160. )
  161. login_manager_mock.ScheduleShutdown.assert_called_once()
  162. assert len(caplog.records) == 3
  163. assert caplog.records[0].levelno == logging.INFO
  164. assert caplog.records[0].message.startswith(f"scheduling {action} for ")
  165. assert caplog.records[1].levelno == logging.ERROR
  166. assert caplog.records[1].message == f"failed to schedule {action}: {log_message}"
  167. assert "inhibitor" in caplog.records[2].message
  168. @pytest.mark.parametrize("action", ["poweroff"])
  169. @pytest.mark.parametrize(
  170. ("error_name", "error_message", "log_message"),
  171. [
  172. (
  173. "org.freedesktop.DBus.Error.InteractiveAuthorizationRequired",
  174. "Interactive authentication required.",
  175. """interactive authorization required
  176. create /etc/polkit-1/rules.d/50-systemctl-mqtt.rules and insert the following rule:
  177. polkit.addRule(function(action, subject) {
  178. if(action.id === "org.freedesktop.login1.power-off" && subject.user === "USERNAME") {
  179. return polkit.Result.YES;
  180. }
  181. });
  182. """,
  183. ),
  184. ],
  185. )
  186. def test__schedule_shutdown_fail_no_username(
  187. caplog, action, error_name, error_message, log_message
  188. ):
  189. login_manager_mock = unittest.mock.MagicMock()
  190. login_manager_mock.ScheduleShutdown.side_effect = DBusErrorResponseMock(
  191. name=error_name,
  192. data=(error_message,),
  193. )
  194. login_manager_mock.ListInhibitors.return_value = ([],)
  195. with unittest.mock.patch(
  196. "systemctl_mqtt._dbus.login_manager.get_login_manager_proxy",
  197. return_value=login_manager_mock,
  198. ), unittest.mock.patch(
  199. "getpass.getuser", side_effect=OSError("No username set in the environment")
  200. ), caplog.at_level(
  201. logging.ERROR
  202. ):
  203. systemctl_mqtt._dbus.login_manager.schedule_shutdown(
  204. action=action, delay=datetime.timedelta(seconds=21)
  205. )
  206. login_manager_mock.ScheduleShutdown.assert_called_once()
  207. assert len(caplog.records) == 1
  208. assert caplog.records[0].levelno == logging.ERROR
  209. assert caplog.records[0].message == f"failed to schedule {action}: {log_message}"
  210. def test_suspend(caplog):
  211. login_manager_mock = unittest.mock.MagicMock()
  212. with unittest.mock.patch(
  213. "systemctl_mqtt._dbus.login_manager.get_login_manager_proxy",
  214. return_value=login_manager_mock,
  215. ), caplog.at_level(logging.INFO):
  216. systemctl_mqtt._dbus.login_manager.suspend()
  217. login_manager_mock.Suspend.assert_called_once_with(interactive=False)
  218. assert len(caplog.records) == 1
  219. assert caplog.records[0].levelno == logging.INFO
  220. assert caplog.records[0].message == "suspending system"
  221. def test_lock_all_sessions(caplog):
  222. login_manager_mock = unittest.mock.MagicMock()
  223. with unittest.mock.patch(
  224. "systemctl_mqtt._dbus.login_manager.get_login_manager_proxy",
  225. return_value=login_manager_mock,
  226. ), caplog.at_level(logging.INFO):
  227. systemctl_mqtt._dbus.login_manager.lock_all_sessions()
  228. login_manager_mock.LockSessions.assert_called_once_with()
  229. assert len(caplog.records) == 1
  230. assert caplog.records[0].levelno == logging.INFO
  231. assert caplog.records[0].message == "instruct all sessions to activate screen locks"
  232. @pytest.mark.parametrize(
  233. ("error_name", "error_message", "log_message"),
  234. [
  235. (
  236. "test error",
  237. "test message",
  238. "[test error] ('test message',)",
  239. ),
  240. (
  241. "org.freedesktop.DBus.Error.InteractiveAuthorizationRequired",
  242. "Interactive authentication required.",
  243. """interactive authorization required
  244. create /etc/polkit-1/rules.d/50-systemctl-mqtt.rules and insert the following rule:
  245. polkit.addRule(function(action, subject) {
  246. if(action.id === "org.freedesktop.login1.lock-sessions" && subject.user === "{{username}}") {
  247. return polkit.Result.YES;
  248. }
  249. });
  250. """.replace(
  251. "{{username}}", getpass.getuser()
  252. ),
  253. ),
  254. ],
  255. )
  256. def test_lock_all_sessions_fail(
  257. caplog: pytest.LogCaptureFixture,
  258. error_name: str,
  259. error_message: str,
  260. log_message: str,
  261. ) -> None:
  262. login_manager_mock = unittest.mock.MagicMock()
  263. login_manager_mock.LockSessions.side_effect = DBusErrorResponseMock(
  264. name=error_name, data=(error_message,)
  265. )
  266. with unittest.mock.patch(
  267. "systemctl_mqtt._dbus.login_manager.get_login_manager_proxy",
  268. return_value=login_manager_mock,
  269. ), caplog.at_level(logging.ERROR):
  270. systemctl_mqtt._dbus.login_manager.lock_all_sessions()
  271. login_manager_mock.LockSessions.assert_called_once()
  272. assert len(caplog.records) == 1
  273. assert caplog.records[0].levelno == logging.ERROR
  274. assert caplog.records[0].message == f"failed to lock all sessions: {log_message}"
  275. async def _get_unit_path_mock( # pylint: disable=unused-argument
  276. *, service_manager: jeepney.io.asyncio.Proxy, unit_name: str
  277. ) -> str:
  278. return "/org/freedesktop/systemd1/unit/" + unit_name
  279. @pytest.mark.asyncio
  280. @pytest.mark.parametrize(
  281. "monitored_system_unit_names", [[], ["foo.service", "bar.service"]]
  282. )
  283. async def test__dbus_signal_loop(monitored_system_unit_names: typing.List[str]) -> None:
  284. # pylint: disable=too-many-locals,too-many-arguments
  285. state_mock = unittest.mock.AsyncMock()
  286. with unittest.mock.patch(
  287. "jeepney.io.asyncio.open_dbus_router",
  288. ) as open_dbus_router_mock, unittest.mock.patch(
  289. "systemctl_mqtt._get_unit_path", _get_unit_path_mock
  290. ), unittest.mock.patch(
  291. "systemctl_mqtt._dbus_signal_loop_unit"
  292. ) as dbus_signal_loop_unit_mock:
  293. async with open_dbus_router_mock() as dbus_router_mock:
  294. pass
  295. add_match_reply = unittest.mock.Mock()
  296. add_match_reply.body = ()
  297. dbus_router_mock.send_and_get_reply.return_value = add_match_reply
  298. msg_queue: asyncio.Queue[jeepney.low_level.Message] = asyncio.Queue()
  299. await msg_queue.put(jeepney.low_level.Message(header=None, body=(False,)))
  300. await msg_queue.put(jeepney.low_level.Message(header=None, body=(True,)))
  301. await msg_queue.put(jeepney.low_level.Message(header=None, body=(False,)))
  302. dbus_router_mock.filter = unittest.mock.MagicMock()
  303. dbus_router_mock.filter.return_value.__enter__.return_value = msg_queue
  304. state_mock.monitored_system_unit_names = monitored_system_unit_names
  305. # asyncio.TaskGroup added in python3.11
  306. loop_task = asyncio.create_task(
  307. systemctl_mqtt._dbus_signal_loop(
  308. state=state_mock, mqtt_client=unittest.mock.MagicMock()
  309. )
  310. )
  311. async def _abort_after_msg_queue():
  312. await msg_queue.join()
  313. loop_task.cancel()
  314. with pytest.raises(asyncio.exceptions.CancelledError):
  315. await asyncio.gather(*(loop_task, _abort_after_msg_queue()))
  316. assert unittest.mock.call(bus="SYSTEM") in open_dbus_router_mock.call_args_list
  317. dbus_router_mock.filter.assert_called_once()
  318. (filter_match_rule,) = dbus_router_mock.filter.call_args[0]
  319. assert (
  320. filter_match_rule.header_fields["interface"] == "org.freedesktop.login1.Manager"
  321. )
  322. assert filter_match_rule.header_fields["member"] == "PrepareForShutdown"
  323. add_match_msg = dbus_router_mock.send_and_get_reply.call_args[0][0]
  324. assert (
  325. add_match_msg.header.fields[jeepney.low_level.HeaderFields.member] == "AddMatch"
  326. )
  327. assert add_match_msg.body == (
  328. "interface='org.freedesktop.login1.Manager',member='PrepareForShutdown'"
  329. ",path='/org/freedesktop/login1',type='signal'",
  330. )
  331. assert [
  332. c[1]["active"] for c in state_mock.preparing_for_shutdown_handler.call_args_list
  333. ] == [False, True, False]
  334. assert not any(args for args, _ in dbus_signal_loop_unit_mock.await_args_list)
  335. dbus_signal_loop_unit_kwargs = [
  336. kwargs for _, kwargs in dbus_signal_loop_unit_mock.await_args_list
  337. ]
  338. assert [(a["unit_name"], a["unit_path"]) for a in dbus_signal_loop_unit_kwargs] == [
  339. (n, f"/org/freedesktop/systemd1/unit/{n}") for n in monitored_system_unit_names
  340. ]
  341. def _mock_get_active_state_reply(state: str) -> unittest.mock.MagicMock:
  342. reply_mock = unittest.mock.MagicMock()
  343. reply_mock.body = (("s", state),)
  344. return reply_mock
  345. @pytest.mark.asyncio
  346. async def test__dbus_signal_loop_unit() -> None:
  347. state = systemctl_mqtt._State(
  348. mqtt_topic_prefix="prefix",
  349. homeassistant_discovery_prefix="unused",
  350. homeassistant_discovery_object_id="unused",
  351. poweroff_delay=datetime.timedelta(),
  352. monitored_system_unit_names=[],
  353. controlled_system_unit_names=[],
  354. )
  355. mqtt_client_mock = unittest.mock.AsyncMock()
  356. dbus_router_mock = unittest.mock.AsyncMock()
  357. bus_proxy_mock = unittest.mock.AsyncMock()
  358. bus_proxy_mock.AddMatch.return_value = ()
  359. get_active_state_reply_mock = unittest.mock.MagicMock()
  360. get_active_state_reply_mock.body = (("s", "active"),)
  361. states = [
  362. "active",
  363. "deactivating",
  364. "inactive",
  365. "inactive",
  366. "activating",
  367. "active",
  368. "active",
  369. "active",
  370. "inactive",
  371. ]
  372. dbus_router_mock.send_and_get_reply.side_effect = [
  373. _mock_get_active_state_reply(s) for s in states
  374. ]
  375. msg_queue: asyncio.Queue[jeepney.low_level.Message] = asyncio.Queue()
  376. for _ in range(len(states) - 1):
  377. await msg_queue.put(jeepney.low_level.Message(header=None, body=()))
  378. dbus_router_mock.filter = unittest.mock.MagicMock()
  379. dbus_router_mock.filter.return_value.__enter__.return_value = msg_queue
  380. loop_task = asyncio.create_task(
  381. systemctl_mqtt._dbus_signal_loop_unit(
  382. state=state,
  383. mqtt_client=mqtt_client_mock,
  384. dbus_router=dbus_router_mock,
  385. bus_proxy=bus_proxy_mock,
  386. unit_name="foo.service",
  387. unit_path="/org/freedesktop/systemd1/unit/whatever.service",
  388. )
  389. )
  390. async def _abort_after_msg_queue():
  391. await msg_queue.join()
  392. loop_task.cancel()
  393. with pytest.raises(asyncio.exceptions.CancelledError):
  394. await asyncio.gather(*(loop_task, _abort_after_msg_queue()))
  395. bus_proxy_mock.AddMatch.assert_awaited_once()
  396. ((match_rule,), add_match_kwargs) = bus_proxy_mock.AddMatch.await_args
  397. assert match_rule.header_fields["interface"] == "org.freedesktop.DBus.Properties"
  398. assert match_rule.header_fields["member"] == "PropertiesChanged"
  399. assert not add_match_kwargs
  400. assert mqtt_client_mock.publish.await_args_list == [
  401. unittest.mock.call(
  402. topic="prefix/unit/system/foo.service/active-state", payload=s
  403. )
  404. for s in [ # consecutive duplicates filtered
  405. "active",
  406. "deactivating",
  407. "inactive",
  408. "activating",
  409. "active",
  410. "inactive",
  411. ]
  412. ]