Simple test¶
Ensure your device works with this simple test.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 | """
Simple example of library usage.
"""
import time
import struct
import board
import digitalio as dio
from circuitpython_nrf24l01 import RF24
# addresses needs to be in a buffer protocol object (bytearray)
address = b'1Node'
# change these (digital output) pins accordingly
ce = dio.DigitalInOut(board.D4)
csn = dio.DigitalInOut(board.D5)
# using board.SPI() automatically selects the MCU's
# available SPI pins, board.SCK, board.MOSI, board.MISO
spi = board.SPI() # init spi bus object
# we'll be using the dynamic payload size feature (enabled by default)
# initialize the nRF24L01 on the spi bus object
nrf = RF24(spi, csn, ce)
def master(count=5): # count = 5 will only transmit 5 packets
"""Transmits an incrementing integer every second"""
# set address of RX node into a TX pipe
nrf.open_tx_pipe(address)
# ensures the nRF24L01 is in TX mode
nrf.listen = False
while count:
# use struct.pack to packetize your data
# into a usable payload
buffer = struct.pack('<i', count)
# 'i' means a single 4 byte int value.
# '<' means little endian byte order. this may be optional
print("Sending: {} as struct: {}".format(count, buffer))
now = time.monotonic() * 1000 # start timer
result = nrf.send(buffer)
if result is None:
print('send() timed out')
elif not result:
print('send() failed')
else:
print('send() successful')
# print timer results despite transmission success
print('Transmission took',
time.monotonic() * 1000 - now, 'ms')
time.sleep(1)
count -= 1
def slave(count=3):
"""Polls the radio and prints the received value. This method expires
after 6 seconds of no received transmission"""
# set address of TX node into an RX pipe. NOTE you MUST specify
# which pipe number to use for RX, we'll be using pipe 0
# pipe number options range [0,5]
# the pipe numbers used during a transition don't have to match
nrf.open_rx_pipe(0, address)
nrf.listen = True # put radio into RX mode and power up
start = time.monotonic()
while count and (time.monotonic() - start) < 6:
if nrf.any():
# print details about the received packet (if any)
print("Found {} bytes on pipe {}\
".format(repr(nrf.any()), nrf.pipe()))
# retreive the received packet's payload
rx = nrf.recv() # clears flags & empties RX FIFO
# expecting an int, thus the string format '<i'
buffer = struct.unpack('<i', rx)
# print the only item in the resulting tuple from
# using `struct.unpack()`
print("Received: {}, Raw: {}".format(buffer[0], repr(rx)))
start = time.monotonic()
count -= 1
# this will listen indefinitely till count == 0
time.sleep(0.25)
# recommended behavior is to keep in TX mode while idle
nrf.listen = False # put the nRF24L01 is in TX mode
print("""\
nRF24L01 Simple test.\n\
Run slave() on receiver\n\
Run master() on transmitter""")
|
ACK Payloads Example¶
This is a test to show how to use custom acknowledgment payloads.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 | """
Simple example of using the library to transmit
and retrieve custom automatic acknowledgment payloads.
"""
import time
import board
import digitalio as dio
from circuitpython_nrf24l01 import RF24
# change these (digital output) pins accordingly
ce = dio.DigitalInOut(board.D4)
csn = dio.DigitalInOut(board.D5)
# using board.SPI() automatically selects the MCU's
# available SPI pins, board.SCK, board.MOSI, board.MISO
spi = board.SPI() # init spi bus object
# we'll be using the dynamic payload size feature (enabled by default)
# the custom ACK payload feature is disabled by default
# the custom ACK payload feature should not be enabled
# during instantiation due to its singular use nature
# meaning 1 ACK payload per 1 RX'd payload
nrf = RF24(spi, csn, ce)
# NOTE the the custom ACK payload feature will be enabled
# automatically when you call load_ack() passing:
# a buffer protocol object (bytearray) of
# length ranging [1,32]. And pipe number always needs
# to be an int ranging [0,5]
# to enable the custom ACK payload feature
nrf.ack = True # False disables again
# addresses needs to be in a buffer protocol object (bytearray)
address = b'1Node'
# payloads need to be in a buffer protocol object (bytearray)
tx = b'Hello '
# NOTE ACK payloads (like regular payloads and addresses)
# need to be in a buffer protocol object (bytearray)
ACK = b'World '
def master(count=5): # count = 5 will only transmit 5 packets
"""Transmits a dummy payload every second and prints the ACK payload"""
# recommended behavior is to keep in TX mode while idle
nrf.listen = False # put radio in TX mode
# set address of RX node into a TX pipe
nrf.open_tx_pipe(address)
while count:
buffer = tx + bytes([count + 48]) # output buffer
print("Sending (raw): {}".format(repr(buffer)))
# to read the ACK payload during TX mode we
# pass the parameter read_ack as True.
nrf.ack = True # enable feature before send()
now = time.monotonic() * 1000 # start timer
result = nrf.send(buffer) # becomes the response buffer
if result is None:
print('send() timed out')
elif not result:
print('send() failed')
else:
# print the received ACK that was automatically
# fetched and saved to "buffer" via send()
print('raw ACK: {}'.format(repr(result)))
# the ACK payload should now be in buffer
# print timer results despite transmission success
print('Transmission took',
time.monotonic() * 1000 - now, 'ms')
time.sleep(1)
count -= 1
def slave(count=3):
"""Prints the received value and sends a dummy ACK payload"""
# set address of TX node into an RX pipe. NOTE you MUST specify
# which pipe number to use for RX, we'll be using pipe 0
nrf.open_rx_pipe(0, address)
# put radio into RX mode, power it up, and set the first
# transmission's ACK payload and pipe number
nrf.listen = True
buffer = ACK + bytes([count + 48])
# we must set the ACK payload data and corresponding
# pipe number [0,5]
nrf.load_ack(buffer, 0) # load ACK for first response
start = time.monotonic()
while count and (time.monotonic() - start) < (count * 2):
if nrf.any():
# this will listen indefinitely till count == 0
count -= 1
# print details about the received packet (if any)
print("Found {} bytes on pipe {}\
".format(repr(nrf.any()), nrf.pipe()))
# retreive the received packet's payload
rx = nrf.recv() # clears flags & empties RX FIFO
print("Received (raw): {}".format(repr(rx)))
start = time.monotonic()
if count: # Going again?
# build new ACK
buffer = ACK + bytes([count + 48])
# load ACK for next response
nrf.load_ack(buffer, 0)
# recommended behavior is to keep in TX mode while idle
nrf.listen = False # put radio in TX mode
nrf.flush_tx() # flush any ACK payload
print("""\
nRF24L01 ACK test\n\
Run slave() on receiver\n\
Run master() on transmitter""")
|
IRQ Pin Example¶
This is a test to show how to use nRF24L01’s interrupt pin.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 | """
Simple example of detecting (and verifying) the IRQ
interrupt pin on the nRF24L01
"""
import time
import board
import digitalio as dio
from circuitpython_nrf24l01 import RF24
# address needs to be in a buffer protocol object (bytearray is preferred)
address = b'1Node'
# select your digital input pin that's connected to the IRQ pin on the nRF4L01
irq = dio.DigitalInOut(board.D4)
irq.switch_to_input() # make sure its an input object
# change these (digital output) pins accordingly
ce = dio.DigitalInOut(board.D4)
csn = dio.DigitalInOut(board.D5)
# using board.SPI() automatically selects the MCU's
# available SPI pins, board.SCK, board.MOSI, board.MISO
spi = board.SPI() # init spi bus object
# we'll be using the dynamic payload size feature (enabled by default)
# initialize the nRF24L01 on the spi bus object
nrf = RF24(spi, csn, ce)
nrf.arc = 15 # turn up automatic retries to the max. default is 3
def master(timeout=5): # will only wait 5 seconds for slave to respond
"""Transmits once, receives once, and intentionally fails a transmit"""
# set address of RX node into a TX pipe
nrf.open_tx_pipe(address)
# ensures the nRF24L01 is in TX mode
nrf.listen = 0
# on data sent test
print("Pinging: enslaved nRF24L01 without auto_ack")
nrf.write(b'ping')
time.sleep(0.00001) # mandatory 10 microsecond pulse starts transmission
nrf.ce.value = 0 # end 10 us pulse; now in active TX
while not nrf.irq_DS and not nrf.irq_DF:
nrf.update() # updates the current status on IRQ flags
if nrf.irq_DS and not irq.value:
print('interrupt on data sent successful')
else:
print(
'IRQ on data sent is not active, check your wiring and call interrupt_config()')
nrf.clear_status_flags() # clear all flags for next test
# on data ready test
nrf.listen = 1
nrf.open_rx_pipe(0, address)
start = time.monotonic()
while not nrf.any() and time.monotonic() - start < timeout: # wait for slave to send
pass
if nrf.any():
print('Pong received')
if nrf.irq_DR and not irq.value:
print('interrupt on data ready successful')
else:
print(
'IRQ on data ready is not active, check your wiring and call interrupt_config()')
nrf.flush_rx()
else:
print('pong reception timed out!. make sure to run slave() on the other nRF24L01')
nrf.clear_status_flags() # clear all flags for next test
# on data fail test
nrf.listen = False # put the nRF24L01 is in TX mode
# the writing pipe should still be open since we didn't call close_tx_pipe()
nrf.flush_tx() # just in case the previous "on data sent" test failed
nrf.write(b'dummy') # slave isn't listening anymore
time.sleep(0.00001) # mandatory 10 microsecond pulse starts transmission
nrf.ce.value = 0 # end 10 us pulse; now in active TX
while not nrf.irq_DS and not nrf.irq_DF: # these attributes don't update themselves
nrf.update() # updates the current status on all IRQ flags (irq_DR, irq_DF, irq_DS)
if nrf.irq_DF and not irq.value:
print('interrupt on data fail successful')
else:
print(
'IRQ on data fail is not active, check your wiring and call interrupt_config()')
nrf.clear_status_flags() # clear all flags for next test
def slave(timeout=10): # will listen for 10 seconds before timing out
"""Acts as a ponging RX node to successfully complete the tests on the master"""
# setup radio to recieve ping
nrf.open_rx_pipe(0, address)
nrf.listen = 1
start = time.monotonic()
while not nrf.any() and time.monotonic() - start < timeout:
pass # nrf.any() also updates the status byte on every call
if nrf.any():
print("ping received. sending pong now.")
else:
print('listening timed out, please try again')
nrf.flush_rx()
nrf.listen = 0
nrf.open_tx_pipe(address)
nrf.send(b'pong') # send a payload to complete the on data ready test
# we're done on this side
print("""\
nRF24L01 Interrupt test\n\
Run master() to run IRQ pin tests\n\
Run slave() on the non-testing nRF24L01 to complete the test successfully""")
|
Stream Example¶
This is a test to show how to use the send() to transmit multiple payloads.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | """
Example of library usage for streaming multiple payloads.
"""
import time
import board
import digitalio as dio
from circuitpython_nrf24l01 import RF24
# addresses needs to be in a buffer protocol object (bytearray)
address = b'1Node'
# change these (digital output) pins accordingly
ce = dio.DigitalInOut(board.D4)
csn = dio.DigitalInOut(board.D5)
# using board.SPI() automatically selects the MCU's
# available SPI pins, board.SCK, board.MOSI, board.MISO
spi = board.SPI() # init spi bus object
# we'll be using the dynamic payload size feature (enabled by default)
# initialize the nRF24L01 on the spi bus object
nrf = RF24(spi, csn, ce)
# lets create a list of payloads to be streamed to the nRF24L01 running slave()
buffers = []
SIZE = 32 # we'll use SIZE for the number of payloads in the list and the payloads' length
for i in range(SIZE):
buff = b''
for j in range(SIZE):
buff += bytes([(j >= SIZE / 2 + abs(SIZE / 2 - i) or j <
SIZE / 2 - abs(SIZE / 2 - i)) + 48])
buffers.append(buff)
del buff
def master(count=1): # count = 5 will transmit the list 5 times
"""Transmits a massive buffer of payloads"""
# set address of RX node into a TX pipe
nrf.open_tx_pipe(address)
# ensures the nRF24L01 is in TX mode
nrf.listen = False
success_percentage = 0
for _ in range(count):
now = time.monotonic() * 1000 # start timer
result = nrf.send(buffers)
print('Transmission took', time.monotonic() * 1000 - now, 'ms')
for r in result:
success_percentage += 1 if r else 0
success_percentage /= SIZE * count
print('successfully sent', success_percentage * 100, '%')
def slave(timeout=5):
"""Stops listening after timeout with no response"""
# set address of TX node into an RX pipe. NOTE you MUST specify
# which pipe number to use for RX, we'll be using pipe 0
# pipe number options range [0,5]
# the pipe numbers used during a transition don't have to match
nrf.open_rx_pipe(0, address)
nrf.listen = True # put radio into RX mode and power up
count = 0
now = time.monotonic() # start timer
while time.monotonic() < now + timeout:
if nrf.any():
count += 1
# retreive the received packet's payload
rx = nrf.recv() # clears flags & empties RX FIFO
print("Received (raw): {} - {}".format(repr(rx), count))
now = time.monotonic()
# recommended behavior is to keep in TX mode while idle
nrf.listen = False # put the nRF24L01 is in TX mode
print("""\
nRF24L01 Stream test\n\
Run slave() on receiver\n\
Run master() on transmitter""")
|
Context Example¶
This is a test to show how to use “with” statements to manage multiple different nRF24L01 configurations on 1 transceiver.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 | """
Simple example of library usage in context.
This will not transmit anything, but rather
display settings after changing contexts ( & thus configurations)
"""
import board
import digitalio as dio
from circuitpython_nrf24l01 import RF24
# change these (digital output) pins accordingly
ce = dio.DigitalInOut(board.D4)
csn = dio.DigitalInOut(board.D5)
# using board.SPI() automatically selects the MCU's
# available SPI pins, board.SCK, board.MOSI, board.MISO
spi = board.SPI() # init spi bus object
# initialize the nRF24L01 objects on the spi bus object
nrf = RF24(spi, csn, ce, ack=True)
# the first object will have all the features enabled
# including the option to use custom ACK payloads
# the second object has most features disabled/altered
# disabled dynamic_payloads, but still using enabled auto_ack
# the IRQ pin is configured to only go active on "data fail"
# using a different channel: 2 (default is 76)
# CRC is set to 1 byte long
# data rate is set to 2 Mbps
# payload length is set to 8 bytes
# NOTE address length is set to 3 bytes
# RF power amplifier is set to -12 dbm
# automatic retry attempts is set to 15 (maximum allowed)
# automatic retry delay (between attempts) is set to 1000 microseconds
basicRF = RF24(spi, csn, ce,
dynamic_payloads=False, irq_DR=False, irq_DS=False,
channel=2, crc=1, data_rate=2, payload_length=8,
address_length=3, pa_level=-12, ard=1000, arc=15)
print("\nsettings configured by the nrf object")
with nrf:
nrf.open_rx_pipe(5, b'1Node') # NOTE we do this inside the "with" block
# only the first character gets written because it is on a pipe_number > 1
# NOTE if opening pipes outside of the "with" block, you may encounter
# conflicts in the differences between address_length attributes.
# the address_length attribute must equal the length of addresses
# display current settings of the nrf object
nrf.what_happened(True) # True dumps pipe info
print("\nsettings configured by the basicRF object")
with basicRF as nerf: # the "as nerf" part is optional
nerf.open_rx_pipe(2, b'SOS') # again only uses the first character
nerf.what_happened(1)
# if you examine the outputs from what_happened() you'll see:
# pipe 5 is opened using the nrf object, but closed using the basicRF object.
# pipe 2 is closed using the nrf object, but opened using the basicRF object.
# this is because the "with" statements load the existing settings
# for the RF24 object specified after the word "with".
# the things that remain consistent despite the use of "with"
# statements includes the power mode (standby or sleep), and
# primary role (RX/TX mode)
# NOTE this library uses the adresses' reset values and closes all pipes upon
# instantiation
|
Working with TMRh20’s Arduino library¶
This test is meant to prove compatibility with the popular Arduino library for the nRF24L01 by TMRh20 (available for install via the Arduino IDE’s Library Manager). The following code has been designed/test with the TMRh20 library example named “GettingStarted_HandlingData.ino”.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 | """
Example of library driving the nRF24L01 to communicate with a nRF24L01 driven by
the TMRh20 Arduino library. The Arduino program/sketch that this example was
designed for is named GettingStarted_HandlingData.ino and can be found in the "RF24"
examples after the TMRh20 library is installed from the Arduino Library Manager.
"""
import time
import struct
import board
import digitalio as dio
from circuitpython_nrf24l01 import RF24
# addresses needs to be in a buffer protocol object (bytearray)
address = [b'1Node', b'2Node']
# change these (digital output) pins accordingly
ce = dio.DigitalInOut(board.D4)
csn = dio.DigitalInOut(board.D5)
# using board.SPI() automatically selects the MCU's
# available SPI pins, board.SCK, board.MOSI, board.MISO
spi = board.SPI() # init spi bus object
# initialize the nRF24L01 on the spi bus object
nrf = RF24(spi, csn, ce, ask_no_ack=False)
nrf.dynamic_payloads = False # this is the default in the TMRh20 arduino library
# set address of TX node into a RX pipe
nrf.open_rx_pipe(1, address[1])
# set address of RX node into a TX pipe
nrf.open_tx_pipe(address[0])
def master(count=5): # count = 5 will only transmit 5 packets
"""Transmits an arbitrary unsigned long value every second. This method
will only try to transmit (count) number of attempts"""
# for the "HandlingData" part of the test from the TMRh20 library example
float_value = 0.01
while count:
nrf.listen = False # ensures the nRF24L01 is in TX mode
print("Now Sending")
start_timer = int(time.monotonic() * 1000) # start timer
# use struct.pack to packetize your data into a usable payload
# '<' means little endian byte order.
# 'L' means a single 4 byte unsigned long value.
# 'f' means a single 4 byte float value.
buffer = struct.pack('<Lf', start_timer, float_value)
result = nrf.send(buffer)
if result is None:
print('send() timed out')
elif not result:
print('send() failed')
else:
nrf.listen = True # get radio ready to receive a response
timeout = True # used to determine if response timed out
while time.monotonic() * 1000 - start_timer < 200:
# the arbitrary 200 ms timeout value is also used in the TMRh20 example
if nrf.any():
end_timer = time.monotonic() * 1000 # end timer
rx = nrf.recv()
rx = struct.unpack('<Lf', rx[:8])
timeout = False # skips timeout prompt
# print total time to send and receive data
print('Sent', struct.unpack('<Lf', buffer), 'Got Response:', rx)
print('Round-trip delay:', end_timer - start_timer, 'ms')
float_value = rx[1] # save float value for next iteration
break
if timeout:
print("failed to get a response; timed out")
count -= 1
time.sleep(1)
def slave(count=3):
"""Polls the radio and prints the received value. This method expires
after 6 seconds of no received transmission"""
start_timer = time.monotonic()
while count and (time.monotonic() - start_timer) < 6:
nrf.listen = True # put radio into RX mode and power up
if nrf.any():
# retreive the received packet's payload
buffer = nrf.recv() # clears flags & empties RX FIFO
# increment floating value as part of the "HandlingData" test
float_value = struct.unpack('<f', buffer[4:8])[0] + 0.01
nrf.listen = False # ensures the nRF24L01 is in TX mode
start_timer = time.monotonic() # in seconds
# echo buffer[:4] appended with incremented float
result = nrf.send(buffer[:4] + struct.pack('<f', float_value))
end_timer = time.monotonic() * 1000 # in milliseconds
# expecting an unsigned long & a float, thus the string format '<Lf'
rx = struct.unpack('<Lf', buffer[:8]) # "[:8]" ignores the padded 0s
# print the unsigned long and float data sent in the response
print("Responding: {}, {}".format(rx[0], rx[1] + 0.01))
if result is None:
print('response timed out')
elif not result:
print('response failed')
else:
# print timer results on transmission success
print('successful response took', end_timer - start_timer * 1000, 'ms')
# this will listen indefinitely till counter == 0
count -= 1
# recommended behavior is to keep in TX mode when in idle
nrf.listen = False # put the nRF24L01 in TX mode + Standby-I power state
print("""\
nRF24L01 communicating with an Arduino running the\n\
TMRh20 library's "GettingStarted_HandlingData.ino" example.\n\
Run slave() on receiver\n\
Run master() on transmitter""")
|