test_relay_switch.py 16 KB


  1. from unittest.mock import AsyncMock, MagicMock, patch
  2. import pytest
  3. from bleak.backends.device import BLEDevice
  4. from switchbot import SwitchBotAdvertisement, SwitchbotEncryptedDevice, SwitchbotModel
  5. from switchbot.devices import relay_switch
  6. from switchbot.devices.device import _merge_data as merge_data
  7. from .test_adv_parser import generate_ble_device
  8. common_params = [
  9. (b";\x00\x00\x00", SwitchbotModel.RELAY_SWITCH_1),
  10. (b"<\x00\x00\x00", SwitchbotModel.RELAY_SWITCH_1PM),
  11. (b">\x00\x00\x00", SwitchbotModel.GARAGE_DOOR_OPENER),
  12. (b"?\x00\x00\x00", SwitchbotModel.PLUG_MINI_EU),
  13. ]
  14. @pytest.fixture
  15. def common_parametrize_2pm():
  16. """Provide common test data."""
  17. return {
  18. "rawAdvData": b"\x00\x00\x00\x00\x00\x00",
  19. "model": SwitchbotModel.RELAY_SWITCH_2PM,
  20. }
  21. def create_device_for_command_testing(
  22. rawAdvData: bytes, model: str, init_data: dict | None = None
  23. ):
  24. """Create a device for command testing."""
  25. ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
  26. if model == SwitchbotModel.GARAGE_DOOR_OPENER:
  27. device_class = relay_switch.SwitchbotGarageDoorOpener
  28. elif model == SwitchbotModel.RELAY_SWITCH_2PM:
  29. device_class = relay_switch.SwitchbotRelaySwitch2PM
  30. else:
  31. device_class = relay_switch.SwitchbotRelaySwitch
  32. device = device_class(
  33. ble_device, "ff", "ffffffffffffffffffffffffffffffff", model=model
  34. )
  35. device.update_from_advertisement(
  36. make_advertisement_data(ble_device, rawAdvData, model, init_data)
  37. )
  38. device._send_command = AsyncMock()
  39. device._check_command_result = MagicMock()
  40. device.update = AsyncMock()
  41. return device
  42. def make_advertisement_data(
  43. ble_device: BLEDevice, rawAdvData: bytes, model: str, init_data: dict | None = None
  44. ):
  45. """Set advertisement data with defaults."""
  46. if init_data is None:
  47. init_data = {}
  48. if model == SwitchbotModel.RELAY_SWITCH_2PM:
  49. return SwitchBotAdvertisement(
  50. address="aa:bb:cc:dd:ee:ff",
  51. data={
  52. "rawAdvData": rawAdvData,
  53. "data": {
  54. 1: {
  55. "switchMode": True,
  56. "sequence_number": 99,
  57. "isOn": True,
  58. },
  59. 2: {
  60. "switchMode": True,
  61. "sequence_number": 99,
  62. "isOn": False,
  63. },
  64. }
  65. | init_data,
  66. "isEncrypted": False,
  67. },
  68. device=ble_device,
  69. rssi=-80,
  70. active=True,
  71. )
  72. if model == SwitchbotModel.GARAGE_DOOR_OPENER:
  73. return SwitchBotAdvertisement(
  74. address="aa:bb:cc:dd:ee:ff",
  75. data={
  76. "rawAdvData": rawAdvData,
  77. "data": {
  78. "switchMode": True,
  79. "sequence_number": 96,
  80. "isOn": True,
  81. "door_open": False,
  82. }
  83. | init_data,
  84. "isEncrypted": False,
  85. },
  86. device=ble_device,
  87. rssi=-80,
  88. active=True,
  89. )
  90. return SwitchBotAdvertisement(
  91. address="aa:bb:cc:dd:ee:ff",
  92. data={
  93. "rawAdvData": rawAdvData,
  94. "data": {
  95. "switchMode": True,
  96. "sequence_number": 96,
  97. "isOn": True,
  98. }
  99. | init_data,
  100. "isEncrypted": False,
  101. },
  102. device=ble_device,
  103. rssi=-80,
  104. active=True,
  105. )
  106. @pytest.mark.asyncio
  107. @pytest.mark.parametrize(
  108. "init_data",
  109. [
  110. {1: {"isOn": True}, 2: {"isOn": True}},
  111. ],
  112. )
  113. async def test_turn_on_2PM(common_parametrize_2pm, init_data):
  114. """Test turn on command."""
  115. device = create_device_for_command_testing(
  116. common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"], init_data
  117. )
  118. await device.turn_on(1)
  119. device._send_command.assert_called_with(
  120. relay_switch.MULTI_CHANNEL_COMMANDS_TURN_ON[common_parametrize_2pm["model"]][1]
  121. )
  122. assert device.is_on(1) is True
  123. await device.turn_on(2)
  124. device._send_command.assert_called_with(
  125. relay_switch.MULTI_CHANNEL_COMMANDS_TURN_ON[common_parametrize_2pm["model"]][2]
  126. )
  127. assert device.is_on(2) is True
  128. @pytest.mark.asyncio
  129. @pytest.mark.parametrize(
  130. "init_data",
  131. [
  132. {1: {"isOn": False}, 2: {"isOn": False}},
  133. ],
  134. )
  135. async def test_turn_off_2PM(common_parametrize_2pm, init_data):
  136. """Test turn off command."""
  137. device = create_device_for_command_testing(
  138. common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"], init_data
  139. )
  140. await device.turn_off(1)
  141. device._send_command.assert_called_with(
  142. relay_switch.MULTI_CHANNEL_COMMANDS_TURN_OFF[common_parametrize_2pm["model"]][1]
  143. )
  144. assert device.is_on(1) is False
  145. await device.turn_off(2)
  146. device._send_command.assert_called_with(
  147. relay_switch.MULTI_CHANNEL_COMMANDS_TURN_OFF[common_parametrize_2pm["model"]][2]
  148. )
  149. assert device.is_on(2) is False
  150. @pytest.mark.asyncio
  151. async def test_turn_toggle_2PM(common_parametrize_2pm):
  152. """Test toggle command."""
  153. device = create_device_for_command_testing(
  154. common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"]
  155. )
  156. await device.async_toggle(1)
  157. device._send_command.assert_called_with(
  158. relay_switch.MULTI_CHANNEL_COMMANDS_TOGGLE[common_parametrize_2pm["model"]][1]
  159. )
  160. assert device.is_on(1) is True
  161. await device.async_toggle(2)
  162. device._send_command.assert_called_with(
  163. relay_switch.MULTI_CHANNEL_COMMANDS_TOGGLE[common_parametrize_2pm["model"]][2]
  164. )
  165. assert device.is_on(2) is False
  166. @pytest.mark.asyncio
  167. async def test_get_switch_mode_2PM(common_parametrize_2pm):
  168. """Test get switch mode."""
  169. device = create_device_for_command_testing(
  170. common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"]
  171. )
  172. assert device.switch_mode(1) is True
  173. assert device.switch_mode(2) is True
  174. @pytest.mark.asyncio
  175. @pytest.mark.parametrize(
  176. ("info_data", "result"),
  177. [
  178. (
  179. {
  180. "basic_info": b"\x01\x98A\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10",
  181. "channel1_info": b"\x01\x00\x00\x00\x00\x00\x00\x02\x99\x00\xe9\x00\x03\x00\x00",
  182. "channel2_info": b"\x01\x00\x055\x00'<\x02\x9f\x00\xe9\x01,\x00F",
  183. },
  184. [False, 0, 0, 0, 0, True, 0.02, 23, 0.3, 7.0],
  185. ),
  186. (
  187. {
  188. "basic_info": b"\x01\x9e\x81\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10",
  189. "channel1_info": b"\x01\x00\x00\x00\x00\x00\x00\x02\x99\x00\xe9\x00\x03\x00\x00",
  190. "channel2_info": b"\x01\x00\x05\xbc\x00'<\x02\xb1\x00\xea\x01-\x00F",
  191. },
  192. [True, 0, 23, 0.1, 0.0, False, 0.02, 0, 0, 0],
  193. ),
  194. ],
  195. )
  196. async def test_get_basic_info_2PM(common_parametrize_2pm, info_data, result):
  197. """Test get_basic_info for 2PM devices."""
  198. device = create_device_for_command_testing(
  199. common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"]
  200. )
  201. assert device.channel == 2
  202. device.get_current_time_and_start_time = MagicMock(
  203. return_value=("683074d6", "682fba80")
  204. )
  205. async def mock_get_basic_info(arg):
  206. if arg == relay_switch.COMMAND_GET_BASIC_INFO:
  207. return info_data["basic_info"]
  208. if arg == relay_switch.COMMAND_GET_CHANNEL1_INFO.format("683074d6", "682fba80"):
  209. return info_data["channel1_info"]
  210. if arg == relay_switch.COMMAND_GET_CHANNEL2_INFO.format("683074d6", "682fba80"):
  211. return info_data["channel2_info"]
  212. return None
  213. device._get_basic_info = AsyncMock(side_effect=mock_get_basic_info)
  214. info = await device.get_basic_info()
  215. assert info is not None
  216. assert 1 in info
  217. assert 2 in info
  218. assert info[1]["isOn"] == result[0]
  219. assert info[1]["energy"] == result[1]
  220. assert info[1]["voltage"] == result[2]
  221. assert info[1]["current"] == result[3]
  222. assert info[1]["power"] == result[4]
  223. assert info[2]["isOn"] == result[5]
  224. assert info[2]["energy"] == result[6]
  225. assert info[2]["voltage"] == result[7]
  226. assert info[2]["current"] == result[8]
  227. assert info[2]["power"] == result[9]
  228. @pytest.mark.asyncio
  229. @pytest.mark.parametrize(
  230. "info_data",
  231. [
  232. {
  233. "basic_info": None,
  234. "channel1_info": b"\x01\x00\x00\x00\x00\x00\x00\x02\x99\x00\xe9\x00\x03\x00\x00",
  235. "channel2_info": b"\x01\x00\x055\x00'<\x02\x9f\x00\xe9\x01,\x00F",
  236. },
  237. {
  238. "basic_info": b"\x01\x98A\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10",
  239. "channel1_info": None,
  240. "channel2_info": b"\x01\x00\x055\x00'<\x02\x9f\x00\xe9\x01,\x00F",
  241. },
  242. {
  243. "basic_info": b"\x01\x98A\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10",
  244. "channel1_info": b"\x01\x00\x00\x00\x00\x00\x00\x02\x99\x00\xe9\x00\x03\x00\x00",
  245. "channel2_info": None,
  246. },
  247. ],
  248. )
  249. async def test_basic_info_exceptions_2PM(common_parametrize_2pm, info_data):
  250. """Test get_basic_info exceptions."""
  251. device = create_device_for_command_testing(
  252. common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"]
  253. )
  254. device.get_current_time_and_start_time = MagicMock(
  255. return_value=("683074d6", "682fba80")
  256. )
  257. async def mock_get_basic_info(arg):
  258. if arg == relay_switch.COMMAND_GET_BASIC_INFO:
  259. return info_data["basic_info"]
  260. if arg == relay_switch.COMMAND_GET_CHANNEL1_INFO.format("683074d6", "682fba80"):
  261. return info_data["channel1_info"]
  262. if arg == relay_switch.COMMAND_GET_CHANNEL2_INFO.format("683074d6", "682fba80"):
  263. return info_data["channel2_info"]
  264. return None
  265. device._get_basic_info = AsyncMock(side_effect=mock_get_basic_info)
  266. info = await device.get_basic_info()
  267. assert info is None
  268. @pytest.mark.asyncio
  269. async def test_get_parsed_data_2PM(common_parametrize_2pm):
  270. """Test get_parsed_data for 2PM devices."""
  271. device = create_device_for_command_testing(
  272. common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"]
  273. )
  274. info = device.get_parsed_data(1)
  275. assert info["isOn"] is True
  276. info = device.get_parsed_data(2)
  277. assert info["isOn"] is False
  278. @pytest.mark.asyncio
  279. @pytest.mark.parametrize(
  280. ("rawAdvData", "model"),
  281. common_params,
  282. )
  283. async def test_turn_on(rawAdvData, model):
  284. """Test turn on command."""
  285. device = create_device_for_command_testing(rawAdvData, model)
  286. await device.turn_on()
  287. device._send_command.assert_awaited_once_with(device._turn_on_command)
  288. assert device.is_on() is True
  289. @pytest.mark.asyncio
  290. @pytest.mark.parametrize(
  291. ("rawAdvData", "model"),
  292. common_params,
  293. )
  294. async def test_turn_off(rawAdvData, model):
  295. """Test turn off command."""
  296. device = create_device_for_command_testing(rawAdvData, model, {"isOn": False})
  297. await device.turn_off()
  298. device._send_command.assert_awaited_once_with(device._turn_off_command)
  299. assert device.is_on() is False
  300. @pytest.mark.asyncio
  301. @pytest.mark.parametrize(
  302. ("rawAdvData", "model"),
  303. common_params,
  304. )
  305. async def test_toggle(rawAdvData, model):
  306. """Test toggle command."""
  307. device = create_device_for_command_testing(rawAdvData, model)
  308. await device.async_toggle()
  309. device._send_command.assert_awaited_once_with(relay_switch.COMMAND_TOGGLE)
  310. assert device.is_on() is True
  311. @pytest.mark.asyncio
  312. @pytest.mark.parametrize(
  313. ("rawAdvData", "model", "info_data"),
  314. [
  315. (
  316. b">\x00\x00\x00",
  317. SwitchbotModel.GARAGE_DOOR_OPENER,
  318. {
  319. "basic_info": b"\x01>\x80\x0c\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x10",
  320. "channel1_info": b"\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
  321. },
  322. )
  323. ],
  324. )
  325. async def test_get_basic_info_garage_door_opener(rawAdvData, model, info_data):
  326. """Test get_basic_info for garage door opener."""
  327. device = create_device_for_command_testing(rawAdvData, model)
  328. device.get_current_time_and_start_time = MagicMock(
  329. return_value=("683074d6", "682fba80")
  330. )
  331. async def mock_get_basic_info(arg):
  332. if arg == relay_switch.COMMAND_GET_BASIC_INFO:
  333. return info_data["basic_info"]
  334. if arg == relay_switch.COMMAND_GET_CHANNEL1_INFO.format("683074d6", "682fba80"):
  335. return info_data["channel1_info"]
  336. return None
  337. device._get_basic_info = AsyncMock(side_effect=mock_get_basic_info)
  338. info = await device.get_basic_info()
  339. assert info is not None
  340. assert info["isOn"] is True
  341. assert info["door_open"] is True
  342. @pytest.mark.asyncio
  343. @pytest.mark.parametrize(
  344. "model",
  345. [
  346. SwitchbotModel.RELAY_SWITCH_1,
  347. SwitchbotModel.RELAY_SWITCH_1PM,
  348. SwitchbotModel.GARAGE_DOOR_OPENER,
  349. SwitchbotModel.RELAY_SWITCH_2PM,
  350. ],
  351. )
  352. @patch.object(SwitchbotEncryptedDevice, "verify_encryption_key", new_callable=AsyncMock)
  353. async def test_verify_encryption_key(mock_parent_verify, model):
  354. ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
  355. key_id = "ff"
  356. encryption_key = "ffffffffffffffffffffffffffffffff"
  357. mock_parent_verify.return_value = True
  358. result = await relay_switch.SwitchbotRelaySwitch.verify_encryption_key(
  359. device=ble_device,
  360. key_id=key_id,
  361. encryption_key=encryption_key,
  362. model=model,
  363. )
  364. mock_parent_verify.assert_awaited_once_with(
  365. ble_device,
  366. key_id,
  367. encryption_key,
  368. model,
  369. )
  370. assert result is True
  371. @pytest.mark.parametrize(
  372. ("old_data", "new_data", "expected_result"),
  373. [
  374. (
  375. {"isOn": True, "sequence_number": 1},
  376. {"isOn": False},
  377. {"isOn": False, "sequence_number": 1},
  378. ),
  379. (
  380. {
  381. 1: {"current": 0, "voltage": 220, "power": 0},
  382. 2: {"current": 1, "voltage": 0, "power": 10},
  383. },
  384. {1: {"current": 1, "power": 10}, 2: {"current": 0, "voltage": 220}},
  385. {
  386. 1: {"current": 1, "voltage": 220, "power": 10},
  387. 2: {"current": 0, "voltage": 220, "power": 10},
  388. },
  389. ),
  390. ],
  391. )
  392. def test_merge_data(old_data, new_data, expected_result):
  393. """Test merging of data dictionaries."""
  394. result = merge_data(old_data, new_data)
  395. assert result == expected_result
  396. @pytest.mark.asyncio
  397. async def test_garage_door_opener_open():
  398. """Test open the garage door."""
  399. device = create_device_for_command_testing(
  400. b">\x00\x00\x00", SwitchbotModel.GARAGE_DOOR_OPENER
  401. )
  402. await device.open()
  403. device._send_command.assert_awaited_once_with(device._open_command)
  404. @pytest.mark.asyncio
  405. async def test_garage_door_opener_close():
  406. """Test close the garage door."""
  407. device = create_device_for_command_testing(
  408. b">\x00\x00\x00", SwitchbotModel.GARAGE_DOOR_OPENER
  409. )
  410. await device.close()
  411. device._send_command.assert_awaited_once_with(device._close_command)
  412. @pytest.mark.parametrize(
  413. "door_open",
  414. [
  415. True,
  416. False,
  417. ],
  418. )
  419. @pytest.mark.asyncio
  420. async def test_garage_door_opener_door_open(door_open):
  421. """Test get garage door state."""
  422. device = create_device_for_command_testing(
  423. b">\x00\x00\x00", SwitchbotModel.GARAGE_DOOR_OPENER, {"door_open": door_open}
  424. )
  425. assert device.door_open() is door_open
  426. @pytest.mark.asyncio
  427. async def test_press():
  428. """Test the press command for garage door opener."""
  429. device = create_device_for_command_testing(
  430. b">\x00\x00\x00", SwitchbotModel.GARAGE_DOOR_OPENER
  431. )
  432. await device.press()
  433. device._send_command.assert_awaited_once_with(device._press_command)