__init__.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. import enum
  2. import logging
  3. import math
  4. import typing
  5. import spidev
  6. from cc1101.addresses import (
  7. StrobeAddress,
  8. ConfigurationRegisterAddress,
  9. StatusRegisterAddress,
  10. FIFORegisterAddress,
  11. )
  12. _LOGGER = logging.getLogger(__name__)
  13. class CC1101:
  14. # > All transfers on the SPI interface are done
  15. # > most significant bit first.
  16. # > All transactions on the SPI interface start with
  17. # > a header byte containing a R/W bit, a access bit (B),
  18. # > and a 6-bit address (A5 - A0).
  19. # > [...]
  20. # > Table 45: SPI Address Space
  21. _WRITE_SINGLE_BYTE = 0x00
  22. # > Registers with consecutive addresses can be
  23. # > accessed in an efficient way by setting the
  24. # > burst bit (B) in the header byte. The address
  25. # > bits (A5 - A0) set the start address in an
  26. # > internal address counter. This counter is
  27. # > incremented by one each new byte [...]
  28. _WRITE_BURST = 0x40
  29. _READ_SINGLE_BYTE = 0x80
  30. _READ_BURST = 0xC0
  31. class _TransceiveMode(enum.IntEnum):
  32. """
  33. PKTCTRL0.PKT_FORMAT
  34. """
  35. FIFO = 0b00
  36. SYNCHRONOUS_SERIAL = 0b01
  37. RANDOM_TRANSMISSION = 0b10
  38. ASYNCHRONOUS_SERIAL = 0b11
  39. class ModulationFormat(enum.IntEnum):
  40. """
  41. MDMCFG2.MOD_FORMAT
  42. """
  43. FSK2 = 0b000
  44. GFSK = 0b001
  45. ASK_OOK = 0b011
  46. FSK4 = 0b100
  47. MSK = 0b111
  48. class MainRadioControlStateMachineState(enum.IntEnum):
  49. """
  50. MARCSTATE - Main Radio Control State Machine State
  51. """
  52. # see "Figure 13: Simplified State Diagram"
  53. # and "Figure 25: Complete Radio Control State Diagram"
  54. IDLE = 0x01
  55. STARTCAL = 0x08 # after IDLE
  56. BWBOOST = 0x09 # after STARTCAL
  57. FS_LOCK = 0x0A
  58. TX = 0x13
  59. # 29.3 Status Register Details
  60. _SUPPORTED_PARTNUM = 0
  61. _SUPPORTED_VERSION = 0x14
  62. _CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ = 26e6
  63. # see "21 Frequency Programming"
  64. # > f_carrier = f_XOSC / 2**16 * (FREQ + CHAN * ((256 + CHANSPC_M) * 2**CHANSPC_E-2))
  65. _FREQUENCY_CONTROL_WORD_HERTZ_FACTOR = _CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ / 2 ** 16
  66. def __init__(self) -> None:
  67. self._spi = spidev.SpiDev()
  68. @staticmethod
  69. def _log_chip_status_byte(chip_status: int) -> None:
  70. # see "10.1 Chip Status Byte" & "Table 23: Status Byte Summary"
  71. _LOGGER.debug(
  72. "chip status byte: CHIP_RDYn=%d STATE=%s FIFO_BYTES_AVAILBLE=%d",
  73. chip_status >> 7,
  74. bin((chip_status >> 4) & 0b111),
  75. chip_status & 0b1111,
  76. )
  77. def _read_single_byte(
  78. self, register: typing.Union[ConfigurationRegisterAddress, FIFORegisterAddress]
  79. ) -> int:
  80. response = self._spi.xfer([register | self._READ_SINGLE_BYTE, 0])
  81. assert len(response) == 2, response
  82. self._log_chip_status_byte(response[0])
  83. return response[1]
  84. def _read_burst(
  85. self,
  86. start_register: typing.Union[ConfigurationRegisterAddress, FIFORegisterAddress],
  87. length: int,
  88. ) -> typing.List[int]:
  89. response = self._spi.xfer([start_register | self._READ_BURST] + [0] * length)
  90. assert len(response) == length + 1, response
  91. self._log_chip_status_byte(response[0])
  92. return response[1:]
  93. def _read_status_register(self, register: StatusRegisterAddress) -> int:
  94. # > For register addresses in the range 0x30-0x3D,
  95. # > the burst bit is used to select between
  96. # > status registers when burst bit is one, and
  97. # > between command strobes when burst bit is
  98. # > zero. [...]
  99. # > Because of this, burst access is not available
  100. # > for status registers and they must be accessed
  101. # > one at a time. The status registers can only be
  102. # > read.
  103. response = self._spi.xfer([register | self._READ_BURST, 0])
  104. assert len(response) == 2, response
  105. self._log_chip_status_byte(response[0])
  106. return response[1]
  107. def _command_strobe(self, register: StrobeAddress) -> None:
  108. # see "10.4 Command Strobes"
  109. _LOGGER.debug("sending command strobe 0x%02x", register)
  110. response = self._spi.xfer([register | self._WRITE_SINGLE_BYTE])
  111. assert len(response) == 1, response
  112. self._log_chip_status_byte(response[0])
  113. def _write_burst(
  114. self,
  115. start_register: typing.Union[ConfigurationRegisterAddress, FIFORegisterAddress],
  116. values: typing.List[int],
  117. ) -> None:
  118. _LOGGER.debug(
  119. "writing burst: start_register=0x%02x values=%s", start_register, values
  120. )
  121. response = self._spi.xfer([start_register | self._WRITE_BURST] + values)
  122. assert len(response) == len(values) + 1, response
  123. self._log_chip_status_byte(response[0])
  124. assert all(v == 0x0F for v in response[1:]), response # TODO why?
  125. def _reset(self) -> None:
  126. self._command_strobe(StrobeAddress.SRES)
  127. def _get_symbol_rate_exponent(self) -> int:
  128. """
  129. MDMCFG4.DRATE_E
  130. """
  131. return self._read_single_byte(ConfigurationRegisterAddress.MDMCFG4) & 0b00001111
  132. def _set_symbol_rate_exponent(self, exponent: int):
  133. mdmcfg4 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG4)
  134. mdmcfg4 &= 0b11110000
  135. mdmcfg4 |= exponent
  136. self._write_burst(
  137. start_register=ConfigurationRegisterAddress.MDMCFG4, values=[mdmcfg4]
  138. )
  139. def _get_symbol_rate_mantissa(self) -> int:
  140. """
  141. MDMCFG3.DRATE_M
  142. """
  143. return self._read_single_byte(ConfigurationRegisterAddress.MDMCFG3)
  144. def _set_symbol_rate_mantissa(self, mantissa: int) -> None:
  145. self._write_burst(
  146. start_register=ConfigurationRegisterAddress.MDMCFG3, values=[mantissa]
  147. )
  148. @classmethod
  149. def _symbol_rate_floating_point_to_real(cls, mantissa: int, exponent: int) -> float:
  150. # see "12 Data Rate Programming"
  151. return (
  152. (256 + mantissa)
  153. * (2 ** exponent)
  154. * cls._CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ
  155. / (2 ** 28)
  156. )
  157. @classmethod
  158. def _symbol_rate_real_to_floating_point(cls, real: float) -> typing.Tuple[int, int]:
  159. # see "12 Data Rate Programming"
  160. assert real > 0, real
  161. exponent = math.floor(
  162. math.log2(real / cls._CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ) + 20
  163. )
  164. mantissa = round(
  165. real * 2 ** 28 / cls._CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ / 2 ** exponent
  166. - 256
  167. )
  168. if mantissa == 256:
  169. exponent += 1
  170. mantissa = 0
  171. assert 0 < exponent <= 2 ** 4, exponent
  172. assert mantissa <= 2 ** 8, mantissa
  173. return mantissa, exponent
  174. def get_symbol_rate_baud(self) -> float:
  175. return self._symbol_rate_floating_point_to_real(
  176. mantissa=self._get_symbol_rate_mantissa(),
  177. exponent=self._get_symbol_rate_exponent(),
  178. )
  179. def set_symbol_rate_baud(self, real: float) -> None:
  180. # > The data rate can be set from 0.6 kBaud to 500 kBaud [...]
  181. mantissa, exponent = self._symbol_rate_real_to_floating_point(real)
  182. self._set_symbol_rate_mantissa(mantissa)
  183. self._set_symbol_rate_exponent(exponent)
  184. def get_modulation_format(self) -> ModulationFormat:
  185. mdmcfg2 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG2)
  186. return self.ModulationFormat((mdmcfg2 >> 4) & 0b111)
  187. def _set_modulation_format(self, modulation_format: ModulationFormat) -> None:
  188. mdmcfg2 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG2)
  189. mdmcfg2 &= ~(modulation_format << 4)
  190. mdmcfg2 |= modulation_format << 4
  191. self._write_burst(ConfigurationRegisterAddress.MDMCFG2, [mdmcfg2])
  192. def __enter__(self) -> "CC1101":
  193. # https://docs.python.org/3/reference/datamodel.html#object.__enter__
  194. self._spi.open(0, 0)
  195. self._spi.max_speed_hz = 55700 # empirical
  196. self._reset()
  197. partnum = self._read_status_register(StatusRegisterAddress.PARTNUM)
  198. if partnum != self._SUPPORTED_PARTNUM:
  199. raise ValueError(
  200. "unexpected chip part number {} (expected: {})".format(
  201. partnum, self._SUPPORTED_PARTNUM
  202. )
  203. )
  204. version = self._read_status_register(StatusRegisterAddress.VERSION)
  205. if version != self._SUPPORTED_VERSION:
  206. raise ValueError(
  207. "unexpected chip version number {} (expected: {})".format(
  208. version, self._SUPPORTED_VERSION
  209. )
  210. )
  211. # 6:4 MOD_FORMAT: OOK (default: 2-FSK)
  212. self._set_modulation_format(self.ModulationFormat.ASK_OOK)
  213. # 7:6 unused
  214. # 5:4 FS_AUTOCAL: calibrate when going from IDLE to RX or TX
  215. # 3:2 PO_TIMEOUT: default
  216. # 1 PIN_CTRL_EN: default
  217. # 0 XOSC_FORCE_ON: default
  218. self._write_burst(ConfigurationRegisterAddress.MCSM0, [0b010100])
  219. marcstate = self.get_main_radio_control_state_machine_state()
  220. if marcstate != self.MainRadioControlStateMachineState.IDLE:
  221. raise ValueError("expected marcstate idle (actual: {})".format(marcstate))
  222. return self
  223. def __exit__(self, exc_type, exc_value, traceback) -> bool:
  224. # https://docs.python.org/3/reference/datamodel.html#object.__exit__
  225. self._spi.close()
  226. return False
  227. def get_main_radio_control_state_machine_state(
  228. self
  229. ) -> MainRadioControlStateMachineState:
  230. return self.MainRadioControlStateMachineState(
  231. self._read_status_register(StatusRegisterAddress.MARCSTATE)
  232. )
  233. def get_marc_state(self) -> MainRadioControlStateMachineState:
  234. """
  235. alias for get_main_radio_control_state_machine_state()
  236. """
  237. return self.get_main_radio_control_state_machine_state()
  238. @classmethod
  239. def _frequency_control_word_to_hertz(cls, control_word: typing.List[int]) -> float:
  240. return (
  241. int.from_bytes(control_word, byteorder="big", signed=False)
  242. * cls._FREQUENCY_CONTROL_WORD_HERTZ_FACTOR
  243. )
  244. @classmethod
  245. def _hertz_to_frequency_control_word(cls, hertz: float) -> typing.List[int]:
  246. return list(
  247. round(hertz / cls._FREQUENCY_CONTROL_WORD_HERTZ_FACTOR).to_bytes(
  248. length=3, byteorder="big", signed=False
  249. )
  250. )
  251. def _get_base_frequency_control_word(self) -> typing.List[int]:
  252. # > The base or start frequency is set by the 24 bitfrequency
  253. # > word located in the FREQ2, FREQ1, FREQ0 registers.
  254. return self._read_burst(
  255. start_register=ConfigurationRegisterAddress.FREQ2, length=3
  256. )
  257. def _set_base_frequency_control_word(self, control_word: typing.List[int]) -> None:
  258. self._write_burst(
  259. start_register=ConfigurationRegisterAddress.FREQ2, values=control_word
  260. )
  261. def get_base_frequency_hertz(self) -> float:
  262. return self._frequency_control_word_to_hertz(
  263. self._get_base_frequency_control_word()
  264. )
  265. def set_base_frequency_hertz(self, freq: float) -> None:
  266. self._set_base_frequency_control_word(
  267. self._hertz_to_frequency_control_word(freq)
  268. )
  269. def __str__(self) -> str:
  270. return "CC1101(marcstate={}, base_frequency={:.2f}MHz, symbol_rate={:.2f}kBaud, modulation_format={})".format(
  271. self.get_main_radio_control_state_machine_state().name.lower(),
  272. self.get_base_frequency_hertz() / 10 ** 6,
  273. self.get_symbol_rate_baud() / 1000,
  274. self.get_modulation_format().name,
  275. )
  276. def _get_packet_length(self) -> int:
  277. """
  278. packet length in fixed packet length mode,
  279. maximum packet length in variable packet length mode.
  280. """
  281. return self._read_single_byte(ConfigurationRegisterAddress.PKTLEN)
  282. def get_configuration_register_values(
  283. self,
  284. start_register: ConfigurationRegisterAddress = min(
  285. ConfigurationRegisterAddress
  286. ),
  287. end_register: ConfigurationRegisterAddress = max(ConfigurationRegisterAddress),
  288. ) -> typing.Dict[ConfigurationRegisterAddress, int]:
  289. assert start_register <= end_register, (start_register, end_register)
  290. values = self._read_burst(
  291. start_register=start_register, length=end_register - start_register + 1
  292. )
  293. return {
  294. ConfigurationRegisterAddress(start_register + i): v
  295. for i, v in enumerate(values)
  296. }
  297. def _get_transceive_mode(self) -> _TransceiveMode:
  298. pktctrl0 = self._read_single_byte(ConfigurationRegisterAddress.PKTCTRL0)
  299. return self._TransceiveMode((pktctrl0 >> 4) & 0b11)
  300. def _flush_tx_fifo_buffer(self) -> None:
  301. # > Only issue SFTX in IDLE or TXFIFO_UNDERFLOW states.
  302. _LOGGER.debug("flushing tx fifo buffer")
  303. self._command_strobe(StrobeAddress.SFTX)
  304. def transmit(self, payload: typing.List[int]) -> None:
  305. # see "15.2 Packet Format"
  306. # > In variable packet length mode, [...]
  307. # > The first byte written to the TXFIFO must be different from 0.
  308. if payload[0] == 0:
  309. raise ValueError(
  310. "in variable packet length mode the first byte of payload must not be null"
  311. + "\npayload: {}".format(payload)
  312. )
  313. marcstate = self.get_main_radio_control_state_machine_state()
  314. if marcstate != self.MainRadioControlStateMachineState.IDLE:
  315. raise Exception(
  316. "device must be idle before transmission (current marcstate: {})".format(
  317. marcstate.name
  318. )
  319. )
  320. max_packet_length = self._get_packet_length()
  321. if len(payload) > max_packet_length:
  322. raise ValueError(
  323. "payload exceeds maximum payload length of {} bytes".format(
  324. max_packet_length
  325. )
  326. + "\npayload: {}".format(payload)
  327. )
  328. self._flush_tx_fifo_buffer()
  329. self._write_burst(FIFORegisterAddress.TX, payload)
  330. _LOGGER.info("transmitting %s", payload)
  331. self._command_strobe(StrobeAddress.STX)