test_mqtt.py 16 KB


  1. # systemctl-mqtt - MQTT client triggering & reporting shutdown on systemd-based systems
  2. #
  3. # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. import datetime
  18. import logging
  19. import ssl
  20. import unittest.mock
  21. import aiomqtt
  22. import jeepney.fds
  23. import jeepney.low_level
  24. import pytest
  25. import systemctl_mqtt
  26. # pylint: disable=protected-access,too-many-positional-arguments
  27. @pytest.mark.asyncio
  28. @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
  29. @pytest.mark.parametrize("mqtt_port", [1833])
  30. @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host", "system/command"])
  31. @pytest.mark.parametrize("homeassistant_discovery_prefix", ["homeassistant"])
  32. @pytest.mark.parametrize("homeassistant_discovery_object_id", ["host", "node"])
  33. async def test__run(
  34. caplog,
  35. mqtt_host,
  36. mqtt_port,
  37. mqtt_topic_prefix,
  38. homeassistant_discovery_prefix,
  39. homeassistant_discovery_object_id,
  40. ):
  41. # pylint: disable=too-many-locals,too-many-arguments
  42. caplog.set_level(logging.DEBUG)
  43. login_manager_mock = unittest.mock.MagicMock()
  44. with unittest.mock.patch(
  45. "aiomqtt.Client", autospec=False
  46. ) as mqtt_client_class_mock, unittest.mock.patch(
  47. "systemctl_mqtt._dbus.login_manager.get_login_manager_proxy",
  48. return_value=login_manager_mock,
  49. ), unittest.mock.patch(
  50. "systemctl_mqtt._dbus_signal_loop"
  51. ) as dbus_signal_loop_mock:
  52. login_manager_mock.Inhibit.return_value = (jeepney.fds.FileDescriptor(-1),)
  53. login_manager_mock.Get.return_value = (("b", False),)
  54. await systemctl_mqtt._run(
  55. mqtt_host=mqtt_host,
  56. mqtt_port=mqtt_port,
  57. mqtt_username=None,
  58. mqtt_password=None,
  59. mqtt_topic_prefix=mqtt_topic_prefix,
  60. homeassistant_discovery_prefix=homeassistant_discovery_prefix,
  61. homeassistant_discovery_object_id=homeassistant_discovery_object_id,
  62. poweroff_delay=datetime.timedelta(),
  63. )
  64. assert caplog.records[0].levelno == logging.INFO
  65. assert caplog.records[0].message == (
  66. f"connecting to MQTT broker {mqtt_host}:{mqtt_port} (TLS enabled)"
  67. )
  68. mqtt_client_class_mock.assert_called_once()
  69. _, mqtt_client_init_kwargs = mqtt_client_class_mock.call_args
  70. assert mqtt_client_init_kwargs.pop("hostname") == mqtt_host
  71. assert mqtt_client_init_kwargs.pop("port") == mqtt_port
  72. assert isinstance(mqtt_client_init_kwargs.pop("tls_context"), ssl.SSLContext)
  73. assert mqtt_client_init_kwargs.pop("username") is None
  74. assert mqtt_client_init_kwargs.pop("password") is None
  75. assert mqtt_client_init_kwargs.pop("will") == aiomqtt.Will(
  76. topic=mqtt_topic_prefix + "/status",
  77. payload="offline",
  78. qos=0,
  79. retain=True,
  80. properties=None,
  81. )
  82. assert not mqtt_client_init_kwargs
  83. login_manager_mock.Inhibit.assert_called_once_with(
  84. what="shutdown",
  85. who="systemctl-mqtt",
  86. why="Report shutdown via MQTT",
  87. mode="delay",
  88. )
  89. login_manager_mock.Get.assert_called_once_with("PreparingForShutdown")
  90. async with mqtt_client_class_mock() as mqtt_client_mock:
  91. pass
  92. assert mqtt_client_mock.publish.call_count == 4
  93. assert (
  94. mqtt_client_mock.publish.call_args_list[0][1]["topic"]
  95. == f"{homeassistant_discovery_prefix}/device/{homeassistant_discovery_object_id}/config"
  96. )
  97. assert mqtt_client_mock.publish.call_args_list[1] == unittest.mock.call(
  98. topic=mqtt_topic_prefix + "/preparing-for-shutdown",
  99. payload="false",
  100. retain=False,
  101. )
  102. assert mqtt_client_mock.publish.call_args_list[2][1] == {
  103. "topic": mqtt_topic_prefix + "/status",
  104. "payload": "online",
  105. "retain": True,
  106. }
  107. assert mqtt_client_mock.publish.call_args_list[3][1] == {
  108. "topic": mqtt_topic_prefix + "/status",
  109. "payload": "offline",
  110. "retain": True,
  111. }
  112. assert sorted(mqtt_client_mock.subscribe.call_args_list) == [
  113. unittest.mock.call(mqtt_topic_prefix + "/lock-all-sessions"),
  114. unittest.mock.call(mqtt_topic_prefix + "/poweroff"),
  115. unittest.mock.call(mqtt_topic_prefix + "/suspend"),
  116. ]
  117. assert caplog.records[1].levelno == logging.DEBUG
  118. assert (
  119. caplog.records[1].message == f"connected to MQTT broker {mqtt_host}:{mqtt_port}"
  120. )
  121. assert caplog.records[2].levelno == logging.DEBUG
  122. assert caplog.records[2].message == "acquired shutdown inhibitor lock"
  123. assert caplog.records[3].levelno == logging.DEBUG
  124. assert (
  125. caplog.records[3].message
  126. == "publishing home assistant config on "
  127. + homeassistant_discovery_prefix
  128. + "/device/"
  129. + homeassistant_discovery_object_id
  130. + "/config"
  131. )
  132. assert caplog.records[4].levelno == logging.INFO
  133. assert (
  134. caplog.records[4].message
  135. == f"publishing 'false' on {mqtt_topic_prefix}/preparing-for-shutdown"
  136. )
  137. assert all(r.levelno == logging.INFO for r in caplog.records[5::2])
  138. assert {r.message for r in caplog.records[5:]} == {
  139. f"subscribing to {mqtt_topic_prefix}/{s}"
  140. for s in ("poweroff", "lock-all-sessions", "suspend")
  141. }
  142. dbus_signal_loop_mock.assert_awaited_once()
  143. @pytest.mark.asyncio
  144. @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
  145. @pytest.mark.parametrize("mqtt_port", [1833])
  146. @pytest.mark.parametrize("mqtt_disable_tls", [True, False])
  147. async def test__run_tls(caplog, mqtt_host, mqtt_port, mqtt_disable_tls):
  148. caplog.set_level(logging.INFO)
  149. with unittest.mock.patch(
  150. "aiomqtt.Client"
  151. ) as mqtt_client_class_mock, unittest.mock.patch(
  152. "systemctl_mqtt._dbus_signal_loop"
  153. ) as dbus_signal_loop_mock:
  154. await systemctl_mqtt._run(
  155. mqtt_host=mqtt_host,
  156. mqtt_port=mqtt_port,
  157. mqtt_disable_tls=mqtt_disable_tls,
  158. mqtt_username=None,
  159. mqtt_password=None,
  160. mqtt_topic_prefix="systemctl/hosts",
  161. homeassistant_discovery_prefix="homeassistant",
  162. homeassistant_discovery_object_id="host",
  163. poweroff_delay=datetime.timedelta(),
  164. )
  165. mqtt_client_class_mock.assert_called_once()
  166. _, mqtt_client_init_kwargs = mqtt_client_class_mock.call_args
  167. assert mqtt_client_init_kwargs.pop("hostname") == mqtt_host
  168. assert mqtt_client_init_kwargs.pop("port") == mqtt_port
  169. if mqtt_disable_tls:
  170. assert mqtt_client_init_kwargs.pop("tls_context") is None
  171. else:
  172. assert isinstance(mqtt_client_init_kwargs.pop("tls_context"), ssl.SSLContext)
  173. assert set(mqtt_client_init_kwargs.keys()) == {"username", "password", "will"}
  174. assert caplog.records[0].levelno == logging.INFO
  175. assert caplog.records[0].message == (
  176. f"connecting to MQTT broker {mqtt_host}:{mqtt_port}"
  177. f" (TLS {'disabled' if mqtt_disable_tls else 'enabled'})"
  178. )
  179. dbus_signal_loop_mock.assert_awaited_once()
  180. @pytest.mark.asyncio
  181. async def test__run_tls_default():
  182. with unittest.mock.patch(
  183. "aiomqtt.Client"
  184. ) as mqtt_client_class_mock, unittest.mock.patch(
  185. "systemctl_mqtt._dbus_signal_loop"
  186. ) as dbus_signal_loop_mock:
  187. await systemctl_mqtt._run(
  188. mqtt_host="mqtt-broker.local",
  189. mqtt_port=1833,
  190. # mqtt_disable_tls default,
  191. mqtt_username=None,
  192. mqtt_password=None,
  193. mqtt_topic_prefix="systemctl/hosts",
  194. homeassistant_discovery_prefix="homeassistant",
  195. homeassistant_discovery_object_id="host",
  196. poweroff_delay=datetime.timedelta(),
  197. )
  198. mqtt_client_class_mock.assert_called_once()
  199. # enabled by default
  200. assert isinstance(
  201. mqtt_client_class_mock.call_args[1]["tls_context"], ssl.SSLContext
  202. )
  203. dbus_signal_loop_mock.assert_awaited_once()
  204. @pytest.mark.asyncio
  205. @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
  206. @pytest.mark.parametrize("mqtt_port", [1833])
  207. @pytest.mark.parametrize("mqtt_username", ["me"])
  208. @pytest.mark.parametrize("mqtt_password", [None, "secret"])
  209. @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host"])
  210. async def test__run_authentication(
  211. mqtt_host, mqtt_port, mqtt_username, mqtt_password, mqtt_topic_prefix
  212. ):
  213. with unittest.mock.patch(
  214. "aiomqtt.Client"
  215. ) as mqtt_client_class_mock, unittest.mock.patch(
  216. "systemctl_mqtt._dbus_signal_loop"
  217. ) as dbus_signal_loop_mock:
  218. await systemctl_mqtt._run(
  219. mqtt_host=mqtt_host,
  220. mqtt_port=mqtt_port,
  221. mqtt_username=mqtt_username,
  222. mqtt_password=mqtt_password,
  223. mqtt_topic_prefix=mqtt_topic_prefix,
  224. homeassistant_discovery_prefix="discovery-prefix",
  225. homeassistant_discovery_object_id="node-id",
  226. poweroff_delay=datetime.timedelta(),
  227. )
  228. mqtt_client_class_mock.assert_called_once()
  229. _, mqtt_client_init_kwargs = mqtt_client_class_mock.call_args
  230. assert mqtt_client_init_kwargs["username"] == mqtt_username
  231. if mqtt_password:
  232. assert mqtt_client_init_kwargs["password"] == mqtt_password
  233. else:
  234. assert mqtt_client_init_kwargs["password"] is None
  235. dbus_signal_loop_mock.assert_awaited_once()
  236. @pytest.mark.asyncio
  237. @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
  238. @pytest.mark.parametrize("mqtt_port", [1833])
  239. @pytest.mark.parametrize("mqtt_password", ["secret"])
  240. async def test__run_authentication_missing_username(
  241. mqtt_host: str, mqtt_port: int, mqtt_password: str
  242. ) -> None:
  243. with unittest.mock.patch("aiomqtt.Client"), unittest.mock.patch(
  244. "systemctl_mqtt._dbus.login_manager.get_login_manager_proxy"
  245. ), unittest.mock.patch("systemctl_mqtt._dbus_signal_loop") as dbus_signal_loop_mock:
  246. with pytest.raises(ValueError, match=r"^Missing MQTT username$"):
  247. await systemctl_mqtt._run(
  248. mqtt_host=mqtt_host,
  249. mqtt_port=mqtt_port,
  250. mqtt_username=None,
  251. mqtt_password=mqtt_password,
  252. mqtt_topic_prefix="prefix",
  253. homeassistant_discovery_prefix="discovery-prefix",
  254. homeassistant_discovery_object_id="node-id",
  255. poweroff_delay=datetime.timedelta(),
  256. )
  257. dbus_signal_loop_mock.assert_not_called()
  258. @pytest.mark.asyncio
  259. @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host"])
  260. async def test__run_sigint(mqtt_topic_prefix: str):
  261. login_manager_mock = unittest.mock.MagicMock()
  262. with unittest.mock.patch(
  263. "aiomqtt.Client", autospec=False
  264. ) as mqtt_client_class_mock, unittest.mock.patch(
  265. "systemctl_mqtt._dbus.login_manager.get_login_manager_proxy",
  266. return_value=login_manager_mock,
  267. ), unittest.mock.patch(
  268. "asyncio.gather", side_effect=KeyboardInterrupt
  269. ):
  270. login_manager_mock.Inhibit.return_value = (jeepney.fds.FileDescriptor(-1),)
  271. login_manager_mock.Get.return_value = (("b", False),)
  272. with pytest.raises(KeyboardInterrupt):
  273. await systemctl_mqtt._run(
  274. mqtt_host="mqtt-broker.local",
  275. mqtt_port=1883,
  276. mqtt_username=None,
  277. mqtt_password=None,
  278. mqtt_topic_prefix=mqtt_topic_prefix,
  279. homeassistant_discovery_prefix="homeassistant",
  280. homeassistant_discovery_object_id="host",
  281. poweroff_delay=datetime.timedelta(),
  282. )
  283. async with mqtt_client_class_mock() as mqtt_client_mock:
  284. pass
  285. assert mqtt_client_mock.publish.call_count == 4
  286. assert mqtt_client_mock.publish.call_args_list[0][1]["topic"].endswith("/config")
  287. assert mqtt_client_mock.publish.call_args_list[1][1]["topic"].endswith(
  288. "/preparing-for-shutdown"
  289. )
  290. assert mqtt_client_mock.publish.call_args_list[2][1] == {
  291. "topic": mqtt_topic_prefix + "/status",
  292. "payload": "online",
  293. "retain": True,
  294. }
  295. assert mqtt_client_mock.publish.call_args_list[3][1] == {
  296. "topic": mqtt_topic_prefix + "/status",
  297. "payload": "offline",
  298. "retain": True,
  299. }
  300. @pytest.mark.asyncio
  301. @pytest.mark.filterwarnings("ignore:coroutine '_dbus_signal_loop' was never awaited")
  302. @pytest.mark.filterwarnings("ignore:coroutine '_mqtt_message_loop' was never awaited")
  303. @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host", "system/command"])
  304. async def test__mqtt_message_loop_trigger_poweroff(
  305. caplog: pytest.LogCaptureFixture, mqtt_topic_prefix: str
  306. ) -> None:
  307. state = systemctl_mqtt._State(
  308. mqtt_topic_prefix=mqtt_topic_prefix,
  309. homeassistant_discovery_prefix="homeassistant",
  310. homeassistant_discovery_object_id="whatever",
  311. poweroff_delay=datetime.timedelta(seconds=21),
  312. )
  313. mqtt_client_mock = unittest.mock.AsyncMock()
  314. mqtt_client_mock.messages.__aiter__.return_value = [
  315. aiomqtt.Message(
  316. topic=mqtt_topic_prefix + "/poweroff",
  317. payload=b"some-payload",
  318. qos=0,
  319. retain=False,
  320. mid=42 // 2,
  321. properties=None,
  322. )
  323. ]
  324. with unittest.mock.patch(
  325. "systemctl_mqtt._dbus.login_manager.schedule_shutdown"
  326. ) as schedule_shutdown_mock, caplog.at_level(logging.DEBUG):
  327. await systemctl_mqtt._mqtt_message_loop(
  328. state=state, mqtt_client=mqtt_client_mock
  329. )
  330. assert sorted(mqtt_client_mock.subscribe.await_args_list) == [
  331. unittest.mock.call(mqtt_topic_prefix + "/lock-all-sessions"),
  332. unittest.mock.call(mqtt_topic_prefix + "/poweroff"),
  333. unittest.mock.call(mqtt_topic_prefix + "/suspend"),
  334. ]
  335. schedule_shutdown_mock.assert_called_once_with(
  336. action="poweroff", delay=datetime.timedelta(seconds=21)
  337. )
  338. assert [
  339. t for t in caplog.record_tuples[2:] if not t[2].startswith("subscribing to ")
  340. ] == [
  341. (
  342. "systemctl_mqtt",
  343. logging.DEBUG,
  344. f"received message on topic '{mqtt_topic_prefix}/poweroff': b'some-payload'",
  345. ),
  346. ]
  347. @pytest.mark.asyncio
  348. @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host"])
  349. async def test__mqtt_message_loop_retained(
  350. caplog: pytest.LogCaptureFixture, mqtt_topic_prefix: str
  351. ) -> None:
  352. state = systemctl_mqtt._State(
  353. mqtt_topic_prefix=mqtt_topic_prefix,
  354. homeassistant_discovery_prefix="homeassistant",
  355. homeassistant_discovery_object_id="whatever",
  356. poweroff_delay=datetime.timedelta(seconds=21),
  357. )
  358. mqtt_client_mock = unittest.mock.AsyncMock()
  359. mqtt_client_mock.messages.__aiter__.return_value = [
  360. aiomqtt.Message(
  361. topic=mqtt_topic_prefix + "/poweroff",
  362. payload=b"some-payload",
  363. qos=0,
  364. retain=True,
  365. mid=42 // 2,
  366. properties=None,
  367. )
  368. ]
  369. with unittest.mock.patch(
  370. "systemctl_mqtt._dbus.login_manager.schedule_shutdown"
  371. ) as schedule_shutdown_mock, caplog.at_level(logging.DEBUG):
  372. await systemctl_mqtt._mqtt_message_loop(
  373. state=state, mqtt_client=mqtt_client_mock
  374. )
  375. schedule_shutdown_mock.assert_not_called()
  376. assert [
  377. t for t in caplog.record_tuples[2:] if not t[2].startswith("subscribing to ")
  378. ] == [
  379. (
  380. "systemctl_mqtt",
  381. logging.INFO,
  382. "ignoring retained message on topic 'systemctl/host/poweroff'",
  383. ),
  384. ]