test_mqtt.py 19 KB


  1. # systemctl-mqtt - MQTT client triggering & reporting shutdown on systemd-based systems
  2. #
  3. # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. import datetime
  18. import logging
  19. import ssl
  20. import unittest.mock
  21. import aiomqtt
  22. import jeepney.fds
  23. import jeepney.low_level
  24. import pytest
  25. import systemctl_mqtt
  26. # pylint: disable=protected-access,too-many-positional-arguments
  27. @pytest.mark.asyncio
  28. @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
  29. @pytest.mark.parametrize("mqtt_port", [1883])
  30. @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host", "system/command"])
  31. @pytest.mark.parametrize("homeassistant_discovery_prefix", ["homeassistant"])
  32. @pytest.mark.parametrize("homeassistant_discovery_object_id", ["host", "node"])
  33. async def test__run(
  34. caplog,
  35. mqtt_host,
  36. mqtt_port,
  37. mqtt_topic_prefix,
  38. homeassistant_discovery_prefix,
  39. homeassistant_discovery_object_id,
  40. ):
  41. # pylint: disable=too-many-locals,too-many-arguments
  42. caplog.set_level(logging.DEBUG)
  43. login_manager_mock = unittest.mock.MagicMock()
  44. with unittest.mock.patch(
  45. "aiomqtt.Client", autospec=False
  46. ) as mqtt_client_class_mock, unittest.mock.patch(
  47. "systemctl_mqtt._dbus.login_manager.get_login_manager_proxy",
  48. return_value=login_manager_mock,
  49. ), unittest.mock.patch(
  50. "systemctl_mqtt._dbus_signal_loop"
  51. ) as dbus_signal_loop_mock:
  52. login_manager_mock.Inhibit.return_value = (jeepney.fds.FileDescriptor(-1),)
  53. login_manager_mock.Get.return_value = (("b", False),)
  54. await systemctl_mqtt._run(
  55. mqtt_host=mqtt_host,
  56. mqtt_port=mqtt_port,
  57. mqtt_username=None,
  58. mqtt_password=None,
  59. mqtt_topic_prefix=mqtt_topic_prefix,
  60. homeassistant_discovery_prefix=homeassistant_discovery_prefix,
  61. homeassistant_discovery_object_id=homeassistant_discovery_object_id,
  62. poweroff_delay=datetime.timedelta(),
  63. monitored_system_unit_names=[],
  64. controlled_system_unit_names=[],
  65. )
  66. assert caplog.records[0].levelno == logging.INFO
  67. assert caplog.records[0].message == (
  68. f"connecting to MQTT broker {mqtt_host}:{mqtt_port} (TLS enabled)"
  69. )
  70. mqtt_client_class_mock.assert_called_once()
  71. _, mqtt_client_init_kwargs = mqtt_client_class_mock.call_args
  72. assert mqtt_client_init_kwargs.pop("hostname") == mqtt_host
  73. assert mqtt_client_init_kwargs.pop("port") == mqtt_port
  74. assert isinstance(mqtt_client_init_kwargs.pop("tls_context"), ssl.SSLContext)
  75. assert mqtt_client_init_kwargs.pop("username") is None
  76. assert mqtt_client_init_kwargs.pop("password") is None
  77. assert mqtt_client_init_kwargs.pop("will") == aiomqtt.Will(
  78. topic=mqtt_topic_prefix + "/status",
  79. payload="offline",
  80. qos=0,
  81. retain=True,
  82. properties=None,
  83. )
  84. assert not mqtt_client_init_kwargs
  85. login_manager_mock.Inhibit.assert_called_once_with(
  86. what="shutdown",
  87. who="systemctl-mqtt",
  88. why="Report shutdown via MQTT",
  89. mode="delay",
  90. )
  91. login_manager_mock.Get.assert_called_once_with("PreparingForShutdown")
  92. async with mqtt_client_class_mock() as mqtt_client_mock:
  93. pass
  94. assert mqtt_client_mock.publish.call_count == 4
  95. assert (
  96. mqtt_client_mock.publish.call_args_list[0][1]["topic"]
  97. == f"{homeassistant_discovery_prefix}/device/{homeassistant_discovery_object_id}/config"
  98. )
  99. assert mqtt_client_mock.publish.call_args_list[1] == unittest.mock.call(
  100. topic=mqtt_topic_prefix + "/preparing-for-shutdown",
  101. payload="false",
  102. retain=False,
  103. )
  104. assert mqtt_client_mock.publish.call_args_list[2][1] == {
  105. "topic": mqtt_topic_prefix + "/status",
  106. "payload": "online",
  107. "retain": True,
  108. }
  109. assert mqtt_client_mock.publish.call_args_list[3][1] == {
  110. "topic": mqtt_topic_prefix + "/status",
  111. "payload": "offline",
  112. "retain": True,
  113. }
  114. assert sorted(mqtt_client_mock.subscribe.call_args_list) == [
  115. unittest.mock.call(mqtt_topic_prefix + "/lock-all-sessions"),
  116. unittest.mock.call(mqtt_topic_prefix + "/poweroff"),
  117. unittest.mock.call(mqtt_topic_prefix + "/suspend"),
  118. ]
  119. assert caplog.records[1].levelno == logging.DEBUG
  120. assert (
  121. caplog.records[1].message == f"connected to MQTT broker {mqtt_host}:{mqtt_port}"
  122. )
  123. assert caplog.records[2].levelno == logging.DEBUG
  124. assert caplog.records[2].message == "acquired shutdown inhibitor lock"
  125. assert caplog.records[3].levelno == logging.DEBUG
  126. assert (
  127. caplog.records[3].message
  128. == "publishing home assistant config on "
  129. + homeassistant_discovery_prefix
  130. + "/device/"
  131. + homeassistant_discovery_object_id
  132. + "/config"
  133. )
  134. assert caplog.records[4].levelno == logging.INFO
  135. assert (
  136. caplog.records[4].message
  137. == f"publishing 'false' on {mqtt_topic_prefix}/preparing-for-shutdown"
  138. )
  139. assert all(r.levelno == logging.INFO for r in caplog.records[5::2])
  140. assert {r.message for r in caplog.records[5:]} == {
  141. f"subscribing to {mqtt_topic_prefix}/{s}"
  142. for s in ("poweroff", "lock-all-sessions", "suspend")
  143. }
  144. dbus_signal_loop_mock.assert_awaited_once()
  145. @pytest.mark.asyncio
  146. @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
  147. @pytest.mark.parametrize("mqtt_port", [1833])
  148. @pytest.mark.parametrize("mqtt_disable_tls", [True, False])
  149. async def test__run_tls(caplog, mqtt_host, mqtt_port, mqtt_disable_tls):
  150. caplog.set_level(logging.INFO)
  151. with unittest.mock.patch(
  152. "aiomqtt.Client"
  153. ) as mqtt_client_class_mock, unittest.mock.patch(
  154. "systemctl_mqtt._dbus_signal_loop"
  155. ) as dbus_signal_loop_mock:
  156. await systemctl_mqtt._run(
  157. mqtt_host=mqtt_host,
  158. mqtt_port=mqtt_port,
  159. mqtt_disable_tls=mqtt_disable_tls,
  160. mqtt_username=None,
  161. mqtt_password=None,
  162. mqtt_topic_prefix="systemctl/hosts",
  163. homeassistant_discovery_prefix="homeassistant",
  164. homeassistant_discovery_object_id="host",
  165. poweroff_delay=datetime.timedelta(),
  166. monitored_system_unit_names=[],
  167. controlled_system_unit_names=[],
  168. )
  169. mqtt_client_class_mock.assert_called_once()
  170. _, mqtt_client_init_kwargs = mqtt_client_class_mock.call_args
  171. assert mqtt_client_init_kwargs.pop("hostname") == mqtt_host
  172. assert mqtt_client_init_kwargs.pop("port") == mqtt_port
  173. if mqtt_disable_tls:
  174. assert mqtt_client_init_kwargs.pop("tls_context") is None
  175. else:
  176. assert isinstance(mqtt_client_init_kwargs.pop("tls_context"), ssl.SSLContext)
  177. assert set(mqtt_client_init_kwargs.keys()) == {"username", "password", "will"}
  178. assert caplog.records[0].levelno == logging.INFO
  179. assert caplog.records[0].message == (
  180. f"connecting to MQTT broker {mqtt_host}:{mqtt_port}"
  181. f" (TLS {'disabled' if mqtt_disable_tls else 'enabled'})"
  182. )
  183. dbus_signal_loop_mock.assert_awaited_once()
  184. @pytest.mark.asyncio
  185. async def test__run_tls_default():
  186. with unittest.mock.patch(
  187. "aiomqtt.Client"
  188. ) as mqtt_client_class_mock, unittest.mock.patch(
  189. "systemctl_mqtt._dbus_signal_loop"
  190. ) as dbus_signal_loop_mock:
  191. await systemctl_mqtt._run(
  192. mqtt_host="mqtt-broker.local",
  193. mqtt_port=1883,
  194. # mqtt_disable_tls default,
  195. mqtt_username=None,
  196. mqtt_password=None,
  197. mqtt_topic_prefix="systemctl/hosts",
  198. homeassistant_discovery_prefix="homeassistant",
  199. homeassistant_discovery_object_id="host",
  200. poweroff_delay=datetime.timedelta(),
  201. monitored_system_unit_names=[],
  202. controlled_system_unit_names=[],
  203. )
  204. mqtt_client_class_mock.assert_called_once()
  205. # enabled by default
  206. assert isinstance(
  207. mqtt_client_class_mock.call_args[1]["tls_context"], ssl.SSLContext
  208. )
  209. dbus_signal_loop_mock.assert_awaited_once()
  210. @pytest.mark.asyncio
  211. @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
  212. @pytest.mark.parametrize("mqtt_port", [1883])
  213. @pytest.mark.parametrize("mqtt_username", ["me"])
  214. @pytest.mark.parametrize("mqtt_password", [None, "secret"])
  215. @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host"])
  216. async def test__run_authentication(
  217. mqtt_host, mqtt_port, mqtt_username, mqtt_password, mqtt_topic_prefix
  218. ):
  219. with unittest.mock.patch(
  220. "aiomqtt.Client"
  221. ) as mqtt_client_class_mock, unittest.mock.patch(
  222. "systemctl_mqtt._dbus_signal_loop"
  223. ) as dbus_signal_loop_mock:
  224. await systemctl_mqtt._run(
  225. mqtt_host=mqtt_host,
  226. mqtt_port=mqtt_port,
  227. mqtt_username=mqtt_username,
  228. mqtt_password=mqtt_password,
  229. mqtt_topic_prefix=mqtt_topic_prefix,
  230. homeassistant_discovery_prefix="discovery-prefix",
  231. homeassistant_discovery_object_id="node-id",
  232. poweroff_delay=datetime.timedelta(),
  233. monitored_system_unit_names=[],
  234. controlled_system_unit_names=[],
  235. )
  236. mqtt_client_class_mock.assert_called_once()
  237. _, mqtt_client_init_kwargs = mqtt_client_class_mock.call_args
  238. assert mqtt_client_init_kwargs["username"] == mqtt_username
  239. if mqtt_password:
  240. assert mqtt_client_init_kwargs["password"] == mqtt_password
  241. else:
  242. assert mqtt_client_init_kwargs["password"] is None
  243. dbus_signal_loop_mock.assert_awaited_once()
  244. @pytest.mark.asyncio
  245. @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
  246. @pytest.mark.parametrize("mqtt_port", [1883])
  247. @pytest.mark.parametrize("mqtt_password", ["secret"])
  248. async def test__run_authentication_missing_username(
  249. mqtt_host: str, mqtt_port: int, mqtt_password: str
  250. ) -> None:
  251. with unittest.mock.patch("aiomqtt.Client"), unittest.mock.patch(
  252. "systemctl_mqtt._dbus.login_manager.get_login_manager_proxy"
  253. ), unittest.mock.patch("systemctl_mqtt._dbus_signal_loop") as dbus_signal_loop_mock:
  254. with pytest.raises(ValueError, match=r"^Missing MQTT username$"):
  255. await systemctl_mqtt._run(
  256. mqtt_host=mqtt_host,
  257. mqtt_port=mqtt_port,
  258. mqtt_username=None,
  259. mqtt_password=mqtt_password,
  260. mqtt_topic_prefix="prefix",
  261. homeassistant_discovery_prefix="discovery-prefix",
  262. homeassistant_discovery_object_id="node-id",
  263. poweroff_delay=datetime.timedelta(),
  264. monitored_system_unit_names=[],
  265. controlled_system_unit_names=[],
  266. )
  267. dbus_signal_loop_mock.assert_not_called()
  268. @pytest.mark.asyncio
  269. @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host"])
  270. async def test__run_sigint(mqtt_topic_prefix: str):
  271. login_manager_mock = unittest.mock.MagicMock()
  272. with unittest.mock.patch(
  273. "aiomqtt.Client", autospec=False
  274. ) as mqtt_client_class_mock, unittest.mock.patch(
  275. "systemctl_mqtt._dbus.login_manager.get_login_manager_proxy",
  276. return_value=login_manager_mock,
  277. ), unittest.mock.patch(
  278. "asyncio.gather", side_effect=KeyboardInterrupt
  279. ):
  280. login_manager_mock.Inhibit.return_value = (jeepney.fds.FileDescriptor(-1),)
  281. login_manager_mock.Get.return_value = (("b", False),)
  282. with pytest.raises(KeyboardInterrupt):
  283. await systemctl_mqtt._run(
  284. mqtt_host="mqtt-broker.local",
  285. mqtt_port=1883,
  286. mqtt_username=None,
  287. mqtt_password=None,
  288. mqtt_topic_prefix=mqtt_topic_prefix,
  289. homeassistant_discovery_prefix="homeassistant",
  290. homeassistant_discovery_object_id="host",
  291. poweroff_delay=datetime.timedelta(),
  292. monitored_system_unit_names=[],
  293. controlled_system_unit_names=[],
  294. )
  295. async with mqtt_client_class_mock() as mqtt_client_mock:
  296. pass
  297. assert mqtt_client_mock.publish.call_count == 4
  298. assert mqtt_client_mock.publish.call_args_list[0][1]["topic"].endswith("/config")
  299. assert mqtt_client_mock.publish.call_args_list[1][1]["topic"].endswith(
  300. "/preparing-for-shutdown"
  301. )
  302. assert mqtt_client_mock.publish.call_args_list[2][1] == {
  303. "topic": mqtt_topic_prefix + "/status",
  304. "payload": "online",
  305. "retain": True,
  306. }
  307. assert mqtt_client_mock.publish.call_args_list[3][1] == {
  308. "topic": mqtt_topic_prefix + "/status",
  309. "payload": "offline",
  310. "retain": True,
  311. }
  312. @pytest.mark.asyncio
  313. @pytest.mark.filterwarnings("ignore:coroutine '_dbus_signal_loop' was never awaited")
  314. @pytest.mark.filterwarnings("ignore:coroutine '_mqtt_message_loop' was never awaited")
  315. @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host", "system/command"])
  316. async def test__mqtt_message_loop_trigger_poweroff(
  317. caplog: pytest.LogCaptureFixture, mqtt_topic_prefix: str
  318. ) -> None:
  319. state = systemctl_mqtt._State(
  320. mqtt_topic_prefix=mqtt_topic_prefix,
  321. homeassistant_discovery_prefix="homeassistant",
  322. homeassistant_discovery_object_id="whatever",
  323. poweroff_delay=datetime.timedelta(seconds=21),
  324. monitored_system_unit_names=[],
  325. controlled_system_unit_names=[],
  326. )
  327. mqtt_client_mock = unittest.mock.AsyncMock()
  328. mqtt_client_mock.messages.__aiter__.return_value = [
  329. aiomqtt.Message(
  330. topic=mqtt_topic_prefix + "/poweroff",
  331. payload=b"some-payload",
  332. qos=0,
  333. retain=False,
  334. mid=42 // 2,
  335. properties=None,
  336. )
  337. ]
  338. with unittest.mock.patch(
  339. "systemctl_mqtt._dbus.login_manager.schedule_shutdown"
  340. ) as schedule_shutdown_mock, caplog.at_level(logging.DEBUG):
  341. await systemctl_mqtt._mqtt_message_loop(
  342. state=state, mqtt_client=mqtt_client_mock
  343. )
  344. assert sorted(mqtt_client_mock.subscribe.await_args_list) == [
  345. unittest.mock.call(mqtt_topic_prefix + "/lock-all-sessions"),
  346. unittest.mock.call(mqtt_topic_prefix + "/poweroff"),
  347. unittest.mock.call(mqtt_topic_prefix + "/suspend"),
  348. ]
  349. schedule_shutdown_mock.assert_called_once_with(
  350. action="poweroff", delay=datetime.timedelta(seconds=21)
  351. )
  352. assert [
  353. t for t in caplog.record_tuples[2:] if not t[2].startswith("subscribing to ")
  354. ] == [
  355. (
  356. "systemctl_mqtt",
  357. logging.DEBUG,
  358. f"received message on topic '{mqtt_topic_prefix}/poweroff': b'some-payload'",
  359. ),
  360. ]
  361. @pytest.mark.asyncio
  362. @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host"])
  363. async def test__mqtt_message_loop_retained(
  364. caplog: pytest.LogCaptureFixture, mqtt_topic_prefix: str
  365. ) -> None:
  366. state = systemctl_mqtt._State(
  367. mqtt_topic_prefix=mqtt_topic_prefix,
  368. homeassistant_discovery_prefix="homeassistant",
  369. homeassistant_discovery_object_id="whatever",
  370. poweroff_delay=datetime.timedelta(seconds=21),
  371. monitored_system_unit_names=[],
  372. controlled_system_unit_names=[],
  373. )
  374. mqtt_client_mock = unittest.mock.AsyncMock()
  375. mqtt_client_mock.messages.__aiter__.return_value = [
  376. aiomqtt.Message(
  377. topic=mqtt_topic_prefix + "/poweroff",
  378. payload=b"some-payload",
  379. qos=0,
  380. retain=True,
  381. mid=42 // 2,
  382. properties=None,
  383. )
  384. ]
  385. with unittest.mock.patch(
  386. "systemctl_mqtt._dbus.login_manager.schedule_shutdown"
  387. ) as schedule_shutdown_mock, caplog.at_level(logging.DEBUG):
  388. await systemctl_mqtt._mqtt_message_loop(
  389. state=state, mqtt_client=mqtt_client_mock
  390. )
  391. schedule_shutdown_mock.assert_not_called()
  392. assert [
  393. t for t in caplog.record_tuples[2:] if not t[2].startswith("subscribing to ")
  394. ] == [
  395. (
  396. "systemctl_mqtt",
  397. logging.INFO,
  398. "ignoring retained message on topic 'systemctl/host/poweroff'",
  399. ),
  400. ]
  401. @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host", "systemd/raspberrypi"])
  402. @pytest.mark.parametrize("unit_name", ["foo.service", "bar.service"])
  403. def test_state_get_system_unit_active_state_mqtt_topic(
  404. mqtt_topic_prefix: str, unit_name: str
  405. ) -> None:
  406. state = systemctl_mqtt._State(
  407. mqtt_topic_prefix=mqtt_topic_prefix,
  408. homeassistant_discovery_prefix="homeassistant",
  409. homeassistant_discovery_object_id="whatever",
  410. poweroff_delay=datetime.timedelta(seconds=21),
  411. monitored_system_unit_names=[],
  412. controlled_system_unit_names=[],
  413. )
  414. assert (
  415. state.get_system_unit_active_state_mqtt_topic(unit_name=unit_name)
  416. == f"{mqtt_topic_prefix}/unit/system/{unit_name}/active-state"
  417. )
  418. @pytest.mark.asyncio
  419. @pytest.mark.filterwarnings("ignore:coroutine '_dbus_signal_loop' was never awaited")
  420. @pytest.mark.filterwarnings("ignore:coroutine '_mqtt_message_loop' was never awaited")
  421. @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host"])
  422. @pytest.mark.parametrize("unit_name", ["foo.service", "bar.service"])
  423. async def test__mqtt_message_loop_trigger_restart(
  424. caplog: pytest.LogCaptureFixture, mqtt_topic_prefix: str, unit_name: str
  425. ) -> None:
  426. state = systemctl_mqtt._State(
  427. mqtt_topic_prefix=mqtt_topic_prefix,
  428. homeassistant_discovery_prefix="homeassistant",
  429. homeassistant_discovery_object_id="whatever",
  430. poweroff_delay=datetime.timedelta(seconds=21),
  431. monitored_system_unit_names=[],
  432. controlled_system_unit_names=[unit_name],
  433. )
  434. mqtt_client_mock = unittest.mock.AsyncMock()
  435. topic = f"{mqtt_topic_prefix}/unit/system/{unit_name}/restart"
  436. mqtt_client_mock.messages.__aiter__.return_value = [
  437. aiomqtt.Message(
  438. topic=topic,
  439. payload=b"some-payload",
  440. qos=0,
  441. retain=False,
  442. mid=42 // 2,
  443. properties=None,
  444. )
  445. ]
  446. with unittest.mock.patch(
  447. "systemctl_mqtt._dbus.service_manager.restart_unit"
  448. ) as trigger_service_restart_mock, caplog.at_level(logging.DEBUG):
  449. await systemctl_mqtt._mqtt_message_loop(
  450. state=state, mqtt_client=mqtt_client_mock
  451. )
  452. assert unittest.mock.call(topic) in mqtt_client_mock.subscribe.await_args_list
  453. trigger_service_restart_mock.assert_called_once_with(unit_name=unit_name)
  454. assert [
  455. t for t in caplog.record_tuples[2:] if not t[2].startswith("subscribing to ")
  456. ] == [
  457. (
  458. "systemctl_mqtt",
  459. logging.DEBUG,
  460. f"received message on topic '{topic}': b'some-payload'",
  461. ),
  462. ]