test_mqtt.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. # systemctl-mqtt - MQTT client triggering shutdown on systemd-based systems
  2. #
  3. # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. import logging
  18. import threading
  19. import time
  20. import unittest.mock
  21. import pytest
  22. from paho.mqtt.client import MQTTMessage
  23. import systemctl_mqtt
  24. # pylint: disable=protected-access
  25. @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
  26. @pytest.mark.parametrize("mqtt_port", [1833])
  27. @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host", "system/command"])
  28. def test__run(caplog, mqtt_host, mqtt_port, mqtt_topic_prefix):
  29. caplog.set_level(logging.DEBUG)
  30. with unittest.mock.patch(
  31. "socket.create_connection"
  32. ) as create_socket_mock, unittest.mock.patch(
  33. "ssl.SSLContext.wrap_socket", autospec=True,
  34. ) as ssl_wrap_socket_mock, unittest.mock.patch(
  35. "paho.mqtt.client.Client.loop_forever", autospec=True,
  36. ) as mqtt_loop_forever_mock, unittest.mock.patch(
  37. "gi.repository.GLib.MainLoop.run"
  38. ) as glib_loop_mock:
  39. ssl_wrap_socket_mock.return_value.send = len
  40. systemctl_mqtt._run(
  41. mqtt_host=mqtt_host,
  42. mqtt_port=mqtt_port,
  43. mqtt_username=None,
  44. mqtt_password=None,
  45. mqtt_topic_prefix=mqtt_topic_prefix,
  46. )
  47. assert caplog.records[0].levelno == logging.INFO
  48. assert caplog.records[0].message == "connecting to MQTT broker {}:{}".format(
  49. mqtt_host, mqtt_port
  50. )
  51. # correct remote?
  52. assert create_socket_mock.call_count == 1
  53. create_socket_args, _ = create_socket_mock.call_args
  54. assert create_socket_args[0] == (mqtt_host, mqtt_port)
  55. # ssl enabled?
  56. assert ssl_wrap_socket_mock.call_count == 1
  57. ssl_context = ssl_wrap_socket_mock.call_args[0][0] # self
  58. assert ssl_context.check_hostname is True
  59. assert ssl_wrap_socket_mock.call_args[1]["server_hostname"] == mqtt_host
  60. # loop started?
  61. while threading.active_count() > 1:
  62. time.sleep(0.01)
  63. assert mqtt_loop_forever_mock.call_count == 1
  64. (mqtt_client,) = mqtt_loop_forever_mock.call_args[0]
  65. assert mqtt_client._tls_insecure is False
  66. # credentials
  67. assert mqtt_client._username is None
  68. assert mqtt_client._password is None
  69. # connect callback
  70. caplog.clear()
  71. mqtt_client.socket().getpeername.return_value = (mqtt_host, mqtt_port)
  72. with unittest.mock.patch(
  73. "paho.mqtt.client.Client.subscribe"
  74. ) as mqtt_subscribe_mock:
  75. mqtt_client.on_connect(mqtt_client, mqtt_client._userdata, {}, 0)
  76. mqtt_subscribe_mock.assert_called_once_with(mqtt_topic_prefix + "/poweroff")
  77. assert mqtt_client.on_message is None
  78. assert ( # pylint: disable=comparison-with-callable
  79. mqtt_client._on_message_filtered[mqtt_topic_prefix + "/poweroff"]
  80. == systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[
  81. "poweroff"
  82. ].mqtt_message_callback
  83. )
  84. assert caplog.records[0].levelno == logging.DEBUG
  85. assert caplog.records[0].message == "connected to MQTT broker {}:{}".format(
  86. mqtt_host, mqtt_port
  87. )
  88. assert caplog.records[1].levelno == logging.INFO
  89. assert caplog.records[1].message == "subscribing to {}".format(
  90. mqtt_topic_prefix + "/poweroff"
  91. )
  92. assert caplog.records[2].levelno == logging.DEBUG
  93. assert caplog.records[2].message == "registered MQTT callback for topic {}".format(
  94. mqtt_topic_prefix + "/poweroff"
  95. ) + " triggering {}".format(
  96. systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"].action
  97. )
  98. # message callback
  99. caplog.clear()
  100. poweroff_message = MQTTMessage(topic=mqtt_topic_prefix.encode() + b"/poweroff")
  101. with unittest.mock.patch.object(
  102. systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "action",
  103. ) as poweroff_action_mock:
  104. mqtt_client._handle_on_message(poweroff_message)
  105. poweroff_action_mock.assert_called_once_with()
  106. assert all(r.levelno == logging.DEBUG for r in caplog.records)
  107. assert caplog.records[0].message == "received topic={} payload=b''".format(
  108. poweroff_message.topic
  109. )
  110. assert caplog.records[1].message.startswith("executing action poweroff")
  111. assert caplog.records[2].message.startswith("completed action poweroff")
  112. # dbus loop started?
  113. glib_loop_mock.assert_called_once_with()
  114. # waited for mqtt loop to stop?
  115. assert mqtt_client._thread_terminate
  116. assert mqtt_client._thread is None
  117. @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
  118. @pytest.mark.parametrize("mqtt_port", [1833])
  119. @pytest.mark.parametrize("mqtt_username", ["me"])
  120. @pytest.mark.parametrize("mqtt_password", [None, "secret"])
  121. @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host"])
  122. def test__run_authentication(
  123. mqtt_host, mqtt_port, mqtt_username, mqtt_password, mqtt_topic_prefix
  124. ):
  125. with unittest.mock.patch("socket.create_connection"), unittest.mock.patch(
  126. "ssl.SSLContext.wrap_socket"
  127. ) as ssl_wrap_socket_mock, unittest.mock.patch(
  128. "paho.mqtt.client.Client.loop_forever", autospec=True,
  129. ) as mqtt_loop_forever_mock, unittest.mock.patch(
  130. "gi.repository.GLib.MainLoop.run"
  131. ):
  132. ssl_wrap_socket_mock.return_value.send = len
  133. systemctl_mqtt._run(
  134. mqtt_host=mqtt_host,
  135. mqtt_port=mqtt_port,
  136. mqtt_username=mqtt_username,
  137. mqtt_password=mqtt_password,
  138. mqtt_topic_prefix=mqtt_topic_prefix,
  139. )
  140. assert mqtt_loop_forever_mock.call_count == 1
  141. (mqtt_client,) = mqtt_loop_forever_mock.call_args[0]
  142. assert mqtt_client._username.decode() == mqtt_username
  143. if mqtt_password:
  144. assert mqtt_client._password.decode() == mqtt_password
  145. else:
  146. assert mqtt_client._password is None
  147. @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
  148. @pytest.mark.parametrize("mqtt_port", [1833])
  149. @pytest.mark.parametrize("mqtt_password", ["secret"])
  150. def test__run_authentication_missing_username(mqtt_host, mqtt_port, mqtt_password):
  151. with unittest.mock.patch("paho.mqtt.client.Client"):
  152. with pytest.raises(ValueError):
  153. systemctl_mqtt._run(
  154. mqtt_host=mqtt_host,
  155. mqtt_port=mqtt_port,
  156. mqtt_username=None,
  157. mqtt_password=mqtt_password,
  158. mqtt_topic_prefix="prefix",
  159. )
  160. @pytest.mark.parametrize("mqtt_topic", ["system/command/poweroff"])
  161. @pytest.mark.parametrize("payload", [b"", b"junk"])
  162. def test_mqtt_message_callback_poweroff(caplog, mqtt_topic: str, payload: bytes):
  163. message = MQTTMessage(topic=mqtt_topic.encode())
  164. message.payload = payload
  165. with unittest.mock.patch.object(
  166. systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "action",
  167. ) as action_mock, caplog.at_level(logging.DEBUG):
  168. systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[
  169. "poweroff"
  170. ].mqtt_message_callback(
  171. None, None, message # type: ignore
  172. )
  173. action_mock.assert_called_once_with()
  174. assert len(caplog.records) == 3
  175. assert caplog.records[0].levelno == logging.DEBUG
  176. assert caplog.records[0].message == (
  177. "received topic={} payload={!r}".format(mqtt_topic, payload)
  178. )
  179. assert caplog.records[1].levelno == logging.DEBUG
  180. assert caplog.records[1].message.startswith(
  181. "executing action {} ({!r})".format("poweroff", action_mock)
  182. )
  183. assert caplog.records[2].levelno == logging.DEBUG
  184. assert caplog.records[2].message.startswith(
  185. "completed action {} ({!r})".format("poweroff", action_mock)
  186. )
  187. @pytest.mark.parametrize("mqtt_topic", ["system/command/poweroff"])
  188. @pytest.mark.parametrize("payload", [b"", b"junk"])
  189. def test_mqtt_message_callback_poweroff_retained(
  190. caplog, mqtt_topic: str, payload: bytes
  191. ):
  192. message = MQTTMessage(topic=mqtt_topic.encode())
  193. message.payload = payload
  194. message.retain = True
  195. with unittest.mock.patch.object(
  196. systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "action",
  197. ) as action_mock, caplog.at_level(logging.DEBUG):
  198. systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[
  199. "poweroff"
  200. ].mqtt_message_callback(
  201. None, None, message # type: ignore
  202. )
  203. action_mock.assert_not_called()
  204. assert len(caplog.records) == 2
  205. assert caplog.records[0].levelno == logging.DEBUG
  206. assert caplog.records[0].message == (
  207. "received topic={} payload={!r}".format(mqtt_topic, payload)
  208. )
  209. assert caplog.records[1].levelno == logging.INFO
  210. assert caplog.records[1].message == "ignoring retained message"