__init__.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. import enum
  2. import logging
  3. import typing
  4. import spidev
  5. _LOGGER = logging.getLogger(__name__)
  6. class CC1101:
  7. # > All transfers on the SPI interface are done
  8. # > most significant bit first.
  9. # > All transactions on the SPI interface start with
  10. # > a header byte containing a R/W bit, a access bit (B),
  11. # > and a 6-bit address (A5 - A0).
  12. # > [...]
  13. # > Table 45: SPI Address Space
  14. _WRITE_SINGLE_BYTE = 0x00
  15. # > Registers with consecutive addresses can be
  16. # > accessed in an efficient way by setting the
  17. # > burst bit (B) in the header byte. The address
  18. # > bits (A5 - A0) set the start address in an
  19. # > internal address counter. This counter is
  20. # > incremented by one each new byte [...]
  21. _WRITE_BURST = 0x40
  22. _READ_SINGLE_BYTE = 0x80
  23. _READ_BURST = 0xC0
  24. class _SPIAddress(enum.IntEnum):
  25. # see "Table 45: SPI Address Space"
  26. # > The configuration registers on the CC1101 are
  27. # > located on SPI addresses from 0x00 to 0x2E.
  28. PKTLEN = 0x06
  29. FREQ2 = 0x0D
  30. FREQ1 = 0x0E
  31. FREQ0 = 0x0F
  32. MCSM0 = 0x18
  33. # > For register addresses in the range 0x30-0x3D,
  34. # > the burst bit is used to select between
  35. # > status registers when burst bit is one, and
  36. # > between command strobes when burst bit is
  37. # > zero. [...]
  38. # > Because of this, burst access is not available
  39. # > for status registers and they must be accessed
  40. # > one at a time. The status registers can only be
  41. # > read.
  42. SRES = 0x30
  43. STX = 0x35
  44. SFTX = 0x3B
  45. PARTNUM = 0x30
  46. VERSION = 0x31
  47. MARCSTATE = 0x35
  48. # see "10.5 FIFO Access"
  49. # > When the R/W-bit is zero, the TX FIFO is
  50. # > accessed, and the RX FIFO is accessed when
  51. # > the R/W-bit is one.
  52. TXFIFO = 0x3F
  53. class MainRadioControlStateMachineState(enum.IntEnum):
  54. """
  55. MARCSTATE - Main Radio Control State Machine State
  56. """
  57. # see "Figure 13: Simplified State Diagram"
  58. # and "Figure 25: Complete Radio Control State Diagram"
  59. IDLE = 0x01
  60. STARTCAL = 0x08 # after IDLE
  61. BWBOOST = 0x09 # after STARTCAL
  62. FS_LOCK = 0x0A
  63. TX = 0x13
  64. # 29.3 Status Register Details
  65. _SUPPORTED_PARTNUM = 0
  66. _SUPPORTED_VERSION = 0x14
  67. _CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ = 26e6
  68. # see "21 Frequency Programming"
  69. # > f_carrier = f_XOSC / 2**16 * (FREQ + CHAN * ((256 + CHANSPC_M) * 2**CHANSPC_E-2))
  70. _FREQUENCY_CONTROL_WORD_HERTZ_FACTOR = _CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ / 2 ** 16
  71. def __init__(self) -> None:
  72. self._spi = spidev.SpiDev()
  73. @staticmethod
  74. def _log_chip_status_byte(chip_status: int) -> None:
  75. # see "10.1 Chip Status Byte" & "Table 23: Status Byte Summary"
  76. _LOGGER.debug(
  77. "chip status byte: CHIP_RDYn=%d STATE=%s FIFO_BYTES_AVAILBLE=%d",
  78. chip_status >> 7,
  79. bin((chip_status >> 4) & 0b111),
  80. chip_status & 0b1111,
  81. )
  82. def _read_burst(self, start_register: _SPIAddress, length: int) -> typing.List[int]:
  83. response = self._spi.xfer([start_register | self._READ_BURST] + [0] * length)
  84. assert len(response) == length + 1, response
  85. self._log_chip_status_byte(response[0])
  86. return response[1:]
  87. def _read_status_register(self, register: _SPIAddress) -> int:
  88. _LOGGER.debug("reading status register 0x%02x", register)
  89. values = self._read_burst(start_register=register, length=1)
  90. assert len(values) == 1, values
  91. return values[0]
  92. def _command_strobe(self, register: _SPIAddress) -> None:
  93. # see "10.4 Command Strobes"
  94. _LOGGER.debug("sending command strobe 0x%02x", register)
  95. response = self._spi.xfer([register | self._WRITE_SINGLE_BYTE])
  96. assert len(response) == 1, response
  97. self._log_chip_status_byte(response[0])
  98. def _write_burst(
  99. self, start_register: _SPIAddress, values: typing.List[int]
  100. ) -> None:
  101. _LOGGER.debug(
  102. "writing burst: start_register=0x%02x values=%s", start_register, values
  103. )
  104. response = self._spi.xfer([start_register | self._WRITE_BURST] + values)
  105. assert len(response) == len(values) + 1, response
  106. self._log_chip_status_byte(response[0])
  107. assert all(v == 0x0F for v in response[1:]), response # TODO why?
  108. def _reset(self) -> None:
  109. self._command_strobe(self._SPIAddress.SRES)
  110. def __enter__(self) -> "CC1101":
  111. # https://docs.python.org/3/reference/datamodel.html#object.__enter__
  112. self._spi.open(0, 0)
  113. self._spi.max_speed_hz = 55700 # empirical
  114. self._reset()
  115. partnum = self._read_status_register(self._SPIAddress.PARTNUM)
  116. if partnum != self._SUPPORTED_PARTNUM:
  117. raise ValueError(
  118. "unexpected chip part number {} (expected: {})".format(
  119. partnum, self._SUPPORTED_PARTNUM
  120. )
  121. )
  122. version = self._read_status_register(self._SPIAddress.VERSION)
  123. if version != self._SUPPORTED_VERSION:
  124. raise ValueError(
  125. "unexpected chip version number {} (expected: {})".format(
  126. version, self._SUPPORTED_VERSION
  127. )
  128. )
  129. # 7:6 unused
  130. # 5:4 FS_AUTOCAL: calibrate when going from IDLE to RX or TX
  131. # 3:2 PO_TIMEOUT: default
  132. # 1 PIN_CTRL_EN: default
  133. # 0 XOSC_FORCE_ON: default
  134. self._write_burst(self._SPIAddress.MCSM0, [0b010100])
  135. marcstate = self.get_main_radio_control_state_machine_state()
  136. if marcstate != self.MainRadioControlStateMachineState.IDLE:
  137. raise ValueError("expected marcstate idle (actual: {})".format(marcstate))
  138. return self
  139. def __exit__(self, exc_type, exc_value, traceback) -> bool:
  140. # https://docs.python.org/3/reference/datamodel.html#object.__exit__
  141. self._spi.close()
  142. return False
  143. def get_main_radio_control_state_machine_state(
  144. self
  145. ) -> MainRadioControlStateMachineState:
  146. return self.MainRadioControlStateMachineState(
  147. self._read_status_register(self._SPIAddress.MARCSTATE)
  148. )
  149. def get_marc_state(self) -> MainRadioControlStateMachineState:
  150. """
  151. alias for get_main_radio_control_state_machine_state()
  152. """
  153. return self.get_main_radio_control_state_machine_state()
  154. @classmethod
  155. def _frequency_control_word_to_hertz(cls, control_word: typing.List[int]) -> float:
  156. return (
  157. int.from_bytes(control_word, byteorder="big", signed=False)
  158. * cls._FREQUENCY_CONTROL_WORD_HERTZ_FACTOR
  159. )
  160. @classmethod
  161. def _hertz_to_frequency_control_word(cls, hertz: float) -> typing.List[int]:
  162. return list(
  163. round(hertz / cls._FREQUENCY_CONTROL_WORD_HERTZ_FACTOR).to_bytes(
  164. length=3, byteorder="big", signed=False
  165. )
  166. )
  167. def _get_base_frequency_control_word(self) -> typing.List[int]:
  168. # > The base or start frequency is set by the 24 bitfrequency
  169. # > word located in the FREQ2, FREQ1, FREQ0 registers.
  170. return self._read_burst(start_register=self._SPIAddress.FREQ2, length=3)
  171. def _set_base_frequency_control_word(self, control_word: typing.List[int]) -> None:
  172. self._write_burst(start_register=self._SPIAddress.FREQ2, values=control_word)
  173. def get_base_frequency_hertz(self) -> float:
  174. return self._frequency_control_word_to_hertz(
  175. self._get_base_frequency_control_word()
  176. )
  177. def set_base_frequency_hertz(self, freq: float) -> None:
  178. self._set_base_frequency_control_word(
  179. self._hertz_to_frequency_control_word(freq)
  180. )
  181. def __str__(self) -> str:
  182. return "CC1101(marcstate={}, base_frequency={:.2f}MHz)".format(
  183. self.get_main_radio_control_state_machine_state().name.lower(),
  184. self.get_base_frequency_hertz() / 10 ** 6,
  185. )
  186. def _get_packet_length(self) -> int:
  187. """
  188. packet length in fixed packet length mode,
  189. maximum packet length in variable packet length mode.
  190. """
  191. return self._read_burst(start_register=self._SPIAddress.PKTLEN, length=1)[0]
  192. def _flush_tx_fifo_buffer(self) -> str:
  193. # > Only issue SFTX in IDLE or TXFIFO_UNDERFLOW states.
  194. _LOGGER.debug("flushing tx fifo buffer")
  195. self._command_strobe(self._SPIAddress.SFTX)
  196. def transmit(self, payload: typing.List[int]) -> None:
  197. # see "15.2 Packet Format"
  198. # > In variable packet length mode, [...]
  199. # > The first byte written to the TXFIFO must be different from 0.
  200. if payload[0] == 0:
  201. raise ValueError(
  202. "in variable packet length mode the first byte of payload must not be null"
  203. + "\npayload: {}".format(payload)
  204. )
  205. marcstate = self.get_main_radio_control_state_machine_state()
  206. if marcstate != self.MainRadioControlStateMachineState.IDLE:
  207. raise Exception(
  208. "device must be idle before transmission (current marcstate: {})".format(
  209. marcstate.name
  210. )
  211. )
  212. max_packet_length = self._get_packet_length()
  213. if len(payload) > max_packet_length:
  214. raise ValueError(
  215. "payload exceeds maximum payload length of {} bytes".format(
  216. max_packet_length
  217. )
  218. + "\npayload: {}".format(payload)
  219. )
  220. self._flush_tx_fifo_buffer()
  221. self._write_burst(self._SPIAddress.TXFIFO, payload)
  222. _LOGGER.info("transmitting %s", payload)
  223. self._command_strobe(self._SPIAddress.STX)