__init__.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. import enum
  2. import logging
  3. import math
  4. import typing
  5. import spidev
  6. _LOGGER = logging.getLogger(__name__)
  7. class CC1101:
  8. # > All transfers on the SPI interface are done
  9. # > most significant bit first.
  10. # > All transactions on the SPI interface start with
  11. # > a header byte containing a R/W bit, a access bit (B),
  12. # > and a 6-bit address (A5 - A0).
  13. # > [...]
  14. # > Table 45: SPI Address Space
  15. _WRITE_SINGLE_BYTE = 0x00
  16. # > Registers with consecutive addresses can be
  17. # > accessed in an efficient way by setting the
  18. # > burst bit (B) in the header byte. The address
  19. # > bits (A5 - A0) set the start address in an
  20. # > internal address counter. This counter is
  21. # > incremented by one each new byte [...]
  22. _WRITE_BURST = 0x40
  23. _READ_SINGLE_BYTE = 0x80
  24. _READ_BURST = 0xC0
  25. class _SPIAddress(enum.IntEnum):
  26. # see "Table 45: SPI Address Space"
  27. # > The configuration registers on the CC1101 are
  28. # > located on SPI addresses from 0x00 to 0x2E.
  29. PKTLEN = 0x06
  30. FREQ2 = 0x0D
  31. FREQ1 = 0x0E
  32. FREQ0 = 0x0F
  33. MDMCFG4 = 0x10
  34. MDMCFG3 = 0x11
  35. MDMCFG2 = 0x12
  36. MCSM0 = 0x18
  37. # > For register addresses in the range 0x30-0x3D,
  38. # > the burst bit is used to select between
  39. # > status registers when burst bit is one, and
  40. # > between command strobes when burst bit is
  41. # > zero. [...]
  42. # > Because of this, burst access is not available
  43. # > for status registers and they must be accessed
  44. # > one at a time. The status registers can only be
  45. # > read.
  46. SRES = 0x30
  47. STX = 0x35
  48. SFTX = 0x3B
  49. PARTNUM = 0x30
  50. VERSION = 0x31
  51. MARCSTATE = 0x35
  52. # see "10.5 FIFO Access"
  53. # > When the R/W-bit is zero, the TX FIFO is
  54. # > accessed, and the RX FIFO is accessed when
  55. # > the R/W-bit is one.
  56. TXFIFO = 0x3F
  57. class ModulationFormat(enum.IntEnum):
  58. """
  59. MDMCFG2.MOD_FORMAT
  60. """
  61. FSK2 = 0b000
  62. GFSK = 0b001
  63. ASK_OOK = 0b011
  64. FSK4 = 0b100
  65. MSK = 0b111
  66. class MainRadioControlStateMachineState(enum.IntEnum):
  67. """
  68. MARCSTATE - Main Radio Control State Machine State
  69. """
  70. # see "Figure 13: Simplified State Diagram"
  71. # and "Figure 25: Complete Radio Control State Diagram"
  72. IDLE = 0x01
  73. STARTCAL = 0x08 # after IDLE
  74. BWBOOST = 0x09 # after STARTCAL
  75. FS_LOCK = 0x0A
  76. TX = 0x13
  77. # 29.3 Status Register Details
  78. _SUPPORTED_PARTNUM = 0
  79. _SUPPORTED_VERSION = 0x14
  80. _CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ = 26e6
  81. # see "21 Frequency Programming"
  82. # > f_carrier = f_XOSC / 2**16 * (FREQ + CHAN * ((256 + CHANSPC_M) * 2**CHANSPC_E-2))
  83. _FREQUENCY_CONTROL_WORD_HERTZ_FACTOR = _CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ / 2 ** 16
  84. def __init__(self) -> None:
  85. self._spi = spidev.SpiDev()
  86. @staticmethod
  87. def _log_chip_status_byte(chip_status: int) -> None:
  88. # see "10.1 Chip Status Byte" & "Table 23: Status Byte Summary"
  89. _LOGGER.debug(
  90. "chip status byte: CHIP_RDYn=%d STATE=%s FIFO_BYTES_AVAILBLE=%d",
  91. chip_status >> 7,
  92. bin((chip_status >> 4) & 0b111),
  93. chip_status & 0b1111,
  94. )
  95. def _read_single_byte(self, register: _SPIAddress) -> int:
  96. response = self._spi.xfer([register | self._READ_SINGLE_BYTE, 0])
  97. assert len(response) == 2, response
  98. self._log_chip_status_byte(response[0])
  99. return response[1]
  100. def _read_burst(self, start_register: _SPIAddress, length: int) -> typing.List[int]:
  101. response = self._spi.xfer([start_register | self._READ_BURST] + [0] * length)
  102. assert len(response) == length + 1, response
  103. self._log_chip_status_byte(response[0])
  104. return response[1:]
  105. def _read_status_register(self, register: _SPIAddress) -> int:
  106. _LOGGER.debug("reading status register 0x%02x", register)
  107. values = self._read_burst(start_register=register, length=1)
  108. assert len(values) == 1, values
  109. return values[0]
  110. def _command_strobe(self, register: _SPIAddress) -> None:
  111. # see "10.4 Command Strobes"
  112. _LOGGER.debug("sending command strobe 0x%02x", register)
  113. response = self._spi.xfer([register | self._WRITE_SINGLE_BYTE])
  114. assert len(response) == 1, response
  115. self._log_chip_status_byte(response[0])
  116. def _write_burst(
  117. self, start_register: _SPIAddress, values: typing.List[int]
  118. ) -> None:
  119. _LOGGER.debug(
  120. "writing burst: start_register=0x%02x values=%s", start_register, values
  121. )
  122. response = self._spi.xfer([start_register | self._WRITE_BURST] + values)
  123. assert len(response) == len(values) + 1, response
  124. self._log_chip_status_byte(response[0])
  125. assert all(v == 0x0F for v in response[1:]), response # TODO why?
  126. def _reset(self) -> None:
  127. self._command_strobe(self._SPIAddress.SRES)
  128. def _get_symbol_rate_exponent(self) -> int:
  129. """
  130. MDMCFG4.DRATE_E
  131. """
  132. return self._read_single_byte(self._SPIAddress.MDMCFG4) & 0b00001111
  133. def _set_symbol_rate_exponent(self, exponent: int):
  134. mdmcfg4 = self._read_single_byte(self._SPIAddress.MDMCFG4)
  135. mdmcfg4 &= 0b11110000
  136. mdmcfg4 |= exponent
  137. self._write_burst(start_register=self._SPIAddress.MDMCFG4, values=[mdmcfg4])
  138. def _get_symbol_rate_mantissa(self) -> int:
  139. """
  140. MDMCFG3.DRATE_M
  141. """
  142. return self._read_single_byte(self._SPIAddress.MDMCFG3)
  143. def _set_symbol_rate_mantissa(self, mantissa: int) -> int:
  144. self._write_burst(start_register=self._SPIAddress.MDMCFG3, values=[mantissa])
  145. @classmethod
  146. def _symbol_rate_floating_point_to_real(cls, mantissa: int, exponent: int) -> float:
  147. # see "12 Data Rate Programming"
  148. return (
  149. (256 + mantissa)
  150. * (2 ** exponent)
  151. * cls._CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ
  152. / (2 ** 28)
  153. )
  154. @classmethod
  155. def _symbol_rate_real_to_floating_point(cls, real: float) -> typing.Tuple[int, int]:
  156. # see "12 Data Rate Programming"
  157. assert real > 0, real
  158. exponent = math.floor(
  159. math.log2(real / cls._CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ) + 20
  160. )
  161. mantissa = round(
  162. real * 2 ** 28 / cls._CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ / 2 ** exponent
  163. - 256
  164. )
  165. if mantissa == 256:
  166. exponent += 1
  167. mantissa = 0
  168. assert 0 < exponent <= 2 ** 4, exponent
  169. assert mantissa <= 2 ** 8, mantissa
  170. return mantissa, exponent
  171. def get_symbol_rate_baud(self) -> float:
  172. return self._symbol_rate_floating_point_to_real(
  173. mantissa=self._get_symbol_rate_mantissa(),
  174. exponent=self._get_symbol_rate_exponent(),
  175. )
  176. def set_symbol_rate_baud(self, real: float) -> None:
  177. mantissa, exponent = self._symbol_rate_real_to_floating_point(real)
  178. self._set_symbol_rate_mantissa(mantissa)
  179. self._set_symbol_rate_exponent(exponent)
  180. def get_modulation_format(self) -> ModulationFormat:
  181. mdmcfg2 = self._read_single_byte(self._SPIAddress.MDMCFG2)
  182. return self.ModulationFormat((mdmcfg2 >> 4) & 0b111)
  183. def _set_modulation_format(self, modulation_format: ModulationFormat) -> None:
  184. mdmcfg2 = self._read_single_byte(self._SPIAddress.MDMCFG2)
  185. mdmcfg2 &= ~(modulation_format << 4)
  186. mdmcfg2 |= modulation_format << 4
  187. self._write_burst(self._SPIAddress.MDMCFG2, [mdmcfg2])
  188. def __enter__(self) -> "CC1101":
  189. # https://docs.python.org/3/reference/datamodel.html#object.__enter__
  190. self._spi.open(0, 0)
  191. self._spi.max_speed_hz = 55700 # empirical
  192. self._reset()
  193. partnum = self._read_status_register(self._SPIAddress.PARTNUM)
  194. if partnum != self._SUPPORTED_PARTNUM:
  195. raise ValueError(
  196. "unexpected chip part number {} (expected: {})".format(
  197. partnum, self._SUPPORTED_PARTNUM
  198. )
  199. )
  200. version = self._read_status_register(self._SPIAddress.VERSION)
  201. if version != self._SUPPORTED_VERSION:
  202. raise ValueError(
  203. "unexpected chip version number {} (expected: {})".format(
  204. version, self._SUPPORTED_VERSION
  205. )
  206. )
  207. # 6:4 MOD_FORMAT: OOK (default: 2-FSK)
  208. self._set_modulation_format(self.ModulationFormat.ASK_OOK)
  209. # 7:6 unused
  210. # 5:4 FS_AUTOCAL: calibrate when going from IDLE to RX or TX
  211. # 3:2 PO_TIMEOUT: default
  212. # 1 PIN_CTRL_EN: default
  213. # 0 XOSC_FORCE_ON: default
  214. self._write_burst(self._SPIAddress.MCSM0, [0b010100])
  215. marcstate = self.get_main_radio_control_state_machine_state()
  216. if marcstate != self.MainRadioControlStateMachineState.IDLE:
  217. raise ValueError("expected marcstate idle (actual: {})".format(marcstate))
  218. return self
  219. def __exit__(self, exc_type, exc_value, traceback) -> bool:
  220. # https://docs.python.org/3/reference/datamodel.html#object.__exit__
  221. self._spi.close()
  222. return False
  223. def get_main_radio_control_state_machine_state(
  224. self
  225. ) -> MainRadioControlStateMachineState:
  226. return self.MainRadioControlStateMachineState(
  227. self._read_status_register(self._SPIAddress.MARCSTATE)
  228. )
  229. def get_marc_state(self) -> MainRadioControlStateMachineState:
  230. """
  231. alias for get_main_radio_control_state_machine_state()
  232. """
  233. return self.get_main_radio_control_state_machine_state()
  234. @classmethod
  235. def _frequency_control_word_to_hertz(cls, control_word: typing.List[int]) -> float:
  236. return (
  237. int.from_bytes(control_word, byteorder="big", signed=False)
  238. * cls._FREQUENCY_CONTROL_WORD_HERTZ_FACTOR
  239. )
  240. @classmethod
  241. def _hertz_to_frequency_control_word(cls, hertz: float) -> typing.List[int]:
  242. return list(
  243. round(hertz / cls._FREQUENCY_CONTROL_WORD_HERTZ_FACTOR).to_bytes(
  244. length=3, byteorder="big", signed=False
  245. )
  246. )
  247. def _get_base_frequency_control_word(self) -> typing.List[int]:
  248. # > The base or start frequency is set by the 24 bitfrequency
  249. # > word located in the FREQ2, FREQ1, FREQ0 registers.
  250. return self._read_burst(start_register=self._SPIAddress.FREQ2, length=3)
  251. def _set_base_frequency_control_word(self, control_word: typing.List[int]) -> None:
  252. self._write_burst(start_register=self._SPIAddress.FREQ2, values=control_word)
  253. def get_base_frequency_hertz(self) -> float:
  254. return self._frequency_control_word_to_hertz(
  255. self._get_base_frequency_control_word()
  256. )
  257. def set_base_frequency_hertz(self, freq: float) -> None:
  258. self._set_base_frequency_control_word(
  259. self._hertz_to_frequency_control_word(freq)
  260. )
  261. def __str__(self) -> str:
  262. return "CC1101(marcstate={}, base_frequency={:.2f}MHz, symbol_rate={:.2f}kBaud, modulation_format={})".format(
  263. self.get_main_radio_control_state_machine_state().name.lower(),
  264. self.get_base_frequency_hertz() / 10 ** 6,
  265. self.get_symbol_rate_baud() / 1000,
  266. self.get_modulation_format().name,
  267. )
  268. def _get_packet_length(self) -> int:
  269. """
  270. packet length in fixed packet length mode,
  271. maximum packet length in variable packet length mode.
  272. """
  273. return self._read_burst(start_register=self._SPIAddress.PKTLEN, length=1)[0]
  274. def _flush_tx_fifo_buffer(self) -> str:
  275. # > Only issue SFTX in IDLE or TXFIFO_UNDERFLOW states.
  276. _LOGGER.debug("flushing tx fifo buffer")
  277. self._command_strobe(self._SPIAddress.SFTX)
  278. def transmit(self, payload: typing.List[int]) -> None:
  279. # see "15.2 Packet Format"
  280. # > In variable packet length mode, [...]
  281. # > The first byte written to the TXFIFO must be different from 0.
  282. if payload[0] == 0:
  283. raise ValueError(
  284. "in variable packet length mode the first byte of payload must not be null"
  285. + "\npayload: {}".format(payload)
  286. )
  287. marcstate = self.get_main_radio_control_state_machine_state()
  288. if marcstate != self.MainRadioControlStateMachineState.IDLE:
  289. raise Exception(
  290. "device must be idle before transmission (current marcstate: {})".format(
  291. marcstate.name
  292. )
  293. )
  294. max_packet_length = self._get_packet_length()
  295. if len(payload) > max_packet_length:
  296. raise ValueError(
  297. "payload exceeds maximum payload length of {} bytes".format(
  298. max_packet_length
  299. )
  300. + "\npayload: {}".format(payload)
  301. )
  302. self._flush_tx_fifo_buffer()
  303. self._write_burst(self._SPIAddress.TXFIFO, payload)
  304. _LOGGER.info("transmitting %s", payload)
  305. self._command_strobe(self._SPIAddress.STX)