test_relay_switch.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453
  1. from unittest.mock import AsyncMock, MagicMock, patch
  2. import pytest
  3. from bleak.backends.device import BLEDevice
  4. from switchbot import SwitchBotAdvertisement, SwitchbotEncryptedDevice, SwitchbotModel
  5. from switchbot.devices import relay_switch
  6. from switchbot.devices.device import _merge_data as merge_data
  7. from .test_adv_parser import generate_ble_device
  8. common_params = [
  9. (b";\x00\x00\x00", SwitchbotModel.RELAY_SWITCH_1),
  10. (b"<\x00\x00\x00", SwitchbotModel.RELAY_SWITCH_1PM),
  11. (b">\x00\x00\x00", SwitchbotModel.GARAGE_DOOR_OPENER),
  12. ]
  13. @pytest.fixture
  14. def common_parametrize_2pm():
  15. """Provide common test data."""
  16. return {
  17. "rawAdvData": b"\x00\x00\x00\x00\x00\x00",
  18. "model": SwitchbotModel.RELAY_SWITCH_2PM,
  19. }
  20. def create_device_for_command_testing(
  21. rawAdvData: bytes, model: str, init_data: dict | None = None
  22. ):
  23. """Create a device for command testing."""
  24. ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
  25. device_class = (
  26. relay_switch.SwitchbotRelaySwitch2PM
  27. if model == SwitchbotModel.RELAY_SWITCH_2PM
  28. else relay_switch.SwitchbotRelaySwitch
  29. )
  30. device = device_class(
  31. ble_device, "ff", "ffffffffffffffffffffffffffffffff", model=model
  32. )
  33. device.update_from_advertisement(
  34. make_advertisement_data(ble_device, rawAdvData, model, init_data)
  35. )
  36. device._send_command = AsyncMock()
  37. device._check_command_result = MagicMock()
  38. device.update = AsyncMock()
  39. return device
  40. def make_advertisement_data(
  41. ble_device: BLEDevice, rawAdvData: bytes, model: str, init_data: dict | None = None
  42. ):
  43. """Set advertisement data with defaults."""
  44. if init_data is None:
  45. init_data = {}
  46. if model == SwitchbotModel.RELAY_SWITCH_2PM:
  47. return SwitchBotAdvertisement(
  48. address="aa:bb:cc:dd:ee:ff",
  49. data={
  50. "rawAdvData": rawAdvData,
  51. "data": {
  52. 1: {
  53. "switchMode": True,
  54. "sequence_number": 99,
  55. "isOn": True,
  56. },
  57. 2: {
  58. "switchMode": True,
  59. "sequence_number": 99,
  60. "isOn": False,
  61. },
  62. }
  63. | init_data,
  64. "isEncrypted": False,
  65. },
  66. device=ble_device,
  67. rssi=-80,
  68. active=True,
  69. )
  70. if model == SwitchbotModel.GARAGE_DOOR_OPENER:
  71. return SwitchBotAdvertisement(
  72. address="aa:bb:cc:dd:ee:ff",
  73. data={
  74. "rawAdvData": rawAdvData,
  75. "data": {
  76. "switchMode": True,
  77. "sequence_number": 96,
  78. "isOn": True,
  79. "door_open": False,
  80. }
  81. | init_data,
  82. "isEncrypted": False,
  83. },
  84. device=ble_device,
  85. rssi=-80,
  86. active=True,
  87. )
  88. return SwitchBotAdvertisement(
  89. address="aa:bb:cc:dd:ee:ff",
  90. data={
  91. "rawAdvData": rawAdvData,
  92. "data": {
  93. "switchMode": True,
  94. "sequence_number": 96,
  95. "isOn": True,
  96. }
  97. | init_data,
  98. "isEncrypted": False,
  99. },
  100. device=ble_device,
  101. rssi=-80,
  102. active=True,
  103. )
  104. @pytest.mark.asyncio
  105. @pytest.mark.parametrize(
  106. "init_data",
  107. [
  108. {1: {"isOn": True}, 2: {"isOn": True}},
  109. ],
  110. )
  111. async def test_turn_on_2PM(common_parametrize_2pm, init_data):
  112. """Test turn on command."""
  113. device = create_device_for_command_testing(
  114. common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"], init_data
  115. )
  116. await device.turn_on(1)
  117. device._send_command.assert_called_with(
  118. relay_switch.MULTI_CHANNEL_COMMANDS_TURN_ON[common_parametrize_2pm["model"]][1]
  119. )
  120. assert device.is_on(1) is True
  121. await device.turn_on(2)
  122. device._send_command.assert_called_with(
  123. relay_switch.MULTI_CHANNEL_COMMANDS_TURN_ON[common_parametrize_2pm["model"]][2]
  124. )
  125. assert device.is_on(2) is True
  126. @pytest.mark.asyncio
  127. @pytest.mark.parametrize(
  128. "init_data",
  129. [
  130. {1: {"isOn": False}, 2: {"isOn": False}},
  131. ],
  132. )
  133. async def test_turn_off_2PM(common_parametrize_2pm, init_data):
  134. """Test turn off command."""
  135. device = create_device_for_command_testing(
  136. common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"], init_data
  137. )
  138. await device.turn_off(1)
  139. device._send_command.assert_called_with(
  140. relay_switch.MULTI_CHANNEL_COMMANDS_TURN_OFF[common_parametrize_2pm["model"]][1]
  141. )
  142. assert device.is_on(1) is False
  143. await device.turn_off(2)
  144. device._send_command.assert_called_with(
  145. relay_switch.MULTI_CHANNEL_COMMANDS_TURN_OFF[common_parametrize_2pm["model"]][2]
  146. )
  147. assert device.is_on(2) is False
  148. @pytest.mark.asyncio
  149. async def test_turn_toggle_2PM(common_parametrize_2pm):
  150. """Test toggle command."""
  151. device = create_device_for_command_testing(
  152. common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"]
  153. )
  154. await device.async_toggle(1)
  155. device._send_command.assert_called_with(
  156. relay_switch.MULTI_CHANNEL_COMMANDS_TOGGLE[common_parametrize_2pm["model"]][1]
  157. )
  158. assert device.is_on(1) is True
  159. await device.async_toggle(2)
  160. device._send_command.assert_called_with(
  161. relay_switch.MULTI_CHANNEL_COMMANDS_TOGGLE[common_parametrize_2pm["model"]][2]
  162. )
  163. assert device.is_on(2) is False
  164. @pytest.mark.asyncio
  165. async def test_get_switch_mode_2PM(common_parametrize_2pm):
  166. """Test get switch mode."""
  167. device = create_device_for_command_testing(
  168. common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"]
  169. )
  170. assert device.switch_mode(1) is True
  171. assert device.switch_mode(2) is True
  172. @pytest.mark.asyncio
  173. @pytest.mark.parametrize(
  174. ("info_data", "result"),
  175. [
  176. (
  177. {
  178. "basic_info": b"\x01\x98A\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10",
  179. "channel1_info": b"\x01\x00\x00\x00\x00\x00\x00\x02\x99\x00\xe9\x00\x03\x00\x00",
  180. "channel2_info": b"\x01\x00\x055\x00'<\x02\x9f\x00\xe9\x01,\x00F",
  181. },
  182. [False, 0, 0, 0, 0, True, 0.02, 23, 0.3, 7.0],
  183. ),
  184. (
  185. {
  186. "basic_info": b"\x01\x9e\x81\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10",
  187. "channel1_info": b"\x01\x00\x00\x00\x00\x00\x00\x02\x99\x00\xe9\x00\x03\x00\x00",
  188. "channel2_info": b"\x01\x00\x05\xbc\x00'<\x02\xb1\x00\xea\x01-\x00F",
  189. },
  190. [True, 0, 23, 0.1, 0.0, False, 0.02, 0, 0, 0],
  191. ),
  192. ],
  193. )
  194. async def test_get_basic_info_2PM(common_parametrize_2pm, info_data, result):
  195. """Test get_basic_info for 2PM devices."""
  196. device = create_device_for_command_testing(
  197. common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"]
  198. )
  199. assert device.channel == 2
  200. device.get_current_time_and_start_time = MagicMock(
  201. return_value=("683074d6", "682fba80")
  202. )
  203. async def mock_get_basic_info(arg):
  204. if arg == relay_switch.COMMAND_GET_BASIC_INFO:
  205. return info_data["basic_info"]
  206. if arg == relay_switch.COMMAND_GET_CHANNEL1_INFO.format("683074d6", "682fba80"):
  207. return info_data["channel1_info"]
  208. if arg == relay_switch.COMMAND_GET_CHANNEL2_INFO.format("683074d6", "682fba80"):
  209. return info_data["channel2_info"]
  210. return None
  211. device._get_basic_info = AsyncMock(side_effect=mock_get_basic_info)
  212. info = await device.get_basic_info()
  213. assert info is not None
  214. assert 1 in info
  215. assert 2 in info
  216. assert info[1]["isOn"] == result[0]
  217. assert info[1]["energy"] == result[1]
  218. assert info[1]["voltage"] == result[2]
  219. assert info[1]["current"] == result[3]
  220. assert info[1]["power"] == result[4]
  221. assert info[2]["isOn"] == result[5]
  222. assert info[2]["energy"] == result[6]
  223. assert info[2]["voltage"] == result[7]
  224. assert info[2]["current"] == result[8]
  225. assert info[2]["power"] == result[9]
  226. @pytest.mark.asyncio
  227. @pytest.mark.parametrize(
  228. "info_data",
  229. [
  230. {
  231. "basic_info": None,
  232. "channel1_info": b"\x01\x00\x00\x00\x00\x00\x00\x02\x99\x00\xe9\x00\x03\x00\x00",
  233. "channel2_info": b"\x01\x00\x055\x00'<\x02\x9f\x00\xe9\x01,\x00F",
  234. },
  235. {
  236. "basic_info": b"\x01\x98A\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10",
  237. "channel1_info": None,
  238. "channel2_info": b"\x01\x00\x055\x00'<\x02\x9f\x00\xe9\x01,\x00F",
  239. },
  240. {
  241. "basic_info": b"\x01\x98A\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10",
  242. "channel1_info": b"\x01\x00\x00\x00\x00\x00\x00\x02\x99\x00\xe9\x00\x03\x00\x00",
  243. "channel2_info": None,
  244. },
  245. ],
  246. )
  247. async def test_basic_info_exceptions_2PM(common_parametrize_2pm, info_data):
  248. """Test get_basic_info exceptions."""
  249. device = create_device_for_command_testing(
  250. common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"]
  251. )
  252. device.get_current_time_and_start_time = MagicMock(
  253. return_value=("683074d6", "682fba80")
  254. )
  255. async def mock_get_basic_info(arg):
  256. if arg == relay_switch.COMMAND_GET_BASIC_INFO:
  257. return info_data["basic_info"]
  258. if arg == relay_switch.COMMAND_GET_CHANNEL1_INFO.format("683074d6", "682fba80"):
  259. return info_data["channel1_info"]
  260. if arg == relay_switch.COMMAND_GET_CHANNEL2_INFO.format("683074d6", "682fba80"):
  261. return info_data["channel2_info"]
  262. return None
  263. device._get_basic_info = AsyncMock(side_effect=mock_get_basic_info)
  264. info = await device.get_basic_info()
  265. assert info is None
  266. @pytest.mark.asyncio
  267. async def test_get_parsed_data_2PM(common_parametrize_2pm):
  268. """Test get_parsed_data for 2PM devices."""
  269. device = create_device_for_command_testing(
  270. common_parametrize_2pm["rawAdvData"], common_parametrize_2pm["model"]
  271. )
  272. info = device.get_parsed_data(1)
  273. assert info["isOn"] is True
  274. info = device.get_parsed_data(2)
  275. assert info["isOn"] is False
  276. @pytest.mark.asyncio
  277. @pytest.mark.parametrize(
  278. ("rawAdvData", "model"),
  279. common_params,
  280. )
  281. async def test_turn_on(rawAdvData, model):
  282. """Test turn on command."""
  283. device = create_device_for_command_testing(rawAdvData, model)
  284. await device.turn_on()
  285. device._send_command.assert_awaited_once_with(relay_switch.COMMAND_TURN_ON)
  286. assert device.is_on() is True
  287. @pytest.mark.asyncio
  288. @pytest.mark.parametrize(
  289. ("rawAdvData", "model"),
  290. common_params,
  291. )
  292. async def test_turn_off(rawAdvData, model):
  293. """Test turn off command."""
  294. device = create_device_for_command_testing(rawAdvData, model, {"isOn": False})
  295. await device.turn_off()
  296. device._send_command.assert_awaited_once_with(relay_switch.COMMAND_TURN_OFF)
  297. assert device.is_on() is False
  298. @pytest.mark.asyncio
  299. @pytest.mark.parametrize(
  300. ("rawAdvData", "model"),
  301. common_params,
  302. )
  303. async def test_toggle(rawAdvData, model):
  304. """Test toggle command."""
  305. device = create_device_for_command_testing(rawAdvData, model)
  306. await device.async_toggle()
  307. device._send_command.assert_awaited_once_with(relay_switch.COMMAND_TOGGLE)
  308. assert device.is_on() is True
  309. @pytest.mark.asyncio
  310. @pytest.mark.parametrize(
  311. ("rawAdvData", "model", "info_data"),
  312. [
  313. (
  314. b">\x00\x00\x00",
  315. SwitchbotModel.GARAGE_DOOR_OPENER,
  316. {
  317. "basic_info": b"\x01>\x80\x0c\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x10",
  318. "channel1_info": b"\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
  319. },
  320. )
  321. ],
  322. )
  323. async def test_get_basic_info_garage_door_opener(rawAdvData, model, info_data):
  324. """Test get_basic_info for garage door opener."""
  325. device = create_device_for_command_testing(rawAdvData, model)
  326. device.get_current_time_and_start_time = MagicMock(
  327. return_value=("683074d6", "682fba80")
  328. )
  329. async def mock_get_basic_info(arg):
  330. if arg == relay_switch.COMMAND_GET_BASIC_INFO:
  331. return info_data["basic_info"]
  332. if arg == relay_switch.COMMAND_GET_CHANNEL1_INFO.format("683074d6", "682fba80"):
  333. return info_data["channel1_info"]
  334. return None
  335. device._get_basic_info = AsyncMock(side_effect=mock_get_basic_info)
  336. info = await device.get_basic_info()
  337. assert info is not None
  338. assert info["isOn"] is True
  339. assert info["door_open"] is True
  340. @pytest.mark.asyncio
  341. @pytest.mark.parametrize(
  342. "model",
  343. [
  344. SwitchbotModel.RELAY_SWITCH_1,
  345. SwitchbotModel.RELAY_SWITCH_1PM,
  346. SwitchbotModel.GARAGE_DOOR_OPENER,
  347. SwitchbotModel.RELAY_SWITCH_2PM,
  348. ],
  349. )
  350. @patch.object(SwitchbotEncryptedDevice, "verify_encryption_key", new_callable=AsyncMock)
  351. async def test_verify_encryption_key(mock_parent_verify, model):
  352. ble_device = generate_ble_device("aa:bb:cc:dd:ee:ff", "any")
  353. key_id = "ff"
  354. encryption_key = "ffffffffffffffffffffffffffffffff"
  355. mock_parent_verify.return_value = True
  356. result = await relay_switch.SwitchbotRelaySwitch.verify_encryption_key(
  357. device=ble_device,
  358. key_id=key_id,
  359. encryption_key=encryption_key,
  360. model=model,
  361. )
  362. mock_parent_verify.assert_awaited_once_with(
  363. ble_device,
  364. key_id,
  365. encryption_key,
  366. model,
  367. )
  368. assert result is True
  369. @pytest.mark.parametrize(
  370. ("old_data", "new_data", "expected_result"),
  371. [
  372. (
  373. {"isOn": True, "sequence_number": 1},
  374. {"isOn": False},
  375. {"isOn": False, "sequence_number": 1},
  376. ),
  377. (
  378. {
  379. 1: {"current": 0, "voltage": 220, "power": 0},
  380. 2: {"current": 1, "voltage": 0, "power": 10},
  381. },
  382. {1: {"current": 1, "power": 10}, 2: {"current": 0, "voltage": 220}},
  383. {
  384. 1: {"current": 1, "voltage": 220, "power": 10},
  385. 2: {"current": 0, "voltage": 220, "power": 10},
  386. },
  387. ),
  388. ],
  389. )
  390. def test_merge_data(old_data, new_data, expected_result):
  391. """Test merging of data dictionaries."""
  392. result = merge_data(old_data, new_data)
  393. assert result == expected_result