Browse Source

Ability to isolate systemd targets (#241)

* add isolate control for systemd units

- expose an MQTT topic to isolate `unit/system/<unit>/isolate`.
- publish HA isolate buttons only for units that allow isolation.

* docs: mention the new isolate action.

* changelog: mention the isolate action addition.

https://github.com/fphammerle/systemctl-mqtt/pull/241
frantzju 5 days ago
parent
commit
6cec4837f8

+ 4 - 0
CHANGELOG.md

@@ -5,6 +5,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
 and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
 ## [Unreleased]
+### Added
+- ability to isolate system units using `--control-system-unit <target_name>`
+  ([#241](https://github.com/fphammerle/systemctl-mqtt/pull/241)
+  by Julien Frantz (julien.frantz@gmail.com))
 ### Removed
 - compatibility with `python3.9`
 

+ 9 - 2
README.md

@@ -100,8 +100,8 @@ enables reports on topic
 ```
 $ systemctl-mqtt  --control-system-unit <unit_name>
 ```
-enables that a system unit can be started, stopped, and restarted by a message on topic
-`systemctl/[hostname]/unit/system/[unit_name]/start`, `…/stop`, `…/restart`.
+enables that a system unit can be started, stopped, restarted and isolated by a message on topic
+`systemctl/[hostname]/unit/system/[unit_name]/start`, `…/stop`, `…/restart`, `…/isolate`.
 
 ## Home Assistant 🏡
 
@@ -114,8 +114,15 @@ added automatically:
 - `button.[hostname]_logind_suspend`
 - `sensor.[hostname]_unit_system_[unit_name]_active_state`
   for `--monitor-system-unit [unit_name]`
+- `button.[hostname]_unit_system_[unit_name]_start`
+  for `--control-system-unit [unit_name]`
+- `button.[hostname]_unit_system_[unit_name]_stop`
+  for `--control-system-unit [unit_name]`
 - `button.[hostname]_unit_system_[unit_name]_restart`
   for `--control-system-unit [unit_name]`
+- `button.[hostname]_unit_system_[unit_name]_isolate`
+  for `--control-system-unit [unit_name]`, when the unit allows isolation (e.g., targets).
+
 
 ![homeassistant entities_over_auto_discovery](docs/homeassistant/entities-after-auto-discovery.png)
 

+ 40 - 35
systemctl_mqtt/__init__.py

@@ -93,14 +93,10 @@ class _State:
     def get_system_unit_active_state_mqtt_topic(self, *, unit_name: str) -> str:
         return self._mqtt_topic_prefix + "/unit/system/" + unit_name + "/active-state"
 
-    def get_system_unit_start_mqtt_topic(self, *, unit_name: str) -> str:
-        return self._mqtt_topic_prefix + "/unit/system/" + unit_name + "/start"
-
-    def get_system_unit_stop_mqtt_topic(self, *, unit_name: str) -> str:
-        return self._mqtt_topic_prefix + "/unit/system/" + unit_name + "/stop"
-
-    def get_system_unit_restart_mqtt_topic(self, *, unit_name: str) -> str:
-        return self._mqtt_topic_prefix + "/unit/system/" + unit_name + "/restart"
+    def get_system_unit_action_mqtt_topic(
+        self, *, unit_name: str, action_name: str
+    ) -> str:
+        return self._mqtt_topic_prefix + "/unit/system/" + unit_name + "/" + action_name
 
     @property
     def monitored_system_unit_names(self) -> list[str]:
@@ -238,33 +234,24 @@ class _State:
                 ),
             }
         for unit_name in self._controlled_system_unit_names:
