test_mqtt.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. # systemctl-mqtt - MQTT client triggering shutdown on systemd-based systems
  2. #
  3. # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. import logging
  18. import unittest.mock
  19. import pytest
  20. from paho.mqtt.client import MQTTMessage
  21. import systemctl_mqtt
  22. # pylint: disable=protected-access
  23. @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
  24. @pytest.mark.parametrize("mqtt_port", [1833])
  25. @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host", "system/command"])
  26. def test__run(mqtt_host, mqtt_port, mqtt_topic_prefix):
  27. with unittest.mock.patch(
  28. "paho.mqtt.client.Client"
  29. ) as mqtt_client_mock, unittest.mock.patch(
  30. "systemctl_mqtt._mqtt_on_message"
  31. ) as message_handler_mock:
  32. systemctl_mqtt._run(
  33. mqtt_host=mqtt_host,
  34. mqtt_port=mqtt_port,
  35. mqtt_username=None,
  36. mqtt_password=None,
  37. mqtt_topic_prefix=mqtt_topic_prefix,
  38. )
  39. mqtt_client_mock.assert_called_once()
  40. init_args, init_kwargs = mqtt_client_mock.call_args
  41. assert not init_args
  42. assert len(init_kwargs) == 1
  43. settings = init_kwargs["userdata"]
  44. assert isinstance(settings, systemctl_mqtt._Settings)
  45. assert mqtt_topic_prefix + "/poweroff" in settings.mqtt_topic_action_mapping
  46. assert not mqtt_client_mock().username_pw_set.called
  47. mqtt_client_mock().connect.assert_called_once_with(host=mqtt_host, port=mqtt_port)
  48. mqtt_client_mock().socket().getpeername.return_value = (mqtt_host, mqtt_port)
  49. mqtt_client_mock().on_connect(mqtt_client_mock(), settings, {}, 0)
  50. mqtt_client_mock().subscribe.assert_called_once_with(
  51. mqtt_topic_prefix + "/poweroff"
  52. )
  53. mqtt_client_mock().on_message(mqtt_client_mock(), settings, "message")
  54. message_handler_mock.assert_called_once()
  55. mqtt_client_mock().loop_forever.assert_called_once_with()
  56. @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
  57. @pytest.mark.parametrize("mqtt_port", [1833])
  58. @pytest.mark.parametrize("mqtt_username", ["me"])
  59. @pytest.mark.parametrize("mqtt_password", [None, "secret"])
  60. @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host"])
  61. def test__run_authentication(
  62. mqtt_host, mqtt_port, mqtt_username, mqtt_password, mqtt_topic_prefix
  63. ):
  64. with unittest.mock.patch("paho.mqtt.client.Client") as mqtt_client_mock:
  65. systemctl_mqtt._run(
  66. mqtt_host=mqtt_host,
  67. mqtt_port=mqtt_port,
  68. mqtt_username=mqtt_username,
  69. mqtt_password=mqtt_password,
  70. mqtt_topic_prefix=mqtt_topic_prefix,
  71. )
  72. mqtt_client_mock.assert_called_once()
  73. init_args, init_kwargs = mqtt_client_mock.call_args
  74. assert not init_args
  75. assert set(init_kwargs.keys()) == {"userdata"}
  76. mqtt_client_mock().username_pw_set.assert_called_once_with(
  77. username=mqtt_username, password=mqtt_password,
  78. )
  79. @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"])
  80. @pytest.mark.parametrize("mqtt_port", [1833])
  81. @pytest.mark.parametrize("mqtt_password", ["secret"])
  82. def test__run_authentication_missing_username(mqtt_host, mqtt_port, mqtt_password):
  83. with unittest.mock.patch("paho.mqtt.client.Client"):
  84. with pytest.raises(ValueError):
  85. systemctl_mqtt._run(
  86. mqtt_host=mqtt_host,
  87. mqtt_port=mqtt_port,
  88. mqtt_username=None,
  89. mqtt_password=mqtt_password,
  90. mqtt_topic_prefix="prefix",
  91. )
  92. @pytest.mark.parametrize("mqtt_topic_prefix", ["system/command"])
  93. @pytest.mark.parametrize("payload", [b"", b"junk"])
  94. def test__mqtt_on_message_poweroff(caplog, mqtt_topic_prefix: str, payload: bytes):
  95. mqtt_topic = mqtt_topic_prefix + "/poweroff"
  96. message = MQTTMessage(topic=mqtt_topic.encode())
  97. message.payload = payload
  98. settings = systemctl_mqtt._Settings(mqtt_topic_prefix=mqtt_topic_prefix)
  99. action_mock = unittest.mock.MagicMock()
  100. settings.mqtt_topic_action_mapping[mqtt_topic] = action_mock # functools.partial
  101. with caplog.at_level(logging.DEBUG):
  102. systemctl_mqtt._mqtt_on_message(
  103. None, settings, message,
  104. )
  105. assert len(caplog.records) == 1
  106. assert caplog.records[0].levelno == logging.DEBUG
  107. assert caplog.records[0].message == (
  108. "received topic={} payload={!r}".format(mqtt_topic, payload)
  109. )
  110. action_mock.assert_called_once_with()
  111. @pytest.mark.parametrize(
  112. ("topic", "payload"), [("system/poweroff", b""), ("system/poweroff", "payload"),],
  113. )
  114. def test__mqtt_on_message_ignored(
  115. caplog, topic: str, payload: bytes,
  116. ):
  117. message = MQTTMessage(topic=topic.encode())
  118. message.payload = payload
  119. settings = systemctl_mqtt._Settings(mqtt_topic_prefix="system/command")
  120. settings.mqtt_topic_action_mapping = {} # provoke KeyError on access
  121. with caplog.at_level(logging.DEBUG):
  122. systemctl_mqtt._mqtt_on_message(
  123. None, settings, message,
  124. )
  125. assert len(caplog.records) == 2
  126. assert caplog.records[0].levelno == logging.DEBUG
  127. assert caplog.records[0].message == (
  128. "received topic={} payload={!r}".format(topic, payload)
  129. )
  130. assert caplog.records[1].levelno == logging.WARNING
  131. assert caplog.records[1].message == "unexpected topic {}".format(topic)
  132. @pytest.mark.parametrize(
  133. ("topic", "payload"), [("system/command/poweroff", b"")],
  134. )
  135. def test__mqtt_on_message_ignored_retained(
  136. caplog, topic: str, payload: bytes,
  137. ):
  138. message = MQTTMessage(topic=topic.encode())
  139. message.payload = payload
  140. message.retain = True
  141. settings = systemctl_mqtt._Settings(mqtt_topic_prefix="system/command")
  142. settings.mqtt_topic_action_mapping = {} # provoke KeyError on access
  143. with caplog.at_level(logging.DEBUG):
  144. systemctl_mqtt._mqtt_on_message(
  145. None, settings, message,
  146. )
  147. assert len(caplog.records) == 2
  148. assert caplog.records[0].levelno == logging.DEBUG
  149. assert caplog.records[0].message == (
  150. "received topic={} payload={!r}".format(topic, payload)
  151. )
  152. assert caplog.records[1].levelno == logging.INFO
  153. assert caplog.records[1].message == "ignoring retained message"