test_mqtt.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. # systemctl-mqtt - MQTT client triggering shutdown on systemd-based systems
  2. #
  3. # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. import logging
  18. import unittest.mock
  19. import pytest
  20. from paho.mqtt.client import MQTTMessage
  21. import systemctl_mqtt
  22. # pylint: disable=protected-access
  23. @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
  24. @pytest.mark.parametrize("mqtt_port", [1833])
  25. @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host", "system/command"])
  26. def test__run(mqtt_host, mqtt_port, mqtt_topic_prefix):
  27. with unittest.mock.patch(
  28. "paho.mqtt.client.Client"
  29. ) as mqtt_client_mock, unittest.mock.patch(
  30. "systemctl_mqtt._mqtt_on_message"
  31. ) as message_handler_mock:
  32. systemctl_mqtt._run(
  33. mqtt_host=mqtt_host,
  34. mqtt_port=mqtt_port,
  35. mqtt_username=None,
  36. mqtt_password=None,
  37. mqtt_topic_prefix=mqtt_topic_prefix,
  38. )
  39. assert mqtt_client_mock.call_count == 1 # .assert_called_once requires python>=v3.6
  40. init_args, init_kwargs = mqtt_client_mock.call_args
  41. assert not init_args
  42. assert len(init_kwargs) == 1
  43. settings = init_kwargs["userdata"]
  44. assert isinstance(settings, systemctl_mqtt._Settings)
  45. assert mqtt_topic_prefix + "/poweroff" in settings.mqtt_topic_action_mapping
  46. assert not mqtt_client_mock().username_pw_set.called
  47. mqtt_client_mock().tls_set.assert_called_once_with(ca_certs=None)
  48. mqtt_client_mock().connect.assert_called_once_with(host=mqtt_host, port=mqtt_port)
  49. mqtt_client_mock().socket().getpeername.return_value = (mqtt_host, mqtt_port)
  50. mqtt_client_mock().on_connect(mqtt_client_mock(), settings, {}, 0)
  51. mqtt_client_mock().subscribe.assert_called_once_with(
  52. mqtt_topic_prefix + "/poweroff"
  53. )
  54. mqtt_client_mock().on_message(mqtt_client_mock(), settings, "message")
  55. assert message_handler_mock.call_count == 1
  56. mqtt_client_mock().loop_forever.assert_called_once_with()
  57. @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
  58. @pytest.mark.parametrize("mqtt_port", [1833])
  59. @pytest.mark.parametrize("mqtt_username", ["me"])
  60. @pytest.mark.parametrize("mqtt_password", [None, "secret"])
  61. @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host"])
  62. def test__run_authentication(
  63. mqtt_host, mqtt_port, mqtt_username, mqtt_password, mqtt_topic_prefix
  64. ):
  65. with unittest.mock.patch("paho.mqtt.client.Client") as mqtt_client_mock:
  66. systemctl_mqtt._run(
  67. mqtt_host=mqtt_host,
  68. mqtt_port=mqtt_port,
  69. mqtt_username=mqtt_username,
  70. mqtt_password=mqtt_password,
  71. mqtt_topic_prefix=mqtt_topic_prefix,
  72. )
  73. assert mqtt_client_mock.call_count == 1
  74. init_args, init_kwargs = mqtt_client_mock.call_args
  75. assert not init_args
  76. assert set(init_kwargs.keys()) == {"userdata"}
  77. mqtt_client_mock().username_pw_set.assert_called_once_with(
  78. username=mqtt_username, password=mqtt_password,
  79. )
  80. @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
  81. @pytest.mark.parametrize("mqtt_port", [1833])
  82. @pytest.mark.parametrize("mqtt_password", ["secret"])
  83. def test__run_authentication_missing_username(mqtt_host, mqtt_port, mqtt_password):
  84. with unittest.mock.patch("paho.mqtt.client.Client"):
  85. with pytest.raises(ValueError):
  86. systemctl_mqtt._run(
  87. mqtt_host=mqtt_host,
  88. mqtt_port=mqtt_port,
  89. mqtt_username=None,
  90. mqtt_password=mqtt_password,
  91. mqtt_topic_prefix="prefix",
  92. )
  93. @pytest.mark.parametrize("mqtt_topic_prefix", ["system/command"])
  94. @pytest.mark.parametrize("payload", [b"", b"junk"])
  95. def test__mqtt_on_message_poweroff(caplog, mqtt_topic_prefix: str, payload: bytes):
  96. mqtt_topic = mqtt_topic_prefix + "/poweroff"
  97. message = MQTTMessage(topic=mqtt_topic.encode())
  98. message.payload = payload
  99. settings = systemctl_mqtt._Settings(mqtt_topic_prefix=mqtt_topic_prefix)
  100. action_mock = unittest.mock.MagicMock()
  101. settings.mqtt_topic_action_mapping[mqtt_topic] = action_mock # functools.partial
  102. with caplog.at_level(logging.DEBUG):
  103. systemctl_mqtt._mqtt_on_message(
  104. None, settings, message,
  105. )
  106. assert len(caplog.records) == 3
  107. assert caplog.records[0].levelno == logging.DEBUG
  108. assert caplog.records[0].message == (
  109. "received topic={} payload={!r}".format(mqtt_topic, payload)
  110. )
  111. assert caplog.records[1].levelno == logging.DEBUG
  112. assert caplog.records[1].message.startswith(
  113. "executing action {!r}".format(action_mock)
  114. )
  115. assert caplog.records[2].levelno == logging.DEBUG
  116. assert caplog.records[2].message.startswith(
  117. "completed action {!r}".format(action_mock)
  118. )
  119. action_mock.assert_called_once_with()
  120. @pytest.mark.parametrize(
  121. ("topic", "payload"), [("system/poweroff", b""), ("system/poweroff", "payload"),],
  122. )
  123. def test__mqtt_on_message_ignored(
  124. caplog, topic: str, payload: bytes,
  125. ):
  126. message = MQTTMessage(topic=topic.encode())
  127. message.payload = payload
  128. settings = systemctl_mqtt._Settings(mqtt_topic_prefix="system/command")
  129. settings.mqtt_topic_action_mapping = {} # provoke KeyError on access
  130. with caplog.at_level(logging.DEBUG):
  131. systemctl_mqtt._mqtt_on_message(
  132. None, settings, message,
  133. )
  134. assert len(caplog.records) == 2
  135. assert caplog.records[0].levelno == logging.DEBUG
  136. assert caplog.records[0].message == (
  137. "received topic={} payload={!r}".format(topic, payload)
  138. )
  139. assert caplog.records[1].levelno == logging.WARNING
  140. assert caplog.records[1].message == "unexpected topic {}".format(topic)
  141. @pytest.mark.parametrize(
  142. ("topic", "payload"), [("system/command/poweroff", b"")],
  143. )
  144. def test__mqtt_on_message_ignored_retained(
  145. caplog, topic: str, payload: bytes,
  146. ):
  147. message = MQTTMessage(topic=topic.encode())
  148. message.payload = payload
  149. message.retain = True
  150. settings = systemctl_mqtt._Settings(mqtt_topic_prefix="system/command")
  151. settings.mqtt_topic_action_mapping = {} # provoke KeyError on access
  152. with caplog.at_level(logging.DEBUG):
  153. systemctl_mqtt._mqtt_on_message(
  154. None, settings, message,
  155. )
  156. assert len(caplog.records) == 2
  157. assert caplog.records[0].levelno == logging.DEBUG
  158. assert caplog.records[0].message == (
  159. "received topic={} payload={!r}".format(topic, payload)
  160. )
  161. assert caplog.records[1].levelno == logging.INFO
  162. assert caplog.records[1].message == "ignoring retained message"