| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 | # systemctl-mqtt - MQTT client triggering shutdown on systemd-based systems## Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>## This program is free software: you can redistribute it and/or modify# it under the terms of the GNU General Public License as published by# the Free Software Foundation, either version 3 of the License, or# any later version.## This program is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the# GNU General Public License for more details.## You should have received a copy of the GNU General Public License# along with this program.  If not, see <https://www.gnu.org/licenses/>.import loggingimport unittest.mockimport dbus.typesimport pytestimport systemctl_mqtt# pylint: disable=protected-accessdef test_shutdown_lock():    lock_fd = unittest.mock.MagicMock()    with unittest.mock.patch("systemctl_mqtt._get_login_manager"):        state = systemctl_mqtt._State(mqtt_topic_prefix="any")        state._login_manager.Inhibit.return_value = lock_fd        state.acquire_shutdown_lock()    state._login_manager.Inhibit.assert_called_once_with(        "shutdown", "systemctl-mqtt", "Report shutdown via MQTT", "delay",    )    assert state._shutdown_lock == lock_fd    # https://dbus.freedesktop.org/doc/dbus-python/dbus.types.html#dbus.types.UnixFd.take    lock_fd.take.return_value = "fdnum"    with unittest.mock.patch("os.close") as close_mock:        state.release_shutdown_lock()    close_mock.assert_called_once_with("fdnum")@pytest.mark.parametrize("active", [True, False])def test_prepare_for_shutdown_handler(caplog, active):    with unittest.mock.patch("systemctl_mqtt._get_login_manager"):        state = systemctl_mqtt._State(mqtt_topic_prefix="any")    mqtt_client_mock = unittest.mock.MagicMock()    state.register_prepare_for_shutdown_handler(mqtt_client=mqtt_client_mock)    # pylint: disable=no-member,comparison-with-callable    connect_to_signal_kwargs = state._login_manager.connect_to_signal.call_args[1]    assert connect_to_signal_kwargs["signal_name"] == "PrepareForShutdown"    handler_function = connect_to_signal_kwargs["handler_function"]    assert handler_function.func == state._prepare_for_shutdown_handler    with unittest.mock.patch.object(        state, "acquire_shutdown_lock"    ) as acquire_lock_mock, unittest.mock.patch.object(        state, "release_shutdown_lock"    ) as release_lock_mock:        handler_function(dbus.types.Boolean(active))    if active:        acquire_lock_mock.assert_not_called()        release_lock_mock.assert_called_once_with()    else:        acquire_lock_mock.assert_called_once_with()        release_lock_mock.assert_not_called()    mqtt_client_mock.publish.assert_called_once_with(        topic="any/preparing-for-shutdown", payload="true" if active else "false",    )    assert len(caplog.records) == 1    assert caplog.records[0].levelno == logging.ERROR    assert caplog.records[0].message.startswith(        "failed to publish on any/preparing-for-shutdown"    )
 |