__init__.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. import contextlib
  2. import enum
  3. import logging
  4. import math
  5. import typing
  6. import spidev
  7. from cc1101.addresses import (
  8. StrobeAddress,
  9. ConfigurationRegisterAddress,
  10. StatusRegisterAddress,
  11. FIFORegisterAddress,
  12. )
  13. _LOGGER = logging.getLogger(__name__)
  14. class CC1101:
  15. # > All transfers on the SPI interface are done
  16. # > most significant bit first.
  17. # > All transactions on the SPI interface start with
  18. # > a header byte containing a R/W bit, a access bit (B),
  19. # > and a 6-bit address (A5 - A0).
  20. # > [...]
  21. # > Table 45: SPI Address Space
  22. _WRITE_SINGLE_BYTE = 0x00
  23. # > Registers with consecutive addresses can be
  24. # > accessed in an efficient way by setting the
  25. # > burst bit (B) in the header byte. The address
  26. # > bits (A5 - A0) set the start address in an
  27. # > internal address counter. This counter is
  28. # > incremented by one each new byte [...]
  29. _WRITE_BURST = 0x40
  30. _READ_SINGLE_BYTE = 0x80
  31. _READ_BURST = 0xC0
  32. class Pin(enum.Enum):
  33. GDO0 = "GDO0"
  34. class _TransceiveMode(enum.IntEnum):
  35. """
  36. PKTCTRL0.PKT_FORMAT
  37. """
  38. FIFO = 0b00
  39. SYNCHRONOUS_SERIAL = 0b01
  40. RANDOM_TRANSMISSION = 0b10
  41. ASYNCHRONOUS_SERIAL = 0b11
  42. class ModulationFormat(enum.IntEnum):
  43. """
  44. MDMCFG2.MOD_FORMAT
  45. """
  46. FSK2 = 0b000
  47. GFSK = 0b001
  48. ASK_OOK = 0b011
  49. FSK4 = 0b100
  50. MSK = 0b111
  51. class MainRadioControlStateMachineState(enum.IntEnum):
  52. """
  53. MARCSTATE - Main Radio Control State Machine State
  54. """
  55. # see "Figure 13: Simplified State Diagram"
  56. # and "Figure 25: Complete Radio Control State Diagram"
  57. IDLE = 0x01
  58. STARTCAL = 0x08 # after IDLE
  59. BWBOOST = 0x09 # after STARTCAL
  60. FS_LOCK = 0x0A
  61. TX = 0x13
  62. # 29.3 Status Register Details
  63. _SUPPORTED_PARTNUM = 0
  64. _SUPPORTED_VERSION = 0x14
  65. _CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ = 26e6
  66. # see "21 Frequency Programming"
  67. # > f_carrier = f_XOSC / 2**16 * (FREQ + CHAN * ((256 + CHANSPC_M) * 2**CHANSPC_E-2))
  68. _FREQUENCY_CONTROL_WORD_HERTZ_FACTOR = _CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ / 2 ** 16
  69. def __init__(self) -> None:
  70. self._spi = spidev.SpiDev()
  71. @staticmethod
  72. def _log_chip_status_byte(chip_status: int) -> None:
  73. # see "10.1 Chip Status Byte" & "Table 23: Status Byte Summary"
  74. _LOGGER.debug(
  75. "chip status byte: CHIP_RDYn=%d STATE=%s FIFO_BYTES_AVAILBLE=%d",
  76. chip_status >> 7,
  77. bin((chip_status >> 4) & 0b111),
  78. chip_status & 0b1111,
  79. )
  80. def _read_single_byte(
  81. self, register: typing.Union[ConfigurationRegisterAddress, FIFORegisterAddress]
  82. ) -> int:
  83. response = self._spi.xfer([register | self._READ_SINGLE_BYTE, 0])
  84. assert len(response) == 2, response
  85. self._log_chip_status_byte(response[0])
  86. return response[1]
  87. def _read_burst(
  88. self,
  89. start_register: typing.Union[ConfigurationRegisterAddress, FIFORegisterAddress],
  90. length: int,
  91. ) -> typing.List[int]:
  92. response = self._spi.xfer([start_register | self._READ_BURST] + [0] * length)
  93. assert len(response) == length + 1, response
  94. self._log_chip_status_byte(response[0])
  95. return response[1:]
  96. def _read_status_register(self, register: StatusRegisterAddress) -> int:
  97. # > For register addresses in the range 0x30-0x3D,
  98. # > the burst bit is used to select between
  99. # > status registers when burst bit is one, and
  100. # > between command strobes when burst bit is
  101. # > zero. [...]
  102. # > Because of this, burst access is not available
  103. # > for status registers and they must be accessed
  104. # > one at a time. The status registers can only be
  105. # > read.
  106. response = self._spi.xfer([register | self._READ_BURST, 0])
  107. assert len(response) == 2, response
  108. self._log_chip_status_byte(response[0])
  109. return response[1]
  110. def _command_strobe(self, register: StrobeAddress) -> None:
  111. # see "10.4 Command Strobes"
  112. _LOGGER.debug("sending command strobe 0x%02x", register)
  113. response = self._spi.xfer([register | self._WRITE_SINGLE_BYTE])
  114. assert len(response) == 1, response
  115. self._log_chip_status_byte(response[0])
  116. def _write_burst(
  117. self,
  118. start_register: typing.Union[ConfigurationRegisterAddress, FIFORegisterAddress],
  119. values: typing.List[int],
  120. ) -> None:
  121. _LOGGER.debug(
  122. "writing burst: start_register=0x%02x values=%s", start_register, values
  123. )
  124. response = self._spi.xfer([start_register | self._WRITE_BURST] + values)
  125. assert len(response) == len(values) + 1, response
  126. self._log_chip_status_byte(response[0])
  127. assert all(v == response[0] for v in response[1:]), response
  128. def _reset(self) -> None:
  129. self._command_strobe(StrobeAddress.SRES)
  130. def _get_symbol_rate_exponent(self) -> int:
  131. """
  132. MDMCFG4.DRATE_E
  133. """
  134. return self._read_single_byte(ConfigurationRegisterAddress.MDMCFG4) & 0b00001111
  135. def _set_symbol_rate_exponent(self, exponent: int):
  136. mdmcfg4 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG4)
  137. mdmcfg4 &= 0b11110000
  138. mdmcfg4 |= exponent
  139. self._write_burst(
  140. start_register=ConfigurationRegisterAddress.MDMCFG4, values=[mdmcfg4]
  141. )
  142. def _get_symbol_rate_mantissa(self) -> int:
  143. """
  144. MDMCFG3.DRATE_M
  145. """
  146. return self._read_single_byte(ConfigurationRegisterAddress.MDMCFG3)
  147. def _set_symbol_rate_mantissa(self, mantissa: int) -> None:
  148. self._write_burst(
  149. start_register=ConfigurationRegisterAddress.MDMCFG3, values=[mantissa]
  150. )
  151. @classmethod
  152. def _symbol_rate_floating_point_to_real(cls, mantissa: int, exponent: int) -> float:
  153. # see "12 Data Rate Programming"
  154. return (
  155. (256 + mantissa)
  156. * (2 ** exponent)
  157. * cls._CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ
  158. / (2 ** 28)
  159. )
  160. @classmethod
  161. def _symbol_rate_real_to_floating_point(cls, real: float) -> typing.Tuple[int, int]:
  162. # see "12 Data Rate Programming"
  163. assert real > 0, real
  164. exponent = math.floor(
  165. math.log2(real / cls._CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ) + 20
  166. )
  167. mantissa = round(
  168. real * 2 ** 28 / cls._CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ / 2 ** exponent
  169. - 256
  170. )
  171. if mantissa == 256:
  172. exponent += 1
  173. mantissa = 0
  174. assert 0 < exponent <= 2 ** 4, exponent
  175. assert mantissa <= 2 ** 8, mantissa
  176. return mantissa, exponent
  177. def get_symbol_rate_baud(self) -> float:
  178. return self._symbol_rate_floating_point_to_real(
  179. mantissa=self._get_symbol_rate_mantissa(),
  180. exponent=self._get_symbol_rate_exponent(),
  181. )
  182. def set_symbol_rate_baud(self, real: float) -> None:
  183. # > The data rate can be set from 0.6 kBaud to 500 kBaud [...]
  184. mantissa, exponent = self._symbol_rate_real_to_floating_point(real)
  185. self._set_symbol_rate_mantissa(mantissa)
  186. self._set_symbol_rate_exponent(exponent)
  187. def get_modulation_format(self) -> ModulationFormat:
  188. mdmcfg2 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG2)
  189. return self.ModulationFormat((mdmcfg2 >> 4) & 0b111)
  190. def _set_modulation_format(self, modulation_format: ModulationFormat) -> None:
  191. mdmcfg2 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG2)
  192. mdmcfg2 &= ~(modulation_format << 4)
  193. mdmcfg2 |= modulation_format << 4
  194. self._write_burst(ConfigurationRegisterAddress.MDMCFG2, [mdmcfg2])
  195. def enable_manchester_code(self) -> None:
  196. """
  197. MDMCFG2.MANCHESTER_EN
  198. Enable manchester encoding & decoding.
  199. """
  200. mdmcfg2 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG2)
  201. mdmcfg2 |= 0b1000
  202. self._write_burst(ConfigurationRegisterAddress.MDMCFG2, [mdmcfg2])
  203. def _set_power_amplifier_setting_index(self, setting_index: int) -> None:
  204. """
  205. FREND0.PA_POWER
  206. > This value is an index to the PATABLE,
  207. > which can be programmed with up to 8 different PA settings.
  208. > In OOK/ASK mode, this selects the PATABLE index to use
  209. > when transmitting a '1'.
  210. > PATABLE index zero is used in OOK/ASK when transmitting a '0'.
  211. > The PATABLE settings from index 0 to the PA_POWER value are
  212. > used for > ASK TX shaping, [...]
  213. see "Figure 32: Shaping of ASK Signal"
  214. > If OOK modulation is used, the logic 0 and logic 1 power levels
  215. > shall be programmed to index 0 and 1 respectively.
  216. """
  217. frend0 = self._read_single_byte(ConfigurationRegisterAddress.FREND0)
  218. frend0 &= 0b000
  219. frend0 |= setting_index
  220. self._write_burst(ConfigurationRegisterAddress.FREND0, [setting_index])
  221. def __enter__(self) -> "CC1101":
  222. # https://docs.python.org/3/reference/datamodel.html#object.__enter__
  223. self._spi.open(0, 0)
  224. self._spi.max_speed_hz = 55700 # empirical
  225. self._reset()
  226. partnum = self._read_status_register(StatusRegisterAddress.PARTNUM)
  227. if partnum != self._SUPPORTED_PARTNUM:
  228. raise ValueError(
  229. "unexpected chip part number {} (expected: {})".format(
  230. partnum, self._SUPPORTED_PARTNUM
  231. )
  232. )
  233. version = self._read_status_register(StatusRegisterAddress.VERSION)
  234. if version != self._SUPPORTED_VERSION:
  235. raise ValueError(
  236. "unexpected chip version number {} (expected: {})".format(
  237. version, self._SUPPORTED_VERSION
  238. )
  239. )
  240. # 6:4 MOD_FORMAT: OOK (default: 2-FSK)
  241. self._set_modulation_format(self.ModulationFormat.ASK_OOK)
  242. self._set_power_amplifier_setting_index(1)
  243. self._disable_data_whitening()
  244. # 7:6 unused
  245. # 5:4 FS_AUTOCAL: calibrate when going from IDLE to RX or TX
  246. # 3:2 PO_TIMEOUT: default
  247. # 1 PIN_CTRL_EN: default
  248. # 0 XOSC_FORCE_ON: default
  249. self._write_burst(ConfigurationRegisterAddress.MCSM0, [0b010100])
  250. marcstate = self.get_main_radio_control_state_machine_state()
  251. if marcstate != self.MainRadioControlStateMachineState.IDLE:
  252. raise ValueError("expected marcstate idle (actual: {})".format(marcstate))
  253. return self
  254. def __exit__(self, exc_type, exc_value, traceback) -> bool:
  255. # https://docs.python.org/3/reference/datamodel.html#object.__exit__
  256. self._spi.close()
  257. return False
  258. def get_main_radio_control_state_machine_state(
  259. self
  260. ) -> MainRadioControlStateMachineState:
  261. return self.MainRadioControlStateMachineState(
  262. self._read_status_register(StatusRegisterAddress.MARCSTATE)
  263. )
  264. def get_marc_state(self) -> MainRadioControlStateMachineState:
  265. """
  266. alias for get_main_radio_control_state_machine_state()
  267. """
  268. return self.get_main_radio_control_state_machine_state()
  269. @classmethod
  270. def _frequency_control_word_to_hertz(cls, control_word: typing.List[int]) -> float:
  271. return (
  272. int.from_bytes(control_word, byteorder="big", signed=False)
  273. * cls._FREQUENCY_CONTROL_WORD_HERTZ_FACTOR
  274. )
  275. @classmethod
  276. def _hertz_to_frequency_control_word(cls, hertz: float) -> typing.List[int]:
  277. return list(
  278. round(hertz / cls._FREQUENCY_CONTROL_WORD_HERTZ_FACTOR).to_bytes(
  279. length=3, byteorder="big", signed=False
  280. )
  281. )
  282. def _get_base_frequency_control_word(self) -> typing.List[int]:
  283. # > The base or start frequency is set by the 24 bitfrequency
  284. # > word located in the FREQ2, FREQ1, FREQ0 registers.
  285. return self._read_burst(
  286. start_register=ConfigurationRegisterAddress.FREQ2, length=3
  287. )
  288. def _set_base_frequency_control_word(self, control_word: typing.List[int]) -> None:
  289. self._write_burst(
  290. start_register=ConfigurationRegisterAddress.FREQ2, values=control_word
  291. )
  292. def get_base_frequency_hertz(self) -> float:
  293. return self._frequency_control_word_to_hertz(
  294. self._get_base_frequency_control_word()
  295. )
  296. def set_base_frequency_hertz(self, freq: float) -> None:
  297. self._set_base_frequency_control_word(
  298. self._hertz_to_frequency_control_word(freq)
  299. )
  300. def __str__(self) -> str:
  301. return "CC1101(marcstate={}, base_frequency={:.2f}MHz, symbol_rate={:.2f}kBaud, modulation_format={})".format(
  302. self.get_main_radio_control_state_machine_state().name.lower(),
  303. self.get_base_frequency_hertz() / 10 ** 6,
  304. self.get_symbol_rate_baud() / 1000,
  305. self.get_modulation_format().name,
  306. )
  307. def _get_packet_length(self) -> int:
  308. """
  309. packet length in fixed packet length mode,
  310. maximum packet length in variable packet length mode.
  311. """
  312. return self._read_single_byte(ConfigurationRegisterAddress.PKTLEN)
  313. def get_configuration_register_values(
  314. self,
  315. start_register: ConfigurationRegisterAddress = min(
  316. ConfigurationRegisterAddress
  317. ),
  318. end_register: ConfigurationRegisterAddress = max(ConfigurationRegisterAddress),
  319. ) -> typing.Dict[ConfigurationRegisterAddress, int]:
  320. assert start_register <= end_register, (start_register, end_register)
  321. values = self._read_burst(
  322. start_register=start_register, length=end_register - start_register + 1
  323. )
  324. return {
  325. ConfigurationRegisterAddress(start_register + i): v
  326. for i, v in enumerate(values)
  327. }
  328. def _disable_data_whitening(self):
  329. """
  330. PKTCTRL0.WHITE_DATA
  331. see "15.1 Data Whitening"
  332. > By setting PKTCTRL0.WHITE_DATA=1 [default],
  333. > all data, except the preamble and the sync word
  334. > will be XOR-ed with a 9-bit pseudo-random (PN9)
  335. > sequence before being transmitted.
  336. """
  337. pktctrl0 = self._read_single_byte(ConfigurationRegisterAddress.PKTCTRL0)
  338. pktctrl0 &= 0b10111111
  339. self._write_burst(
  340. start_register=ConfigurationRegisterAddress.PKTCTRL0, values=[pktctrl0]
  341. )
  342. def _get_transceive_mode(self) -> _TransceiveMode:
  343. pktctrl0 = self._read_single_byte(ConfigurationRegisterAddress.PKTCTRL0)
  344. return self._TransceiveMode((pktctrl0 >> 4) & 0b11)
  345. def _set_transceive_mode(self, mode: _TransceiveMode) -> None:
  346. _LOGGER.info("changing transceive mode to %s", mode.name)
  347. pktctrl0 = self._read_single_byte(ConfigurationRegisterAddress.PKTCTRL0)
  348. pktctrl0 &= ~0b00110000
  349. pktctrl0 |= mode << 4
  350. self._write_burst(
  351. start_register=ConfigurationRegisterAddress.PKTCTRL0, values=[pktctrl0]
  352. )
  353. def _flush_tx_fifo_buffer(self) -> None:
  354. # > Only issue SFTX in IDLE or TXFIFO_UNDERFLOW states.
  355. _LOGGER.debug("flushing tx fifo buffer")
  356. self._command_strobe(StrobeAddress.SFTX)
  357. def transmit(self, payload: typing.List[int]) -> None:
  358. # see "15.2 Packet Format"
  359. # > In variable packet length mode, [...]
  360. # > The first byte written to the TXFIFO must be different from 0.
  361. if payload[0] == 0:
  362. raise ValueError(
  363. "in variable packet length mode the first byte of payload must not be null"
  364. + "\npayload: {}".format(payload)
  365. )
  366. marcstate = self.get_main_radio_control_state_machine_state()
  367. if marcstate != self.MainRadioControlStateMachineState.IDLE:
  368. raise Exception(
  369. "device must be idle before transmission (current marcstate: {})".format(
  370. marcstate.name
  371. )
  372. )
  373. max_packet_length = self._get_packet_length()
  374. if len(payload) > max_packet_length:
  375. raise ValueError(
  376. "payload exceeds maximum payload length of {} bytes".format(
  377. max_packet_length
  378. )
  379. + "\npayload: {}".format(payload)
  380. )
  381. self._flush_tx_fifo_buffer()
  382. self._write_burst(FIFORegisterAddress.TX, payload)
  383. _LOGGER.info("transmitting %s", payload)
  384. self._command_strobe(StrobeAddress.STX)
  385. @contextlib.contextmanager
  386. def asynchronous_transmission(self) -> typing.Iterator[Pin]:
  387. """
  388. see "27.1 Asynchronous Serial Operation"
  389. >>> with cc1101.CC1101() as transceiver:
  390. >>> transceiver.set_base_frequency_hertz(433.92e6)
  391. >>> transceiver.set_symbol_rate_baud(600)
  392. >>> print(transceiver)
  393. >>> with transceiver.asynchronous_transmission():
  394. >>> # send digital signal to GDO0 pin
  395. """
  396. self._set_transceive_mode(self._TransceiveMode.ASYNCHRONOUS_SERIAL)
  397. self._command_strobe(StrobeAddress.STX)
  398. try:
  399. # > In TX, the GDO0 pin is used for data input (TX data).
  400. yield self.Pin.GDO0
  401. finally:
  402. self._command_strobe(StrobeAddress.SIDLE)
  403. self._set_transceive_mode(self._TransceiveMode.FIFO)