test_mqtt.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. # systemctl-mqtt - MQTT client triggering shutdown on systemd-based systems
  2. #
  3. # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. import logging
  18. import unittest.mock
  19. import pytest
  20. from paho.mqtt.client import MQTTMessage
  21. import systemctl_mqtt
  22. # pylint: disable=protected-access
  23. @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
  24. @pytest.mark.parametrize("mqtt_port", [1833])
  25. @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host", "system/command"])
  26. def test__run(caplog, mqtt_host, mqtt_port, mqtt_topic_prefix):
  27. caplog.set_level(logging.DEBUG)
  28. with unittest.mock.patch(
  29. "socket.create_connection"
  30. ) as create_socket_mock, unittest.mock.patch(
  31. "ssl.SSLContext.wrap_socket", autospec=True,
  32. ) as ssl_wrap_socket_mock, unittest.mock.patch(
  33. "paho.mqtt.client.Client.loop_forever", autospec=True,
  34. ) as mqtt_loop_forever_mock:
  35. ssl_wrap_socket_mock.return_value.send = len
  36. systemctl_mqtt._run(
  37. mqtt_host=mqtt_host,
  38. mqtt_port=mqtt_port,
  39. mqtt_username=None,
  40. mqtt_password=None,
  41. mqtt_topic_prefix=mqtt_topic_prefix,
  42. )
  43. assert caplog.records[0].levelno == logging.INFO
  44. assert caplog.records[0].message == "connecting to MQTT broker {}:{}".format(
  45. mqtt_host, mqtt_port
  46. )
  47. # correct remote?
  48. assert create_socket_mock.call_count == 1
  49. create_socket_args, _ = create_socket_mock.call_args
  50. assert create_socket_args[0] == (mqtt_host, mqtt_port)
  51. # ssl enabled?
  52. assert ssl_wrap_socket_mock.call_count == 1
  53. ssl_context = ssl_wrap_socket_mock.call_args[0][0] # self
  54. assert ssl_context.check_hostname is True
  55. assert ssl_wrap_socket_mock.call_args[1]["server_hostname"] == mqtt_host
  56. # loop started?
  57. assert mqtt_loop_forever_mock.call_count == 1
  58. (mqtt_client,) = mqtt_loop_forever_mock.call_args[0]
  59. assert mqtt_client._tls_insecure is False
  60. # credentials
  61. assert mqtt_client._username is None
  62. assert mqtt_client._password is None
  63. # connect callback
  64. caplog.clear()
  65. mqtt_client.socket().getpeername.return_value = (mqtt_host, mqtt_port)
  66. with unittest.mock.patch(
  67. "paho.mqtt.client.Client.subscribe"
  68. ) as mqtt_subscribe_mock:
  69. mqtt_client.on_connect(mqtt_client, mqtt_client._userdata, {}, 0)
  70. mqtt_subscribe_mock.assert_called_once_with(mqtt_topic_prefix + "/poweroff")
  71. assert mqtt_client.on_message is None
  72. assert ( # pylint: disable=comparison-with-callable
  73. mqtt_client._on_message_filtered[mqtt_topic_prefix + "/poweroff"]
  74. == systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[
  75. "poweroff"
  76. ].mqtt_message_callback
  77. )
  78. assert caplog.records[0].levelno == logging.DEBUG
  79. assert caplog.records[0].message == "connected to MQTT broker {}:{}".format(
  80. mqtt_host, mqtt_port
  81. )
  82. assert caplog.records[1].levelno == logging.INFO
  83. assert caplog.records[1].message == "subscribing to {}".format(
  84. mqtt_topic_prefix + "/poweroff"
  85. )
  86. assert caplog.records[2].levelno == logging.DEBUG
  87. assert caplog.records[2].message == "registered MQTT callback for topic {}".format(
  88. mqtt_topic_prefix + "/poweroff"
  89. ) + " triggering {}".format(
  90. systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"].action
  91. )
  92. # message callback
  93. caplog.clear()
  94. poweroff_message = MQTTMessage(topic=mqtt_topic_prefix.encode() + b"/poweroff")
  95. with unittest.mock.patch.object(
  96. systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "action",
  97. ) as poweroff_action_mock:
  98. mqtt_client._handle_on_message(poweroff_message)
  99. poweroff_action_mock.assert_called_once_with()
  100. assert all(r.levelno == logging.DEBUG for r in caplog.records)
  101. assert caplog.records[0].message == "received topic={} payload=b''".format(
  102. poweroff_message.topic
  103. )
  104. assert caplog.records[1].message.startswith("executing action poweroff")
  105. assert caplog.records[2].message.startswith("completed action poweroff")
  106. @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
  107. @pytest.mark.parametrize("mqtt_port", [1833])
  108. @pytest.mark.parametrize("mqtt_username", ["me"])
  109. @pytest.mark.parametrize("mqtt_password", [None, "secret"])
  110. @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host"])
  111. def test__run_authentication(
  112. mqtt_host, mqtt_port, mqtt_username, mqtt_password, mqtt_topic_prefix
  113. ):
  114. with unittest.mock.patch("socket.create_connection"), unittest.mock.patch(
  115. "ssl.SSLContext.wrap_socket"
  116. ) as ssl_wrap_socket_mock, unittest.mock.patch(
  117. "paho.mqtt.client.Client.loop_forever", autospec=True,
  118. ) as mqtt_loop_forever_mock:
  119. ssl_wrap_socket_mock.return_value.send = len
  120. systemctl_mqtt._run(
  121. mqtt_host=mqtt_host,
  122. mqtt_port=mqtt_port,
  123. mqtt_username=mqtt_username,
  124. mqtt_password=mqtt_password,
  125. mqtt_topic_prefix=mqtt_topic_prefix,
  126. )
  127. assert mqtt_loop_forever_mock.call_count == 1
  128. (mqtt_client,) = mqtt_loop_forever_mock.call_args[0]
  129. assert mqtt_client._username.decode() == mqtt_username
  130. if mqtt_password:
  131. assert mqtt_client._password.decode() == mqtt_password
  132. else:
  133. assert mqtt_client._password is None
  134. @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
  135. @pytest.mark.parametrize("mqtt_port", [1833])
  136. @pytest.mark.parametrize("mqtt_password", ["secret"])
  137. def test__run_authentication_missing_username(mqtt_host, mqtt_port, mqtt_password):
  138. with unittest.mock.patch("paho.mqtt.client.Client"):
  139. with pytest.raises(ValueError):
  140. systemctl_mqtt._run(
  141. mqtt_host=mqtt_host,
  142. mqtt_port=mqtt_port,
  143. mqtt_username=None,
  144. mqtt_password=mqtt_password,
  145. mqtt_topic_prefix="prefix",
  146. )
  147. @pytest.mark.parametrize("mqtt_topic", ["system/command/poweroff"])
  148. @pytest.mark.parametrize("payload", [b"", b"junk"])
  149. def test_mqtt_message_callback_poweroff(caplog, mqtt_topic: str, payload: bytes):
  150. message = MQTTMessage(topic=mqtt_topic.encode())
  151. message.payload = payload
  152. with unittest.mock.patch.object(
  153. systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "action",
  154. ) as action_mock, caplog.at_level(logging.DEBUG):
  155. systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[
  156. "poweroff"
  157. ].mqtt_message_callback(
  158. None, None, message # type: ignore
  159. )
  160. action_mock.assert_called_once_with()
  161. assert len(caplog.records) == 3
  162. assert caplog.records[0].levelno == logging.DEBUG
  163. assert caplog.records[0].message == (
  164. "received topic={} payload={!r}".format(mqtt_topic, payload)
  165. )
  166. assert caplog.records[1].levelno == logging.DEBUG
  167. assert caplog.records[1].message.startswith(
  168. "executing action {} ({!r})".format("poweroff", action_mock)
  169. )
  170. assert caplog.records[2].levelno == logging.DEBUG
  171. assert caplog.records[2].message.startswith(
  172. "completed action {} ({!r})".format("poweroff", action_mock)
  173. )
  174. @pytest.mark.parametrize("mqtt_topic", ["system/command/poweroff"])
  175. @pytest.mark.parametrize("payload", [b"", b"junk"])
  176. def test_mqtt_message_callback_poweroff_retained(
  177. caplog, mqtt_topic: str, payload: bytes
  178. ):
  179. message = MQTTMessage(topic=mqtt_topic.encode())
  180. message.payload = payload
  181. message.retain = True
  182. with unittest.mock.patch.object(
  183. systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["poweroff"], "action",
  184. ) as action_mock, caplog.at_level(logging.DEBUG):
  185. systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[
  186. "poweroff"
  187. ].mqtt_message_callback(
  188. None, None, message # type: ignore
  189. )
  190. action_mock.assert_not_called()
  191. assert len(caplog.records) == 2
  192. assert caplog.records[0].levelno == logging.DEBUG
  193. assert caplog.records[0].message == (
  194. "received topic={} payload={!r}".format(mqtt_topic, payload)
  195. )
  196. assert caplog.records[1].levelno == logging.INFO
  197. assert caplog.records[1].message == "ignoring retained message"