-            config["components"]["unit/system/" + unit_name + "/start"] = {  # type: ignore
-                "unique_id": f"{unique_id_prefix}-unit-system-{unit_name}-start",
-                "object_id": f"{hostname}_unit_system_{unit_name}_start",
-                "name": f"{unit_name} start",
-                "platform": "button",
-                "command_topic": self.get_system_unit_start_mqtt_topic(
-                    unit_name=unit_name
-                ),
-            }
-            config["components"]["unit/system/" + unit_name + "/stop"] = {  # type: ignore
-                "unique_id": f"{unique_id_prefix}-unit-system-{unit_name}-stop",
-                "object_id": f"{hostname}_unit_system_{unit_name}_stop",
-                "name": f"{unit_name} stop",
-                "platform": "button",
-                "command_topic": self.get_system_unit_stop_mqtt_topic(
-                    unit_name=unit_name
-                ),
-            }
-            config["components"]["unit/system/" + unit_name + "/restart"] = {  # type: ignore
-                "unique_id": f"{unique_id_prefix}-unit-system-{unit_name}-restart",
-                "object_id": f"{hostname}_unit_system_{unit_name}_restart",
-                "name": f"{unit_name} restart",
-                "platform": "button",
-                "command_topic": self.get_system_unit_restart_mqtt_topic(
-                    unit_name=unit_name
-                ),
-            }
+            component_prefix = "unit/system/" + unit_name
+            for action_name, action_class in [
+                ("start", _MQTTActionStartUnit),
+                ("stop", _MQTTActionStopUnit),
+                ("restart", _MQTTActionRestartUnit),
+                ("isolate", _MQTTActionIsolateUnit),
+            ]:
+                if action_class(unit_name).is_allowed():
+                    config["components"][component_prefix + "/" + action_name] = {  # type: ignore
+                        "unique_id": f"{unique_id_prefix}-unit-system-{unit_name}-{action_name}",
+                        "object_id": f"{hostname}_unit_system_{unit_name}_{action_name}",
+                        "name": f"{unit_name} {action_name}",
+                        "platform": "button",
+                        "command_topic": self.get_system_unit_action_mqtt_topic(
+                            unit_name=unit_name, action_name=action_name
+                        ),
+                    }
+
         _LOGGER.debug("publishing home assistant config on %s", discovery_topic)
         await mqtt_client.publish(
             topic=discovery_topic, payload=json.dumps(config), retain=False
@@ -272,6 +259,9 @@ class _State:
 
 
 class _MQTTAction(metaclass=abc.ABCMeta):
+    def is_allowed(self) -> bool:
+        return True
+
     @abc.abstractmethod
     def trigger(self, state: _State) -> None:
         pass  # pragma: no cover
@@ -316,6 +306,20 @@ class _MQTTActionRestartUnit(_MQTTAction):
         systemctl_mqtt._dbus.service_manager.restart_unit(unit_name=self._unit_name)
 
 
+class _MQTTActionIsolateUnit(_MQTTAction):
+    # pylint: disable=protected-access,too-few-public-methods
+    def __init__(self, unit_name: str):
+        self._unit_name = unit_name
+
+    def is_allowed(self) -> bool:
+        return systemctl_mqtt._dbus.service_manager.is_isolate_unit_allowed(
+            unit_name=self._unit_name
+        )
+
+    def trigger(self, state: _State) -> None:
+        systemctl_mqtt._dbus.service_manager.isolate_unit(unit_name=self._unit_name)
+
+
 class _MQTTActionLockAllSessions(_MQTTAction):
     # pylint: disable=too-few-public-methods
     def trigger(self, state: _State) -> None:
@@ -350,6 +354,7 @@ async def _mqtt_message_loop(*, state: _State, mqtt_client: aiomqtt.Client) -> N
             ("start", _MQTTActionStartUnit),
             ("stop", _MQTTActionStopUnit),
             ("restart", _MQTTActionRestartUnit),
+            ("isolate", _MQTTActionIsolateUnit),
         ]:
             topic = (
                 state.mqtt_topic_prefix

+ 52 - 4
systemctl_mqtt/_dbus/service_manager.py

@@ -43,6 +43,11 @@ class ServiceManager(jeepney.MessageGenerator):
             remote_obj=self, method="GetUnit", signature="s", body=(name,)
         )
 
