|
@@ -5,6 +5,13 @@ import typing
|
|
|
|
|
|
import spidev
|
|
import spidev
|
|
|
|
|
|
|
|
+from cc1101.addresses import (
|
|
|
|
+ StrobeAddress,
|
|
|
|
+ ConfigurationRegisterAddress,
|
|
|
|
+ StatusRegisterAddress,
|
|
|
|
+ FIFORegisterAddress,
|
|
|
|
+)
|
|
|
|
+
|
|
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
@@ -29,39 +36,6 @@ class CC1101:
|
|
_READ_SINGLE_BYTE = 0x80
|
|
_READ_SINGLE_BYTE = 0x80
|
|
_READ_BURST = 0xC0
|
|
_READ_BURST = 0xC0
|
|
|
|
|
|
- class _SPIAddress(enum.IntEnum):
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- PKTLEN = 0x06
|
|
|
|
- FREQ2 = 0x0D
|
|
|
|
- FREQ1 = 0x0E
|
|
|
|
- FREQ0 = 0x0F
|
|
|
|
- MDMCFG4 = 0x10
|
|
|
|
- MDMCFG3 = 0x11
|
|
|
|
- MDMCFG2 = 0x12
|
|
|
|
- MCSM0 = 0x18
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- SRES = 0x30
|
|
|
|
- STX = 0x35
|
|
|
|
- SFTX = 0x3B
|
|
|
|
- PARTNUM = 0x30
|
|
|
|
- VERSION = 0x31
|
|
|
|
- MARCSTATE = 0x35
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- TXFIFO = 0x3F
|
|
|
|
-
|
|
|
|
class ModulationFormat(enum.IntEnum):
|
|
class ModulationFormat(enum.IntEnum):
|
|
"""
|
|
"""
|
|
MDMCFG2.MOD_FORMAT
|
|
MDMCFG2.MOD_FORMAT
|
|
@@ -108,25 +82,40 @@ class CC1101:
|
|
chip_status & 0b1111,
|
|
chip_status & 0b1111,
|
|
)
|
|
)
|
|
|
|
|
|
- def _read_single_byte(self, register: _SPIAddress) -> int:
|
|
+ def _read_single_byte(
|
|
|
|
+ self, register: typing.Union[ConfigurationRegisterAddress, FIFORegisterAddress]
|
|
|
|
+ ) -> int:
|
|
response = self._spi.xfer([register | self._READ_SINGLE_BYTE, 0])
|
|
response = self._spi.xfer([register | self._READ_SINGLE_BYTE, 0])
|
|
assert len(response) == 2, response
|
|
assert len(response) == 2, response
|
|
self._log_chip_status_byte(response[0])
|
|
self._log_chip_status_byte(response[0])
|
|
return response[1]
|
|
return response[1]
|
|
|
|
|
|
- def _read_burst(self, start_register: _SPIAddress, length: int) -> typing.List[int]:
|
|
+ def _read_burst(
|
|
|
|
+ self,
|
|
|
|
+ start_register: typing.Union[ConfigurationRegisterAddress, FIFORegisterAddress],
|
|
|
|
+ length: int,
|
|
|
|
+ ) -> typing.List[int]:
|
|
response = self._spi.xfer([start_register | self._READ_BURST] + [0] * length)
|
|
response = self._spi.xfer([start_register | self._READ_BURST] + [0] * length)
|
|
assert len(response) == length + 1, response
|
|
assert len(response) == length + 1, response
|
|
self._log_chip_status_byte(response[0])
|
|
self._log_chip_status_byte(response[0])
|
|
return response[1:]
|
|
return response[1:]
|
|
|
|
|
|
- def _read_status_register(self, register: _SPIAddress) -> int:
|
|
+ def _read_status_register(self, register: StatusRegisterAddress) -> int:
|
|
- _LOGGER.debug("reading status register 0x%02x", register)
|
|
+
|
|
- values = self._read_burst(start_register=register, length=1)
|
|
+
|
|
- assert len(values) == 1, values
|
|
+
|
|
- return values[0]
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ response = self._spi.xfer([register | self._READ_BURST, 0])
|
|
|
|
+ assert len(response) == 2, response
|
|
|
|
+ self._log_chip_status_byte(response[0])
|
|
|
|
+ return response[1]
|
|
|
|
|
|
- def _command_strobe(self, register: _SPIAddress) -> None:
|
|
+ def _command_strobe(self, register: StrobeAddress) -> None:
|
|
|
|
|
|
_LOGGER.debug("sending command strobe 0x%02x", register)
|
|
_LOGGER.debug("sending command strobe 0x%02x", register)
|
|
response = self._spi.xfer([register | self._WRITE_SINGLE_BYTE])
|
|
response = self._spi.xfer([register | self._WRITE_SINGLE_BYTE])
|
|
@@ -134,7 +123,9 @@ class CC1101:
|
|
self._log_chip_status_byte(response[0])
|
|
self._log_chip_status_byte(response[0])
|
|
|
|
|
|
def _write_burst(
|
|
def _write_burst(
|
|
- self, start_register: _SPIAddress, values: typing.List[int]
|
|
+ self,
|
|
|
|
+ start_register: typing.Union[ConfigurationRegisterAddress, FIFORegisterAddress],
|
|
|
|
+ values: typing.List[int],
|
|
) -> None:
|
|
) -> None:
|
|
_LOGGER.debug(
|
|
_LOGGER.debug(
|
|
"writing burst: start_register=0x%02x values=%s", start_register, values
|
|
"writing burst: start_register=0x%02x values=%s", start_register, values
|
|
@@ -145,28 +136,32 @@ class CC1101:
|
|
assert all(v == 0x0F for v in response[1:]), response
|
|
assert all(v == 0x0F for v in response[1:]), response
|
|
|
|
|
|
def _reset(self) -> None:
|
|
def _reset(self) -> None:
|
|
- self._command_strobe(self._SPIAddress.SRES)
|
|
+ self._command_strobe(StrobeAddress.SRES)
|
|
|
|
|
|
def _get_symbol_rate_exponent(self) -> int:
|
|
def _get_symbol_rate_exponent(self) -> int:
|
|
"""
|
|
"""
|
|
MDMCFG4.DRATE_E
|
|
MDMCFG4.DRATE_E
|
|
"""
|
|
"""
|
|
- return self._read_single_byte(self._SPIAddress.MDMCFG4) & 0b00001111
|
|
+ return self._read_single_byte(ConfigurationRegisterAddress.MDMCFG4) & 0b00001111
|
|
|
|
|
|
def _set_symbol_rate_exponent(self, exponent: int):
|
|
def _set_symbol_rate_exponent(self, exponent: int):
|
|
- mdmcfg4 = self._read_single_byte(self._SPIAddress.MDMCFG4)
|
|
+ mdmcfg4 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG4)
|
|
mdmcfg4 &= 0b11110000
|
|
mdmcfg4 &= 0b11110000
|
|
mdmcfg4 |= exponent
|
|
mdmcfg4 |= exponent
|
|
- self._write_burst(start_register=self._SPIAddress.MDMCFG4, values=[mdmcfg4])
|
|
+ self._write_burst(
|
|
|
|
+ start_register=ConfigurationRegisterAddress.MDMCFG4, values=[mdmcfg4]
|
|
|
|
+ )
|
|
|
|
|
|
def _get_symbol_rate_mantissa(self) -> int:
|
|
def _get_symbol_rate_mantissa(self) -> int:
|
|
"""
|
|
"""
|
|
MDMCFG3.DRATE_M
|
|
MDMCFG3.DRATE_M
|
|
"""
|
|
"""
|
|
- return self._read_single_byte(self._SPIAddress.MDMCFG3)
|
|
+ return self._read_single_byte(ConfigurationRegisterAddress.MDMCFG3)
|
|
|
|
|
|
- def _set_symbol_rate_mantissa(self, mantissa: int) -> int:
|
|
+ def _set_symbol_rate_mantissa(self, mantissa: int) -> None:
|
|
- self._write_burst(start_register=self._SPIAddress.MDMCFG3, values=[mantissa])
|
|
+ self._write_burst(
|
|
|
|
+ start_register=ConfigurationRegisterAddress.MDMCFG3, values=[mantissa]
|
|
|
|
+ )
|
|
|
|
|
|
@classmethod
|
|
@classmethod
|
|
def _symbol_rate_floating_point_to_real(cls, mantissa: int, exponent: int) -> float:
|
|
def _symbol_rate_floating_point_to_real(cls, mantissa: int, exponent: int) -> float:
|
|
@@ -208,28 +203,28 @@ class CC1101:
|
|
self._set_symbol_rate_exponent(exponent)
|
|
self._set_symbol_rate_exponent(exponent)
|
|
|
|
|
|
def get_modulation_format(self) -> ModulationFormat:
|
|
def get_modulation_format(self) -> ModulationFormat:
|
|
- mdmcfg2 = self._read_single_byte(self._SPIAddress.MDMCFG2)
|
|
+ mdmcfg2 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG2)
|
|
return self.ModulationFormat((mdmcfg2 >> 4) & 0b111)
|
|
return self.ModulationFormat((mdmcfg2 >> 4) & 0b111)
|
|
|
|
|
|
def _set_modulation_format(self, modulation_format: ModulationFormat) -> None:
|
|
def _set_modulation_format(self, modulation_format: ModulationFormat) -> None:
|
|
- mdmcfg2 = self._read_single_byte(self._SPIAddress.MDMCFG2)
|
|
+ mdmcfg2 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG2)
|
|
mdmcfg2 &= ~(modulation_format << 4)
|
|
mdmcfg2 &= ~(modulation_format << 4)
|
|
mdmcfg2 |= modulation_format << 4
|
|
mdmcfg2 |= modulation_format << 4
|
|
- self._write_burst(self._SPIAddress.MDMCFG2, [mdmcfg2])
|
|
+ self._write_burst(ConfigurationRegisterAddress.MDMCFG2, [mdmcfg2])
|
|
|
|
|
|
def __enter__(self) -> "CC1101":
|
|
def __enter__(self) -> "CC1101":
|
|
|
|
|
|
self._spi.open(0, 0)
|
|
self._spi.open(0, 0)
|
|
self._spi.max_speed_hz = 55700
|
|
self._spi.max_speed_hz = 55700
|
|
self._reset()
|
|
self._reset()
|
|
- partnum = self._read_status_register(self._SPIAddress.PARTNUM)
|
|
+ partnum = self._read_status_register(StatusRegisterAddress.PARTNUM)
|
|
if partnum != self._SUPPORTED_PARTNUM:
|
|
if partnum != self._SUPPORTED_PARTNUM:
|
|
raise ValueError(
|
|
raise ValueError(
|
|
"unexpected chip part number {} (expected: {})".format(
|
|
"unexpected chip part number {} (expected: {})".format(
|
|
partnum, self._SUPPORTED_PARTNUM
|
|
partnum, self._SUPPORTED_PARTNUM
|
|
)
|
|
)
|
|
)
|
|
)
|
|
- version = self._read_status_register(self._SPIAddress.VERSION)
|
|
+ version = self._read_status_register(StatusRegisterAddress.VERSION)
|
|
if version != self._SUPPORTED_VERSION:
|
|
if version != self._SUPPORTED_VERSION:
|
|
raise ValueError(
|
|
raise ValueError(
|
|
"unexpected chip version number {} (expected: {})".format(
|
|
"unexpected chip version number {} (expected: {})".format(
|
|
@@ -243,7 +238,7 @@ class CC1101:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- self._write_burst(self._SPIAddress.MCSM0, [0b010100])
|
|
+ self._write_burst(ConfigurationRegisterAddress.MCSM0, [0b010100])
|
|
marcstate = self.get_main_radio_control_state_machine_state()
|
|
marcstate = self.get_main_radio_control_state_machine_state()
|
|
if marcstate != self.MainRadioControlStateMachineState.IDLE:
|
|
if marcstate != self.MainRadioControlStateMachineState.IDLE:
|
|
raise ValueError("expected marcstate idle (actual: {})".format(marcstate))
|
|
raise ValueError("expected marcstate idle (actual: {})".format(marcstate))
|
|
@@ -258,7 +253,7 @@ class CC1101:
|
|
self
|
|
self
|
|
) -> MainRadioControlStateMachineState:
|
|
) -> MainRadioControlStateMachineState:
|
|
return self.MainRadioControlStateMachineState(
|
|
return self.MainRadioControlStateMachineState(
|
|
- self._read_status_register(self._SPIAddress.MARCSTATE)
|
|
+ self._read_status_register(StatusRegisterAddress.MARCSTATE)
|
|
)
|
|
)
|
|
|
|
|
|
def get_marc_state(self) -> MainRadioControlStateMachineState:
|
|
def get_marc_state(self) -> MainRadioControlStateMachineState:
|
|
@@ -285,10 +280,14 @@ class CC1101:
|
|
def _get_base_frequency_control_word(self) -> typing.List[int]:
|
|
def _get_base_frequency_control_word(self) -> typing.List[int]:
|
|
|
|
|
|
|
|
|
|
- return self._read_burst(start_register=self._SPIAddress.FREQ2, length=3)
|
|
+ return self._read_burst(
|
|
|
|
+ start_register=ConfigurationRegisterAddress.FREQ2, length=3
|
|
|
|
+ )
|
|
|
|
|
|
def _set_base_frequency_control_word(self, control_word: typing.List[int]) -> None:
|
|
def _set_base_frequency_control_word(self, control_word: typing.List[int]) -> None:
|
|
- self._write_burst(start_register=self._SPIAddress.FREQ2, values=control_word)
|
|
+ self._write_burst(
|
|
|
|
+ start_register=ConfigurationRegisterAddress.FREQ2, values=control_word
|
|
|
|
+ )
|
|
|
|
|
|
def get_base_frequency_hertz(self) -> float:
|
|
def get_base_frequency_hertz(self) -> float:
|
|
return self._frequency_control_word_to_hertz(
|
|
return self._frequency_control_word_to_hertz(
|
|
@@ -313,12 +312,12 @@ class CC1101:
|
|
packet length in fixed packet length mode,
|
|
packet length in fixed packet length mode,
|
|
maximum packet length in variable packet length mode.
|
|
maximum packet length in variable packet length mode.
|
|
"""
|
|
"""
|
|
- return self._read_burst(start_register=self._SPIAddress.PKTLEN, length=1)[0]
|
|
+ return self._read_single_byte(ConfigurationRegisterAddress.PKTLEN)
|
|
|
|
|
|
- def _flush_tx_fifo_buffer(self) -> str:
|
|
+ def _flush_tx_fifo_buffer(self) -> None:
|
|
|
|
|
|
_LOGGER.debug("flushing tx fifo buffer")
|
|
_LOGGER.debug("flushing tx fifo buffer")
|
|
- self._command_strobe(self._SPIAddress.SFTX)
|
|
+ self._command_strobe(StrobeAddress.SFTX)
|
|
|
|
|
|
def transmit(self, payload: typing.List[int]) -> None:
|
|
def transmit(self, payload: typing.List[int]) -> None:
|
|
|
|
|
|
@@ -345,6 +344,6 @@ class CC1101:
|
|
+ "\npayload: {}".format(payload)
|
|
+ "\npayload: {}".format(payload)
|
|
)
|
|
)
|
|
self._flush_tx_fifo_buffer()
|
|
self._flush_tx_fifo_buffer()
|
|
- self._write_burst(self._SPIAddress.TXFIFO, payload)
|
|
+ self._write_burst(FIFORegisterAddress.TX, payload)
|
|
_LOGGER.info("transmitting %s", payload)
|
|
_LOGGER.info("transmitting %s", payload)
|
|
- self._command_strobe(self._SPIAddress.STX)
|
|
+ self._command_strobe(StrobeAddress.STX)
|