test_relay_switch.py 13 KB


  1. from unittest.mock import AsyncMock, MagicMock, patch
  2. import pytest
  3. from bleak.backends.device import BLEDevice
  4. from switchbot import SwitchBotAdvertisement, SwitchbotEncryptedDevice, SwitchbotModel
  5. from switchbot.devices import relay_switch
  6. from .test_adv_parser import generate_ble_device
  7. common_params = [
  8. (b";\x00\x00\x00", SwitchbotModel.RELAY_SWITCH_1),
  9. (b"<\x00\x00\x00", SwitchbotModel.RELAY_SWITCH_1PM),
  10. (b'>\x00\x00\x00', SwitchbotModel.GARAGE_DOOR_OPENER),
  11. ]
  12. @pytest.fixture
  13. def common_parametrize_2pm():
  14. """Provide common test data."""
  15. return {
  16. "rawAdvData": b"\x00\x00\x00\x00\x00\x00",
  17. "model": SwitchbotModel.RELAY_SWITCH_2PM,
  18. }
  19. def create_device_for_command_testing(
  20. rawAdvData: bytes, model: str, init_data: dict | None = None):
  21. """Create a device for command testing."""
  22. ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
  23. device_class = (
  24. relay_switch.SwitchbotRelaySwitch2PM
  25. if model == SwitchbotModel.RELAY_SWITCH_2PM
  26. else relay_switch.SwitchbotRelaySwitch
  27. )
  28. device = device_class(
  29. ble_device, "ff", "ffffffffffffffffffffffffffffffff", model=model
  30. )
  31. device.update_from_advertisement(
  32. make_advertisement_data(ble_device, rawAdvData, model, init_data)
  33. )
  34. device._send_command = AsyncMock()
  35. device._check_command_result = MagicMock()
  36. device.update = AsyncMock()
  37. return device
  38. def make_advertisement_data(
  39. ble_device: BLEDevice, rawAdvData: bytes, model: str, init_data: dict | None = None
  40. ):
  41. """Set advertisement data with defaults."""
  42. if init_data is None:
  43. init_data = {}
  44. if model == SwitchbotModel.RELAY_SWITCH_2PM:
  45. return SwitchBotAdvertisement(
  46. address="aa:bb:cc:dd:ee:ff",
  47. data={
  48. "rawAdvData": rawAdvData,
  49. "data": {
  50. 1: {
  51. "switchMode": True,
  52. "sequence_number": 99,
  53. "isOn": True,
  54. },
  55. 2: {
  56. "switchMode": True,
  57. "sequence_number": 99,
  58. "isOn": False,
  59. },
  60. }
  61. | init_data,
  62. "isEncrypted": False,
  63. },
  64. device=ble_device,
  65. rssi=-80,
  66. active=True,
  67. )
  68. if model == SwitchbotModel.GARAGE_DOOR_OPENER:
  69. return SwitchBotAdvertisement(
  70. address="aa:bb:cc:dd:ee:ff",
  71. data={
  72. "rawAdvData": rawAdvData,
  73. "data": {
  74. "switchMode": True,
  75. "sequence_number": 96,
  76. "isOn": True,
  77. "door_open": False,
  78. } | init_data,
  79. "isEncrypted": False,
  80. },
  81. device=ble_device,
  82. rssi=-80,
  83. active=True,
  84. )
  85. return SwitchBotAdvertisement(
  86. address="aa:bb:cc:dd:ee:ff",
  87. data={
  88. "rawAdvData": rawAdvData,
  89. "data": {
  90. "switchMode": True,
  91. "sequence_number": 96,
  92. "isOn": True,
  93. }
  94. | init_data,
  95. "isEncrypted": False,
  96. },
  97. device=ble_device,
  98. rssi=-80,
  99. active=True,
  100. )
  101. @pytest.mark.asyncio
  102. @pytest.mark.parametrize(
  103. "init_data",
  104. [
  105. {1: {"isOn": True}, 2: {"isOn": True}},
  106. ],
  107. )
  108. async def test_turn_on_2PM(common_parametrize_2pm, init_data):
  109. """Test turn on command."""
  110. device = create_device_for_command_testing(common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"], init_data)
  111. await device.turn_on(1)
  112. device._send_command.assert_called_with(
  113. relay_switch.MULTI_CHANNEL_COMMANDS_TURN_ON[common_parametrize_2pm["model"]][1]
  114. )
  115. assert device.is_on(1) is True
  116. await device.turn_on(2)
  117. device._send_command.assert_called_with(
  118. relay_switch.MULTI_CHANNEL_COMMANDS_TURN_ON[common_parametrize_2pm["model"]][2]
  119. )
  120. assert device.is_on(2) is True
  121. @pytest.mark.asyncio
  122. @pytest.mark.parametrize(
  123. "init_data",
  124. [
  125. {1: {"isOn": False}, 2: {"isOn": False}},
  126. ],
  127. )
  128. async def test_turn_off_2PM(common_parametrize_2pm, init_data):
  129. """Test turn off command."""
  130. device = create_device_for_command_testing(common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"], init_data)
  131. await device.turn_off(1)
  132. device._send_command.assert_called_with(
  133. relay_switch.MULTI_CHANNEL_COMMANDS_TURN_OFF[common_parametrize_2pm["model"]][1]
  134. )
  135. assert device.is_on(1) is False
  136. await device.turn_off(2)
  137. device._send_command.assert_called_with(
  138. relay_switch.MULTI_CHANNEL_COMMANDS_TURN_OFF[common_parametrize_2pm["model"]][2]
  139. )
  140. assert device.is_on(2) is False
  141. @pytest.mark.asyncio
  142. async def test_turn_toggle_2PM(common_parametrize_2pm):
  143. """Test toggle command."""
  144. device = create_device_for_command_testing(common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"])
  145. await device.async_toggle(1)
  146. device._send_command.assert_called_with(
  147. relay_switch.MULTI_CHANNEL_COMMANDS_TOGGLE[common_parametrize_2pm["model"]][1]
  148. )
  149. assert device.is_on(1) is True
  150. await device.async_toggle(2)
  151. device._send_command.assert_called_with(
  152. relay_switch.MULTI_CHANNEL_COMMANDS_TOGGLE[common_parametrize_2pm["model"]][2]
  153. )
  154. assert device.is_on(2) is False
  155. @pytest.mark.asyncio
  156. async def test_get_switch_mode_2PM(common_parametrize_2pm):
  157. """Test get switch mode."""
  158. device = create_device_for_command_testing(common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"])
  159. assert device.switch_mode(1) is True
  160. assert device.switch_mode(2) is True
  161. @pytest.mark.asyncio
  162. @pytest.mark.parametrize(
  163. ("info_data", "result"),
  164. [
  165. (
  166. {
  167. "basic_info": b'\x01\x98A\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10',
  168. "channel1_info": b'\x01\x00\x00\x00\x00\x00\x00\x02\x99\x00\xe9\x00\x03\x00\x00',
  169. "channel2_info": b"\x01\x00\x055\x00'<\x02\x9f\x00\xe9\x01,\x00F",
  170. },
  171. [False, 0, 0, 0, 0, True, 0.02, 23, 0.3, 7.0],
  172. ),
  173. (
  174. {
  175. "basic_info": b'\x01\x9e\x81\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10',
  176. "channel1_info": b'\x01\x00\x00\x00\x00\x00\x00\x02\x99\x00\xe9\x00\x03\x00\x00',
  177. "channel2_info": b"\x01\x00\x05\xbc\x00'<\x02\xb1\x00\xea\x01-\x00F",
  178. },
  179. [True, 0, 23, 0.1, 0.0, False, 0.02, 0, 0, 0],
  180. )
  181. ],
  182. )
  183. async def test_get_basic_info_2PM(common_parametrize_2pm, info_data, result):
  184. """Test get_basic_info for 2PM devices."""
  185. device = create_device_for_command_testing(common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"])
  186. assert device.channel == 2
  187. device.get_current_time_and_start_time = MagicMock(return_value=("683074d6", "682fba80"))
  188. async def mock_get_basic_info(arg):
  189. if arg == relay_switch.COMMAND_GET_BASIC_INFO:
  190. return info_data["basic_info"]
  191. if arg == relay_switch.COMMAND_GET_CHANNEL1_INFO.format("683074d6", "682fba80"):
  192. return info_data["channel1_info"]
  193. if arg == relay_switch.COMMAND_GET_CHANNEL2_INFO.format("683074d6", "682fba80"):
  194. return info_data["channel2_info"]
  195. return None
  196. device._get_basic_info = AsyncMock(side_effect=mock_get_basic_info)
  197. info = await device.get_basic_info()
  198. assert info is not None
  199. assert 1 in info
  200. assert 2 in info
  201. assert info[1]["isOn"] == result[0]
  202. assert info[1]["energy"] == result[1]
  203. assert info[1]["voltage"] == result[2]
  204. assert info[1]["current"] == result[3]
  205. assert info[1]["power"] == result[4]
  206. assert info[2]["isOn"] == result[5]
  207. assert info[2]["energy"] == result[6]
  208. assert info[2]["voltage"] == result[7]
  209. assert info[2]["current"] == result[8]
  210. assert info[2]["power"] == result[9]
  211. @pytest.mark.asyncio
  212. @pytest.mark.parametrize(
  213. "info_data",
  214. [
  215. {
  216. "basic_info": None,
  217. "channel1_info": b'\x01\x00\x00\x00\x00\x00\x00\x02\x99\x00\xe9\x00\x03\x00\x00',
  218. "channel2_info": b"\x01\x00\x055\x00'<\x02\x9f\x00\xe9\x01,\x00F",
  219. },
  220. {
  221. "basic_info": b'\x01\x98A\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10',
  222. "channel1_info": None,
  223. "channel2_info": b"\x01\x00\x055\x00'<\x02\x9f\x00\xe9\x01,\x00F",
  224. },
  225. {
  226. "basic_info": b'\x01\x98A\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10',
  227. "channel1_info": b'\x01\x00\x00\x00\x00\x00\x00\x02\x99\x00\xe9\x00\x03\x00\x00',
  228. "channel2_info": None,
  229. },
  230. ],
  231. )
  232. async def test_basic_info_exceptions_2PM(common_parametrize_2pm, info_data):
  233. """Test get_basic_info exceptions."""
  234. device = create_device_for_command_testing(common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"])
  235. device.get_current_time_and_start_time = MagicMock(return_value=("683074d6", "682fba80"))
  236. async def mock_get_basic_info(arg):
  237. if arg == relay_switch.COMMAND_GET_BASIC_INFO:
  238. return info_data["basic_info"]
  239. if arg == relay_switch.COMMAND_GET_CHANNEL1_INFO.format("683074d6", "682fba80"):
  240. return info_data["channel1_info"]
  241. if arg == relay_switch.COMMAND_GET_CHANNEL2_INFO.format("683074d6", "682fba80"):
  242. return info_data["channel2_info"]
  243. return None
  244. device._get_basic_info = AsyncMock(side_effect=mock_get_basic_info)
  245. info = await device.get_basic_info()
  246. assert info is None
  247. @pytest.mark.asyncio
  248. async def test_get_parsed_data_2PM(common_parametrize_2pm):
  249. """Test get_parsed_data for 2PM devices."""
  250. device = create_device_for_command_testing(common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"])
  251. info = device.get_parsed_data(1)
  252. assert info["isOn"] is True
  253. info = device.get_parsed_data(2)
  254. assert info["isOn"] is False
  255. @pytest.mark.asyncio
  256. @pytest.mark.parametrize(
  257. ("rawAdvData", "model"),
  258. common_params,
  259. )
  260. async def test_turn_on(rawAdvData, model):
  261. """Test turn on command."""
  262. device = create_device_for_command_testing(rawAdvData, model)
  263. await device.turn_on()
  264. device._send_command.assert_awaited_once_with(
  265. relay_switch.COMMAND_TURN_ON
  266. )
  267. assert device.is_on() is True
  268. @pytest.mark.asyncio
  269. @pytest.mark.parametrize(
  270. ("rawAdvData", "model"),
  271. common_params,
  272. )
  273. async def test_turn_off(rawAdvData, model):
  274. """Test turn off command."""
  275. device = create_device_for_command_testing(rawAdvData, model, {"isOn": False})
  276. await device.turn_off()
  277. device._send_command.assert_awaited_once_with(
  278. relay_switch.COMMAND_TURN_OFF
  279. )
  280. assert device.is_on() is False
  281. @pytest.mark.asyncio
  282. @pytest.mark.parametrize(
  283. ("rawAdvData", "model"),
  284. common_params,
  285. )
  286. async def test_toggle(rawAdvData, model):
  287. """Test toggle command."""
  288. device = create_device_for_command_testing(rawAdvData, model)
  289. await device.async_toggle()
  290. device._send_command.assert_awaited_once_with(
  291. relay_switch.COMMAND_TOGGLE
  292. )
  293. assert device.is_on() is True
  294. @pytest.mark.asyncio
  295. @pytest.mark.parametrize(
  296. ("rawAdvData", "model", "info_data"),
  297. [
  298. (
  299. b'>\x00\x00\x00',
  300. SwitchbotModel.GARAGE_DOOR_OPENER,
  301. {
  302. "basic_info": b'\x01>\x80\x0c\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x10',
  303. "channel1_info": b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
  304. },
  305. )
  306. ]
  307. )
  308. async def test_get_basic_info_garage_door_opener(rawAdvData, model, info_data):
  309. """Test get_basic_info for garage door opener."""
  310. device = create_device_for_command_testing(rawAdvData, model)
  311. device.get_current_time_and_start_time = MagicMock(return_value=("683074d6", "682fba80"))
  312. async def mock_get_basic_info(arg):
  313. if arg == relay_switch.COMMAND_GET_BASIC_INFO:
  314. return info_data["basic_info"]
  315. if arg == relay_switch.COMMAND_GET_CHANNEL1_INFO.format("683074d6", "682fba80"):
  316. return info_data["channel1_info"]
  317. return None
  318. device._get_basic_info = AsyncMock(side_effect=mock_get_basic_info)
  319. info = await device.get_basic_info()
  320. assert info is not None
  321. assert info["isOn"] is True
  322. assert info["door_open"] is True
  323. @pytest.mark.asyncio
  324. @pytest.mark.parametrize(
  325. "model",
  326. [
  327. SwitchbotModel.RELAY_SWITCH_1,
  328. SwitchbotModel.RELAY_SWITCH_1PM,
  329. SwitchbotModel.GARAGE_DOOR_OPENER,
  330. SwitchbotModel.RELAY_SWITCH_2PM,
  331. ],
  332. )
  333. @patch.object(SwitchbotEncryptedDevice, "verify_encryption_key", new_callable=AsyncMock)
  334. async def test_verify_encryption_key(mock_parent_verify, model):
  335. ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
  336. key_id = "ff"
  337. encryption_key = "ffffffffffffffffffffffffffffffff"
  338. mock_parent_verify.return_value = True
  339. result = await relay_switch.SwitchbotRelaySwitch.verify_encryption_key(
  340. device=ble_device,
  341. key_id=key_id,
  342. encryption_key=encryption_key,
  343. model=model,
  344. )
  345. mock_parent_verify.assert_awaited_once_with(
  346. ble_device,
  347. key_id,
  348. encryption_key,
  349. model,
  350. )
  351. assert result is True