+    def LoadUnit(self, name: str) -> jeepney.low_level.Message:
+        return jeepney.new_method_call(
+            remote_obj=self, method="LoadUnit", signature="s", body=(name,)
+        )
+
     def StartUnit(self, name: str, mode: str) -> jeepney.low_level.Message:
         return jeepney.new_method_call(
             remote_obj=self,
@@ -122,12 +127,55 @@ def restart_unit(unit_name: str):
         _LOGGER.error("Failed to restart unit: %s because %s ", unit_name, exc.name)
 
 
+def isolate_unit(unit_name: str):
+    proxy = get_service_manager_proxy()
+    try:
+        proxy.StartUnit(unit_name, "isolate")
+        _LOGGER.debug("Isolating unit: %s", unit_name)
+    # pylint: disable=broad-exception-caught
+    except jeepney.wrappers.DBusErrorResponse as exc:
+        _LOGGER.error("Failed to isolate unit: %s because %s ", unit_name, exc.name)
+
+
+def is_isolate_unit_allowed(unit_name: str) -> bool:
+    if (unit_proxy := _get_unit_proxy(unit_name=unit_name)) is None:
+        return False
+
+    try:
+        ((_, allowed),) = unit_proxy.Get("AllowIsolate")
+        _LOGGER.debug("AllowIsolate for %s = %s", unit_name, allowed)
+        return bool(allowed)
+    # pylint: disable=broad-exception-caught
+    except jeepney.wrappers.DBusErrorResponse as exc:
+        _LOGGER.error(
+            "Failed to get AllowIsolate property of unit %s because %s",
+            unit_name,
+            exc.name,
+        )
+        return False
+
+
+def _get_connection() -> jeepney.io.blocking.DBusConnection:
+    return jeepney.io.blocking.open_dbus_connection(bus="SYSTEM")
+
+
+def _get_unit_proxy(unit_name: str) -> jeepney.io.blocking.Proxy | None:
+    connection = _get_connection()
+    proxy = jeepney.io.blocking.Proxy(msggen=ServiceManager(), connection=connection)
+    try:
+        (unit_path,) = proxy.LoadUnit(name=unit_name)
+    # pylint: disable=broad-exception-caught
+    except jeepney.wrappers.DBusErrorResponse as exc:
+        _LOGGER.error("Failed to load unit: %s because %s", unit_name, exc.name)
+        return None
+    return jeepney.io.blocking.Proxy(
+        msggen=Unit(object_path=unit_path), connection=connection
+    )
+
+
 def get_service_manager_proxy() -> jeepney.io.blocking.Proxy:
     # https://jeepney.readthedocs.io/en/latest/integrate.html
     # https://gitlab.com/takluyver/jeepney/-/blob/master/examples/aio_notify.py
     return jeepney.io.blocking.Proxy(
-        msggen=ServiceManager(),
-        connection=jeepney.io.blocking.open_dbus_connection(
-            bus="SYSTEM",
-        ),
+        msggen=ServiceManager(), connection=_get_connection()
     )

+ 94 - 6
tests/dbus/message-generators/test_service_manager.py

@@ -65,15 +65,102 @@ async def test__get_unit_path() -> None:
     assert not send_kwargs
 
 
+def test__get_unit_proxy():
+    unit_proxy = unittest.mock.MagicMock()
+    manager_proxy = unittest.mock.MagicMock()
+    manager_proxy.LoadUnit.return_value = ("/unit/foo",)
+
+    with unittest.mock.patch(
+        "systemctl_mqtt._dbus.service_manager._get_connection", return_value=object()
+    ), unittest.mock.patch(
+        "jeepney.io.blocking.Proxy", side_effect=(manager_proxy, unit_proxy)
+    ):
+        assert (
+            systemctl_mqtt._dbus.service_manager._get_unit_proxy("foo.service")
+            is unit_proxy
+        )
+
+    manager_proxy.LoadUnit.assert_called_once_with(name="foo.service")
+
+
 @pytest.mark.parametrize(
-    "action,method",
+    ("function_name", "property_name", "propery_value", "return_value"),
     [
-        ("start", "StartUnit"),
-        ("stop", "StopUnit"),
-        ("restart", "RestartUnit"),
+        ("is_isolate_unit_allowed", "AllowIsolate", True, True),
+        ("is_isolate_unit_allowed", "AllowIsolate", False, False),
+    ],
+)
+def test__unit_property(function_name, property_name, propery_value, return_value):
+    mock_unit_proxy = unittest.mock.MagicMock()
+    mock_unit_proxy.Get.return_value = ((None, propery_value),)
+    with unittest.mock.patch(
+        "systemctl_mqtt._dbus.service_manager._get_unit_proxy",
+        return_value=mock_unit_proxy,
+    ):
+        # call the wrapper function dynamically
+        assert (
+            getattr(systemctl_mqtt._dbus.service_manager, function_name)("foo.service")
+            is return_value
+        )
+        mock_unit_proxy.Get.assert_called_once_with(property_name)
+
+
+@pytest.mark.parametrize(
+    "function_name",
+    ["is_isolate_unit_allowed"],
+)
+def test__unit_property_with_exception_on_load_unit(function_name):
+    with unittest.mock.patch(
+        "systemctl_mqtt._dbus.service_manager.ServiceManager.LoadUnit",
+        side_effect=DBusErrorResponseMock("DBus error", ("mocked",)),
+    ), unittest.mock.patch(
+        "systemctl_mqtt._dbus.service_manager._LOGGER"
+    ) as mock_logger:
+        assert (
+            getattr(systemctl_mqtt._dbus.service_manager, function_name)("foo.service")
+            is False
+        )
+        mock_logger.error.assert_called_once_with(
+            "Failed to load unit: %s because %s",
+            "foo.service",
+            "DBus error",
+        )
+
+
+@pytest.mark.parametrize(
+    ("function_name", "property_name"),
+    [("is_isolate_unit_allowed", "AllowIsolate")],
+)
+def test__unit_property_with_exception_on_get(function_name, property_name):
+    mock_proxy = unittest.mock.MagicMock()
+    mock_proxy.Get.side_effect = DBusErrorResponseMock("DBus error", ("mocked",))
+    with unittest.mock.patch(
+        "systemctl_mqtt._dbus.service_manager._get_unit_proxy",
+        return_value=mock_proxy,
+    ), unittest.mock.patch(
+        "systemctl_mqtt._dbus.service_manager._LOGGER"
+    ) as mock_logger:
+        assert (
+            getattr(systemctl_mqtt._dbus.service_manager, function_name)("foo.service")
+            is False
+        )
+        mock_logger.error.assert_called_once_with(
+            f"Failed to get {property_name} property of unit %s because %s",
+            "foo.service",
+            "DBus error",
+        )
+
+
+@pytest.mark.parametrize(
+    "action,method,mode",
+    [
+        ("start", "StartUnit", "replace"),
+        ("stop", "StopUnit", "replace"),
+        ("restart", "RestartUnit", "replace"),
+        ("isolate", "StartUnit", "isolate"),
     ],
 )
-def test__unit_proxy(action, method):
+def test__unit_proxy(action, method, mode):
     mock_proxy = unittest.mock.MagicMock()
     with unittest.mock.patch(
         "systemctl_mqtt._dbus.service_manager.get_service_manager_proxy",
@@ -81,7 +168,7 @@ def test__unit_proxy(action, method):
     ):
         # call the wrapper function dynamically
         getattr(systemctl_mqtt._dbus.service_manager, f"{action}_unit")("foo.service")
-        getattr(mock_proxy, method).assert_called_once_with("foo.service", "replace")
+        getattr(mock_proxy, method).assert_called_once_with("foo.service", mode)
 
 
 @pytest.mark.parametrize(
@@ -112,6 +199,7 @@ def test__unit_method_call(method):
         ("start", "StartUnit"),
         ("stop", "StopUnit"),
         ("restart", "RestartUnit"),
+        ("isolate", "StartUnit"),
     ],
 )
 def test__unit_with_exception(action, method):

