Browse Source

prepare / prototype private methods for receiving data (for testing only, unstable api)

Fabian Peter Hammerle 3 years ago
parent
commit
97b404ba8a
3 changed files with 135 additions and 0 deletions
  1. 72 0
      cc1101/__init__.py
  2. 30 0
      tests/test_receive.py
  3. 33 0
      tests/test_received_packet.py

+ 72 - 0
cc1101/__init__.py

@@ -67,6 +67,45 @@ class MainRadioControlStateMachineState(enum.IntEnum):
     # TXFIFO_UNDERFLOW = 0x16
 
 
+class _ReceivedPacket:  # unstable
+
+    # "Table 31: Typical RSSI_offset Values"
+    _RSSI_OFFSET_dB = 74
+
+    def __init__(
+        self,
+        # *,
+        data: bytes,
+        rssi_index: int,  # byte
+        checksum_valid: bool,
+        link_quality_indicator: int,  # 7bit
+    ):
+        self.data = data
+        self._rssi_index = rssi_index
+        assert 0 <= rssi_index < (1 << 8), rssi_index
+        self.checksum_valid = checksum_valid
+        self.link_quality_indicator = link_quality_indicator
+        assert 0 <= link_quality_indicator < (1 << 7), link_quality_indicator
+
+    @property
+    def rssi_dbm(self) -> float:
+        """
+        Estimated Received Signal Strength Indicator (RSSI) in dBm
+
+        see section "17.3 RSSI"
+        """
+        if self._rssi_index >= 128:
+            return (self._rssi_index - 256) / 2 - self._RSSI_OFFSET_dB
+        return self._rssi_index / 2 - self._RSSI_OFFSET_dB
+
+    def __str__(self) -> str:
+        return "{}(RSSI {:.0f}dBm, 0x{})".format(
+            type(self).__name__,
+            self.rssi_dbm,
+            "".join("{:02x}".format(b) for b in self.data),
+        )
+
+
 class CC1101:
 
     # pylint: disable=too-many-public-methods
@@ -194,6 +233,20 @@ class CC1101:
             exponent=mdmcfg4 >> 6, mantissa=(mdmcfg4 >> 4) & 0b11
         )
 
+    def _set_filter_bandwidth(self, *, mantissa: int, exponent: int) -> None:
+        """
+        MDMCFG4.CHANBW_E & MDMCFG4.CHANBW_M
+        """
+        mdmcfg4 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG4)
+        mdmcfg4 &= 0b00001111
+        assert 0 <= exponent <= 0b11, exponent
+        mdmcfg4 |= exponent << 6
+        assert 0 <= mantissa <= 0b11, mantissa
+        mdmcfg4 |= mantissa << 4
+        self._write_burst(
+            start_register=ConfigurationRegisterAddress.MDMCFG4, values=[mdmcfg4]
+        )
+
     def _get_symbol_rate_exponent(self) -> int:
         """
         MDMCFG4.DRATE_E
@@ -681,3 +734,22 @@ class CC1101:
         finally:
             self._command_strobe(StrobeAddress.SIDLE)
             self._set_transceive_mode(_TransceiveMode.FIFO)
+
+    def _enable_receive_mode(self) -> None:  # unstable
+        self._command_strobe(StrobeAddress.SRX)
+
+    def _get_received_packet(self) -> typing.Optional[_ReceivedPacket]:  # unstable
+        """
+        see section "20 Data FIFO"
+        """
+        rxbytes = self._read_status_register(StatusRegisterAddress.RXBYTES)
+        # PKTCTRL1.APPEND_STATUS is enabled by default
+        if rxbytes < 2:
+            return None
+        buffer = self._read_burst(start_register=FIFORegisterAddress.RX, length=rxbytes)
+        return _ReceivedPacket(
+            data=bytes(buffer[:-2]),
+            rssi_index=buffer[-2],
+            checksum_valid=bool(buffer[-1] >> 7),
+            link_quality_indicator=buffer[-1] & 0b0111111,
+        )

+ 30 - 0
tests/test_receive.py

@@ -0,0 +1,30 @@
+import unittest.mock
+
+import pytest
+
+# pylint: disable=protected-access
+
+
+def test__enable_receive_mode(transceiver):
+    transceiver._spi.xfer.return_value = [15]
+    transceiver._enable_receive_mode()
+    transceiver._spi.xfer.assert_called_once_with([0x34 | 0x00])
+
+
+@pytest.mark.parametrize("data", [b"\0", b"\x12\x45\x56"])
+def test__get_received_packet(transceiver, data):
+    fifo_buffer = list(data) + [128, (1 << 7) | 42]
+    with unittest.mock.patch.object(
+        transceiver, "_read_status_register", return_value=len(fifo_buffer)
+    ) as read_status_register, unittest.mock.patch.object(
+        transceiver, "_read_burst", return_value=fifo_buffer
+    ) as read_burst_mock:
+        received_packet = transceiver._get_received_packet()
+    read_status_register.assert_called_once_with(0x3B)
+    read_burst_mock.assert_called_once_with(
+        start_register=0x3F, length=len(fifo_buffer)
+    )
+    assert received_packet.data == data
+    assert received_packet._rssi_index == 128
+    assert received_packet.checksum_valid
+    assert received_packet.link_quality_indicator == 42

+ 33 - 0
tests/test_received_packet.py

@@ -0,0 +1,33 @@
+import pytest
+
+import cc1101
+
+# pylint: disable=protected-access
+
+
+@pytest.mark.parametrize(
+    ("rssi_index", "rssi_dbm"),
+    [
+        (128, -64 - 74),
+        (204, -100),
+        (255, -0.5 - 74),
+        (0, -74),
+        (64, 32 - 74),
+        (127, 63.5 - 74),
+    ],
+)
+def test_rss_dbm(rssi_index, rssi_dbm):
+    packet = cc1101._ReceivedPacket(
+        data=b"\0", rssi_index=rssi_index, checksum_valid=True, link_quality_indicator=0
+    )
+    assert packet.rssi_dbm == pytest.approx(rssi_dbm)
+
+
+def test___str__():
+    packet = cc1101._ReceivedPacket(
+        data=b"\0\x12\x34",
+        rssi_index=204,
+        checksum_valid=True,
+        link_quality_indicator=0,
+    )
+    assert str(packet) == "_ReceivedPacket(RSSI -100dBm, 0x001234)"