1
0

test_relay_switch.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. from unittest.mock import AsyncMock, MagicMock, patch
  2. import pytest
  3. from bleak.backends.device import BLEDevice
  4. from switchbot import SwitchBotAdvertisement, SwitchbotEncryptedDevice, SwitchbotModel
  5. from switchbot.devices import relay_switch
  6. from switchbot.devices.device import _merge_data as merge_data
  7. from .test_adv_parser import generate_ble_device
  8. common_params = [
  9. (b";\x00\x00\x00", SwitchbotModel.RELAY_SWITCH_1),
  10. (b"<\x00\x00\x00", SwitchbotModel.RELAY_SWITCH_1PM),
  11. (b">\x00\x00\x00", SwitchbotModel.GARAGE_DOOR_OPENER),
  12. ]
  13. @pytest.fixture
  14. def common_parametrize_2pm():
  15. """Provide common test data."""
  16. return {
  17. "rawAdvData": b"\x00\x00\x00\x00\x00\x00",
  18. "model": SwitchbotModel.RELAY_SWITCH_2PM,
  19. }
  20. def create_device_for_command_testing(
  21. rawAdvData: bytes, model: str, init_data: dict | None = None
  22. ):
  23. """Create a device for command testing."""
  24. ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
  25. if model == SwitchbotModel.GARAGE_DOOR_OPENER:
  26. device_class = relay_switch.SwitchbotGarageDoorOpener
  27. elif model == SwitchbotModel.RELAY_SWITCH_2PM:
  28. device_class = relay_switch.SwitchbotRelaySwitch2PM
  29. else:
  30. device_class = relay_switch.SwitchbotRelaySwitch
  31. device = device_class(
  32. ble_device, "ff", "ffffffffffffffffffffffffffffffff", model=model
  33. )
  34. device.update_from_advertisement(
  35. make_advertisement_data(ble_device, rawAdvData, model, init_data)
  36. )
  37. device._send_command = AsyncMock()
  38. device._check_command_result = MagicMock()
  39. device.update = AsyncMock()
  40. return device
  41. def make_advertisement_data(
  42. ble_device: BLEDevice, rawAdvData: bytes, model: str, init_data: dict | None = None
  43. ):
  44. """Set advertisement data with defaults."""
  45. if init_data is None:
  46. init_data = {}
  47. if model == SwitchbotModel.RELAY_SWITCH_2PM:
  48. return SwitchBotAdvertisement(
  49. address="aa:bb:cc:dd:ee:ff",
  50. data={
  51. "rawAdvData": rawAdvData,
  52. "data": {
  53. 1: {
  54. "switchMode": True,
  55. "sequence_number": 99,
  56. "isOn": True,
  57. },
  58. 2: {
  59. "switchMode": True,
  60. "sequence_number": 99,
  61. "isOn": False,
  62. },
  63. }
  64. | init_data,
  65. "isEncrypted": False,
  66. },
  67. device=ble_device,
  68. rssi=-80,
  69. active=True,
  70. )
  71. if model == SwitchbotModel.GARAGE_DOOR_OPENER:
  72. return SwitchBotAdvertisement(
  73. address="aa:bb:cc:dd:ee:ff",
  74. data={
  75. "rawAdvData": rawAdvData,
  76. "data": {
  77. "switchMode": True,
  78. "sequence_number": 96,
  79. "isOn": True,
  80. "door_open": False,
  81. }
  82. | init_data,
  83. "isEncrypted": False,
  84. },
  85. device=ble_device,
  86. rssi=-80,
  87. active=True,
  88. )
  89. return SwitchBotAdvertisement(
  90. address="aa:bb:cc:dd:ee:ff",
  91. data={
  92. "rawAdvData": rawAdvData,
  93. "data": {
  94. "switchMode": True,
  95. "sequence_number": 96,
  96. "isOn": True,
  97. }
  98. | init_data,
  99. "isEncrypted": False,
  100. },
  101. device=ble_device,
  102. rssi=-80,
  103. active=True,
  104. )
  105. @pytest.mark.asyncio
  106. @pytest.mark.parametrize(
  107. "init_data",
  108. [
  109. {1: {"isOn": True}, 2: {"isOn": True}},
  110. ],
  111. )
  112. async def test_turn_on_2PM(common_parametrize_2pm, init_data):
  113. """Test turn on command."""
  114. device = create_device_for_command_testing(
  115. common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"], init_data
  116. )
  117. await device.turn_on(1)
  118. device._send_command.assert_called_with(
  119. relay_switch.MULTI_CHANNEL_COMMANDS_TURN_ON[common_parametrize_2pm["model"]][1]
  120. )
  121. assert device.is_on(1) is True
  122. await device.turn_on(2)
  123. device._send_command.assert_called_with(
  124. relay_switch.MULTI_CHANNEL_COMMANDS_TURN_ON[common_parametrize_2pm["model"]][2]
  125. )
  126. assert device.is_on(2) is True
  127. @pytest.mark.asyncio
  128. @pytest.mark.parametrize(
  129. "init_data",
  130. [
  131. {1: {"isOn": False}, 2: {"isOn": False}},
  132. ],
  133. )
  134. async def test_turn_off_2PM(common_parametrize_2pm, init_data):
  135. """Test turn off command."""
  136. device = create_device_for_command_testing(
  137. common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"], init_data
  138. )
  139. await device.turn_off(1)
  140. device._send_command.assert_called_with(
  141. relay_switch.MULTI_CHANNEL_COMMANDS_TURN_OFF[common_parametrize_2pm["model"]][1]
  142. )
  143. assert device.is_on(1) is False
  144. await device.turn_off(2)
  145. device._send_command.assert_called_with(
  146. relay_switch.MULTI_CHANNEL_COMMANDS_TURN_OFF[common_parametrize_2pm["model"]][2]
  147. )
  148. assert device.is_on(2) is False
  149. @pytest.mark.asyncio
  150. async def test_turn_toggle_2PM(common_parametrize_2pm):
  151. """Test toggle command."""
  152. device = create_device_for_command_testing(
  153. common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"]
  154. )
  155. await device.async_toggle(1)
  156. device._send_command.assert_called_with(
  157. relay_switch.MULTI_CHANNEL_COMMANDS_TOGGLE[common_parametrize_2pm["model"]][1]
  158. )
  159. assert device.is_on(1) is True
  160. await device.async_toggle(2)
  161. device._send_command.assert_called_with(
  162. relay_switch.MULTI_CHANNEL_COMMANDS_TOGGLE[common_parametrize_2pm["model"]][2]
  163. )
  164. assert device.is_on(2) is False
  165. @pytest.mark.asyncio
  166. async def test_get_switch_mode_2PM(common_parametrize_2pm):
  167. """Test get switch mode."""
  168. device = create_device_for_command_testing(
  169. common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"]
  170. )
  171. assert device.switch_mode(1) is True
  172. assert device.switch_mode(2) is True
  173. @pytest.mark.asyncio
  174. @pytest.mark.parametrize(
  175. ("info_data", "result"),
  176. [
  177. (
  178. {
  179. "basic_info": b"\x01\x98A\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10",
  180. "channel1_info": b"\x01\x00\x00\x00\x00\x00\x00\x02\x99\x00\xe9\x00\x03\x00\x00",
  181. "channel2_info": b"\x01\x00\x055\x00'<\x02\x9f\x00\xe9\x01,\x00F",
  182. },
  183. [False, 0, 0, 0, 0, True, 0.02, 23, 0.3, 7.0],
  184. ),
  185. (
  186. {
  187. "basic_info": b"\x01\x9e\x81\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10",
  188. "channel1_info": b"\x01\x00\x00\x00\x00\x00\x00\x02\x99\x00\xe9\x00\x03\x00\x00",
  189. "channel2_info": b"\x01\x00\x05\xbc\x00'<\x02\xb1\x00\xea\x01-\x00F",
  190. },
  191. [True, 0, 23, 0.1, 0.0, False, 0.02, 0, 0, 0],
  192. ),
  193. ],
  194. )
  195. async def test_get_basic_info_2PM(common_parametrize_2pm, info_data, result):
  196. """Test get_basic_info for 2PM devices."""
  197. device = create_device_for_command_testing(
  198. common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"]
  199. )
  200. assert device.channel == 2
  201. device.get_current_time_and_start_time = MagicMock(
  202. return_value=("683074d6", "682fba80")
  203. )
  204. async def mock_get_basic_info(arg):
  205. if arg == relay_switch.COMMAND_GET_BASIC_INFO:
  206. return info_data["basic_info"]
  207. if arg == relay_switch.COMMAND_GET_CHANNEL1_INFO.format("683074d6", "682fba80"):
  208. return info_data["channel1_info"]
  209. if arg == relay_switch.COMMAND_GET_CHANNEL2_INFO.format("683074d6", "682fba80"):
  210. return info_data["channel2_info"]
  211. return None
  212. device._get_basic_info = AsyncMock(side_effect=mock_get_basic_info)
  213. info = await device.get_basic_info()
  214. assert info is not None
  215. assert 1 in info
  216. assert 2 in info
  217. assert info[1]["isOn"] == result[0]
  218. assert info[1]["energy"] == result[1]
  219. assert info[1]["voltage"] == result[2]
  220. assert info[1]["current"] == result[3]
  221. assert info[1]["power"] == result[4]
  222. assert info[2]["isOn"] == result[5]
  223. assert info[2]["energy"] == result[6]
  224. assert info[2]["voltage"] == result[7]
  225. assert info[2]["current"] == result[8]
  226. assert info[2]["power"] == result[9]
  227. @pytest.mark.asyncio
  228. @pytest.mark.parametrize(
  229. "info_data",
  230. [
  231. {
  232. "basic_info": None,
  233. "channel1_info": b"\x01\x00\x00\x00\x00\x00\x00\x02\x99\x00\xe9\x00\x03\x00\x00",
  234. "channel2_info": b"\x01\x00\x055\x00'<\x02\x9f\x00\xe9\x01,\x00F",
  235. },
  236. {
  237. "basic_info": b"\x01\x98A\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10",
  238. "channel1_info": None,
  239. "channel2_info": b"\x01\x00\x055\x00'<\x02\x9f\x00\xe9\x01,\x00F",
  240. },
  241. {
  242. "basic_info": b"\x01\x98A\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10",
  243. "channel1_info": b"\x01\x00\x00\x00\x00\x00\x00\x02\x99\x00\xe9\x00\x03\x00\x00",
  244. "channel2_info": None,
  245. },
  246. ],
  247. )
  248. async def test_basic_info_exceptions_2PM(common_parametrize_2pm, info_data):
  249. """Test get_basic_info exceptions."""
  250. device = create_device_for_command_testing(
  251. common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"]
  252. )
  253. device.get_current_time_and_start_time = MagicMock(
  254. return_value=("683074d6", "682fba80")
  255. )
  256. async def mock_get_basic_info(arg):
  257. if arg == relay_switch.COMMAND_GET_BASIC_INFO:
  258. return info_data["basic_info"]
  259. if arg == relay_switch.COMMAND_GET_CHANNEL1_INFO.format("683074d6", "682fba80"):
  260. return info_data["channel1_info"]
  261. if arg == relay_switch.COMMAND_GET_CHANNEL2_INFO.format("683074d6", "682fba80"):
  262. return info_data["channel2_info"]
  263. return None
  264. device._get_basic_info = AsyncMock(side_effect=mock_get_basic_info)
  265. info = await device.get_basic_info()
  266. assert info is None
  267. @pytest.mark.asyncio
  268. async def test_get_parsed_data_2PM(common_parametrize_2pm):
  269. """Test get_parsed_data for 2PM devices."""
  270. device = create_device_for_command_testing(
  271. common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"]
  272. )
  273. info = device.get_parsed_data(1)
  274. assert info["isOn"] is True
  275. info = device.get_parsed_data(2)
  276. assert info["isOn"] is False
  277. @pytest.mark.asyncio
  278. @pytest.mark.parametrize(
  279. ("rawAdvData", "model"),
  280. common_params,
  281. )
  282. async def test_turn_on(rawAdvData, model):
  283. """Test turn on command."""
  284. device = create_device_for_command_testing(rawAdvData, model)
  285. await device.turn_on()
  286. device._send_command.assert_awaited_once_with(device._turn_on_command)
  287. assert device.is_on() is True
  288. @pytest.mark.asyncio
  289. @pytest.mark.parametrize(
  290. ("rawAdvData", "model"),
  291. common_params,
  292. )
  293. async def test_turn_off(rawAdvData, model):
  294. """Test turn off command."""
  295. device = create_device_for_command_testing(rawAdvData, model, {"isOn": False})
  296. await device.turn_off()
  297. device._send_command.assert_awaited_once_with(device._turn_off_command)
  298. assert device.is_on() is False
  299. @pytest.mark.asyncio
  300. @pytest.mark.parametrize(
  301. ("rawAdvData", "model"),
  302. common_params,
  303. )
  304. async def test_toggle(rawAdvData, model):
  305. """Test toggle command."""
  306. device = create_device_for_command_testing(rawAdvData, model)
  307. await device.async_toggle()
  308. device._send_command.assert_awaited_once_with(relay_switch.COMMAND_TOGGLE)
  309. assert device.is_on() is True
  310. @pytest.mark.asyncio
  311. @pytest.mark.parametrize(
  312. ("rawAdvData", "model", "info_data"),
  313. [
  314. (
  315. b">\x00\x00\x00",
  316. SwitchbotModel.GARAGE_DOOR_OPENER,
  317. {
  318. "basic_info": b"\x01>\x80\x0c\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x10",
  319. "channel1_info": b"\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
  320. },
  321. )
  322. ],
  323. )
  324. async def test_get_basic_info_garage_door_opener(rawAdvData, model, info_data):
  325. """Test get_basic_info for garage door opener."""
  326. device = create_device_for_command_testing(rawAdvData, model)
  327. device.get_current_time_and_start_time = MagicMock(
  328. return_value=("683074d6", "682fba80")
  329. )
  330. async def mock_get_basic_info(arg):
  331. if arg == relay_switch.COMMAND_GET_BASIC_INFO:
  332. return info_data["basic_info"]
  333. if arg == relay_switch.COMMAND_GET_CHANNEL1_INFO.format("683074d6", "682fba80"):
  334. return info_data["channel1_info"]
  335. return None
  336. device._get_basic_info = AsyncMock(side_effect=mock_get_basic_info)
  337. info = await device.get_basic_info()
  338. assert info is not None
  339. assert info["isOn"] is True
  340. assert info["door_open"] is True
  341. @pytest.mark.asyncio
  342. @pytest.mark.parametrize(
  343. "model",
  344. [
  345. SwitchbotModel.RELAY_SWITCH_1,
  346. SwitchbotModel.RELAY_SWITCH_1PM,
  347. SwitchbotModel.GARAGE_DOOR_OPENER,
  348. SwitchbotModel.RELAY_SWITCH_2PM,
  349. ],
  350. )
  351. @patch.object(SwitchbotEncryptedDevice, "verify_encryption_key", new_callable=AsyncMock)
  352. async def test_verify_encryption_key(mock_parent_verify, model):
  353. ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
  354. key_id = "ff"
  355. encryption_key = "ffffffffffffffffffffffffffffffff"
  356. mock_parent_verify.return_value = True
  357. result = await relay_switch.SwitchbotRelaySwitch.verify_encryption_key(
  358. device=ble_device,
  359. key_id=key_id,
  360. encryption_key=encryption_key,
  361. model=model,
  362. )
  363. mock_parent_verify.assert_awaited_once_with(
  364. ble_device,
  365. key_id,
  366. encryption_key,
  367. model,
  368. )
  369. assert result is True
  370. @pytest.mark.parametrize(
  371. ("old_data", "new_data", "expected_result"),
  372. [
  373. (
  374. {"isOn": True, "sequence_number": 1},
  375. {"isOn": False},
  376. {"isOn": False, "sequence_number": 1},
  377. ),
  378. (
  379. {
  380. 1: {"current": 0, "voltage": 220, "power": 0},
  381. 2: {"current": 1, "voltage": 0, "power": 10},
  382. },
  383. {1: {"current": 1, "power": 10}, 2: {"current": 0, "voltage": 220}},
  384. {
  385. 1: {"current": 1, "voltage": 220, "power": 10},
  386. 2: {"current": 0, "voltage": 220, "power": 10},
  387. },
  388. ),
  389. ],
  390. )
  391. def test_merge_data(old_data, new_data, expected_result):
  392. """Test merging of data dictionaries."""
  393. result = merge_data(old_data, new_data)
  394. assert result == expected_result
  395. @pytest.mark.asyncio
  396. async def test_garage_door_opener_open():
  397. """Test open the garage door."""
  398. device = create_device_for_command_testing(
  399. b">\x00\x00\x00", SwitchbotModel.GARAGE_DOOR_OPENER
  400. )
  401. await device.open()
  402. device._send_command.assert_awaited_once_with(device._open_command)
  403. @pytest.mark.asyncio
  404. async def test_garage_door_opener_close():
  405. """Test close the garage door."""
  406. device = create_device_for_command_testing(
  407. b">\x00\x00\x00", SwitchbotModel.GARAGE_DOOR_OPENER
  408. )
  409. await device.close()
  410. device._send_command.assert_awaited_once_with(device._close_command)
  411. @pytest.mark.parametrize(
  412. "door_open",
  413. [
  414. True,
  415. False,
  416. ],
  417. )
  418. @pytest.mark.asyncio
  419. async def test_garage_door_opener_door_open(door_open):
  420. """Test get garage door state."""
  421. device = create_device_for_command_testing(
  422. b">\x00\x00\x00", SwitchbotModel.GARAGE_DOOR_OPENER, {"door_open": door_open}
  423. )
  424. assert device.door_open() is door_open
  425. @pytest.mark.asyncio
  426. async def test_press():
  427. """Test the press command for garage door opener."""
  428. device = create_device_for_command_testing(
  429. b">\x00\x00\x00", SwitchbotModel.GARAGE_DOOR_OPENER
  430. )
  431. await device.press()
  432. device._send_command.assert_awaited_once_with(device._press_command)