Browse Source

Add retrieve_encryption_key for lock (#170)

Co-authored-by: J. Nick Koston <nick@koston.org>
Damian Sypniewski 1 year ago
parent
commit
5b634d285b
4 changed files with 93 additions and 76 deletions
  1. 6 75
      scripts/get_encryption_key.py
  2. 8 1
      setup.py
  3. 13 0
      switchbot/api_config.py
  4. 66 0
      switchbot/devices/lock.py

+ 6 - 75
scripts/get_encryption_key.py

@@ -1,28 +1,8 @@
 #!/usr/bin/env python3
-import base64
 import getpass
-import hashlib
-import hmac
-import json
 import sys
 
-import boto3
-import requests
-
-# Those values have been obtained from the following files in SwitchBot Android app
-# That's how you can verify them yourself
-# /assets/switchbot_config.json
-# /res/raw/amplifyconfiguration.json
-# /res/raw/awsconfiguration.json
-SWITCHBOT_INTERNAL_API_BASE_URL = (
-    "https://l9ren7efdj.execute-api.us-east-1.amazonaws.com"
-)
-SWITCHBOT_COGNITO_POOL = {
-    "PoolId": "us-east-1_x1fixo5LC",
-    "AppClientId": "66r90hdllaj4nnlne4qna0muls",
-    "AppClientSecret": "1v3v7vfjsiggiupkeuqvsovg084e3msbefpj9rgh611u30uug6t8",
-    "Region": "us-east-1",
-}
+from switchbot import SwitchbotLock
 
 
 def main():
@@ -30,68 +10,19 @@ def main():
         print(f"Usage: {sys.argv[0]} <device_mac> <username> [<password>]")
         exit(1)
 
-    device_mac = sys.argv[1].replace(":", "").replace("-", "").upper()
-    username = sys.argv[2]
     if len(sys.argv) == 3:
         password = getpass.getpass()
     else:
         password = sys.argv[3]
 
-    msg = bytes(username + SWITCHBOT_COGNITO_POOL["AppClientId"], "utf-8")
-    secret_hash = base64.b64encode(
-        hmac.new(
-            SWITCHBOT_COGNITO_POOL["AppClientSecret"].encode(),
-            msg,
-            digestmod=hashlib.sha256,
-        ).digest()
-    ).decode()
-
-    cognito_idp_client = boto3.client(
-        "cognito-idp", region_name=SWITCHBOT_COGNITO_POOL["Region"]
-    )
-    auth_response = None
     try:
-        auth_response = cognito_idp_client.initiate_auth(
-            ClientId=SWITCHBOT_COGNITO_POOL["AppClientId"],
-            AuthFlow="USER_PASSWORD_AUTH",
-            AuthParameters={
-                "USERNAME": username,
-                "PASSWORD": password,
-                "SECRET_HASH": secret_hash,
-            },
-        )
-    except cognito_idp_client.exceptions.NotAuthorizedException as e:
-        print(f"Error: Failed to authenticate - {e}")
-        exit(1)
-    except BaseException as e:
-        print(f"Error: Unexpected error during authentication - {e}")
-        exit(1)
-
-    if (
-        auth_response is None
-        or "AuthenticationResult" not in auth_response
-        or "AccessToken" not in auth_response["AuthenticationResult"]
-    ):
-        print(f"Error: unexpected authentication result")
-        exit(1)
-
-    access_token = auth_response["AuthenticationResult"]["AccessToken"]
-    key_response = requests.post(
-        url=SWITCHBOT_INTERNAL_API_BASE_URL + "/developStage/keys/v1/communicate",
-        headers={"authorization": access_token},
-        json={"device_mac": device_mac, "keyType": "user"},
-    )
-    key_response_content = json.loads(key_response.content)
-    if key_response_content["statusCode"] != 100:
-        print(
-            "Error: {} ({})".format(
-                key_response_content["message"], key_response_content["statusCode"]
-            )
-        )
+        result = SwitchbotLock.retrieve_encryption_key(sys.argv[1], sys.argv[2], password)
+    except RuntimeError as e:
+        print(e)
         exit(1)
 
-    print("Key ID: " + key_response_content["body"]["communicationKey"]["keyId"])
-    print("Encryption key: " + key_response_content["body"]["communicationKey"]["key"])
+    print("Key ID: " + result["key_id"])
+    print("Encryption key: " + result["encryption_key"])
 
 
 if __name__ == "__main__":

+ 8 - 1
setup.py

@@ -3,7 +3,14 @@ from setuptools import setup
 setup(
     name="PySwitchbot",
     packages=["switchbot", "switchbot.devices", "switchbot.adv_parsers"],
-    install_requires=["async_timeout>=4.0.1", "bleak>=0.17.0", "bleak-retry-connector>=2.9.0", "cryptography>=38.0.3"],
+    install_requires=[
+        "async_timeout>=4.0.1",
+        "bleak>=0.17.0",
+        "bleak-retry-connector>=2.9.0",
+        "cryptography>=38.0.3",
+        "boto3>=1.20.24",
+        "requests>=2.28.1",
+    ],
     version="0.33.0",
     description="A library to communicate with Switchbot",
     author="Daniel Hjelseth Hoyer",

+ 13 - 0
switchbot/api_config.py

@@ -0,0 +1,13 @@
+# Those values have been obtained from the following files in SwitchBot Android app
+# That's how you can verify them yourself
+# /assets/switchbot_config.json
+# /res/raw/amplifyconfiguration.json
+# /res/raw/awsconfiguration.json
+
+SWITCHBOT_APP_API_BASE_URL = "https://l9ren7efdj.execute-api.us-east-1.amazonaws.com"
+SWITCHBOT_APP_COGNITO_POOL = {
+    "PoolId": "us-east-1_x1fixo5LC",
+    "AppClientId": "66r90hdllaj4nnlne4qna0muls",
+    "AppClientSecret": "1v3v7vfjsiggiupkeuqvsovg084e3msbefpj9rgh611u30uug6t8",
+    "Region": "us-east-1",
+}

+ 66 - 0
switchbot/devices/lock.py

@@ -2,12 +2,19 @@
 from __future__ import annotations
 
 import asyncio
+import base64
+import hashlib
+import hmac
+import json
 import logging
 from typing import Any
 
+import boto3
+import requests
 from bleak.backends.device import BLEDevice
 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
 
+from ..api_config import SWITCHBOT_APP_API_BASE_URL, SWITCHBOT_APP_COGNITO_POOL
 from ..const import LockStatus
 from .device import SwitchbotDevice, SwitchbotOperationError
 
@@ -69,6 +76,65 @@ class SwitchbotLock(SwitchbotDevice):
 
         return lock_info is not None
 
+    @staticmethod
+    def retrieve_encryption_key(device_mac: str, username: str, password: str):
+        """Retrieve lock key from internal SwitchBot API."""
+        device_mac = device_mac.replace(":", "").replace("-", "").upper()
+        msg = bytes(username + SWITCHBOT_APP_COGNITO_POOL["AppClientId"], "utf-8")
+        secret_hash = base64.b64encode(
+            hmac.new(
+                SWITCHBOT_APP_COGNITO_POOL["AppClientSecret"].encode(),
+                msg,
+                digestmod=hashlib.sha256,
+            ).digest()
+        ).decode()
+
+        cognito_idp_client = boto3.client(
+            "cognito-idp", region_name=SWITCHBOT_APP_COGNITO_POOL["Region"]
+        )
+        try:
+            auth_response = cognito_idp_client.initiate_auth(
+                ClientId=SWITCHBOT_APP_COGNITO_POOL["AppClientId"],
+                AuthFlow="USER_PASSWORD_AUTH",
+                AuthParameters={
+                    "USERNAME": username,
+                    "PASSWORD": password,
+                    "SECRET_HASH": secret_hash,
+                },
+            )
+        except cognito_idp_client.exceptions.NotAuthorizedException as err:
+            raise RuntimeError("Failed to authenticate") from err
+        except BaseException as err:
+            raise RuntimeError("Unexpected error during authentication") from err
+
+        if (
+            auth_response is None
+            or "AuthenticationResult" not in auth_response
+            or "AccessToken" not in auth_response["AuthenticationResult"]
+        ):
+            raise RuntimeError("Unexpected authentication response")
+
+        access_token = auth_response["AuthenticationResult"]["AccessToken"]
+        key_response = requests.post(
+            url=SWITCHBOT_APP_API_BASE_URL + "/developStage/keys/v1/communicate",
+            headers={"authorization": access_token},
+            json={
+                "device_mac": device_mac,
+                "keyType": "user",
+            },
+            timeout=10,
+        )
+        key_response_content = json.loads(key_response.content)
+        if key_response_content["statusCode"] != 100:
+            raise RuntimeError(
+                f"Unexpected status code returned by SwitchBot API: {key_response_content['statusCode']}"
+            )
+
+        return {
+            "key_id": key_response_content["body"]["communicationKey"]["keyId"],
+            "encryption_key": key_response_content["body"]["communicationKey"]["key"],
+        }
+
     async def lock(self) -> bool:
         """Send lock command."""
         return await self._lock_unlock(