Source code for circuitpython_nrf24l01.rf24

# The MIT License (MIT)
#
# Copyright (c) 2017 Damien P. George
# Copyright (c) 2019 Brendan Doherty
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""rf24 module containing the base class RF24"""
__version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/2bndy5/CircuitPython_nRF24L01.git"
import time
from micropython import const

try:
    from ubus_device import SPIDevice
except ImportError:
    from adafruit_bus_device.spi_device import SPIDevice

CONFIGURE = const(0x00)  # IRQ masking, CRC scheme, PWR control, & RX/TX roles
AUTO_ACK = const(0x01)  # auto-ACK status for all pipes
OPEN_PIPES = const(0x02)  # open/close RX status for all pipes
SETUP_RETR = const(0x04)  # auto-retry count & delay values
RF_PA_RATE = const(0x06)  # RF Power Amplifier & Data Rate values
RX_ADDR_P0 = const(0x0A)  # RX pipe addresses; pipes 0-5 = 0x0A-0x0F
TX_ADDRESS = const(0x10)  # Address used for TX transmissions
RX_PL_LENG = const(0x11)  # RX payload widths; pipes 0-5 = 0x11-0x16
DYN_PL_LEN = const(0x1C)  # dynamic payloads status for all pipes
TX_FEATURE = const(0x1D)  # dynamic TX-payloads, TX-ACK payloads, TX-NO_ACK
CSN_DELAY = 0.005
"""The delay time (in seconds) used to let the CSN pin settle,
allowing a clean SPI transaction."""


[docs]class RF24: """A driver class for the nRF24L01(+) transceiver radios.""" def __init__(self, spi, csn, ce, spi_frequency=10000000): self._spi = SPIDevice(spi, chip_select=csn, baudrate=spi_frequency) self.ce_pin = ce self.ce_pin.switch_to_output(value=False) # pre-empt standby-I mode self._status = 0 # status byte returned on all SPI transactions # pre-configure the CONFIGURE register: # 0x0E = IRQs are all enabled, CRC is enabled with 2 bytes, and # power up in TX mode self._config = 0x0E self._reg_write(CONFIGURE, self._config) if self._reg_read(CONFIGURE) & 3 != 2: raise RuntimeError("nRF24L01 Hardware not responding") self.power = False # init shadow copy of RX addresses for all pipes for context manager self._pipes = [bytearray(5), bytearray(5), 0xC3, 0xC4, 0xC5, 0xC6] # _open_pipes attribute reflects only RX state on each pipe self._open_pipes = 0 # 0 = all pipes closed for i in range(6): # capture RX addresses from registers if i < 2: self._pipes[i] = self._reg_read_bytes(RX_ADDR_P0 + i) else: self._pipes[i] = self._reg_read(RX_ADDR_P0 + i) # test is nRF24L01 is a plus variant using a command specific to # non-plus variants self._is_plus_variant = False b4_toggle = self._reg_read(TX_FEATURE) # derelict ACTIVATE command toggles bits in the TX_FEATURE register self._reg_write(0x50, 0x73) after_toggle = self._reg_read(TX_FEATURE) if b4_toggle == after_toggle: self._is_plus_variant = True if not after_toggle: # if features are disabled self._reg_write(0x50, 0x73) # ensure they're enabled # init shadow copy of last RX_ADDR_P0 written to pipe 0 needed as # open_tx_pipe() appropriates pipe 0 for ACK packet self._pipe0_read_addr = None # init shadow copy of register about FIFO info self._fifo = 0 # shadow copy of the TX_ADDRESS self._tx_address = self._reg_read_bytes(TX_ADDRESS) # pre-configure the SETUP_RETR register self._retry_setup = 0x53 # ard = 1500; arc = 3 # pre-configure the RF_SETUP register self._rf_setup = 0x07 # 1 Mbps data_rate, and 0 dbm pa_level # pre-configure dynamic_payloads & auto_ack for RX operations self._dyn_pl = 0x3F # 0x3F = enable dynamic_payloads on all pipes self._aa = 0x3F # 0x3F = enable auto_ack on all pipes # pre-configure features for TX operations: # 5 = enable dynamic_payloads, disable custom ack payloads, & # allow ask_no_ack command self._features = 5 self._channel = 76 # 2.476 GHz self._addr_len = 5 # 5-byte long addresses self._pl_len = [32] * 6 # 32-byte static payloads for all pipes with self: # dumps internal attributes to all registers self.flush_rx() self.flush_tx() self.clear_status_flags() def __enter__(self): self.ce_pin.value = False self._reg_write(CONFIGURE, self._config & 0x7C) self._reg_write(RF_PA_RATE, self._rf_setup) self._reg_write(OPEN_PIPES, self._open_pipes) self._reg_write(DYN_PL_LEN, self._dyn_pl) self._reg_write(AUTO_ACK, self._aa) self._reg_write(TX_FEATURE, self._features) self._reg_write(SETUP_RETR, self._retry_setup) for i, addr in enumerate(self._pipes): if i < 2: self._reg_write_bytes(RX_ADDR_P0 + i, addr) else: self._reg_write(RX_ADDR_P0 + i, addr) self._reg_write_bytes(TX_ADDRESS, self._tx_address) self._reg_write(0x05, self._channel) self._reg_write(0x03, self._addr_len - 2) self.payload_length = self._pl_len return self def __exit__(self, *exc): self.ce_pin.value = False self.power = False return False # pylint: disable=no-member def _reg_read(self, reg): out_buf = bytes([reg, 0]) in_buf = bytearray([0, 0]) with self._spi as spi: time.sleep(CSN_DELAY) spi.write_readinto(out_buf, in_buf) self._status = in_buf[0] return in_buf[1] def _reg_read_bytes(self, reg, buf_len=5): in_buf = bytearray(buf_len + 1) out_buf = bytes([reg]) + b"\x00" * buf_len with self._spi as spi: time.sleep(CSN_DELAY) spi.write_readinto(out_buf, in_buf) self._status = in_buf[0] return in_buf[1:] def _reg_write_bytes(self, reg, out_buf): out_buf = bytes([0x20 | reg]) + out_buf in_buf = bytearray(len(out_buf)) with self._spi as spi: time.sleep(CSN_DELAY) spi.write_readinto(out_buf, in_buf) self._status = in_buf[0] def _reg_write(self, reg, value=None): out_buf = bytes([reg]) if value is not None: out_buf = bytes([0x20 | reg, value]) in_buf = bytearray(len(out_buf)) with self._spi as spi: time.sleep(CSN_DELAY) spi.write_readinto(out_buf, in_buf) self._status = in_buf[0] # pylint: enable=no-member @property def address_length(self): """This `int` attribute specifies the length (in bytes) of addresses to be used for RX/TX pipes.""" return self._reg_read(0x03) + 2 @address_length.setter def address_length(self, length): if not 3 <= length <= 5: raise ValueError("address_length can only be set in range [3, 5] bytes") self._addr_len = int(length) self._reg_write(0x03, length - 2)
[docs] def open_tx_pipe(self, address): """This function is used to open a data pipe for OTA (over the air) TX transmissions.""" if self.arc: for i, val in enumerate(address): self._pipes[0][i] = val self._reg_write_bytes(RX_ADDR_P0, address) self._open_pipes = self._open_pipes | 1 self._reg_write(OPEN_PIPES, self._open_pipes) for i, val in enumerate(address): self._tx_address[i] = val self._reg_write_bytes(TX_ADDRESS, address)
[docs] def close_rx_pipe(self, pipe_number): """This function is used to close a specific data pipe from OTA (over the air) RX transmissions.""" if pipe_number < 0 or pipe_number > 5: raise IndexError("pipe number must be in range [0, 5]") self._open_pipes = self._reg_read(OPEN_PIPES) if self._open_pipes & (1 << pipe_number): self._open_pipes = self._open_pipes & ~(1 << pipe_number) self._reg_write(OPEN_PIPES, self._open_pipes)
[docs] def open_rx_pipe(self, pipe_number, address): """This function is used to open a specific data pipe for OTA (over the air) RX transmissions.""" if not 0 <= pipe_number <= 5: raise IndexError("pipe number must be in range [0, 5]") if not address: raise ValueError("address length cannot be 0") if pipe_number < 2: if not pipe_number: self._pipe0_read_addr = address for i, val in enumerate(address): self._pipes[pipe_number][i] = val self._reg_write_bytes(RX_ADDR_P0 + pipe_number, address) else: self._pipes[pipe_number] = address[0] self._reg_write(RX_ADDR_P0 + pipe_number, address[0]) self._open_pipes = self._reg_read(OPEN_PIPES) | (1 << pipe_number) self._reg_write(OPEN_PIPES, self._open_pipes)
@property def listen(self): """An attribute to represent the nRF24L01 primary role as a radio.""" return self.power and bool(self._config & 1) @listen.setter def listen(self, is_rx): if self.listen != bool(is_rx): self.ce_pin.value = 0 if is_rx: if self._pipe0_read_addr is not None: for i, val in enumerate(self._pipe0_read_addr): self._pipes[0][i] = val self._reg_write_bytes(RX_ADDR_P0, self._pipe0_read_addr) self._config = (self._config & 0xFC) | 3 self._reg_write(CONFIGURE, self._config) time.sleep(0.00015) # mandatory wait to power up radio self.flush_rx() self.clear_status_flags() self.ce_pin.value = 1 # mandatory pulse is > 130 µs time.sleep(0.00013) else: self._config = self._config & 0xFE self._reg_write(CONFIGURE, self._config) time.sleep(0.00016)
[docs] def any(self): """This function checks if the nRF24L01 has received any data at all, and then reports the next available payload's length (in bytes).""" if self.update() and self.pipe is not None: if self._features & 4: return self._reg_read(0x60) return self._pl_len[self.pipe] return 0
[docs] def recv(self, length=None): """This function is used to retrieve the next available payload in the RX FIFO buffer, then clears the `irq_dr` status flag.""" return_size = length if length is not None else self.any() if not return_size: return None result = self._reg_read_bytes(0x61, return_size) self.clear_status_flags(True, False, False) return result
[docs] def send(self, buf, ask_no_ack=False, force_retry=0, send_only=False): """This blocking function is used to transmit payload(s).""" self.ce_pin.value = 0 if isinstance(buf, (list, tuple)): result = [] for b in buf: result.append(self.send(b, ask_no_ack, force_retry, send_only)) return result self.flush_tx() if not send_only: self.flush_rx() self.write(buf, ask_no_ack) time.sleep(0.00001) self.ce_pin.value = 0 while not self._status & 0x70: self.update() result = self.irq_ds if self.irq_df: for _ in range(force_retry): result = self.resend(send_only) if result is None or result: break if self._status & 0x60 == 0x60 and not send_only: result = self.recv() self.clear_status_flags(False) return result
@property def tx_full(self): """An attribute to represent the nRF24L01's status flag signaling that the TX FIFO buffer is full. (read-only)""" return bool(self._status & 1) @property def pipe(self): """The identifying number of the data pipe that received the next available payload in the RX FIFO buffer. (read only)""" result = (self._status & 0x0E) >> 1 if result <= 5: return result return None @property def irq_dr(self): """A `bool` that represents the "Data Ready" interrupted flag. (read-only)""" return bool(self._status & 0x40) @property def irq_ds(self): """A `bool` that represents the "Data Sent" interrupted flag. (read-only)""" return bool(self._status & 0x20) @property def irq_df(self): """A `bool` that represents the "Data Failed" interrupted flag. (read-only)""" return bool(self._status & 0x10)
[docs] def clear_status_flags(self, data_recv=True, data_sent=True, data_fail=True): """This clears the interrupt flags in the status register.""" config = bool(data_recv) << 6 | bool(data_sent) << 5 | bool(data_fail) << 4 self._reg_write(7, config)
[docs] def interrupt_config(self, data_recv=True, data_sent=True, data_fail=True): """Sets the configuration of the nRF24L01's IRQ pin. (write-only)""" self._config = (self._reg_read(CONFIGURE) & 0x0F) | (not data_recv) << 6 self._config |= (not data_fail) << 4 | (not data_sent) << 5 self._reg_write(CONFIGURE, self._config)
[docs] def what_happened(self, dump_pipes=False): """This debuggung function aggregates and outputs all status/condition related information from the nRF24L01.""" observer = self._reg_read(8) print("Is a plus variant_________{}".format(self.is_plus_variant)) print( "Channel___________________{} ~ {} GHz".format( self.channel, (self.channel + 2400) / 1000 ) ) print( "RF Data Rate______________{} {}".format( self.data_rate, "Mbps" if self.data_rate != 250 else "Kbps" ) ) print("RF Power Amplifier________{} dbm".format(self.pa_level)) print( "RF Low Noise Amplifier____{}".format( "Enabled" if self.is_lna_enabled else "Disabled" ) ) print("CRC bytes_________________{}".format(self.crc)) print("Address length____________{} bytes".format(self.address_length)) print("TX Payload lengths________{} bytes".format(self.payload_length[0])) print("Auto retry delay__________{} microseconds".format(self.ard)) print("Auto retry attempts_______{} maximum".format(self.arc)) print("Re-use TX FIFO____________{}".format(bool(self._reg_read(0x17) & 64))) print( "Packets lost on current channel_____________________{}".format( (observer & 0xF0) >> 4 ) ) print( "Retry attempts made for last transmission___________{}".format( observer & 0x0F ) ) print( "IRQ - Data Ready______{} Data Ready___________{}".format( "_True" if not bool(self._config & 0x40) else "False", self.irq_dr ) ) print( "IRQ - Data Fail_______{} Data Failed__________{}".format( "_True" if not bool(self._config & 0x10) else "False", self.irq_df ) ) print( "IRQ - Data Sent_______{} Data Sent____________{}".format( "_True" if not bool(self._config & 0x20) else "False", self.irq_ds ) ) print( "TX FIFO full__________{} TX FIFO empty________{}".format( "_True" if bool(self.tx_full) else "False", bool(self.fifo(True, True)) ) ) print( "RX FIFO full__________{} RX FIFO empty________{}".format( "_True" if bool(self._fifo & 2) else "False", bool(self._fifo & 1) ) ) print( "Ask no ACK_________{} Custom ACK Payload___{}".format( "_Allowed" if bool(self._features & 1) else "Disabled", "Enabled" if self.ack else "Disabled", ) ) print( "Dynamic Payloads___{} Auto Acknowledgment__{}".format( "_Enabled" if self._dyn_pl == 0x3F else ( bin(self._dyn_pl).replace( "0b", "0b" + "0" * (8 - len(bin(self._dyn_pl))) ) if self._dyn_pl else "Disabled" ), "Enabled" if self._aa == 0x3F else ( bin(self._aa).replace("0b", "0b" + "0" * (8 - len(bin(self._aa)))) if self._aa else "Disabled" ), ) ) print( "Primary Mode_____________{} Power Mode___________{}".format( "RX" if self.listen else "TX", ("Standby-II" if self.ce_pin.value else "Standby-I") if self._config & 2 else "Off", ) ) if dump_pipes: print("TX address____________", self.address()) self._open_pipes = self._reg_read(OPEN_PIPES) for i in range(6): is_open = self._open_pipes & (1 << i) print( "Pipe", i, "( open )" if is_open else "(closed)", "bound:", self.address(i), ) if is_open: print("\t\texpecting", self._pl_len[i], "byte static payloads")
@property def is_plus_variant(self): """A `bool` attribute to descibe if the nRF24L01 is a plus variant or not (read-only).""" return self._is_plus_variant @property def dynamic_payloads(self): """This `bool` attribute controls the nRF24L01's dynamic payload length feature for each pipe.""" self._dyn_pl = self._reg_read(DYN_PL_LEN) self._features = self._reg_read(TX_FEATURE) return bool(self._dyn_pl) and self._features & 4 == 4 @dynamic_payloads.setter def dynamic_payloads(self, enable): self._features = self._reg_read(TX_FEATURE) if isinstance(enable, (bool, int)): self._dyn_pl = 0x3F if enable else 0 elif isinstance(enable, (list, tuple)): for i, val in enumerate(enable): if i < 6 and val >= 0: # skip pipe if val is negative self._dyn_pl = (self._dyn_pl & ~(1 << i)) | (bool(val) << i) else: raise ValueError("dynamic_payloads: {} is an invalid input" % enable) if bool(self._features & 4) != (self._dyn_pl & 1): self._features = (self._features & 3) | ((self._dyn_pl & 1) << 2) self._reg_write(TX_FEATURE, self._features) if self._dyn_pl != (self._aa & self._dyn_pl): self._aa |= self._dyn_pl self._reg_write(AUTO_ACK, self._aa) self._config = self._reg_read(CONFIGURE) self._reg_write(DYN_PL_LEN, self._dyn_pl) @property def payload_length(self): """This `int` attribute specifies the length (in bytes) of static payloads for each pipe.""" return self._pl_len @payload_length.setter def payload_length(self, length): if isinstance(length, int): length = [length] * 6 elif not isinstance(length, (list, tuple)): raise ValueError("length {} is not a valid input".format(length)) for i, val in enumerate(length): if i < 6: if 0 < val <= 32: # don't throw exception, just skip pipe self._pl_len[i] = val self._reg_write(RX_PL_LENG + i, val) @property def arc(self): """This `int` attribute specifies the nRF24L01's number of attempts to re-transmit TX payload when acknowledgment packet is not received. """ self._retry_setup = self._reg_read(SETUP_RETR) return self._retry_setup & 0x0F @arc.setter def arc(self, count): if not 0 <= count <= 15: raise ValueError("automatic re-transmit count must in range [0, 15]") self._retry_setup = (self._retry_setup & 0xF0) | count self._reg_write(SETUP_RETR, self._retry_setup) @property def ard(self): """This `int` attribute specifies the nRF24L01's delay (in microseconds) between attempts to automatically re-transmit the TX payload when an expected acknowledgement (ACK) packet is not received.""" self._retry_setup = self._reg_read(SETUP_RETR) return ((self._retry_setup & 0xF0) >> 4) * 250 + 250 @ard.setter def ard(self, delta): if not 250 <= delta <= 4000: raise ValueError("automatic re-transmit delay must be in range [250, 4000]") self._retry_setup = (self._retry_setup & 15) | int((delta - 250) / 250) << 4 self._reg_write(SETUP_RETR, self._retry_setup) @property def auto_ack(self): """This `bool` attribute controls the nRF24L01's automatic acknowledgment feature during the process of receiving a packet.""" self._aa = self._reg_read(AUTO_ACK) return bool(self._aa) @auto_ack.setter def auto_ack(self, enable): if isinstance(enable, (bool, int)): self._aa = 0x3F if enable else 0 elif isinstance(enable, (list, tuple)): for i, val in enumerate(enable): if i < 6 and val >= 0: # skip pipe if val is negative self._aa = (self._aa & ~(1 << i)) | (bool(val) << i) else: raise ValueError("auto_ack: {} is not a valid input" % enable) self._reg_write(AUTO_ACK, self._aa) if self._aa: # refresh crc data if enabled self._config = self._reg_read(CONFIGURE) @property def ack(self): """This `bool` attribute represents the status of the nRF24L01's capability to use custom payloads as part of the automatic acknowledgment (ACK) packet.""" self._aa = self._reg_read(AUTO_ACK) self._dyn_pl = self._reg_read(DYN_PL_LEN) self._features = self._reg_read(TX_FEATURE) return bool((self._features & 6) == 6 and ((self._aa & self._dyn_pl) & 1)) @ack.setter def ack(self, enable): if self.ack != bool(enable): self.auto_ack = (1,) self._dyn_pl = self._dyn_pl & ~1 | 1 self._reg_write(DYN_PL_LEN, self._dyn_pl) self._features = self._features & 3 | 4 self._features = self._features & 5 | bool(enable) << 1 self._reg_write(TX_FEATURE, self._features)
[docs] def load_ack(self, buf, pipe_number): """This allows the MCU to specify a payload to be allocated into the TX FIFO buffer for use on a specific data pipe.""" if pipe_number < 0 or pipe_number > 5: raise IndexError("pipe_number must be in range [0, 5]") if not buf or len(buf) > 32: raise ValueError("payload must have a byte length in range [1, 32]") if not bool((self._features & 6) == 6 and ((self._aa & self._dyn_pl) & 1)): self.ack = True if not self.tx_full: self._reg_write_bytes(0xA8 | pipe_number, buf) return True return False
[docs] def read_ack(self): """Allows user to read the automatic acknowledgement (ACK) payload (if any).""" return self.recv()
@property def data_rate(self): """This `int` attribute specifies the nRF24L01's frequency data rate for OTA (over the air) transmissions.""" self._rf_setup = self._reg_read(RF_PA_RATE) rf_setup = self._rf_setup & 0x28 return (2 if rf_setup == 8 else 250) if rf_setup else 1 @data_rate.setter def data_rate(self, speed): if not speed in (1, 2, 250): raise ValueError("data_rate must be 1 (Mbps), 2 (Mbps), or 250 (kbps)") if self.is_plus_variant and speed == 250: raise NotImplementedError( "250 kbps data rate is not available for the non-plus " "variants of the nRF24L01 transceivers." ) if self.data_rate != speed: speed = 0 if speed == 1 else (0x20 if speed != 2 else 8) self._rf_setup = self._rf_setup & 0xD7 | speed self._reg_write(RF_PA_RATE, self._rf_setup) @property def channel(self): """This `int` attribute specifies the nRF24L01's frequency.""" return self._reg_read(5) @channel.setter def channel(self, channel): if not 0 <= int(channel) <= 125: raise ValueError("channel can only be set in range [0, 125]") self._channel = int(channel) self._reg_write(5, self._channel) @property def crc(self): """This `int` attribute specifies the nRF24L01's CRC (cyclic redundancy checking) encoding scheme in terms of byte length.""" self._config = self._reg_read(CONFIGURE) return max(0, ((self._config & 0x0C) >> 2) - 1) @crc.setter def crc(self, length): if not 0 <= length <= 2: raise ValueError("CRC byte length must be an int equal to 0 (off), 1, or 2") if self.crc != length: length = (length + 1) << 2 if length else 0 self._config = self._config & 0x73 | length self._reg_write(0, self._config) @property def power(self): """This `bool` attribute controls the power state of the nRF24L01.""" self._config = self._reg_read(CONFIGURE) return bool(self._config & 2) @power.setter def power(self, is_on): self._config = self._reg_read(CONFIGURE) if self.power != bool(is_on): self._config = self._config & 0x7D | bool(is_on) << 1 self._reg_write(CONFIGURE, self._config) time.sleep(0.00016) @property def pa_level(self): """This `int` attribute specifies the nRF24L01's power amplifier level (in dBm).""" self._rf_setup = self._reg_read(RF_PA_RATE) return (3 - ((self._rf_setup & 6) >> 1)) * -6 @pa_level.setter def pa_level(self, power): lna_bit = True if isinstance(power, (list, tuple)) and len(power) > 1: lna_bit, power = bool(power[1]), int(power[0]) if power not in (-18, -12, -6, 0): raise ValueError("pa_level must be -18, -12, -6, or 0 (in dBm)") power = (3 - int(power / -6)) * 2 self._rf_setup = (self._rf_setup & 0xF8) | power | lna_bit self._reg_write(RF_PA_RATE, self._rf_setup) @property def is_lna_enabled(self): """A read-only `bool` attribute about the LNA (Low Noise Amplifier) gain feature.""" self._rf_setup = self._reg_read(RF_PA_RATE) return bool(self._rf_setup & 1)
[docs] def update(self): """This function is only used to get an updated status byte over SPI from the nRF24L01.""" self._reg_write(0xFF) return True
[docs] def resend(self, send_only=False): """Use this function to maunally re-send the previous payload in the top level (first out) of the TX FIFO buffer.""" result = False if not self.fifo(True, True): self.ce_pin.value = 0 if not send_only: self.flush_rx() self.clear_status_flags() self._reg_write(0xE3) self.ce_pin.value = 1 time.sleep(0.00001) self.ce_pin.value = 0 while not self._status & 0x70: self.update() result = self.irq_ds if self._status & 0x60 == 0x60 and not send_only: result = self.recv() self.clear_status_flags(False) return result
[docs] def write(self, buf, ask_no_ack=False, write_only=False): """This non-blocking function (when used as alternative to `send()`) is meant for asynchronous applications and can only handle one payload at a time as it is a helper function to `send()`.""" if not buf or len(buf) > 32: raise ValueError("buffer must have a length in range [1, 32]") self.clear_status_flags() if self.tx_full: return False if self._config & 3 != 2: # is radio powered up in TX mode? self._config = (self._reg_read(CONFIGURE) & 0x7C) | 2 self._reg_write(CONFIGURE, self._config) time.sleep(0.00016) if not bool((self._dyn_pl & 1) and (self._features & 4)): if len(buf) < self._pl_len[0]: buf += b"\x00" * (self._pl_len[0] - len(buf)) elif len(buf) > self._pl_len[0]: buf = buf[: self._pl_len[0]] if ask_no_ack: if self._features & 1 == 0: self._features = self._features & 0xFE | 1 self._reg_write(TX_FEATURE, self._features) self._reg_write_bytes(0xA0 | (bool(ask_no_ack) << 4), buf) if not write_only: self.ce_pin.value = 1 return True
[docs] def flush_rx(self): """A helper function to flush the nRF24L01's RX FIFO buffer.""" self._reg_write(0xE2)
[docs] def flush_tx(self): """A helper function to flush the nRF24L01's TX FIFO buffer.""" self._reg_write(0xE1)
[docs] def fifo(self, about_tx=False, check_empty=None): """This provides *some* precision determining the status of the TX/RX FIFO buffers. (read-only)""" self._fifo, about_tx = (self._reg_read(0x17), bool(about_tx)) if check_empty is None: return (self._fifo & (0x30 if about_tx else 0x03)) >> (4 * about_tx) return bool(self._fifo & ((2 - bool(check_empty)) << (4 * about_tx)))
[docs] def address(self, index=-1): """Returns the current address set to a specified data pipe or the TX address. (read-only)""" if index > 5: raise IndexError("index {} is out of bounds [0,5]".format(index)) if index < 0: return self._tx_address if index <= 1: return self._pipes[index] return bytes([self._pipes[index]]) + self._pipes[1][1:]
@property def rpd(self): """This read-only attribute returns `True` if RPD (Received Power Detector) is triggered or `False` if not triggered.""" return bool(self._reg_read(0x09))
[docs] def start_carrier_wave(self): """Starts a continuous carrier wave test.""" self.power = 0 self.ce_pin.value = 0 self.power = 1 self.listen = 0 self._rf_setup |= 0x90 self._reg_write(RF_PA_RATE, self._rf_setup) if not self.is_plus_variant: self.auto_ack = False self._retry_setup = 0 self._reg_write(SETUP_RETR, self._retry_setup) self._tx_address = bytearray([0xFF] * 5) self._reg_write_bytes(TX_ADDRESS, self._tx_address) self._reg_write_bytes(0xA0, b"\xFF" * 32) self.crc = 0 self.ce_pin.value = 1 time.sleep(0.001) self.ce_pin.value = 0 while self._status & 0x70: self.update() self._reg_write(0x17, 0x40) self.ce_pin.value = 1
[docs] def stop_carrier_wave(self): """Stops a continuous carrier wave test.""" self.ce_pin.value = 0 self.power = 0 self._rf_setup &= ~0x90 self._reg_write(RF_PA_RATE, self._rf_setup)