+ 1 - 1
tests/test_mqtt.py

@@ -444,7 +444,7 @@ def test_state_get_system_unit_active_state_mqtt_topic(
 @pytest.mark.filterwarnings("ignore:coroutine '_mqtt_message_loop' was never awaited")
 @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host"])
 @pytest.mark.parametrize("unit_name", ["foo.service", "bar.service"])
-@pytest.mark.parametrize("action", ["restart", "start", "stop"])
+@pytest.mark.parametrize("action", ["restart", "start", "stop", "isolate"])
 async def test__mqtt_message_loop_triggers_unit_action(
     caplog: pytest.LogCaptureFixture,
     mqtt_topic_prefix: str,

+ 40 - 0
tests/test_state_dbus.py

@@ -263,3 +263,43 @@ async def test_publish_homeassistant_device_config(
             for action in ["restart", "start", "stop"]
         },
     }
+
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize("is_allowed", [True, False])
+@pytest.mark.parametrize(
+    ("action_cls", "action_name"),
+    [
+        (systemctl_mqtt._MQTTActionStartUnit, "start"),
+        (systemctl_mqtt._MQTTActionStopUnit, "stop"),
+        (systemctl_mqtt._MQTTActionRestartUnit, "restart"),
+        (systemctl_mqtt._MQTTActionIsolateUnit, "isolate"),
+    ],
+)
+async def test_publish_homeassistant_device_config_filter(
+    action_cls: systemctl_mqtt._MQTTAction,
+    action_name: str,
+    is_allowed: bool,
+) -> None:
+    mqtt_client = unittest.mock.AsyncMock()
+    with unittest.mock.patch("jeepney.io.blocking.open_dbus_connection"):
+        state = systemctl_mqtt._State(
+            mqtt_topic_prefix="topic",
+            homeassistant_discovery_prefix="homeassistant",
+            homeassistant_discovery_object_id="obj",
+            poweroff_delay=datetime.timedelta(),
+            monitored_system_unit_names=[],
+            controlled_system_unit_names=["foo.service"],
+        )
+    with unittest.mock.patch(
+        "systemctl_mqtt._utils.get_hostname", return_value="host"
+    ), unittest.mock.patch.object(action_cls, "is_allowed", return_value=is_allowed):
+        await state.publish_homeassistant_device_config(mqtt_client=mqtt_client)
+
+    mqtt_client.publish.assert_awaited_once()
+    _, publish_kwargs = mqtt_client.publish.call_args
+    config = json.loads(publish_kwargs["payload"])
+
+    assert (
+        f"unit/system/foo.service/{action_name}" in config["components"]
+    ) == is_allowed