test_service_manager.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. # systemctl-mqtt - MQTT client triggering & reporting shutdown on systemd-based systems
  2. #
  3. # Copyright (C) 2024 Fabian Peter Hammerle <fabian@hammerle.me>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. import typing
  18. import unittest.mock
  19. import pytest
  20. import jeepney.io.asyncio
  21. import jeepney.low_level
  22. import systemctl_mqtt
  23. # pylint: disable=protected-access
  24. class DBusErrorResponseMock(jeepney.wrappers.DBusErrorResponse):
  25. # pylint: disable=missing-class-docstring,super-init-not-called
  26. def __init__(self, name: str, data: typing.Any):
  27. self.name = name
  28. self.data = data
  29. @pytest.mark.asyncio
  30. async def test__get_unit_path() -> None:
  31. router_mock = unittest.mock.AsyncMock()
  32. reply_mock = unittest.mock.MagicMock()
  33. expected_path = "/org/freedesktop/systemd1/unit/ssh_2eservice"
  34. reply_mock.body = (expected_path,)
  35. router_mock.send_and_get_reply.return_value = reply_mock
  36. service_manager = jeepney.io.asyncio.Proxy(
  37. msggen=systemctl_mqtt._dbus.service_manager.ServiceManager(),
  38. router=router_mock,
  39. )
  40. assert (
  41. await systemctl_mqtt._get_unit_path(
  42. service_manager=service_manager, unit_name="ssh.service"
  43. )
  44. == expected_path
  45. )
  46. router_mock.send_and_get_reply.assert_awaited_once()
  47. (msg,), send_kwargs = router_mock.send_and_get_reply.await_args
  48. assert isinstance(msg, jeepney.low_level.Message)
  49. assert msg.header.fields == {
  50. jeepney.low_level.HeaderFields.path: "/org/freedesktop/systemd1",
  51. jeepney.low_level.HeaderFields.destination: "org.freedesktop.systemd1",
  52. jeepney.low_level.HeaderFields.interface: "org.freedesktop.systemd1.Manager",
  53. jeepney.low_level.HeaderFields.member: "GetUnit",
  54. jeepney.low_level.HeaderFields.signature: "s",
  55. }
  56. assert msg.body == ("ssh.service",)
  57. assert not send_kwargs
  58. @pytest.mark.parametrize(
  59. "action,method",
  60. [
  61. ("start", "StartUnit"),
  62. ("stop", "StopUnit"),
  63. ("restart", "RestartUnit"),
  64. ],
  65. )
  66. def test__unit_proxy(action, method):
  67. mock_proxy = unittest.mock.MagicMock()
  68. with unittest.mock.patch(
  69. "systemctl_mqtt._dbus.service_manager.get_service_manager_proxy",
  70. return_value=mock_proxy,
  71. ):
  72. # call the wrapper function dynamically
  73. getattr(systemctl_mqtt._dbus.service_manager, f"{action}_unit")("foo.service")
  74. getattr(mock_proxy, method).assert_called_once_with("foo.service", "replace")
  75. @pytest.mark.parametrize(
  76. "method",
  77. [
  78. "StartUnit",
  79. "StopUnit",
  80. "RestartUnit",
  81. ],
  82. )
  83. def test__unit_method_call(method):
  84. with unittest.mock.patch(
  85. "jeepney.new_method_call", return_value=unittest.mock.MagicMock()
  86. ) as mock_method_call:
  87. mgr = systemctl_mqtt._dbus.service_manager.ServiceManager()
  88. getattr(mgr, method)("foo.service", "replace")
  89. mock_method_call.assert_called_once_with(
  90. remote_obj=mgr,
  91. method=method,
  92. signature="ss",
  93. body=("foo.service", "replace"),
  94. )
  95. @pytest.mark.parametrize(
  96. "action,method",
  97. [
  98. ("start", "StartUnit"),
  99. ("stop", "StopUnit"),
  100. ("restart", "RestartUnit"),
  101. ],
  102. )
  103. def test__unit_with_exception(action, method):
  104. mock_proxy = unittest.mock.MagicMock()
  105. getattr(mock_proxy, method).side_effect = DBusErrorResponseMock(
  106. "DBus error", ("mocked",)
  107. )
  108. with unittest.mock.patch(
  109. "systemctl_mqtt._dbus.service_manager.get_service_manager_proxy",
  110. return_value=mock_proxy,
  111. ), unittest.mock.patch(
  112. "systemctl_mqtt._dbus.service_manager._LOGGER"
  113. ) as mock_logger:
  114. getattr(systemctl_mqtt._dbus.service_manager, f"{action}_unit")(
  115. "example.service"
  116. )
  117. mock_logger.error.assert_called_once_with(
  118. f"Failed to {action} unit: %s because %s ",
  119. "example.service",
  120. "DBus error",
  121. )