__init__.py 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996
  1. # python-cc1101 - Python Library to Transmit RF Signals via CC1101 Transceivers
  2. #
  3. # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. from __future__ import annotations
  18. import contextlib
  19. import datetime
  20. import enum
  21. import fcntl
  22. import logging
  23. import math
  24. import typing
  25. import warnings
  26. import spidev
  27. import cc1101._gpio
  28. from cc1101.addresses import (
  29. ConfigurationRegisterAddress,
  30. FIFORegisterAddress,
  31. PatableAddress,
  32. StatusRegisterAddress,
  33. StrobeAddress,
  34. )
  35. from cc1101.options import (
  36. GDOSignalSelection,
  37. ModulationFormat,
  38. PacketLengthMode,
  39. SyncMode,
  40. _TransceiveMode,
  41. )
  42. _LOGGER = logging.getLogger(__name__)
  43. class Pin(enum.Enum):
  44. GDO0 = "GDO0"
  45. class MainRadioControlStateMachineState(enum.IntEnum):
  46. """
  47. MARCSTATE - Main Radio Control State Machine State
  48. """
  49. # see "Figure 13: Simplified State Diagram"
  50. # and "Figure 25: Complete Radio Control State Diagram"
  51. SLEEP = 0x00
  52. IDLE = 0x01
  53. XOFF = 0x02
  54. VCOON_MC = 0x03
  55. REGON_MC = 0x04
  56. MANCAL = 0x05
  57. VCOON = 0x06
  58. REGON = 0x07
  59. STARTCAL = 0x08 # after IDLE
  60. BWBOOST = 0x09 # after STARTCAL
  61. FS_LOCK = 0x0A
  62. IFADCON = 0x0B
  63. ENDCAL = 0x0C
  64. RX = 0x0D
  65. RX_END = 0x0E
  66. RX_RST = 0x0F
  67. TXRX_SWITCH = 0x10
  68. RXFIFO_OVERFLOW = 0x11
  69. FSTXON = 0x12
  70. TX = 0x13
  71. TX_END = 0x14
  72. RXTX_SWITCH = 0x15
  73. TXFIFO_UNDERFLOW = 0x16
  74. class _ReceivedPacket: # unstable
  75. # "Table 31: Typical RSSI_offset Values"
  76. _RSSI_OFFSET_dB = 74
  77. def __init__(
  78. self,
  79. *,
  80. payload: bytes,
  81. rssi_index: int, # byte
  82. checksum_valid: bool,
  83. link_quality_indicator: int, # 7bit
  84. ):
  85. self.payload = payload
  86. self._rssi_index = rssi_index
  87. assert 0 <= rssi_index < (1 << 8), rssi_index
  88. self.checksum_valid = checksum_valid
  89. self.link_quality_indicator = link_quality_indicator
  90. assert 0 <= link_quality_indicator < (1 << 7), link_quality_indicator
  91. @property
  92. def rssi_dbm(self) -> float:
  93. """
  94. Estimated Received Signal Strength Indicator (RSSI) in dBm
  95. see section "17.3 RSSI"
  96. """
  97. if self._rssi_index >= 128:
  98. return (self._rssi_index - 256) / 2 - self._RSSI_OFFSET_dB
  99. return self._rssi_index / 2 - self._RSSI_OFFSET_dB
  100. def __str__(self) -> str:
  101. return f"{type(self).__name__}(RSSI {self.rssi_dbm:.0f}dBm, 0x{self.payload.hex()})"
  102. def _format_patable(settings: typing.Iterable[int], insert_spaces: bool) -> str:
  103. # "Table 39: Optimum PATABLE Settings" uses hexadecimal digits
  104. # "0" for brevity
  105. settings_hex = tuple(map(lambda s: "0" if s == 0 else f"0x{s:x}", settings))
  106. if len(settings_hex) == 1:
  107. return f"({settings_hex[0]},)"
  108. delimiter = ", " if insert_spaces else ","
  109. return f"({delimiter.join(settings_hex)})"
  110. class CC1101:
  111. # pylint: disable=too-many-public-methods
  112. # > All transfers on the SPI interface are done
  113. # > most significant bit first.
  114. # > All transactions on the SPI interface start with
  115. # > a header byte containing a R/W bit, a access bit (B),
  116. # > and a 6-bit address (A5 - A0).
  117. # > [...]
  118. # > Table 45: SPI Address Space
  119. _WRITE_SINGLE_BYTE = 0x00
  120. # > Registers with consecutive addresses can be
  121. # > accessed in an efficient way by setting the
  122. # > burst bit (B) in the header byte. The address
  123. # > bits (A5 - A0) set the start address in an
  124. # > internal address counter. This counter is
  125. # > incremented by one each new byte [...]
  126. _WRITE_BURST = 0x40
  127. _READ_SINGLE_BYTE = 0x80
  128. _READ_BURST = 0xC0
  129. # 29.3 Status Register Details
  130. _SUPPORTED_PARTNUM = 0
  131. # > The two versions of the chip will behave the same.
  132. # https://e2e.ti.com/support/wireless-connectivity/sub-1-ghz/f/156/p/428028/1529544#1529544
  133. _SUPPORTED_VERSIONS = [
  134. 0x04, # https://github.com/fphammerle/python-cc1101/issues/15
  135. 0x14,
  136. ]
  137. _CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ = 26e6
  138. # see "21 Frequency Programming"
  139. # > f_carrier = f_XOSC / 2**16 * (FREQ + CHAN * ((256 + CHANSPC_M) * 2**CHANSPC_E-2))
  140. _FREQUENCY_CONTROL_WORD_HERTZ_FACTOR = _CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ / 2**16
  141. # roughly estimated / tested with SDR receiver, docs specify:
  142. # > can [...] be programmed for operation at other frequencies
  143. # > in the 300-348 MHz, 387-464 MHz and 779-928 MHz bands.
  144. _TRANSMIT_MIN_FREQUENCY_HERTZ = 281.7e6
  145. # > The PATABLE is an 8-byte table that defines the PA control settings [...]
  146. _PATABLE_LENGTH_BYTES = 8
  147. def __init__(
  148. self,
  149. spi_bus: int = 0,
  150. spi_chip_select: int = 0,
  151. lock_spi_device: bool = False,
  152. spi_max_speed_hz: int = 55700,
  153. ) -> None:
  154. """
  155. lock_spi_device:
  156. When True, an advisory, exclusive lock will be set on the SPI device file
  157. non-blockingly via flock upon entering the context.
  158. If the SPI device file is already locked (e.g., by a different process),
  159. a BlockingIOError will be raised.
  160. The lock will be removed automatically, when leaving the context.
  161. The lock can optionally be released earlier by calling .unlock_spi_device().
  162. >>> transceiver = cc1101.CC1101(lock_spi_device=True)
  163. >>> # not locked
  164. >>> with transceiver:
  165. >>> # locked
  166. >>> # lock removed
  167. >>> with transceiver:
  168. >>> # locked
  169. >>> transceiver.unlock_spi_device()
  170. >>> # lock removed
  171. """
  172. self._spi = spidev.SpiDev()
  173. self._spi_max_speed_hz = spi_max_speed_hz
  174. self._spi_bus = int(spi_bus)
  175. # > The BCM2835 core common to all Raspberry Pi devices has 3 SPI Controllers:
  176. # > SPI0, with two hardware chip selects, [...]
  177. # > SPI1, with three hardware chip selects, [...]
  178. # > SPI2, also with three hardware chip selects, is only usable on a Compute Module [...]
  179. # https://github.com/raspberrypi/documentation/blob/d41d69f8efa3667b1a8b01a669238b8bd113edc1/hardware/raspberrypi/spi/README.md#hardware
  180. # https://www.raspberrypi.org/documentation/hardware/raspberrypi/spi/README.md
  181. self._spi_chip_select = int(spi_chip_select)
  182. self._lock_spi_device = lock_spi_device
  183. @property
  184. def _spi_device_path(self) -> str:
  185. # https://github.com/doceme/py-spidev/blob/v3.4/spidev_module.c#L1286
  186. return f"/dev/spidev{self._spi_bus}.{self._spi_chip_select}"
  187. @staticmethod
  188. def _log_chip_status_byte(chip_status: int) -> None:
  189. # see "10.1 Chip Status Byte" & "Table 23: Status Byte Summary"
  190. # > The command strobe registers are accessed by transferring
  191. # > a single header byte [...]. That is, only the R/W̄ bit,
  192. # > the burst access bit (set to 0), and the six address bits [...]
  193. # > The R/W̄ bit can be either one or zero and will determine how the
  194. # > FIFO_BYTES_AVAILABLE field in the status byte should be interpreted.
  195. _LOGGER.debug(
  196. "chip status byte: CHIP_RDYn=%d STATE=%s FIFO_BYTES_AVAILBLE=%d",
  197. chip_status >> 7,
  198. bin((chip_status >> 4) & 0b111),
  199. chip_status & 0b1111,
  200. )
  201. def _read_single_byte(
  202. self, register: typing.Union[ConfigurationRegisterAddress, FIFORegisterAddress]
  203. ) -> int:
  204. response = self._spi.xfer([register | self._READ_SINGLE_BYTE, 0])
  205. assert len(response) == 2, response
  206. self._log_chip_status_byte(response[0])
  207. return response[1]
  208. def _read_burst(
  209. self,
  210. start_register: typing.Union[
  211. ConfigurationRegisterAddress, PatableAddress, FIFORegisterAddress
  212. ],
  213. length: int,
  214. ) -> typing.List[int]:
  215. response = self._spi.xfer([start_register | self._READ_BURST] + [0] * length)
  216. assert len(response) == length + 1, response
  217. self._log_chip_status_byte(response[0])
  218. return response[1:]
  219. def _read_status_register(self, register: StatusRegisterAddress) -> int:
  220. # > For register addresses in the range 0x30-0x3D,
  221. # > the burst bit is used to select between
  222. # > status registers when burst bit is one, and
  223. # > between command strobes when burst bit is
  224. # > zero. [...]
  225. # > Because of this, burst access is not available
  226. # > for status registers and they must be accessed
  227. # > one at a time. The status registers can only be
  228. # > read.
  229. response = self._spi.xfer([register | self._READ_BURST, 0])
  230. assert len(response) == 2, response
  231. self._log_chip_status_byte(response[0])
  232. return response[1]
  233. def _command_strobe(self, register: StrobeAddress) -> None:
  234. # see "10.4 Command Strobes"
  235. _LOGGER.debug("sending command strobe 0x%02x", register)
  236. response = self._spi.xfer([register | self._WRITE_SINGLE_BYTE])
  237. assert len(response) == 1, response
  238. self._log_chip_status_byte(response[0])
  239. def _write_burst(
  240. self,
  241. start_register: typing.Union[
  242. ConfigurationRegisterAddress, PatableAddress, FIFORegisterAddress
  243. ],
  244. values: typing.List[int],
  245. ) -> None:
  246. _LOGGER.debug(
  247. "writing burst: start_register=0x%02x values=%s", start_register, values
  248. )
  249. response = self._spi.xfer([start_register | self._WRITE_BURST] + values)
  250. assert len(response) == len(values) + 1, response
  251. self._log_chip_status_byte(response[0])
  252. assert all(v == response[0] for v in response[1:]), response
  253. def _reset(self) -> None:
  254. self._command_strobe(StrobeAddress.SRES)
  255. @classmethod
  256. def _filter_bandwidth_floating_point_to_real(
  257. cls, *, mantissa: int, exponent: int
  258. ) -> float:
  259. """
  260. See "13 Receiver Channel Filter Bandwidth"
  261. """
  262. return cls._CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ / (
  263. 8 * (4 + mantissa) * (2**exponent)
  264. )
  265. def _get_filter_bandwidth_hertz(self) -> float:
  266. """
  267. MDMCFG4.CHANBW_E & MDMCFG4.CHANBW_M
  268. > [...] decimation ratio for the delta-sigma ADC input stream
  269. > and thus the channel bandwidth.
  270. See "13 Receiver Channel Filter Bandwidth"
  271. """
  272. mdmcfg4 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG4)
  273. return self._filter_bandwidth_floating_point_to_real(
  274. exponent=mdmcfg4 >> 6, mantissa=(mdmcfg4 >> 4) & 0b11
  275. )
  276. def _set_filter_bandwidth(self, *, mantissa: int, exponent: int) -> None:
  277. """
  278. MDMCFG4.CHANBW_E & MDMCFG4.CHANBW_M
  279. """
  280. mdmcfg4 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG4)
  281. mdmcfg4 &= 0b00001111
  282. assert 0 <= exponent <= 0b11, exponent
  283. mdmcfg4 |= exponent << 6
  284. assert 0 <= mantissa <= 0b11, mantissa
  285. mdmcfg4 |= mantissa << 4
  286. self._write_burst(
  287. start_register=ConfigurationRegisterAddress.MDMCFG4, values=[mdmcfg4]
  288. )
  289. def _get_symbol_rate_exponent(self) -> int:
  290. """
  291. MDMCFG4.DRATE_E
  292. """
  293. return self._read_single_byte(ConfigurationRegisterAddress.MDMCFG4) & 0b00001111
  294. def _set_symbol_rate_exponent(self, exponent: int):
  295. mdmcfg4 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG4)
  296. mdmcfg4 &= 0b11110000
  297. mdmcfg4 |= exponent
  298. self._write_burst(
  299. start_register=ConfigurationRegisterAddress.MDMCFG4, values=[mdmcfg4]
  300. )
  301. def _get_symbol_rate_mantissa(self) -> int:
  302. """
  303. MDMCFG3.DRATE_M
  304. """
  305. return self._read_single_byte(ConfigurationRegisterAddress.MDMCFG3)
  306. def _set_symbol_rate_mantissa(self, mantissa: int) -> None:
  307. self._write_burst(
  308. start_register=ConfigurationRegisterAddress.MDMCFG3, values=[mantissa]
  309. )
  310. @classmethod
  311. def _symbol_rate_floating_point_to_real(
  312. cls, *, mantissa: int, exponent: int
  313. ) -> float:
  314. # see "12 Data Rate Programming"
  315. return (
  316. (256 + mantissa)
  317. * (2**exponent)
  318. * cls._CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ
  319. / (2**28)
  320. )
  321. @classmethod
  322. def _symbol_rate_real_to_floating_point(cls, real: float) -> typing.Tuple[int, int]:
  323. # see "12 Data Rate Programming"
  324. assert real > 0, real
  325. exponent = math.floor(
  326. math.log2(real / cls._CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ) + 20
  327. )
  328. mantissa = round(
  329. real * 2**28 / cls._CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ / 2**exponent - 256
  330. )
  331. if mantissa == 256:
  332. exponent += 1
  333. mantissa = 0
  334. assert 0 < exponent <= 2**4, exponent
  335. assert mantissa <= 2**8, mantissa
  336. return mantissa, exponent
  337. def get_symbol_rate_baud(self) -> float:
  338. return self._symbol_rate_floating_point_to_real(
  339. mantissa=self._get_symbol_rate_mantissa(),
  340. exponent=self._get_symbol_rate_exponent(),
  341. )
  342. def set_symbol_rate_baud(self, real: float) -> None:
  343. # > The data rate can be set from 0.6 kBaud to 500 kBaud [...]
  344. mantissa, exponent = self._symbol_rate_real_to_floating_point(real)
  345. self._set_symbol_rate_mantissa(mantissa)
  346. self._set_symbol_rate_exponent(exponent)
  347. def get_modulation_format(self) -> ModulationFormat:
  348. mdmcfg2 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG2)
  349. return ModulationFormat((mdmcfg2 >> 4) & 0b111)
  350. def _set_modulation_format(self, modulation_format: ModulationFormat) -> None:
  351. mdmcfg2 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG2)
  352. mdmcfg2 &= 0b10001111
  353. mdmcfg2 |= modulation_format << 4
  354. self._write_burst(ConfigurationRegisterAddress.MDMCFG2, [mdmcfg2])
  355. def enable_manchester_code(self) -> None:
  356. """
  357. MDMCFG2.MANCHESTER_EN
  358. Enable manchester encoding & decoding for the entire packet,
  359. including the preamble and synchronization word.
  360. """
  361. mdmcfg2 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG2)
  362. mdmcfg2 |= 0b1000
  363. self._write_burst(ConfigurationRegisterAddress.MDMCFG2, [mdmcfg2])
  364. def get_sync_mode(self) -> SyncMode:
  365. mdmcfg2 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG2)
  366. return SyncMode(mdmcfg2 & 0b11)
  367. def set_sync_mode(
  368. self,
  369. mode: SyncMode,
  370. *,
  371. _carrier_sense_threshold_enabled: typing.Optional[bool] = None, # unstable
  372. ) -> None:
  373. """
  374. MDMCFG2.SYNC_MODE
  375. see "14.3 Byte Synchronization"
  376. Carrier Sense (CS) Threshold (when receiving packets, API unstable):
  377. > Carrier sense can be used as a sync word qualifier
  378. > that requires the signal level to be higher than the threshold
  379. > for a sync word > search to be performed [...]
  380. > CS can be used to avoid interference from other RF sources [...]
  381. True: enable, False: disable, None: keep current setting
  382. See "17.4 Carrier Sense (CS)"
  383. """
  384. mdmcfg2 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG2)
  385. mdmcfg2 &= 0b11111100
  386. mdmcfg2 |= mode
  387. if _carrier_sense_threshold_enabled is not None:
  388. if _carrier_sense_threshold_enabled:
  389. mdmcfg2 |= 0b00000100
  390. else:
  391. mdmcfg2 &= 0b11111011
  392. self._write_burst(ConfigurationRegisterAddress.MDMCFG2, [mdmcfg2])
  393. def get_preamble_length_bytes(self) -> int:
  394. """
  395. MDMCFG1.NUM_PREAMBLE
  396. Minimum number of preamble bytes to be transmitted.
  397. See "15.2 Packet Format"
  398. """
  399. index = (
  400. self._read_single_byte(ConfigurationRegisterAddress.MDMCFG1) >> 4
  401. ) & 0b111
  402. return 2 ** (index >> 1) * (2 + (index & 0b1))
  403. def _set_preamble_length_index(self, index: int) -> None:
  404. assert 0 <= index <= 0b111
  405. mdmcfg1 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG1)
  406. mdmcfg1 &= 0b10001111
  407. mdmcfg1 |= index << 4
  408. self._write_burst(ConfigurationRegisterAddress.MDMCFG1, [mdmcfg1])
  409. def set_preamble_length_bytes(self, length: int) -> None:
  410. """
  411. see .get_preamble_length_bytes()
  412. """
  413. if length < 1:
  414. raise ValueError(
  415. f"invalid preamble length {length} given"
  416. "\ncall .set_sync_mode(cc1101.SyncMode.NO_PREAMBLE_AND_SYNC_WORD)"
  417. " to disable preamble"
  418. )
  419. if length % 3 == 0: # pylint: disable=consider-ternary-expression
  420. index = math.log2(length / 3) * 2 + 1
  421. else:
  422. index = math.log2(length / 2) * 2
  423. if not index.is_integer() or index < 0 or index > 0b111:
  424. raise ValueError(
  425. f"unsupported preamble length: {length} bytes"
  426. "\nsee MDMCFG1.NUM_PREAMBLE in cc1101 docs"
  427. )
  428. self._set_preamble_length_index(int(index))
  429. def _get_power_amplifier_setting_index(self) -> int:
  430. """
  431. see ._set_power_amplifier_setting_index
  432. """
  433. return self._read_single_byte(ConfigurationRegisterAddress.FREND0) & 0b111
  434. def _set_power_amplifier_setting_index(self, setting_index: int) -> None:
  435. """
  436. FREND0.PA_POWER
  437. > This value is an index to the PATABLE,
  438. > which can be programmed with up to 8 different PA settings.
  439. > In OOK/ASK mode, this selects the PATABLE index to use
  440. > when transmitting a '1'.
  441. > PATABLE index zero is used in OOK/ASK when transmitting a '0'.
  442. > The PATABLE settings from index 0 to the PA_POWER value are
  443. > used for > ASK TX shaping, [...]
  444. see "Figure 32: Shaping of ASK Signal"
  445. > If OOK modulation is used, the logic 0 and logic 1 power levels
  446. > shall be programmed to index 0 and 1 respectively.
  447. """
  448. assert 0 <= setting_index <= 0b111, setting_index
  449. frend0 = self._read_single_byte(ConfigurationRegisterAddress.FREND0)
  450. frend0 &= 0b11111000
  451. frend0 |= setting_index
  452. self._write_burst(ConfigurationRegisterAddress.FREND0, [frend0])
  453. def _verify_chip(self) -> None:
  454. partnum = self._read_status_register(StatusRegisterAddress.PARTNUM)
  455. if partnum != self._SUPPORTED_PARTNUM:
  456. raise ValueError(
  457. f"unexpected chip part number {partnum} (expected: {self._SUPPORTED_PARTNUM})"
  458. )
  459. version = self._read_status_register(StatusRegisterAddress.VERSION)
  460. if version not in self._SUPPORTED_VERSIONS:
  461. msg = f"Unsupported chip version 0x{version:02x}"
  462. supported_versions = ", ".join(
  463. f"0x{v:02x}" for v in self._SUPPORTED_VERSIONS
  464. )
  465. msg += f" (expected one of [{supported_versions}])"
  466. if version == 0:
  467. msg += (
  468. "\n\nPlease verify that all required pins are connected"
  469. " (see https://github.com/fphammerle/python-cc1101#wiring-raspberry-pi)"
  470. " and that you selected the correct SPI bus and chip/slave select line."
  471. )
  472. raise ValueError(msg)
  473. def _configure_defaults(self) -> None:
  474. # next major/breaking release will probably stick closer to CC1101's defaults
  475. # 6:4 MOD_FORMAT: OOK (default: 2-FSK)
  476. self._set_modulation_format(ModulationFormat.ASK_OOK)
  477. self._set_power_amplifier_setting_index(1)
  478. self._disable_data_whitening()
  479. # 7:6 unused
  480. # 5:4 FS_AUTOCAL: calibrate when going from IDLE to RX or TX
  481. # 3:2 PO_TIMEOUT: default
  482. # 1 PIN_CTRL_EN: default
  483. # 0 XOSC_FORCE_ON: default
  484. self._write_burst(ConfigurationRegisterAddress.MCSM0, [0b010100])
  485. # > Default is CLK_XOSC/192 (See Table 41 on page 62).
  486. # > It is recommended to disable the clock output in initialization,
  487. # > in order to optimize RF performance.
  488. self._write_burst(
  489. ConfigurationRegisterAddress.IOCFG0,
  490. # required for _wait_for_packet()
  491. [GDOSignalSelection.RX_FIFO_AT_OR_ABOVE_THRESHOLD_OR_PACKET_END_REACHED],
  492. )
  493. def __enter__(self) -> CC1101:
  494. # https://docs.python.org/3/reference/datamodel.html#object.__enter__
  495. try:
  496. self._spi.open(self._spi_bus, self._spi_chip_select)
  497. except PermissionError as exc:
  498. raise PermissionError(
  499. f"Could not access {self._spi_device_path}"
  500. "\nVerify that the current user has both read and write access."
  501. "\nOn some systems, like Raspberry Pi OS / Raspbian,"
  502. "\n\tsudo usermod -a -G spi $USER"
  503. "\nfollowed by a re-login grants sufficient permissions."
  504. ) from exc
  505. if self._lock_spi_device:
  506. # advisory, exclusive, non-blocking
  507. # lock removed in __exit__ by SpiDev.close()
  508. fcntl.flock(self._spi.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
  509. try:
  510. self._spi.max_speed_hz = self._spi_max_speed_hz
  511. self._reset()
  512. self._verify_chip()
  513. self._configure_defaults()
  514. marcstate = self.get_main_radio_control_state_machine_state()
  515. if marcstate != MainRadioControlStateMachineState.IDLE:
  516. raise ValueError(f"expected marcstate idle (actual: {marcstate.name})")
  517. except:
  518. self._spi.close()
  519. raise
  520. return self
  521. def __exit__(self, exc_type, exc_value, traceback): # -> typing.Literal[False]
  522. # https://docs.python.org/3/reference/datamodel.html#object.__exit__
  523. self._spi.close()
  524. return False
  525. def unlock_spi_device(self) -> None:
  526. """
  527. Manually release the lock set on the SPI device file.
  528. Alternatively, the lock will be released automatically,
  529. when leaving the context.
  530. Method fails silently, if the SPI device file is not locked.
  531. >>> transceiver = cc1101.CC1101(lock_spi_device=True)
  532. >>> # not locked
  533. >>> with transceiver:
  534. >>> # locked
  535. >>> # lock removed
  536. >>> with transceiver:
  537. >>> # locked
  538. >>> transceiver.unlock_spi_device()
  539. >>> # lock removed
  540. """
  541. fileno = self._spi.fileno()
  542. if fileno != -1:
  543. fcntl.flock(fileno, fcntl.LOCK_UN)
  544. def get_main_radio_control_state_machine_state(
  545. self,
  546. ) -> MainRadioControlStateMachineState:
  547. return MainRadioControlStateMachineState(
  548. self._read_status_register(StatusRegisterAddress.MARCSTATE)
  549. )
  550. def get_marc_state(self) -> MainRadioControlStateMachineState:
  551. """
  552. alias for get_main_radio_control_state_machine_state()
  553. """
  554. return self.get_main_radio_control_state_machine_state()
  555. @classmethod
  556. def _frequency_control_word_to_hertz(cls, control_word: typing.List[int]) -> float:
  557. return (
  558. int.from_bytes(control_word, byteorder="big", signed=False)
  559. * cls._FREQUENCY_CONTROL_WORD_HERTZ_FACTOR
  560. )
  561. @classmethod
  562. def _hertz_to_frequency_control_word(cls, hertz: float) -> typing.List[int]:
  563. return list(
  564. round(hertz / cls._FREQUENCY_CONTROL_WORD_HERTZ_FACTOR).to_bytes(
  565. length=3, byteorder="big", signed=False
  566. )
  567. )
  568. def _get_base_frequency_control_word(self) -> typing.List[int]:
  569. # > The base or start frequency is set by the 24 bitfrequency
  570. # > word located in the FREQ2, FREQ1, FREQ0 registers.
  571. return self._read_burst(
  572. start_register=ConfigurationRegisterAddress.FREQ2, length=3
  573. )
  574. def _set_base_frequency_control_word(self, control_word: typing.List[int]) -> None:
  575. self._write_burst(
  576. start_register=ConfigurationRegisterAddress.FREQ2, values=control_word
  577. )
  578. def get_base_frequency_hertz(self) -> float:
  579. return self._frequency_control_word_to_hertz(
  580. self._get_base_frequency_control_word()
  581. )
  582. def set_base_frequency_hertz(self, freq: float) -> None:
  583. if freq < (self._TRANSMIT_MIN_FREQUENCY_HERTZ - 50e3):
  584. # > [use] warnings.warn() in library code if the issue is avoidable
  585. # > and the client application should be modified to eliminate the warning[.]
  586. # > [use] logging.warning() if there is nothing the client application
  587. # > can do about the situation, but the event should still be noted.
  588. # https://docs.python.org/3/howto/logging.html#when-to-use-logging
  589. warnings.warn(
  590. "CC1101 is unable to transmit at frequencies"
  591. f" below {(self._TRANSMIT_MIN_FREQUENCY_HERTZ / 1e6):.1f} MHz"
  592. )
  593. self._set_base_frequency_control_word(
  594. self._hertz_to_frequency_control_word(freq)
  595. )
  596. def __str__(self) -> str:
  597. sync_mode = self.get_sync_mode()
  598. attrs = (
  599. f"marcstate={self.get_main_radio_control_state_machine_state().name.lower()}",
  600. f"base_frequency={(self.get_base_frequency_hertz() / 1e6):.2f}MHz",
  601. f"symbol_rate={(self.get_symbol_rate_baud() / 1000):.2f}kBaud",
  602. f"modulation_format={self.get_modulation_format().name}",
  603. f"sync_mode={sync_mode.name}",
  604. (
  605. f"preamble_length={self.get_preamble_length_bytes()}B"
  606. if sync_mode != SyncMode.NO_PREAMBLE_AND_SYNC_WORD
  607. else None
  608. ),
  609. (
  610. f"sync_word=0x{self.get_sync_word().hex()}"
  611. if sync_mode != SyncMode.NO_PREAMBLE_AND_SYNC_WORD
  612. else None
  613. ),
  614. "packet_length{}{}B".format( # pylint: disable=consider-using-f-string
  615. (
  616. "≤"
  617. if self.get_packet_length_mode() == PacketLengthMode.VARIABLE
  618. else "="
  619. ),
  620. self.get_packet_length_bytes(),
  621. ),
  622. "output_power="
  623. + _format_patable(self.get_output_power(), insert_spaces=False),
  624. )
  625. # pylint: disable=consider-using-f-string
  626. return "CC1101({})".format(", ".join(filter(None, attrs)))
  627. def get_configuration_register_values(
  628. self,
  629. start_register: ConfigurationRegisterAddress = min(
  630. ConfigurationRegisterAddress
  631. ),
  632. end_register: ConfigurationRegisterAddress = max(ConfigurationRegisterAddress),
  633. ) -> typing.Dict[ConfigurationRegisterAddress, int]:
  634. assert start_register <= end_register, (start_register, end_register)
  635. values = self._read_burst(
  636. start_register=start_register, length=end_register - start_register + 1
  637. )
  638. return {
  639. ConfigurationRegisterAddress(start_register + i): v
  640. for i, v in enumerate(values)
  641. }
  642. def get_sync_word(self) -> bytes:
  643. """
  644. SYNC1 & SYNC0
  645. See "15.2 Packet Format"
  646. The first byte's most significant bit is transmitted first.
  647. """
  648. return bytes(
  649. self._read_burst(
  650. start_register=ConfigurationRegisterAddress.SYNC1, length=2
  651. )
  652. )
  653. def set_sync_word(self, sync_word: bytes) -> None:
  654. """
  655. See .set_sync_word()
  656. """
  657. if len(sync_word) != 2:
  658. raise ValueError(f"expected two bytes, got {sync_word!r}")
  659. self._write_burst(
  660. start_register=ConfigurationRegisterAddress.SYNC1, values=list(sync_word)
  661. )
  662. def get_packet_length_bytes(self) -> int:
  663. """
  664. PKTLEN
  665. Packet length in fixed packet length mode,
  666. maximum packet length in variable packet length mode.
  667. > In variable packet length mode, [...]
  668. > any packet received with a length byte
  669. > with a value greater than PKTLEN will be discarded.
  670. """
  671. return self._read_single_byte(ConfigurationRegisterAddress.PKTLEN)
  672. def set_packet_length_bytes(self, packet_length: int) -> None:
  673. """
  674. see get_packet_length_bytes()
  675. """
  676. assert 1 <= packet_length <= 255, f"unsupported packet length {packet_length}"
  677. self._write_burst(
  678. start_register=ConfigurationRegisterAddress.PKTLEN, values=[packet_length]
  679. )
  680. def _disable_data_whitening(self):
  681. """
  682. PKTCTRL0.WHITE_DATA
  683. see "15.1 Data Whitening"
  684. > By setting PKTCTRL0.WHITE_DATA=1 [default],
  685. > all data, except the preamble and the sync word
  686. > will be XOR-ed with a 9-bit pseudo-random (PN9)
  687. > sequence before being transmitted.
  688. """
  689. pktctrl0 = self._read_single_byte(ConfigurationRegisterAddress.PKTCTRL0)
  690. pktctrl0 &= 0b10111111
  691. self._write_burst(
  692. start_register=ConfigurationRegisterAddress.PKTCTRL0, values=[pktctrl0]
  693. )
  694. def disable_checksum(self) -> None:
  695. """
  696. PKTCTRL0.CRC_EN
  697. Disable automatic 2-byte cyclic redundancy check (CRC) sum
  698. appending in TX mode and checking in RX mode.
  699. See "Figure 19: Packet Format".
  700. """
  701. pktctrl0 = self._read_single_byte(ConfigurationRegisterAddress.PKTCTRL0)
  702. pktctrl0 &= 0b11111011
  703. self._write_burst(
  704. start_register=ConfigurationRegisterAddress.PKTCTRL0, values=[pktctrl0]
  705. )
  706. def _get_transceive_mode(self) -> _TransceiveMode:
  707. pktctrl0 = self._read_single_byte(ConfigurationRegisterAddress.PKTCTRL0)
  708. return _TransceiveMode((pktctrl0 >> 4) & 0b11)
  709. def _set_transceive_mode(self, mode: _TransceiveMode) -> None:
  710. _LOGGER.info("changing transceive mode to %s", mode.name)
  711. pktctrl0 = self._read_single_byte(ConfigurationRegisterAddress.PKTCTRL0)
  712. pktctrl0 &= ~0b00110000
  713. pktctrl0 |= mode << 4
  714. self._write_burst(
  715. start_register=ConfigurationRegisterAddress.PKTCTRL0, values=[pktctrl0]
  716. )
  717. def get_packet_length_mode(self) -> PacketLengthMode:
  718. pktctrl0 = self._read_single_byte(ConfigurationRegisterAddress.PKTCTRL0)
  719. return PacketLengthMode(pktctrl0 & 0b11)
  720. def set_packet_length_mode(self, mode: PacketLengthMode) -> None:
  721. pktctrl0 = self._read_single_byte(ConfigurationRegisterAddress.PKTCTRL0)
  722. pktctrl0 &= 0b11111100
  723. pktctrl0 |= mode
  724. self._write_burst(
  725. start_register=ConfigurationRegisterAddress.PKTCTRL0, values=[pktctrl0]
  726. )
  727. def _get_patable(self) -> typing.Tuple[int, ...]:
  728. """
  729. see "10.6 PATABLE Access" and "24 Output Power Programming"
  730. default: (0xC6, 0, 0, 0, 0, 0, 0, 0)
  731. """
  732. return tuple(
  733. self._read_burst(
  734. start_register=PatableAddress.PATABLE, length=self._PATABLE_LENGTH_BYTES
  735. )
  736. )
  737. def _set_patable(self, settings: typing.Iterable[int]):
  738. settings = list(settings)
  739. assert all(0 <= l <= 0xFF for l in settings), settings
  740. assert 0 < len(settings) <= self._PATABLE_LENGTH_BYTES, settings
  741. self._write_burst(start_register=PatableAddress.PATABLE, values=settings)
  742. def get_output_power(self) -> typing.Tuple[int, ...]:
  743. """
  744. Returns the enabled output power settings
  745. (up to 8 bytes of the PATABLE register).
  746. see .set_output_power()
  747. """
  748. return self._get_patable()[: self._get_power_amplifier_setting_index() + 1]
  749. def set_output_power(self, power_settings: typing.Iterable[int]) -> None:
  750. """
  751. Configures output power levels by setting PATABLE and FREND0.PA_POWER.
  752. Up to 8 bytes may be provided.
  753. > [PATABLE] provides flexible PA power ramp up and ramp down
  754. > at the start and end of transmission when using 2-FSK, GFSK,
  755. > 4-FSK, and MSK modulation as well as ASK modulation shaping.
  756. For OOK modulation, exactly 2 bytes must be provided:
  757. 0 to turn off the transmission for logical 0,
  758. and a level > 0 to turn on the transmission for logical 1.
  759. >>> transceiver.set_output_power((0, 0xC6))
  760. See "Table 39: Optimum PATABLE Settings for Various Output Power Levels [...]"
  761. and section "24 Output Power Programming".
  762. """
  763. power_settings = list(power_settings)
  764. # checks in sub-methods
  765. self._set_power_amplifier_setting_index(len(power_settings) - 1)
  766. self._set_patable(power_settings)
  767. def _flush_tx_fifo_buffer(self) -> None:
  768. # > Only issue SFTX in IDLE or TXFIFO_UNDERFLOW states.
  769. _LOGGER.debug("flushing tx fifo buffer")
  770. self._command_strobe(StrobeAddress.SFTX)
  771. def transmit(self, payload: bytes) -> None:
  772. """
  773. The most significant bit is transmitted first.
  774. In variable packet length mode,
  775. a byte indicating the packet's length will be prepended.
  776. > In variable packet length mode,
  777. > the packet length is configured by the first byte [...].
  778. > The packet length is defined as the payload data,
  779. > excluding the length byte and the optional CRC.
  780. from "15.2 Packet Format"
  781. Call .set_packet_length_mode(cc1101.PacketLengthMode.FIXED)
  782. to switch to fixed packet length mode.
  783. """
  784. # see "15.2 Packet Format"
  785. # > In variable packet length mode, [...]
  786. # > The first byte written to the TXFIFO must be different from 0.
  787. packet_length_mode = self.get_packet_length_mode()
  788. packet_length = self.get_packet_length_bytes()
  789. if packet_length_mode == PacketLengthMode.VARIABLE:
  790. if not payload:
  791. raise ValueError(f"empty payload {payload!r}")
  792. if len(payload) > packet_length:
  793. raise ValueError(
  794. f"payload exceeds maximum payload length of {packet_length} bytes"
  795. "\nsee .get_packet_length_bytes()"
  796. f"\npayload: {payload!r}"
  797. )
  798. payload = int.to_bytes(len(payload), length=1, byteorder="big") + payload
  799. elif (
  800. packet_length_mode == PacketLengthMode.FIXED
  801. and len(payload) != packet_length
  802. ):
  803. raise ValueError(
  804. f"expected payload length of {packet_length} bytes, got {len(payload)}"
  805. + "\nsee .set_packet_length_mode() and .get_packet_length_bytes()"
  806. + f"\npayload: {payload!r}"
  807. )
  808. marcstate = self.get_main_radio_control_state_machine_state()
  809. if marcstate != MainRadioControlStateMachineState.IDLE:
  810. raise RuntimeError(
  811. f"device must be idle before transmission (current marcstate: {marcstate.name})"
  812. )
  813. self._flush_tx_fifo_buffer()
  814. self._write_burst(FIFORegisterAddress.TX, list(payload))
  815. _LOGGER.info("transmitting 0x%s (%r)", payload.hex(), payload)
  816. self._command_strobe(StrobeAddress.STX)
  817. @contextlib.contextmanager
  818. def asynchronous_transmission(self) -> typing.Iterator[Pin]:
  819. """
  820. > [...] the GDO0 pin is used for data input [...]
  821. > The CC1101 modulator samples the level of the asynchronous input
  822. > 8 times faster than the programmed data rate.
  823. see "27.1 Asynchronous Serial Operation"
  824. >>> with cc1101.CC1101() as transceiver:
  825. >>> transceiver.set_base_frequency_hertz(433.92e6)
  826. >>> transceiver.set_symbol_rate_baud(600)
  827. >>> print(transceiver)
  828. >>> with transceiver.asynchronous_transmission():
  829. >>> # send digital signal to GDO0 pin
  830. """
  831. self._set_transceive_mode(_TransceiveMode.ASYNCHRONOUS_SERIAL)
  832. self._command_strobe(StrobeAddress.STX)
  833. try:
  834. # > In TX, the GDO0 pin is used for data input (TX data).
  835. yield Pin.GDO0
  836. finally:
  837. self._command_strobe(StrobeAddress.SIDLE)
  838. self._set_transceive_mode(_TransceiveMode.FIFO)
  839. def _enable_receive_mode(self) -> None:
  840. self._command_strobe(StrobeAddress.SRX)
  841. def _get_received_packet(self) -> typing.Optional[_ReceivedPacket]: # unstable
  842. """
  843. see section "20 Data FIFO"
  844. """
  845. rxbytes = self._read_status_register(StatusRegisterAddress.RXBYTES)
  846. # PKTCTRL1.APPEND_STATUS is enabled by default
  847. if rxbytes < 2:
  848. return None
  849. buffer = self._read_burst(start_register=FIFORegisterAddress.RX, length=rxbytes)
  850. return _ReceivedPacket(
  851. payload=bytes(buffer[:-2]),
  852. rssi_index=buffer[-2],
  853. checksum_valid=bool(buffer[-1] >> 7),
  854. link_quality_indicator=buffer[-1] & 0b0111111,
  855. )
  856. def _wait_for_packet( # unstable
  857. self,
  858. timeout: datetime.timedelta,
  859. gdo0_gpio_line_name: bytes = b"GPIO24", # recommended in README.md
  860. ) -> typing.Optional[_ReceivedPacket]:
  861. """
  862. depends on IOCFG0 == 0b00000001 (see _configure_defaults)
  863. """
  864. # pylint: disable=protected-access
  865. gdo0 = cc1101._gpio.GPIOLine.find(name=gdo0_gpio_line_name)
  866. self._enable_receive_mode()
  867. if not gdo0.wait_for_rising_edge(consumer=b"CC1101:GDO0", timeout=timeout):
  868. self._command_strobe(StrobeAddress.SIDLE)
  869. _LOGGER.debug(
  870. "reached timeout of %.02f seconds while waiting for packet",
  871. timeout.total_seconds(),
  872. )
  873. return None # timeout
  874. return self._get_received_packet()