__init__.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856
  1. # python-cc1101 - Python Library to Transmit RF Signals via C1101 Transceivers
  2. #
  3. # Copyright (C) 2020 Fabian Peter Hammerle <fabian@hammerle.me>
  4. #
  5. # This program is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # any later version.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. import contextlib
  18. import enum
  19. import fcntl
  20. import logging
  21. import math
  22. import typing
  23. import spidev
  24. from cc1101.addresses import (
  25. StrobeAddress,
  26. ConfigurationRegisterAddress,
  27. StatusRegisterAddress,
  28. FIFORegisterAddress,
  29. )
  30. from cc1101.options import PacketLengthMode, SyncMode, ModulationFormat
  31. _LOGGER = logging.getLogger(__name__)
  32. class Pin(enum.Enum):
  33. GDO0 = "GDO0"
  34. class _TransceiveMode(enum.IntEnum):
  35. """
  36. PKTCTRL0.PKT_FORMAT
  37. """
  38. FIFO = 0b00
  39. SYNCHRONOUS_SERIAL = 0b01
  40. RANDOM_TRANSMISSION = 0b10
  41. ASYNCHRONOUS_SERIAL = 0b11
  42. class MainRadioControlStateMachineState(enum.IntEnum):
  43. """
  44. MARCSTATE - Main Radio Control State Machine State
  45. """
  46. # see "Figure 13: Simplified State Diagram"
  47. # and "Figure 25: Complete Radio Control State Diagram"
  48. IDLE = 0x01
  49. STARTCAL = 0x08 # after IDLE
  50. BWBOOST = 0x09 # after STARTCAL
  51. FS_LOCK = 0x0A
  52. RX = 0x0D
  53. RXFIFO_OVERFLOW = 0x11
  54. TX = 0x13
  55. # TXFIFO_UNDERFLOW = 0x16
  56. class _ReceivedPacket: # unstable
  57. # "Table 31: Typical RSSI_offset Values"
  58. _RSSI_OFFSET_dB = 74
  59. def __init__(
  60. self,
  61. # *,
  62. data: bytes,
  63. rssi_index: int, # byte
  64. checksum_valid: bool,
  65. link_quality_indicator: int, # 7bit
  66. ):
  67. self.data = data
  68. self._rssi_index = rssi_index
  69. assert 0 <= rssi_index < (1 << 8), rssi_index
  70. self.checksum_valid = checksum_valid
  71. self.link_quality_indicator = link_quality_indicator
  72. assert 0 <= link_quality_indicator < (1 << 7), link_quality_indicator
  73. @property
  74. def rssi_dbm(self) -> float:
  75. """
  76. Estimated Received Signal Strength Indicator (RSSI) in dBm
  77. see section "17.3 RSSI"
  78. """
  79. if self._rssi_index >= 128:
  80. return (self._rssi_index - 256) / 2 - self._RSSI_OFFSET_dB
  81. return self._rssi_index / 2 - self._RSSI_OFFSET_dB
  82. def __str__(self) -> str:
  83. return "{}(RSSI {:.0f}dBm, 0x{})".format(
  84. type(self).__name__,
  85. self.rssi_dbm,
  86. "".join("{:02x}".format(b) for b in self.data),
  87. )
  88. class CC1101:
  89. # pylint: disable=too-many-public-methods
  90. # > All transfers on the SPI interface are done
  91. # > most significant bit first.
  92. # > All transactions on the SPI interface start with
  93. # > a header byte containing a R/W bit, a access bit (B),
  94. # > and a 6-bit address (A5 - A0).
  95. # > [...]
  96. # > Table 45: SPI Address Space
  97. _WRITE_SINGLE_BYTE = 0x00
  98. # > Registers with consecutive addresses can be
  99. # > accessed in an efficient way by setting the
  100. # > burst bit (B) in the header byte. The address
  101. # > bits (A5 - A0) set the start address in an
  102. # > internal address counter. This counter is
  103. # > incremented by one each new byte [...]
  104. _WRITE_BURST = 0x40
  105. _READ_SINGLE_BYTE = 0x80
  106. _READ_BURST = 0xC0
  107. # 29.3 Status Register Details
  108. _SUPPORTED_PARTNUM = 0
  109. # > The two versions of the chip will behave the same.
  110. # https://e2e.ti.com/support/wireless-connectivity/sub-1-ghz/f/156/p/428028/1529544#1529544
  111. _SUPPORTED_VERSIONS = [
  112. 0x04, # https://github.com/fphammerle/python-cc1101/issues/15
  113. 0x14,
  114. ]
  115. _CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ = 26e6
  116. # see "21 Frequency Programming"
  117. # > f_carrier = f_XOSC / 2**16 * (FREQ + CHAN * ((256 + CHANSPC_M) * 2**CHANSPC_E-2))
  118. _FREQUENCY_CONTROL_WORD_HERTZ_FACTOR = _CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ / 2 ** 16
  119. def __init__(
  120. self, spi_bus: int = 0, spi_chip_select: int = 0, lock_spi_device: bool = False
  121. ) -> None:
  122. """
  123. lock_spi_device:
  124. When True, an advisory, exclusive lock will be set on the SPI device file
  125. non-blockingly via flock upon entering the context.
  126. If the SPI device file is already locked (e.g., by a different process),
  127. a BlockingIOError will be raised.
  128. The lock will be removed automatically, when leaving the context.
  129. The lock can optionally be released earlier by calling .unlock_spi_device().
  130. >>> transceiver = cc1101.CC1101(lock_spi_device=True)
  131. >>> # not locked
  132. >>> with transceiver:
  133. >>> # locked
  134. >>> # lock removed
  135. >>> with transceiver:
  136. >>> # locked
  137. >>> transceiver.unlock_spi_device()
  138. >>> # lock removed
  139. """
  140. self._spi = spidev.SpiDev()
  141. self._spi_bus = int(spi_bus)
  142. # > The BCM2835 core common to all Raspberry Pi devices has 3 SPI Controllers:
  143. # > SPI0, with two hardware chip selects, [...]
  144. # > SPI1, with three hardware chip selects, [...]
  145. # > SPI2, also with three hardware chip selects, is only usable on a Compute Module [...]
  146. # https://github.com/raspberrypi/documentation/blob/d41d69f8efa3667b1a8b01a669238b8bd113edc1/hardware/raspberrypi/spi/README.md#hardware
  147. # https://www.raspberrypi.org/documentation/hardware/raspberrypi/spi/README.md
  148. self._spi_chip_select = int(spi_chip_select)
  149. self._lock_spi_device = lock_spi_device
  150. @property
  151. def _spi_device_path(self) -> str:
  152. # https://github.com/doceme/py-spidev/blob/v3.4/spidev_module.c#L1286
  153. return "/dev/spidev{}.{}".format(self._spi_bus, self._spi_chip_select)
  154. @staticmethod
  155. def _log_chip_status_byte(chip_status: int) -> None:
  156. # see "10.1 Chip Status Byte" & "Table 23: Status Byte Summary"
  157. # > The command strobe registers are accessed by transferring
  158. # > a single header byte [...]. That is, only the R/W̄ bit,
  159. # > the burst access bit (set to 0), and the six address bits [...]
  160. # > The R/W̄ bit can be either one or zero and will determine how the
  161. # > FIFO_BYTES_AVAILABLE field in the status byte should be interpreted.
  162. _LOGGER.debug(
  163. "chip status byte: CHIP_RDYn=%d STATE=%s FIFO_BYTES_AVAILBLE=%d",
  164. chip_status >> 7,
  165. bin((chip_status >> 4) & 0b111),
  166. chip_status & 0b1111,
  167. )
  168. def _read_single_byte(
  169. self, register: typing.Union[ConfigurationRegisterAddress, FIFORegisterAddress]
  170. ) -> int:
  171. response = self._spi.xfer([register | self._READ_SINGLE_BYTE, 0])
  172. assert len(response) == 2, response
  173. self._log_chip_status_byte(response[0])
  174. return response[1]
  175. def _read_burst(
  176. self,
  177. start_register: typing.Union[ConfigurationRegisterAddress, FIFORegisterAddress],
  178. length: int,
  179. ) -> typing.List[int]:
  180. response = self._spi.xfer([start_register | self._READ_BURST] + [0] * length)
  181. assert len(response) == length + 1, response
  182. self._log_chip_status_byte(response[0])
  183. return response[1:]
  184. def _read_status_register(self, register: StatusRegisterAddress) -> int:
  185. # > For register addresses in the range 0x30-0x3D,
  186. # > the burst bit is used to select between
  187. # > status registers when burst bit is one, and
  188. # > between command strobes when burst bit is
  189. # > zero. [...]
  190. # > Because of this, burst access is not available
  191. # > for status registers and they must be accessed
  192. # > one at a time. The status registers can only be
  193. # > read.
  194. response = self._spi.xfer([register | self._READ_BURST, 0])
  195. assert len(response) == 2, response
  196. self._log_chip_status_byte(response[0])
  197. return response[1]
  198. def _command_strobe(self, register: StrobeAddress) -> None:
  199. # see "10.4 Command Strobes"
  200. _LOGGER.debug("sending command strobe 0x%02x", register)
  201. response = self._spi.xfer([register | self._WRITE_SINGLE_BYTE])
  202. assert len(response) == 1, response
  203. self._log_chip_status_byte(response[0])
  204. def _write_burst(
  205. self,
  206. start_register: typing.Union[ConfigurationRegisterAddress, FIFORegisterAddress],
  207. values: typing.List[int],
  208. ) -> None:
  209. _LOGGER.debug(
  210. "writing burst: start_register=0x%02x values=%s", start_register, values
  211. )
  212. response = self._spi.xfer([start_register | self._WRITE_BURST] + values)
  213. assert len(response) == len(values) + 1, response
  214. self._log_chip_status_byte(response[0])
  215. assert all(v == response[0] for v in response[1:]), response
  216. def _reset(self) -> None:
  217. self._command_strobe(StrobeAddress.SRES)
  218. @classmethod
  219. def _filter_bandwidth_floating_point_to_real(
  220. cls, mantissa: int, exponent: int
  221. ) -> float:
  222. """
  223. See "13 Receiver Channel Filter Bandwidth"
  224. """
  225. return cls._CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ / (
  226. 8 * (4 + mantissa) * (2 ** exponent)
  227. )
  228. def _get_filter_bandwidth_hertz(self) -> float:
  229. """
  230. See "13 Receiver Channel Filter Bandwidth"
  231. MDMCFG4.CHANBW_E & MDMCFG4.CHANBW_M
  232. """
  233. mdmcfg4 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG4)
  234. return self._filter_bandwidth_floating_point_to_real(
  235. exponent=mdmcfg4 >> 6, mantissa=(mdmcfg4 >> 4) & 0b11
  236. )
  237. def _set_filter_bandwidth(self, *, mantissa: int, exponent: int) -> None:
  238. """
  239. MDMCFG4.CHANBW_E & MDMCFG4.CHANBW_M
  240. """
  241. mdmcfg4 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG4)
  242. mdmcfg4 &= 0b00001111
  243. assert 0 <= exponent <= 0b11, exponent
  244. mdmcfg4 |= exponent << 6
  245. assert 0 <= mantissa <= 0b11, mantissa
  246. mdmcfg4 |= mantissa << 4
  247. self._write_burst(
  248. start_register=ConfigurationRegisterAddress.MDMCFG4, values=[mdmcfg4]
  249. )
  250. def _get_symbol_rate_exponent(self) -> int:
  251. """
  252. MDMCFG4.DRATE_E
  253. """
  254. return self._read_single_byte(ConfigurationRegisterAddress.MDMCFG4) & 0b00001111
  255. def _set_symbol_rate_exponent(self, exponent: int):
  256. mdmcfg4 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG4)
  257. mdmcfg4 &= 0b11110000
  258. mdmcfg4 |= exponent
  259. self._write_burst(
  260. start_register=ConfigurationRegisterAddress.MDMCFG4, values=[mdmcfg4]
  261. )
  262. def _get_symbol_rate_mantissa(self) -> int:
  263. """
  264. MDMCFG3.DRATE_M
  265. """
  266. return self._read_single_byte(ConfigurationRegisterAddress.MDMCFG3)
  267. def _set_symbol_rate_mantissa(self, mantissa: int) -> None:
  268. self._write_burst(
  269. start_register=ConfigurationRegisterAddress.MDMCFG3, values=[mantissa]
  270. )
  271. @classmethod
  272. def _symbol_rate_floating_point_to_real(cls, mantissa: int, exponent: int) -> float:
  273. # see "12 Data Rate Programming"
  274. return (
  275. (256 + mantissa)
  276. * (2 ** exponent)
  277. * cls._CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ
  278. / (2 ** 28)
  279. )
  280. @classmethod
  281. def _symbol_rate_real_to_floating_point(cls, real: float) -> typing.Tuple[int, int]:
  282. # see "12 Data Rate Programming"
  283. assert real > 0, real
  284. exponent = math.floor(
  285. math.log2(real / cls._CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ) + 20
  286. )
  287. mantissa = round(
  288. real * 2 ** 28 / cls._CRYSTAL_OSCILLATOR_FREQUENCY_HERTZ / 2 ** exponent
  289. - 256
  290. )
  291. if mantissa == 256:
  292. exponent += 1
  293. mantissa = 0
  294. assert 0 < exponent <= 2 ** 4, exponent
  295. assert mantissa <= 2 ** 8, mantissa
  296. return mantissa, exponent
  297. def get_symbol_rate_baud(self) -> float:
  298. return self._symbol_rate_floating_point_to_real(
  299. mantissa=self._get_symbol_rate_mantissa(),
  300. exponent=self._get_symbol_rate_exponent(),
  301. )
  302. def set_symbol_rate_baud(self, real: float) -> None:
  303. # > The data rate can be set from 0.6 kBaud to 500 kBaud [...]
  304. mantissa, exponent = self._symbol_rate_real_to_floating_point(real)
  305. self._set_symbol_rate_mantissa(mantissa)
  306. self._set_symbol_rate_exponent(exponent)
  307. def get_modulation_format(self) -> ModulationFormat:
  308. mdmcfg2 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG2)
  309. return ModulationFormat((mdmcfg2 >> 4) & 0b111)
  310. def _set_modulation_format(self, modulation_format: ModulationFormat) -> None:
  311. mdmcfg2 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG2)
  312. mdmcfg2 &= ~(modulation_format << 4)
  313. mdmcfg2 |= modulation_format << 4
  314. self._write_burst(ConfigurationRegisterAddress.MDMCFG2, [mdmcfg2])
  315. def enable_manchester_code(self) -> None:
  316. """
  317. MDMCFG2.MANCHESTER_EN
  318. Enable manchester encoding & decoding for the entire packet,
  319. including the preamble and synchronization word.
  320. """
  321. mdmcfg2 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG2)
  322. mdmcfg2 |= 0b1000
  323. self._write_burst(ConfigurationRegisterAddress.MDMCFG2, [mdmcfg2])
  324. def get_sync_mode(self) -> SyncMode:
  325. mdmcfg2 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG2)
  326. return SyncMode(mdmcfg2 & 0b11)
  327. def set_sync_mode(
  328. self,
  329. mode: SyncMode,
  330. *,
  331. _carrier_sense_threshold_enabled: typing.Optional[bool] = None # unstable
  332. ) -> None:
  333. """
  334. MDMCFG2.SYNC_MODE
  335. see "14.3 Byte Synchronization"
  336. Carrier Sense (CS) Threshold (when receiving packets, API unstable):
  337. > Carrier sense can be used as a sync word qualifier
  338. > that requires the signal level to be higher than the threshold
  339. > for a sync word > search to be performed [...]
  340. > CS can be used to avoid interference from other RF sources [...]
  341. True: enable, False: disable, None: keep current setting
  342. See "17.4 Carrier Sense (CS)"
  343. """
  344. mdmcfg2 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG2)
  345. mdmcfg2 &= 0b11111100
  346. mdmcfg2 |= mode
  347. if _carrier_sense_threshold_enabled is not None:
  348. if _carrier_sense_threshold_enabled:
  349. mdmcfg2 |= 0b00000100
  350. else:
  351. mdmcfg2 &= 0b11111011
  352. self._write_burst(ConfigurationRegisterAddress.MDMCFG2, [mdmcfg2])
  353. def get_preamble_length_bytes(self) -> int:
  354. """
  355. MDMCFG1.NUM_PREAMBLE
  356. Minimum number of preamble bytes to be transmitted.
  357. See "15.2 Packet Format"
  358. """
  359. index = (
  360. self._read_single_byte(ConfigurationRegisterAddress.MDMCFG1) >> 4
  361. ) & 0b111
  362. return 2 ** (index >> 1) * (2 + (index & 0b1))
  363. def _set_preamble_length_index(self, index: int) -> None:
  364. assert 0 <= index <= 0b111
  365. mdmcfg1 = self._read_single_byte(ConfigurationRegisterAddress.MDMCFG1)
  366. mdmcfg1 &= 0b10001111
  367. mdmcfg1 |= index << 4
  368. self._write_burst(ConfigurationRegisterAddress.MDMCFG1, [mdmcfg1])
  369. def set_preamble_length_bytes(self, length: int) -> None:
  370. """
  371. see .get_preamble_length_bytes()
  372. """
  373. if length < 1:
  374. raise ValueError(
  375. "invalid preamble length {} given".format(length)
  376. + "\ncall .set_sync_mode(cc1101.SyncMode.NO_PREAMBLE_AND_SYNC_WORD)"
  377. + " to disable preamble"
  378. )
  379. if length % 3 == 0:
  380. index = math.log2(length / 3) * 2 + 1
  381. else:
  382. index = math.log2(length / 2) * 2
  383. if not index.is_integer() or index < 0 or index > 0b111:
  384. raise ValueError(
  385. "unsupported preamble length: {} bytes".format(length)
  386. + "\nsee MDMCFG1.NUM_PREAMBLE in cc1101 docs"
  387. )
  388. self._set_preamble_length_index(int(index))
  389. def _set_power_amplifier_setting_index(self, setting_index: int) -> None:
  390. """
  391. FREND0.PA_POWER
  392. > This value is an index to the PATABLE,
  393. > which can be programmed with up to 8 different PA settings.
  394. > In OOK/ASK mode, this selects the PATABLE index to use
  395. > when transmitting a '1'.
  396. > PATABLE index zero is used in OOK/ASK when transmitting a '0'.
  397. > The PATABLE settings from index 0 to the PA_POWER value are
  398. > used for > ASK TX shaping, [...]
  399. see "Figure 32: Shaping of ASK Signal"
  400. > If OOK modulation is used, the logic 0 and logic 1 power levels
  401. > shall be programmed to index 0 and 1 respectively.
  402. """
  403. frend0 = self._read_single_byte(ConfigurationRegisterAddress.FREND0)
  404. frend0 &= 0b000
  405. frend0 |= setting_index
  406. self._write_burst(ConfigurationRegisterAddress.FREND0, [setting_index])
  407. def _verify_chip(self) -> None:
  408. partnum = self._read_status_register(StatusRegisterAddress.PARTNUM)
  409. if partnum != self._SUPPORTED_PARTNUM:
  410. raise ValueError(
  411. "unexpected chip part number {} (expected: {})".format(
  412. partnum, self._SUPPORTED_PARTNUM
  413. )
  414. )
  415. version = self._read_status_register(StatusRegisterAddress.VERSION)
  416. if version not in self._SUPPORTED_VERSIONS:
  417. raise ValueError(
  418. "unsupported chip version 0x{:02x} (expected one of [{}])".format(
  419. version,
  420. ", ".join("0x{:02x}".format(v) for v in self._SUPPORTED_VERSIONS),
  421. )
  422. )
  423. def _configure_defaults(self) -> None:
  424. # 6:4 MOD_FORMAT: OOK (default: 2-FSK)
  425. self._set_modulation_format(ModulationFormat.ASK_OOK)
  426. self._set_power_amplifier_setting_index(1)
  427. self._disable_data_whitening()
  428. # 7:6 unused
  429. # 5:4 FS_AUTOCAL: calibrate when going from IDLE to RX or TX
  430. # 3:2 PO_TIMEOUT: default
  431. # 1 PIN_CTRL_EN: default
  432. # 0 XOSC_FORCE_ON: default
  433. self._write_burst(ConfigurationRegisterAddress.MCSM0, [0b010100])
  434. def __enter__(self) -> "CC1101":
  435. # https://docs.python.org/3/reference/datamodel.html#object.__enter__
  436. try:
  437. self._spi.open(self._spi_bus, self._spi_chip_select)
  438. except PermissionError as exc:
  439. raise PermissionError(
  440. "Could not access {}".format(self._spi_device_path)
  441. + "\nVerify that the current user has both read and write access."
  442. + "\nOn some devices, like Raspberry Pis,"
  443. + "\n\tsudo usermod -a -G spi $USER"
  444. + "\nfollowed by a re-login grants sufficient permissions."
  445. ) from exc
  446. if self._lock_spi_device:
  447. # advisory, exclusive, non-blocking
  448. # lock removed in __exit__ by SpiDev.close()
  449. fcntl.flock(self._spi.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
  450. self._spi.max_speed_hz = 55700 # empirical
  451. self._reset()
  452. self._verify_chip()
  453. self._configure_defaults()
  454. marcstate = self.get_main_radio_control_state_machine_state()
  455. if marcstate != MainRadioControlStateMachineState.IDLE:
  456. raise ValueError("expected marcstate idle (actual: {})".format(marcstate))
  457. return self
  458. def __exit__(self, exc_type, exc_value, traceback): # -> typing.Literal[False]
  459. # https://docs.python.org/3/reference/datamodel.html#object.__exit__
  460. self._spi.close()
  461. return False
  462. def unlock_spi_device(self) -> None:
  463. """
  464. Manually release the lock set on the SPI device file.
  465. Alternatively, the lock will be released automatically,
  466. when leaving the context.
  467. Method fails silently, if the SPI device file is not locked.
  468. >>> transceiver = cc1101.CC1101(lock_spi_device=True)
  469. >>> # not locked
  470. >>> with transceiver:
  471. >>> # locked
  472. >>> # lock removed
  473. >>> with transceiver:
  474. >>> # locked
  475. >>> transceiver.unlock_spi_device()
  476. >>> # lock removed
  477. """
  478. fileno = self._spi.fileno()
  479. if fileno != -1:
  480. fcntl.flock(fileno, fcntl.LOCK_UN)
  481. def get_main_radio_control_state_machine_state(
  482. self,
  483. ) -> MainRadioControlStateMachineState:
  484. return MainRadioControlStateMachineState(
  485. self._read_status_register(StatusRegisterAddress.MARCSTATE)
  486. )
  487. def get_marc_state(self) -> MainRadioControlStateMachineState:
  488. """
  489. alias for get_main_radio_control_state_machine_state()
  490. """
  491. return self.get_main_radio_control_state_machine_state()
  492. @classmethod
  493. def _frequency_control_word_to_hertz(cls, control_word: typing.List[int]) -> float:
  494. return (
  495. int.from_bytes(control_word, byteorder="big", signed=False)
  496. * cls._FREQUENCY_CONTROL_WORD_HERTZ_FACTOR
  497. )
  498. @classmethod
  499. def _hertz_to_frequency_control_word(cls, hertz: float) -> typing.List[int]:
  500. return list(
  501. round(hertz / cls._FREQUENCY_CONTROL_WORD_HERTZ_FACTOR).to_bytes(
  502. length=3, byteorder="big", signed=False
  503. )
  504. )
  505. def _get_base_frequency_control_word(self) -> typing.List[int]:
  506. # > The base or start frequency is set by the 24 bitfrequency
  507. # > word located in the FREQ2, FREQ1, FREQ0 registers.
  508. return self._read_burst(
  509. start_register=ConfigurationRegisterAddress.FREQ2, length=3
  510. )
  511. def _set_base_frequency_control_word(self, control_word: typing.List[int]) -> None:
  512. self._write_burst(
  513. start_register=ConfigurationRegisterAddress.FREQ2, values=control_word
  514. )
  515. def get_base_frequency_hertz(self) -> float:
  516. return self._frequency_control_word_to_hertz(
  517. self._get_base_frequency_control_word()
  518. )
  519. def set_base_frequency_hertz(self, freq: float) -> None:
  520. self._set_base_frequency_control_word(
  521. self._hertz_to_frequency_control_word(freq)
  522. )
  523. def __str__(self) -> str:
  524. sync_mode = self.get_sync_mode()
  525. attrs = (
  526. "marcstate={}".format(
  527. self.get_main_radio_control_state_machine_state().name.lower()
  528. ),
  529. "base_frequency={:.2f}MHz".format(
  530. self.get_base_frequency_hertz() / 10 ** 6
  531. ),
  532. "symbol_rate={:.2f}kBaud".format(self.get_symbol_rate_baud() / 1000),
  533. "modulation_format={}".format(self.get_modulation_format().name),
  534. "sync_mode={}".format(sync_mode.name),
  535. "preamble_length={}B".format(self.get_preamble_length_bytes())
  536. if sync_mode != SyncMode.NO_PREAMBLE_AND_SYNC_WORD
  537. else None,
  538. "sync_word=0x{:02x}{:02x}".format(*self.get_sync_word())
  539. if sync_mode != SyncMode.NO_PREAMBLE_AND_SYNC_WORD
  540. else None,
  541. "packet_length{}{}B".format(
  542. "≤"
  543. if self.get_packet_length_mode() == PacketLengthMode.VARIABLE
  544. else "=",
  545. self.get_packet_length_bytes(),
  546. ),
  547. )
  548. return "CC1101({})".format(", ".join(filter(None, attrs)))
  549. def get_configuration_register_values(
  550. self,
  551. start_register: ConfigurationRegisterAddress = min(
  552. ConfigurationRegisterAddress
  553. ),
  554. end_register: ConfigurationRegisterAddress = max(ConfigurationRegisterAddress),
  555. ) -> typing.Dict[ConfigurationRegisterAddress, int]:
  556. assert start_register <= end_register, (start_register, end_register)
  557. values = self._read_burst(
  558. start_register=start_register, length=end_register - start_register + 1
  559. )
  560. return {
  561. ConfigurationRegisterAddress(start_register + i): v
  562. for i, v in enumerate(values)
  563. }
  564. def get_sync_word(self) -> bytes:
  565. """
  566. SYNC1 & SYNC0
  567. See "15.2 Packet Format"
  568. The first byte's most significant bit is transmitted first.
  569. """
  570. return bytes(
  571. self._read_burst(
  572. start_register=ConfigurationRegisterAddress.SYNC1, length=2
  573. )
  574. )
  575. def set_sync_word(self, sync_word: bytes) -> None:
  576. """
  577. See .set_sync_word()
  578. """
  579. if len(sync_word) != 2:
  580. raise ValueError("expected two bytes, got {!r}".format(sync_word))
  581. self._write_burst(
  582. start_register=ConfigurationRegisterAddress.SYNC1, values=list(sync_word)
  583. )
  584. def get_packet_length_bytes(self) -> int:
  585. """
  586. PKTLEN
  587. Packet length in fixed packet length mode,
  588. maximum packet length in variable packet length mode.
  589. > In variable packet length mode, [...]
  590. > any packet received with a length byte
  591. > with a value greater than PKTLEN will be discarded.
  592. """
  593. return self._read_single_byte(ConfigurationRegisterAddress.PKTLEN)
  594. def set_packet_length_bytes(self, packet_length: int) -> None:
  595. """
  596. see get_packet_length_bytes()
  597. """
  598. assert 1 <= packet_length <= 255, "unsupported packet length {}".format(
  599. packet_length
  600. )
  601. self._write_burst(
  602. start_register=ConfigurationRegisterAddress.PKTLEN, values=[packet_length]
  603. )
  604. def _disable_data_whitening(self):
  605. """
  606. PKTCTRL0.WHITE_DATA
  607. see "15.1 Data Whitening"
  608. > By setting PKTCTRL0.WHITE_DATA=1 [default],
  609. > all data, except the preamble and the sync word
  610. > will be XOR-ed with a 9-bit pseudo-random (PN9)
  611. > sequence before being transmitted.
  612. """
  613. pktctrl0 = self._read_single_byte(ConfigurationRegisterAddress.PKTCTRL0)
  614. pktctrl0 &= 0b10111111
  615. self._write_burst(
  616. start_register=ConfigurationRegisterAddress.PKTCTRL0, values=[pktctrl0]
  617. )
  618. def disable_checksum(self) -> None:
  619. """
  620. PKTCTRL0.CRC_EN
  621. Disable automatic 2-byte cyclic redundancy check (CRC) sum
  622. appending in TX mode and checking in RX mode.
  623. See "Figure 19: Packet Format".
  624. """
  625. pktctrl0 = self._read_single_byte(ConfigurationRegisterAddress.PKTCTRL0)
  626. pktctrl0 &= 0b11111011
  627. self._write_burst(
  628. start_register=ConfigurationRegisterAddress.PKTCTRL0, values=[pktctrl0]
  629. )
  630. def _get_transceive_mode(self) -> _TransceiveMode:
  631. pktctrl0 = self._read_single_byte(ConfigurationRegisterAddress.PKTCTRL0)
  632. return _TransceiveMode((pktctrl0 >> 4) & 0b11)
  633. def _set_transceive_mode(self, mode: _TransceiveMode) -> None:
  634. _LOGGER.info("changing transceive mode to %s", mode.name)
  635. pktctrl0 = self._read_single_byte(ConfigurationRegisterAddress.PKTCTRL0)
  636. pktctrl0 &= ~0b00110000
  637. pktctrl0 |= mode << 4
  638. self._write_burst(
  639. start_register=ConfigurationRegisterAddress.PKTCTRL0, values=[pktctrl0]
  640. )
  641. def get_packet_length_mode(self) -> PacketLengthMode:
  642. pktctrl0 = self._read_single_byte(ConfigurationRegisterAddress.PKTCTRL0)
  643. return PacketLengthMode(pktctrl0 & 0b11)
  644. def set_packet_length_mode(self, mode: PacketLengthMode) -> None:
  645. pktctrl0 = self._read_single_byte(ConfigurationRegisterAddress.PKTCTRL0)
  646. pktctrl0 &= 0b11111100
  647. pktctrl0 |= mode
  648. self._write_burst(
  649. start_register=ConfigurationRegisterAddress.PKTCTRL0, values=[pktctrl0]
  650. )
  651. def _flush_tx_fifo_buffer(self) -> None:
  652. # > Only issue SFTX in IDLE or TXFIFO_UNDERFLOW states.
  653. _LOGGER.debug("flushing tx fifo buffer")
  654. self._command_strobe(StrobeAddress.SFTX)
  655. def transmit(self, payload: bytes) -> None:
  656. """
  657. The most significant bit is transmitted first.
  658. In variable packet length mode,
  659. a byte indicating the packet's length will be prepended.
  660. > In variable packet length mode,
  661. > the packet length is configured by the first byte [...].
  662. > The packet length is defined as the payload data,
  663. > excluding the length byte and the optional CRC.
  664. from "15.2 Packet Format"
  665. Call .set_packet_length_mode(cc1101.PacketLengthMode.FIXED)
  666. to switch to fixed packet length mode.
  667. """
  668. # see "15.2 Packet Format"
  669. # > In variable packet length mode, [...]
  670. # > The first byte written to the TXFIFO must be different from 0.
  671. packet_length_mode = self.get_packet_length_mode()
  672. packet_length = self.get_packet_length_bytes()
  673. if packet_length_mode == PacketLengthMode.VARIABLE:
  674. if not payload:
  675. raise ValueError("empty payload {!r}".format(payload))
  676. if len(payload) > packet_length:
  677. raise ValueError(
  678. "payload exceeds maximum payload length of {} bytes".format(
  679. packet_length
  680. )
  681. + "\nsee .get_packet_length_bytes()"
  682. + "\npayload: {!r}".format(payload)
  683. )
  684. payload = int.to_bytes(len(payload), length=1, byteorder="big") + payload
  685. elif (
  686. packet_length_mode == PacketLengthMode.FIXED
  687. and len(payload) != packet_length
  688. ):
  689. raise ValueError(
  690. "expected payload length of {} bytes, got {}".format(
  691. packet_length, len(payload)
  692. )
  693. + "\nsee .set_packet_length_mode() and .get_packet_length_bytes()"
  694. + "\npayload: {!r}".format(payload)
  695. )
  696. marcstate = self.get_main_radio_control_state_machine_state()
  697. if marcstate != MainRadioControlStateMachineState.IDLE:
  698. raise Exception(
  699. "device must be idle before transmission (current marcstate: {})".format(
  700. marcstate.name
  701. )
  702. )
  703. self._flush_tx_fifo_buffer()
  704. self._write_burst(FIFORegisterAddress.TX, list(payload))
  705. _LOGGER.info(
  706. "transmitting 0x%s (%r)",
  707. "".join("{:02x}".format(b) for b in payload),
  708. payload,
  709. )
  710. self._command_strobe(StrobeAddress.STX)
  711. @contextlib.contextmanager
  712. def asynchronous_transmission(self) -> typing.Iterator[Pin]:
  713. """
  714. see "27.1 Asynchronous Serial Operation"
  715. >>> with cc1101.CC1101() as transceiver:
  716. >>> transceiver.set_base_frequency_hertz(433.92e6)
  717. >>> transceiver.set_symbol_rate_baud(600)
  718. >>> print(transceiver)
  719. >>> with transceiver.asynchronous_transmission():
  720. >>> # send digital signal to GDO0 pin
  721. """
  722. self._set_transceive_mode(_TransceiveMode.ASYNCHRONOUS_SERIAL)
  723. self._command_strobe(StrobeAddress.STX)
  724. try:
  725. # > In TX, the GDO0 pin is used for data input (TX data).
  726. yield Pin.GDO0
  727. finally:
  728. self._command_strobe(StrobeAddress.SIDLE)
  729. self._set_transceive_mode(_TransceiveMode.FIFO)
  730. def _enable_receive_mode(self) -> None: # unstable
  731. self._command_strobe(StrobeAddress.SRX)
  732. def _get_received_packet(self) -> typing.Optional[_ReceivedPacket]: # unstable
  733. """
  734. see section "20 Data FIFO"
  735. """
  736. rxbytes = self._read_status_register(StatusRegisterAddress.RXBYTES)
  737. # PKTCTRL1.APPEND_STATUS is enabled by default
  738. if rxbytes < 2:
  739. return None
  740. buffer = self._read_burst(start_register=FIFORegisterAddress.RX, length=rxbytes)
  741. return _ReceivedPacket(
  742. data=bytes(buffer[:-2]),
  743. rssi_index=buffer[-2],
  744. checksum_valid=bool(buffer[-1] >> 7),
  745. link_quality_indicator=buffer[-1] & 0b0111111,
  746. )