__init__.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809
  1. # python-cc1101 - Python Library to Transmit RF Signals via C1101 Transceivers
  2. #
  3. # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. import contextlib
  18. import enum
  19. import fcntl
  20. import logging
  21. import math
  22. import typing
  23. import spidev
  24. from cc1101.addresses import (
  25. StrobeAddress,
  26. ConfigurationRegisterAddress,
  27. StatusRegisterAddress,
  28. FIFORegisterAddress,
  29. )
  30. from cc1101.options import PacketLengthMode, SyncMode, ModulationFormat
  31. _LOGGER = logging.getLogger(__name__)
  32. class Pin(enum.Enum):
  33. GDO0 = "GDO0"
  34. class _TransceiveMode(enum.IntEnum):
  35. """
  36. PKTCTRL0.PKT_FORMAT
  37. """
  38. FIFO = 0b00
  39. SYNCHRONOUS_SERIAL = 0b01
  40. RANDOM_TRANSMISSION = 0b10
  41. ASYNCHRONOUS_SERIAL = 0b11
  42. class MainRadioControlStateMachineState(enum.IntEnum):
  43. """
  44. MARCSTATE - Main Radio Control State Machine State
  45. """
  46. # see "Figure 13: Simplified State Diagram"
  47. # and "Figure 25: Complete Radio Control State Diagram"
  48. IDLE = 0x01
  49. STARTCAL = 0x08 # after IDLE
  50. BWBOOST = 0x09 # after STARTCAL
  51. FS_LOCK = 0x0A
  52. RX = 0x0D
  53. RXFIFO_OVERFLOW = 0x11
  54. TX = 0x13
  55. # TXFIFO_UNDERFLOW = 0x16
  56. class _ReceivedPacket: # unstable
  57. # "Table 31: Typical RSSI_offset Values"
  58. _RSSI_OFFSET_dB = 74
  59. def __init__(
  60. self,
  61. # *,
  62. data: bytes,
  63. rssi_index: int, # byte
  64. checksum_valid: bool,
  65. link_quality_indicator: int, # 7bit
  66. ):
  67. self.data = data
  68. self._rssi_index = rssi_index
  69. assert 0 <= rssi_index < (1 << 8), rssi_index
  70. self.checksum_valid = checksum_valid
  71. self.link_quality_indicator = link_quality_indicator
  72. assert 0 <= link_quality_indicator < (1 << 7), link_quality_indicator
  73. @property
  74. def rssi_dbm(self) -> float:
  75. """
  76. Estimated Received Signal Strength Indicator (RSSI) in dBm
  77. see section "17.3 RSSI"
  78. """
  79. if self._rssi_index >= 128:
  80. return (self._rssi_index - 256) / 2 - self._RSSI_OFFSET_dB
  81. return self._rssi_index / 2 - self._RSSI_OFFSET_dB
  82. def __str__(self) -> str:
  83. return "{}(RSSI {:.0f}dBm, 0x{})".format(
  84. type(self).__name__,
  85. self.rssi_dbm,
  86. "".join("{:02x}".format(b) for b in self.data),
  87. )
  88. class CC1101:
  89. # pylint: disable=too-many-public-methods
  90. # > All transfers on the SPI interface are done
  91. # > most significant bit first.
  92. # > All transactions on the SPI interface start with
  93. # > a header byte containing a R/W bit, a access bit (B),
  94. # > and a 6-bit address (A5 - A0).
  95. # > [...]
  96. # > Table 45: SPI Address Space
  97. _WRITE_SINGLE_BYTE = 0x00
  98. # > Registers with consecutive addresses can be
  99. # > accessed in an efficient way by setting the
  100. # > burst bit (B) in the header byte. The address
  101. # > bits (A5 - A0) set the start address in an
  102. # > internal address counter. This counter is
  103. # > incremented by one each new byte [...]
  104. _WRITE_BURST = 0x40
  105. _READ_SINGLE_BYTE = 0x80
  106. _READ_BURST = 0xC0
  107. # 29.3 Status Register Details
  108. _SUPPORTED_PARTNUM = 0
  109. _SUPPORTED_VERSION = 0x14
  110. _CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ = 26e6
  111. # see "21 Frequency Programming"
  112. # > f_carrier = f_XOSC / 2**16 * (FREQ + CHAN * ((256 + CHANSPC_M) * 2**CHANSPC_E-2))
  113. _FREQUENCY_CONTROL_WORD_HERTZ_FACTOR = _CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ / 2 ** 16
  114. def __init__(
  115. self, spi_bus: int = 0, spi_chip_select: int = 0, lock_spi_device: bool = False
  116. ) -> None:
  117. self._spi = spidev.SpiDev()
  118. self._spi_bus = int(spi_bus)
  119. # > The BCM2835 core common to all Raspberry Pi devices has 3 SPI Controllers:
  120. # > SPI0, with two hardware chip selects, [...]
  121. # > SPI1, with three hardware chip selects, [...]
  122. # > SPI2, also with three hardware chip selects, is only usable on a Compute Module [...]
  123. # https://github.com/raspberrypi/documentation/blob/d41d69f8efa3667b1a8b01a669238b8bd113edc1/hardware/raspberrypi/spi/README.md#hardware
  124. # https://www.raspberrypi.org/documentation/hardware/raspberrypi/spi/README.md
  125. self._spi_chip_select = int(spi_chip_select)
  126. self._lock_spi_device = lock_spi_device
  127. @property
  128. def _spi_device_path(self) -> str:
  129. # https://github.com/doceme/py-spidev/blob/v3.4/spidev_module.c#L1286
  130. return "/dev/spidev{}.{}".format(self._spi_bus, self._spi_chip_select)
  131. @staticmethod
  132. def _log_chip_status_byte(chip_status: int) -> None:
  133. # see "10.1 Chip Status Byte" & "Table 23: Status Byte Summary"
  134. # > The command strobe registers are accessed by transferring
  135. # > a single header byte [...]. That is, only the R/W̄ bit,
  136. # > the burst access bit (set to 0), and the six address bits [...]
  137. # > The R/W̄ bit can be either one or zero and will determine how the
  138. # > FIFO_BYTES_AVAILABLE field in the status byte should be interpreted.
  139. _LOGGER.debug(
  140. "chip status byte: CHIP_RDYn=%d STATE=%s FIFO_BYTES_AVAILBLE=%d",
  141. chip_status >> 7,
  142. bin((chip_status >> 4) & 0b111),
  143. chip_status & 0b1111,
  144. )
  145. def _read_single_byte(
  146. self, register: typing.Union[ConfigurationRegisterAddress, FIFORegisterAddress]
  147. ) -> int:
  148. response = self._spi.xfer([register | self._READ_SINGLE_BYTE, 0])
  149. assert len(response) == 2, response
  150. self._log_chip_status_byte(response[0])
  151. return response[1]
  152. def _read_burst(
  153. self,
  154. start_register: typing.Union[ConfigurationRegisterAddress, FIFORegisterAddress],
  155. length: int,
  156. ) -> typing.List[int]:
  157. response = self._spi.xfer([start_register | self._READ_BURST] + [0] * length)
  158. assert len(response) == length + 1, response
  159. self._log_chip_status_byte(response[0])
  160. return response[1:]
  161. def _read_status_register(self, register: StatusRegisterAddress) -> int:
  162. # > For register addresses in the range 0x30-0x3D,
  163. # > the burst bit is used to select between
  164. # > status registers when burst bit is one, and
  165. # > between command strobes when burst bit is
  166. # > zero. [...]
  167. # > Because of this, burst access is not available
  168. # > for status registers and they must be accessed
  169. # > one at a time. The status registers can only be
  170. # > read.
  171. response = self._spi.xfer([register | self._READ_BURST, 0])
  172. assert len(response) == 2, response
  173. self._log_chip_status_byte(response[0])
  174. return response[1]
  175. def _command_strobe(self, register: StrobeAddress) -> None:
  176. # see "10.4 Command Strobes"
  177. _LOGGER.debug("sending command strobe 0x%02x", register)
  178. response = self._spi.xfer([register | self._WRITE_SINGLE_BYTE])
  179. assert len(response) == 1, response
  180. self._log_chip_status_byte(response[0])
  181. def _write_burst(
  182. self,
  183. start_register: typing.Union[ConfigurationRegisterAddress, FIFORegisterAddress],
  184. values: typing.List[int],
  185. ) -> None:
  186. _LOGGER.debug(
  187. "writing burst: start_register=0x%02x values=%s", start_register, values
  188. )
  189. response = self._spi.xfer([start_register | self._WRITE_BURST] + values)
  190. assert len(response) == len(values) + 1, response
  191. self._log_chip_status_byte(response[0])
  192. assert all(v == response[0] for v in response[1:]), response
  193. def _reset(self) -> None:
  194. self._command_strobe(StrobeAddress.SRES)
  195. @classmethod
  196. def _filter_bandwidth_floating_point_to_real(
  197. cls, mantissa: int, exponent: int
  198. ) -> float:
  199. """
  200. See "13 Receiver Channel Filter Bandwidth"
  201. """
  202. return cls._CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ / (
  203. 8 * (4 + mantissa) * (2 ** exponent)
  204. )
  205. def _get_filter_bandwidth_hertz(self) -> float:
  206. """
  207. See "13 Receiver Channel Filter Bandwidth"
  208. MDMCFG4.CHANBW_E & MDMCFG4.CHANBW_M
  209. """
  210. mdmcfg4 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG4)
  211. return self._filter_bandwidth_floating_point_to_real(
  212. exponent=mdmcfg4 >> 6, mantissa=(mdmcfg4 >> 4) & 0b11
  213. )
  214. def _set_filter_bandwidth(self, *, mantissa: int, exponent: int) -> None:
  215. """
  216. MDMCFG4.CHANBW_E & MDMCFG4.CHANBW_M
  217. """
  218. mdmcfg4 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG4)
  219. mdmcfg4 &= 0b00001111
  220. assert 0 <= exponent <= 0b11, exponent
  221. mdmcfg4 |= exponent << 6
  222. assert 0 <= mantissa <= 0b11, mantissa
  223. mdmcfg4 |= mantissa << 4
  224. self._write_burst(
  225. start_register=ConfigurationRegisterAddress.MDMCFG4, values=[mdmcfg4]
  226. )
  227. def _get_symbol_rate_exponent(self) -> int:
  228. """
  229. MDMCFG4.DRATE_E
  230. """
  231. return self._read_single_byte(ConfigurationRegisterAddress.MDMCFG4) & 0b00001111
  232. def _set_symbol_rate_exponent(self, exponent: int):
  233. mdmcfg4 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG4)
  234. mdmcfg4 &= 0b11110000
  235. mdmcfg4 |= exponent
  236. self._write_burst(
  237. start_register=ConfigurationRegisterAddress.MDMCFG4, values=[mdmcfg4]
  238. )
  239. def _get_symbol_rate_mantissa(self) -> int:
  240. """
  241. MDMCFG3.DRATE_M
  242. """
  243. return self._read_single_byte(ConfigurationRegisterAddress.MDMCFG3)
  244. def _set_symbol_rate_mantissa(self, mantissa: int) -> None:
  245. self._write_burst(
  246. start_register=ConfigurationRegisterAddress.MDMCFG3, values=[mantissa]
  247. )
  248. @classmethod
  249. def _symbol_rate_floating_point_to_real(cls, mantissa: int, exponent: int) -> float:
  250. # see "12 Data Rate Programming"
  251. return (
  252. (256 + mantissa)
  253. * (2 ** exponent)
  254. * cls._CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ
  255. / (2 ** 28)
  256. )
  257. @classmethod
  258. def _symbol_rate_real_to_floating_point(cls, real: float) -> typing.Tuple[int, int]:
  259. # see "12 Data Rate Programming"
  260. assert real > 0, real
  261. exponent = math.floor(
  262. math.log2(real / cls._CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ) + 20
  263. )
  264. mantissa = round(
  265. real * 2 ** 28 / cls._CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ / 2 ** exponent
  266. - 256
  267. )
  268. if mantissa == 256:
  269. exponent += 1
  270. mantissa = 0
  271. assert 0 < exponent <= 2 ** 4, exponent
  272. assert mantissa <= 2 ** 8, mantissa
  273. return mantissa, exponent
  274. def get_symbol_rate_baud(self) -> float:
  275. return self._symbol_rate_floating_point_to_real(
  276. mantissa=self._get_symbol_rate_mantissa(),
  277. exponent=self._get_symbol_rate_exponent(),
  278. )
  279. def set_symbol_rate_baud(self, real: float) -> None:
  280. # > The data rate can be set from 0.6 kBaud to 500 kBaud [...]
  281. mantissa, exponent = self._symbol_rate_real_to_floating_point(real)
  282. self._set_symbol_rate_mantissa(mantissa)
  283. self._set_symbol_rate_exponent(exponent)
  284. def get_modulation_format(self) -> ModulationFormat:
  285. mdmcfg2 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG2)
  286. return ModulationFormat((mdmcfg2 >> 4) & 0b111)
  287. def _set_modulation_format(self, modulation_format: ModulationFormat) -> None:
  288. mdmcfg2 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG2)
  289. mdmcfg2 &= ~(modulation_format << 4)
  290. mdmcfg2 |= modulation_format << 4
  291. self._write_burst(ConfigurationRegisterAddress.MDMCFG2, [mdmcfg2])
  292. def enable_manchester_code(self) -> None:
  293. """
  294. MDMCFG2.MANCHESTER_EN
  295. Enable manchester encoding & decoding for the entire packet,
  296. including the preamble and synchronization word.
  297. """
  298. mdmcfg2 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG2)
  299. mdmcfg2 |= 0b1000
  300. self._write_burst(ConfigurationRegisterAddress.MDMCFG2, [mdmcfg2])
  301. def get_sync_mode(self) -> SyncMode:
  302. mdmcfg2 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG2)
  303. return SyncMode(mdmcfg2 & 0b11)
  304. def set_sync_mode(
  305. self,
  306. mode: SyncMode,
  307. *,
  308. _carrier_sense_threshold_enabled: typing.Optional[bool] = None # unstable
  309. ) -> None:
  310. """
  311. MDMCFG2.SYNC_MODE
  312. see "14.3 Byte Synchronization"
  313. Carrier Sense (CS) Threshold (when receiving packets, API unstable):
  314. > Carrier sense can be used as a sync word qualifier
  315. > that requires the signal level to be higher than the threshold
  316. > for a sync word > search to be performed [...]
  317. > CS can be used to avoid interference from other RF sources [...]
  318. True: enable, False: disable, None: keep current setting
  319. See "17.4 Carrier Sense (CS)"
  320. """
  321. mdmcfg2 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG2)
  322. mdmcfg2 &= 0b11111100
  323. mdmcfg2 |= mode
  324. if _carrier_sense_threshold_enabled is not None:
  325. if _carrier_sense_threshold_enabled:
  326. mdmcfg2 |= 0b00000100
  327. else:
  328. mdmcfg2 &= 0b11111011
  329. self._write_burst(ConfigurationRegisterAddress.MDMCFG2, [mdmcfg2])
  330. def get_preamble_length_bytes(self) -> int:
  331. """
  332. MDMCFG1.NUM_PREAMBLE
  333. Minimum number of preamble bytes to be transmitted.
  334. See "15.2 Packet Format"
  335. """
  336. index = (
  337. self._read_single_byte(ConfigurationRegisterAddress.MDMCFG1) >> 4
  338. ) & 0b111
  339. return 2 ** (index >> 1) * (2 + (index & 0b1))
  340. def _set_preamble_length_index(self, index: int) -> None:
  341. assert 0 <= index <= 0b111
  342. mdmcfg1 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG1)
  343. mdmcfg1 &= 0b10001111
  344. mdmcfg1 |= index << 4
  345. self._write_burst(ConfigurationRegisterAddress.MDMCFG1, [mdmcfg1])
  346. def set_preamble_length_bytes(self, length: int) -> None:
  347. """
  348. see .get_preamble_length_bytes()
  349. """
  350. if length < 1:
  351. raise ValueError(
  352. "invalid preamble length {} given".format(length)
  353. + "\ncall .set_sync_mode(cc1101.SyncMode.NO_PREAMBLE_AND_SYNC_WORD)"
  354. + " to disable preamble"
  355. )
  356. if length % 3 == 0:
  357. index = math.log2(length / 3) * 2 + 1
  358. else:
  359. index = math.log2(length / 2) * 2
  360. if not index.is_integer() or index < 0 or index > 0b111:
  361. raise ValueError(
  362. "unsupported preamble length: {} bytes".format(length)
  363. + "\nsee MDMCFG1.NUM_PREAMBLE in cc1101 docs"
  364. )
  365. self._set_preamble_length_index(int(index))
  366. def _set_power_amplifier_setting_index(self, setting_index: int) -> None:
  367. """
  368. FREND0.PA_POWER
  369. > This value is an index to the PATABLE,
  370. > which can be programmed with up to 8 different PA settings.
  371. > In OOK/ASK mode, this selects the PATABLE index to use
  372. > when transmitting a '1'.
  373. > PATABLE index zero is used in OOK/ASK when transmitting a '0'.
  374. > The PATABLE settings from index 0 to the PA_POWER value are
  375. > used for > ASK TX shaping, [...]
  376. see "Figure 32: Shaping of ASK Signal"
  377. > If OOK modulation is used, the logic 0 and logic 1 power levels
  378. > shall be programmed to index 0 and 1 respectively.
  379. """
  380. frend0 = self._read_single_byte(ConfigurationRegisterAddress.FREND0)
  381. frend0 &= 0b000
  382. frend0 |= setting_index
  383. self._write_burst(ConfigurationRegisterAddress.FREND0, [setting_index])
  384. def _verify_chip(self) -> None:
  385. partnum = self._read_status_register(StatusRegisterAddress.PARTNUM)
  386. if partnum != self._SUPPORTED_PARTNUM:
  387. raise ValueError(
  388. "unexpected chip part number {} (expected: {})".format(
  389. partnum, self._SUPPORTED_PARTNUM
  390. )
  391. )
  392. version = self._read_status_register(StatusRegisterAddress.VERSION)
  393. if version != self._SUPPORTED_VERSION:
  394. raise ValueError(
  395. "unexpected chip version number {} (expected: {})".format(
  396. version, self._SUPPORTED_VERSION
  397. )
  398. )
  399. def _configure_defaults(self) -> None:
  400. # 6:4 MOD_FORMAT: OOK (default: 2-FSK)
  401. self._set_modulation_format(ModulationFormat.ASK_OOK)
  402. self._set_power_amplifier_setting_index(1)
  403. self._disable_data_whitening()
  404. # 7:6 unused
  405. # 5:4 FS_AUTOCAL: calibrate when going from IDLE to RX or TX
  406. # 3:2 PO_TIMEOUT: default
  407. # 1 PIN_CTRL_EN: default
  408. # 0 XOSC_FORCE_ON: default
  409. self._write_burst(ConfigurationRegisterAddress.MCSM0, [0b010100])
  410. def __enter__(self) -> "CC1101":
  411. # https://docs.python.org/3/reference/datamodel.html#object.__enter__
  412. try:
  413. self._spi.open(self._spi_bus, self._spi_chip_select)
  414. except PermissionError as exc:
  415. raise PermissionError(
  416. "Could not access {}".format(self._spi_device_path)
  417. + "\nVerify that the current user has both read and write access."
  418. + "\nOn some devices, like Raspberry Pis,"
  419. + "\n\tsudo usermod -a -G spi $USER"
  420. + "\nfollowed by a re-login grants sufficient permissions."
  421. ) from exc
  422. if self._lock_spi_device:
  423. # advisory, exclusive, non-blocking
  424. # lock removed in __exit__ by SpiDev.close()
  425. fcntl.flock(self._spi.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
  426. self._spi.max_speed_hz = 55700 # empirical
  427. self._reset()
  428. self._verify_chip()
  429. self._configure_defaults()
  430. marcstate = self.get_main_radio_control_state_machine_state()
  431. if marcstate != MainRadioControlStateMachineState.IDLE:
  432. raise ValueError("expected marcstate idle (actual: {})".format(marcstate))
  433. return self
  434. def __exit__(self, exc_type, exc_value, traceback): # -> typing.Literal[False]
  435. # https://docs.python.org/3/reference/datamodel.html#object.__exit__
  436. self._spi.close()
  437. return False
  438. def get_main_radio_control_state_machine_state(
  439. self,
  440. ) -> MainRadioControlStateMachineState:
  441. return MainRadioControlStateMachineState(
  442. self._read_status_register(StatusRegisterAddress.MARCSTATE)
  443. )
  444. def get_marc_state(self) -> MainRadioControlStateMachineState:
  445. """
  446. alias for get_main_radio_control_state_machine_state()
  447. """
  448. return self.get_main_radio_control_state_machine_state()
  449. @classmethod
  450. def _frequency_control_word_to_hertz(cls, control_word: typing.List[int]) -> float:
  451. return (
  452. int.from_bytes(control_word, byteorder="big", signed=False)
  453. * cls._FREQUENCY_CONTROL_WORD_HERTZ_FACTOR
  454. )
  455. @classmethod
  456. def _hertz_to_frequency_control_word(cls, hertz: float) -> typing.List[int]:
  457. return list(
  458. round(hertz / cls._FREQUENCY_CONTROL_WORD_HERTZ_FACTOR).to_bytes(
  459. length=3, byteorder="big", signed=False
  460. )
  461. )
  462. def _get_base_frequency_control_word(self) -> typing.List[int]:
  463. # > The base or start frequency is set by the 24 bitfrequency
  464. # > word located in the FREQ2, FREQ1, FREQ0 registers.
  465. return self._read_burst(
  466. start_register=ConfigurationRegisterAddress.FREQ2, length=3
  467. )
  468. def _set_base_frequency_control_word(self, control_word: typing.List[int]) -> None:
  469. self._write_burst(
  470. start_register=ConfigurationRegisterAddress.FREQ2, values=control_word
  471. )
  472. def get_base_frequency_hertz(self) -> float:
  473. return self._frequency_control_word_to_hertz(
  474. self._get_base_frequency_control_word()
  475. )
  476. def set_base_frequency_hertz(self, freq: float) -> None:
  477. self._set_base_frequency_control_word(
  478. self._hertz_to_frequency_control_word(freq)
  479. )
  480. def __str__(self) -> str:
  481. sync_mode = self.get_sync_mode()
  482. attrs = (
  483. "marcstate={}".format(
  484. self.get_main_radio_control_state_machine_state().name.lower()
  485. ),
  486. "base_frequency={:.2f}MHz".format(
  487. self.get_base_frequency_hertz() / 10 ** 6
  488. ),
  489. "symbol_rate={:.2f}kBaud".format(self.get_symbol_rate_baud() / 1000),
  490. "modulation_format={}".format(self.get_modulation_format().name),
  491. "sync_mode={}".format(sync_mode.name),
  492. "preamble_length={}B".format(self.get_preamble_length_bytes())
  493. if sync_mode != SyncMode.NO_PREAMBLE_AND_SYNC_WORD
  494. else None,
  495. "sync_word=0x{:02x}{:02x}".format(*self.get_sync_word())
  496. if sync_mode != SyncMode.NO_PREAMBLE_AND_SYNC_WORD
  497. else None,
  498. "packet_length{}{}B".format(
  499. "≤"
  500. if self.get_packet_length_mode() == PacketLengthMode.VARIABLE
  501. else "=",
  502. self.get_packet_length_bytes(),
  503. ),
  504. )
  505. return "CC1101({})".format(", ".join(filter(None, attrs)))
  506. def get_configuration_register_values(
  507. self,
  508. start_register: ConfigurationRegisterAddress = min(
  509. ConfigurationRegisterAddress
  510. ),
  511. end_register: ConfigurationRegisterAddress = max(ConfigurationRegisterAddress),
  512. ) -> typing.Dict[ConfigurationRegisterAddress, int]:
  513. assert start_register <= end_register, (start_register, end_register)
  514. values = self._read_burst(
  515. start_register=start_register, length=end_register - start_register + 1
  516. )
  517. return {
  518. ConfigurationRegisterAddress(start_register + i): v
  519. for i, v in enumerate(values)
  520. }
  521. def get_sync_word(self) -> bytes:
  522. """
  523. SYNC1 & SYNC0
  524. See "15.2 Packet Format"
  525. The first byte's most significant bit is transmitted first.
  526. """
  527. return bytes(
  528. self._read_burst(
  529. start_register=ConfigurationRegisterAddress.SYNC1, length=2
  530. )
  531. )
  532. def set_sync_word(self, sync_word: bytes) -> None:
  533. """
  534. See .set_sync_word()
  535. """
  536. if len(sync_word) != 2:
  537. raise ValueError("expected two bytes, got {!r}".format(sync_word))
  538. self._write_burst(
  539. start_register=ConfigurationRegisterAddress.SYNC1, values=list(sync_word)
  540. )
  541. def get_packet_length_bytes(self) -> int:
  542. """
  543. PKTLEN
  544. Packet length in fixed packet length mode,
  545. maximum packet length in variable packet length mode.
  546. > In variable packet length mode, [...]
  547. > any packet received with a length byte
  548. > with a value greater than PKTLEN will be discarded.
  549. """
  550. return self._read_single_byte(ConfigurationRegisterAddress.PKTLEN)
  551. def set_packet_length_bytes(self, packet_length: int) -> None:
  552. """
  553. see get_packet_length_bytes()
  554. """
  555. assert 1 <= packet_length <= 255, "unsupported packet length {}".format(
  556. packet_length
  557. )
  558. self._write_burst(
  559. start_register=ConfigurationRegisterAddress.PKTLEN, values=[packet_length]
  560. )
  561. def _disable_data_whitening(self):
  562. """
  563. PKTCTRL0.WHITE_DATA
  564. see "15.1 Data Whitening"
  565. > By setting PKTCTRL0.WHITE_DATA=1 [default],
  566. > all data, except the preamble and the sync word
  567. > will be XOR-ed with a 9-bit pseudo-random (PN9)
  568. > sequence before being transmitted.
  569. """
  570. pktctrl0 = self._read_single_byte(ConfigurationRegisterAddress.PKTCTRL0)
  571. pktctrl0 &= 0b10111111
  572. self._write_burst(
  573. start_register=ConfigurationRegisterAddress.PKTCTRL0, values=[pktctrl0]
  574. )
  575. def disable_checksum(self) -> None:
  576. """
  577. PKTCTRL0.CRC_EN
  578. Disable automatic 2-byte cyclic redundancy check (CRC) sum
  579. appending in TX mode and checking in RX mode.
  580. See "Figure 19: Packet Format".
  581. """
  582. pktctrl0 = self._read_single_byte(ConfigurationRegisterAddress.PKTCTRL0)
  583. pktctrl0 &= 0b11111011
  584. self._write_burst(
  585. start_register=ConfigurationRegisterAddress.PKTCTRL0, values=[pktctrl0]
  586. )
  587. def _get_transceive_mode(self) -> _TransceiveMode:
  588. pktctrl0 = self._read_single_byte(ConfigurationRegisterAddress.PKTCTRL0)
  589. return _TransceiveMode((pktctrl0 >> 4) & 0b11)
  590. def _set_transceive_mode(self, mode: _TransceiveMode) -> None:
  591. _LOGGER.info("changing transceive mode to %s", mode.name)
  592. pktctrl0 = self._read_single_byte(ConfigurationRegisterAddress.PKTCTRL0)
  593. pktctrl0 &= ~0b00110000
  594. pktctrl0 |= mode << 4
  595. self._write_burst(
  596. start_register=ConfigurationRegisterAddress.PKTCTRL0, values=[pktctrl0]
  597. )
  598. def get_packet_length_mode(self) -> PacketLengthMode:
  599. pktctrl0 = self._read_single_byte(ConfigurationRegisterAddress.PKTCTRL0)
  600. return PacketLengthMode(pktctrl0 & 0b11)
  601. def set_packet_length_mode(self, mode: PacketLengthMode) -> None:
  602. pktctrl0 = self._read_single_byte(ConfigurationRegisterAddress.PKTCTRL0)
  603. pktctrl0 &= 0b11111100
  604. pktctrl0 |= mode
  605. self._write_burst(
  606. start_register=ConfigurationRegisterAddress.PKTCTRL0, values=[pktctrl0]
  607. )
  608. def _flush_tx_fifo_buffer(self) -> None:
  609. # > Only issue SFTX in IDLE or TXFIFO_UNDERFLOW states.
  610. _LOGGER.debug("flushing tx fifo buffer")
  611. self._command_strobe(StrobeAddress.SFTX)
  612. def transmit(self, payload: bytes) -> None:
  613. """
  614. The most significant bit is transmitted first.
  615. In variable packet length mode,
  616. a byte indicating the packet's length will be prepended.
  617. > In variable packet length mode,
  618. > the packet length is configured by the first byte [...].
  619. > The packet length is defined as the payload data,
  620. > excluding the length byte and the optional CRC.
  621. from "15.2 Packet Format"
  622. Call .set_packet_length_mode(cc1101.PacketLengthMode.FIXED)
  623. to switch to fixed packet length mode.
  624. """
  625. # see "15.2 Packet Format"
  626. # > In variable packet length mode, [...]
  627. # > The first byte written to the TXFIFO must be different from 0.
  628. packet_length_mode = self.get_packet_length_mode()
  629. packet_length = self.get_packet_length_bytes()
  630. if packet_length_mode == PacketLengthMode.VARIABLE:
  631. if not payload:
  632. raise ValueError("empty payload {!r}".format(payload))
  633. if len(payload) > packet_length:
  634. raise ValueError(
  635. "payload exceeds maximum payload length of {} bytes".format(
  636. packet_length
  637. )
  638. + "\nsee .get_packet_length_bytes()"
  639. + "\npayload: {!r}".format(payload)
  640. )
  641. payload = int.to_bytes(len(payload), length=1, byteorder="big") + payload
  642. elif (
  643. packet_length_mode == PacketLengthMode.FIXED
  644. and len(payload) != packet_length
  645. ):
  646. raise ValueError(
  647. "expected payload length of {} bytes, got {}".format(
  648. packet_length, len(payload)
  649. )
  650. + "\nsee .set_packet_length_mode() and .get_packet_length_bytes()"
  651. + "\npayload: {!r}".format(payload)
  652. )
  653. marcstate = self.get_main_radio_control_state_machine_state()
  654. if marcstate != MainRadioControlStateMachineState.IDLE:
  655. raise Exception(
  656. "device must be idle before transmission (current marcstate: {})".format(
  657. marcstate.name
  658. )
  659. )
  660. self._flush_tx_fifo_buffer()
  661. self._write_burst(FIFORegisterAddress.TX, list(payload))
  662. _LOGGER.info(
  663. "transmitting 0x%s (%r)",
  664. "".join("{:02x}".format(b) for b in payload),
  665. payload,
  666. )
  667. self._command_strobe(StrobeAddress.STX)
  668. @contextlib.contextmanager
  669. def asynchronous_transmission(self) -> typing.Iterator[Pin]:
  670. """
  671. see "27.1 Asynchronous Serial Operation"
  672. >>> with cc1101.CC1101() as transceiver:
  673. >>> transceiver.set_base_frequency_hertz(433.92e6)
  674. >>> transceiver.set_symbol_rate_baud(600)
  675. >>> print(transceiver)
  676. >>> with transceiver.asynchronous_transmission():
  677. >>> # send digital signal to GDO0 pin
  678. """
  679. self._set_transceive_mode(_TransceiveMode.ASYNCHRONOUS_SERIAL)
  680. self._command_strobe(StrobeAddress.STX)
  681. try:
  682. # > In TX, the GDO0 pin is used for data input (TX data).
  683. yield Pin.GDO0
  684. finally:
  685. self._command_strobe(StrobeAddress.SIDLE)
  686. self._set_transceive_mode(_TransceiveMode.FIFO)
  687. def _enable_receive_mode(self) -> None: # unstable
  688. self._command_strobe(StrobeAddress.SRX)
  689. def _get_received_packet(self) -> typing.Optional[_ReceivedPacket]: # unstable
  690. """
  691. see section "20 Data FIFO"
  692. """
  693. rxbytes = self._read_status_register(StatusRegisterAddress.RXBYTES)
  694. # PKTCTRL1.APPEND_STATUS is enabled by default
  695. if rxbytes < 2:
  696. return None
  697. buffer = self._read_burst(start_register=FIFORegisterAddress.RX, length=rxbytes)
  698. return _ReceivedPacket(
  699. data=bytes(buffer[:-2]),
  700. rssi_index=buffer[-2],
  701. checksum_valid=bool(buffer[-1] >> 7),
  702. link_quality_indicator=buffer[-1] & 0b0111111,
  703. )