nRF24L01 Features

Simple test

Ensure your device works with this simple test.

examples/nrf24l01_simple_test.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
"""
Simple example of using the RF24 class.
"""
import time
import struct
import board
import digitalio as dio
# if running this on a ATSAMD21 M0 based board
# from circuitpython_nrf24l01.rf24_lite import RF24
from circuitpython_nrf24l01.rf24 import RF24

# addresses needs to be in a buffer protocol object (bytearray)
address = b"1Node"

# change these (digital output) pins accordingly
ce = dio.DigitalInOut(board.D4)
csn = dio.DigitalInOut(board.D5)

# using board.SPI() automatically selects the MCU's
# available SPI pins, board.SCK, board.MOSI, board.MISO
spi = board.SPI()  # init spi bus object

# we'll be using the dynamic payload size feature (enabled by default)
# initialize the nRF24L01 on the spi bus object
nrf = RF24(spi, csn, ce)

# set the Power Amplifier level to -12 dBm since this test example is
# usually run with nRF24L01 transceivers in close proximity
nrf.pa_level = -12


def master(count=5):  # count = 5 will only transmit 5 packets
    """Transmits an incrementing integer every second"""
    # set address of RX node into a TX pipe
    nrf.open_tx_pipe(address)
    # ensures the nRF24L01 is in TX mode
    nrf.listen = False

    while count:
        # use struct.pack to packetize your data
        # into a usable payload
        buffer = struct.pack("<i", count)
        # 'i' means a single 4 byte int value.
        # '<' means little endian byte order. this may be optional
        print("Sending: {} as struct: {}".format(count, buffer))
        now = time.monotonic() * 1000  # start timer
        result = nrf.send(buffer)
        if not result:
            print("send() failed or timed out")
        else:
            print("send() successful")
        # print timer results despite transmission success
        print("Transmission took", time.monotonic() * 1000 - now, "ms")
        time.sleep(1)
        count -= 1


def slave(count=5):
    """Polls the radio and prints the received value. This method expires
    after 6 seconds of no received transmission"""
    # set address of TX node into an RX pipe. NOTE you MUST specify
    # which pipe number to use for RX, we'll be using pipe 0
    # pipe number options range [0,5]
    # the pipe numbers used during a transition don't have to match
    nrf.open_rx_pipe(0, address)
    nrf.listen = True  # put radio into RX mode and power up

    start = time.monotonic()
    while count and (time.monotonic() - start) < 6:
        if nrf.any():
            # print details about the received packet (if any)
            print("Found {} bytes on pipe {}".format(nrf.any(), nrf.pipe))
            # retreive the received packet's payload
            rx = nrf.recv()  # clears flags & empties RX FIFO
            # expecting an int, thus the string format '<i'
            buffer = struct.unpack("<i", rx[:4])
            # print the only item in the resulting tuple from
            # using `struct.unpack()`
            print("Received: {}, Raw: {}".format(buffer[0], repr(rx)))
            start = time.monotonic()
            count -= 1
            # this will listen indefinitely till count == 0
        time.sleep(0.25)

    # recommended behavior is to keep in TX mode while idle
    nrf.listen = False  # put the nRF24L01 is in TX mode


print(
    """\
    nRF24L01 Simple test.\n\
    Run slave() on receiver\n\
    Run master() on transmitter"""
)

ACK Payloads Example

This is a test to show how to use custom acknowledgment payloads. See also documentation on ack and load_ack().

examples/nrf24l01_ack_payload_test.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
"""
Simple example of using the library to transmit
and retrieve custom automatic acknowledgment payloads.
"""
import time
import board
import digitalio as dio
# if running this on a ATSAMD21 M0 based board
# from circuitpython_nrf24l01.rf24_lite import RF24
from circuitpython_nrf24l01.rf24 import RF24

# change these (digital output) pins accordingly
ce = dio.DigitalInOut(board.D4)
csn = dio.DigitalInOut(board.D5)

# using board.SPI() automatically selects the MCU's
# available SPI pins, board.SCK, board.MOSI, board.MISO
spi = board.SPI()  # init spi bus object

# we'll be using the dynamic payload size feature (enabled by default)
# the custom ACK payload feature is disabled by default
# the custom ACK payload feature should not be enabled
# during instantiation due to its singular use nature
# meaning 1 ACK payload per 1 RX'd payload
nrf = RF24(spi, csn, ce)

# NOTE the the custom ACK payload feature will be enabled
# automatically when you call load_ack() passing:
# a buffer protocol object (bytearray) of
# length ranging [1,32]. And pipe number always needs
# to be an int ranging [0,5]

# to enable the custom ACK payload feature
nrf.ack = True  # False disables again

# set the Power Amplifier level to -12 dBm since this test example is
# usually run with nRF24L01 transceivers in close proximity
nrf.pa_level = -12

# addresses needs to be in a buffer protocol object (bytearray)
address = b"1Node"

# NOTE ACK payloads (like regular payloads and addresses)
# need to be in a buffer protocol object (bytearray)
ACK = b"World "


def master(count=5):  # count = 5 will only transmit 5 packets
    """Transmits a dummy payload every second and prints the ACK payload"""
    # recommended behavior is to keep in TX mode while idle
    nrf.listen = False  # put radio in TX mode

    # set address of RX node into a TX pipe
    nrf.open_tx_pipe(address)

    while count:
        buffer = b"Hello " + bytes([count + 48])  # output buffer
        print("Sending (raw): {}".format(repr(buffer)))
        # to read the ACK payload during TX mode we
        # pass the parameter read_ack as True.
        nrf.ack = True  # enable feature before send()
        now = time.monotonic() * 1000  # start timer
        result = nrf.send(buffer)  # becomes the response buffer
        if not result:
            print("send() failed or timed out")
        else:
            # print the received ACK that was automatically
            # fetched and saved to "buffer" via send()
            print("raw ACK: {}".format(repr(result)))
            # the ACK payload should now be in buffer
        # print timer results despite transmission success
        print("Transmission took", time.monotonic() * 1000 - now, "ms")
        time.sleep(1)
        count -= 1


def slave(count=5):
    """Prints the received value and sends a dummy ACK payload"""
    # set address of TX node into an RX pipe. NOTE you MUST specify
    # which pipe number to use for RX, we'll be using pipe 0
    nrf.open_rx_pipe(0, address)

    # put radio into RX mode, power it up, and set the first
    # transmission's ACK payload and pipe number
    nrf.listen = True
    buffer = ACK + bytes([count + 48])
    # we must set the ACK payload data and corresponding
    # pipe number [0,5]
    nrf.load_ack(buffer, 0)  # load ACK for first response

    start = time.monotonic()
    while count and (time.monotonic() - start) < (count * 2):
        if nrf.any():
            # this will listen indefinitely till count == 0
            count -= 1
            # print details about the received packet (if any)
            print("Found {} bytes on pipe {}".format(nrf.any(), nrf.pipe))
            # retreive the received packet's payload
            rx = nrf.recv()  # clears flags & empties RX FIFO
            print("Received (raw): {}".format(repr(rx)))
            start = time.monotonic()
            if count:  # Going again?
                # build new ACK
                buffer = ACK + bytes([count + 48])
                # load ACK for next response
                nrf.load_ack(buffer, 0)

    # recommended behavior is to keep in TX mode while idle
    nrf.listen = False  # put radio in TX mode
    nrf.flush_tx()  # flush any ACK payload


print(
    """\
    nRF24L01 ACK test\n\
    Run slave() on receiver\n\
    Run master() on transmitter"""
)

Multiceiver Example

This example shows how use a group of 6 nRF24L01 transceivers to transmit to 1 nRF24L01 transceiver. This technique is called “Multiceiver” in the nRF24L01 Specifications Sheet

Note

This example follows the diagram illistrated in figure 12 of section 7.7 of the nRF24L01 Specifications Sheet Please note that if auto_ack (on the base station) and arc (on the trnasmitting nodes) are disabled, then figure 10 of section 7.7 of the nRF24L01 Specifications Sheet would be a better illustration.

Hint

A paraphrased note from the the nRF24L01 Specifications Sheet:

Only when a data pipe receives a complete packet can other data pipes begin to receive data. When multiple [nRF24L01]s are transmitting to [one nRF24L01], the ard can be used to skew the auto retransmission so that they only block each other once.

This basically means that it might help packets get received if the ard attribute is set to various values among multiple transmitting nRF24L01 transceivers.

examples/nrf24l01_multiceiver_test.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""
Simple example of using 1 nRF24L01 to receive data from up to 6 other
transceivers. This technique is called "multiceiver" in the datasheet.
For fun, this example also sends an ACK payload from the base station
to the node-1 transmitter.
"""
import time
import board
import digitalio as dio

# if running this on a ATSAMD21 M0 based board
# from circuitpython_nrf24l01.rf24_lite import RF24
from circuitpython_nrf24l01.rf24 import RF24

# change these (digital output) pins accordingly
ce = dio.DigitalInOut(board.D4)
csn = dio.DigitalInOut(board.D5)

# using board.SPI() automatically selects the MCU's
# available SPI pins, board.SCK, board.MOSI, board.MISO
spi = board.SPI()  # init spi bus object

# we'll be using the dynamic payload size feature (enabled by default)
# initialize the nRF24L01 on the spi bus object
nrf = RF24(spi, csn, ce)

# set the Power Amplifier level to -12 dBm since this test example is
# usually run with nRF24L01 transceivers in close proximity
nrf.pa_level = -12

# setup the addresses for all transmitting nRF24L01 nodes
addresses = [
    b"\x78" * 5,
    b"\xF1\xB3\xB4\xB5\xB6",
    b"\xCD\xB3\xB4\xB5\xB6",
    b"\xA3\xB3\xB4\xB5\xB6",
    b"\x0F\xB3\xB4\xB5\xB6",
    b"\x05\xB3\xB4\xB5\xB6"
]

# to use custom ACK payloads, we must enable that feature
nrf.ack = True
# let this be the ACk payload
ACK = b"Yak Back ACK"


def base(timeout=10):
    """Use the nRF24L01 as a base station for lisening to all nodes"""
    # write the addresses to all pipes.
    for pipe_n, addr in enumerate(addresses):
        nrf.open_rx_pipe(pipe_n, addr)
    while nrf.fifo(True, False):  # fill TX FIFO with ACK payloads
        nrf.load_ack(ACK, 1)  # only send ACK payload to node 1
    nrf.listen = True  # put base station into RX mode
    start_timer = time.monotonic()  # start timer
    while time.monotonic() - start_timer < timeout:
        while not nrf.fifo(False, True):  # keep RX FIFO empty for reception
            # show the pipe number that received the payload
            print("node", nrf.pipe, "sent:", nrf.recv())
            start_timer = time.monotonic()  # reset timer with every payload
            if nrf.load_ack(ACK, 1):  # keep TX FIFO full with ACK payloads
                print("\t ACK re-loaded")
    nrf.listen = False


def node(node_number, count=6):
    """start transmitting to the base station.

        :param int node_number: the node's identifying index (from the
            the `addresses` list)
        :param int count: the number of times that the node will transmit
            to the base station.
    """
    nrf.listen = False
    # set the TX address to the address of the base station.
    nrf.open_tx_pipe(addresses[node_number])
    counter = 0
    # use the node_number to identify where the payload came from
    node_id = b"PTX-" + bytes([node_number + 48])
    while counter < count:
        counter += 1
        # payloads will include the node_number and a payload ID character
        payload = node_id + b" payload-ID: " + bytes([node_number + 48])
        payload += bytes([counter + (65 if 0 <= counter < 26 else 71)])
        # show something to see it isn't frozen
        print("attempt {} returned {}".format(counter, nrf.send(payload)))
        time.sleep(0.5)  # slow down the test for readability


print(
    """\
    nRF24L01 Multiceiver test.\n\
    Run base() on the receiver\n\
        base() sends ACK payloads to node 1\n\
    Run node(node_number) on a transmitter\n\
        node()'s parameter, `node_number`, must be in range [0, 5]"""
)

IRQ Pin Example

This is a test to show how to use nRF24L01’s interrupt pin. Be aware that send() clears all IRQ events on exit, so we use the non-blocking write() instead. Also the ack attribute is enabled to trigger the irq_dr event when the master node receives ACK payloads. Simply put, this example is the most advanced example script (in this library), and it runs VERY quickly.

examples/nrf24l01_interrupt_test.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""
Simple example of detecting (and verifying) the IRQ (interrupt) pin on the
nRF24L01
    .. note:: this script uses the non-blocking `write()` function because
        the function `send()` clears the IRQ flags upon returning
"""
import time
import board
import digitalio as dio
# if running this on a ATSAMD21 M0 based board
# from circuitpython_nrf24l01.rf24_lite import RF24
from circuitpython_nrf24l01.rf24 import RF24

# address needs to be in a buffer protocol object (bytearray is preferred)
address = b"1Node"

# select your digital input pin that's connected to the IRQ pin on the nRF4L01
irq_pin = dio.DigitalInOut(board.D12)
irq_pin.switch_to_input()  # make sure its an input object
# change these (digital output) pins accordingly
ce = dio.DigitalInOut(board.D4)
csn = dio.DigitalInOut(board.D5)

# using board.SPI() automatically selects the MCU's
# available SPI pins, board.SCK, board.MOSI, board.MISO
spi = board.SPI()  # init spi bus object

# we'll be using the dynamic payload size feature (enabled by default)
# initialize the nRF24L01 on the spi bus object
nrf = RF24(spi, csn, ce)

# this example uses the ACK payload to trigger the IRQ pin active for
# the "on data received" event
nrf.ack = True  # enable ACK payloads

# set the Power Amplifier level to -12 dBm since this test example is
# usually run with nRF24L01 transceivers in close proximity
nrf.pa_level = -12


def _ping_and_prompt():
    """transmit 1 payload, wait till irq_pin goes active, print IRQ status
    flags."""
    ce.value = 1  # tell the nRF24L01 to prepare sending a single packet
    time.sleep(0.00001)  # mandatory 10 microsecond pulse starts transmission
    ce.value = 0  # end 10 us pulse; use only 1 buffer from TX FIFO
    while irq_pin.value:  # IRQ pin is active when LOW
        pass
    print("IRQ pin went active LOW.")
    nrf.update()  # update irq_d? status flags
    print(
        "\tirq_ds: {}, irq_dr: {}, irq_df: {}".format(
            nrf.irq_ds, nrf.irq_dr, nrf.irq_df
        )
    )

def master():
    """Transmits 3 times: successfully receive ACK payload first, successfully
    transmit on second, and intentionally fail transmit on the third"""
    # set address of RX node into a TX pipe
    nrf.open_tx_pipe(address)
    # ensures the nRF24L01 is in TX mode
    nrf.listen = False
    # NOTE nrf.power is automatically set to True on first call to nrf.write()
    # NOTE nrf.write() internally calls nrf.clear_status_flags() first

    # load 2 buffers into the TX FIFO; write_only=True leaves CE pin LOW
    nrf.write(b"Ping ", write_only=True)
    nrf.write(b"Pong ", write_only=True)

    # on data ready test
    print("\nConfiguring IRQ pin to only ignore 'on data sent' event")
    nrf.interrupt_config(data_sent=False)
    print("    Pinging slave node for an ACK payload...", end=" ")
    _ping_and_prompt()  # CE pin is managed by this function
    if nrf.irq_dr:
        print("\t'on data ready' event test successful")
    else:
        print("\t'on data ready' event test unsucessful")

    # on data sent test
    print("\nConfiguring IRQ pin to only ignore 'on data ready' event")
    nrf.interrupt_config(data_recv=False)
    print("    Pinging slave node again...             ", end=" ")
    _ping_and_prompt()  # CE pin is managed by this function
    if nrf.irq_ds:
        print("\t'on data sent' event test successful")
    else:
        print("\t'on data sent' event test unsucessful")

    # trigger slave node to exit by filling the slave node's RX FIFO
    print("\nSending one extra payload to fill RX FIFO on slave node.")
    if nrf.send(b"Radio", send_only=True):
        # when send_only parameter is True, send() ignores RX FIFO usage
        print("Slave node should not be listening anymore.")
    else:
        print("Slave node was unresponsive.")

    # on data fail test
    print("\nConfiguring IRQ pin to go active for all events.")
    nrf.interrupt_config()
    print("    Sending a ping to inactive slave node...", end=" ")
    nrf.flush_tx()  # just in case any previous tests failed
    nrf.write(b"Dummy", write_only=True)  # CE pin is left LOW
    _ping_and_prompt()  # CE pin is managed by this function
    if nrf.irq_df:
        print("\t'on data failed' event test successful")
    else:
        print("\t'on data failed' event test unsucessful")
    nrf.flush_tx()  # flush artifact payload in TX FIFO from last test
    # all 3 ACK payloads received were 4 bytes each, and RX FIFO is full
    # so, fetching 12 bytes from the RX FIFO also flushes RX FIFO
    print("\nComplete RX FIFO:", nrf.recv(12))


def slave(timeout=6):  # will listen for 6 seconds before timing out
    """Only listen for 3 payload from the master node"""
    # setup radio to recieve pings, fill TX FIFO with ACK payloads
    nrf.open_rx_pipe(0, address)
    nrf.load_ack(b"Yak ", 0)
    nrf.load_ack(b"Back", 0)
    nrf.load_ack(b" ACK", 0)
    nrf.listen = True  # start listening & clear irq_dr flag
    start_timer = time.monotonic()  # start timer now
    while not nrf.fifo(0, 0) and time.monotonic() - start_timer < timeout:
        # if RX FIFO is not full and timeout is not reached, then keep going
        pass
    nrf.listen = False  # put nRF24L01 in Standby-I mode when idling
    if not nrf.fifo(False, True):  # if RX FIFO is not empty
        # all 3 payloads received were 5 bytes each, and RX FIFO is full
        # so, fetching 15 bytes from the RX FIFO also flushes RX FIFO
        print("Complete RX FIFO:", nrf.recv(15))
    nrf.flush_tx()  # discard any pending ACK payloads


print(
    """\
    nRF24L01 Interrupt pin test.\n\
    Make sure the IRQ pin is connected to the MCU\n\
    Run slave() on receiver\n\
    Run master() on transmitter"""
)

Library-Specific Features

Stream Example

This is a test to show how to use the send() to transmit multiple payloads with 1 function call.

examples/nrf24l01_stream_test.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""
Example of library usage for streaming multiple payloads.
"""
import time
import board
import digitalio as dio
# if running this on a ATSAMD21 M0 based board
# from circuitpython_nrf24l01.rf24_lite import RF24
from circuitpython_nrf24l01.rf24 import RF24

# addresses needs to be in a buffer protocol object (bytearray)
address = b"1Node"

# change these (digital output) pins accordingly
ce = dio.DigitalInOut(board.D4)
csn = dio.DigitalInOut(board.D5)

# using board.SPI() automatically selects the MCU's
# available SPI pins, board.SCK, board.MOSI, board.MISO
spi = board.SPI()  # init spi bus object

# we'll be using the dynamic payload size feature (enabled by default)
# initialize the nRF24L01 on the spi bus object
nrf = RF24(spi, csn, ce)

# set the Power Amplifier level to -12 dBm since this test example is
# usually run with nRF24L01 transceivers in close proximity
nrf.pa_level = -12


def master(count=1):  # count = 5 will transmit the list 5 times
    """Transmits a massive buffer of payloads"""
    # lets create a `list` of payloads to be streamed to
    # the nRF24L01 running slave()
    buffers = []
    # we'll use SIZE for the number of payloads in the list and the
    # payloads' length
    size = 32
    for i in range(size):
        # prefix payload with a sequential letter to indicate which
        # payloads were lost (if any)
        buff = bytes([i + (65 if 0 <= i < 26 else 71)])
        for j in range(size - 1):
            char = bool(j >= (size - 1) / 2 + abs((size - 1) / 2 - i))
            char |= bool(j < (size - 1) / 2 - abs((size - 1) / 2 - i))
            buff += bytes([char + 48])
        buffers.append(buff)
        del buff

    # set address of RX node into a TX pipe
    nrf.open_tx_pipe(address)
    # ensures the nRF24L01 is in TX mode
    nrf.listen = False

    successful = 0
    for _ in range(count):
        now = time.monotonic() * 1000  # start timer
        result = nrf.send(buffers, force_retry=2)
        print("Transmission took", time.monotonic() * 1000 - now, "ms")
        for r in result:
            successful += 1 if r else 0
    print(
        "successfully sent {}% ({}/{})".format(
            successful / (size* count) * 100, successful, size * count
        )
    )


def slave(timeout=5):
    """Stops listening after timeout with no response"""
    # set address of TX node into an RX pipe. NOTE you MUST specify
    # which pipe number to use for RX, we'll be using pipe 0
    # pipe number options range [0,5]
    # the pipe numbers used during a transition don't have to match
    nrf.open_rx_pipe(0, address)
    nrf.listen = True  # put radio into RX mode and power up

    count = 0
    now = time.monotonic()  # start timer
    while time.monotonic() < now + timeout:
        if nrf.any():
            count += 1
            # retreive the received packet's payload
            rx = nrf.recv()  # clears flags & empties RX FIFO
            print("Received: {} - {}".format(repr(rx), count))
            now = time.monotonic()

    # recommended behavior is to keep in TX mode while idle
    nrf.listen = False  # put the nRF24L01 is in TX mode


print(
    """\
    nRF24L01 Stream test\n\
    Run slave() on receiver\n\
    Run master() on transmitter"""
)

Context Example

This is a test to show how to use The with statement blocks to manage multiple different nRF24L01 configurations on 1 transceiver.

examples/nrf24l01_context_test.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
"""
Simple example of library usage in context.
This will not transmit anything, but rather
display settings after changing contexts ( & thus configurations)

    .. warning:: This script is not compatible with the rf24_lite module
"""
import board
import digitalio as dio
from circuitpython_nrf24l01.rf24 import RF24
from circuitpython_nrf24l01.fake_ble import FakeBLE

# change these (digital output) pins accordingly
ce = dio.DigitalInOut(board.D4)
csn = dio.DigitalInOut(board.D5)

# using board.SPI() automatically selects the MCU's
# available SPI pins, board.SCK, board.MOSI, board.MISO
spi = board.SPI()  # init spi bus object

# initialize the nRF24L01 objects on the spi bus object
# the first object will have all the features enabled
nrf = RF24(spi, csn, ce)
# enable the option to use custom ACK payloads
nrf.ack = True
# set the static payload length to 8 bytes
nrf.payload_length = 8
# RF power amplifier is set to -18 dbm
nrf.pa_level = -18

# the second object has most features disabled/altered
ble = FakeBLE(spi, csn, ce)
# the IRQ pin is configured to only go active on "data fail"
# NOTE BLE operations prevent the IRQ pin going active on "data fail" events
ble.interrupt_config(data_recv=False, data_sent=False)
# using a channel 2
ble.channel = 2
# RF power amplifier is set to -12 dbm
ble.pa_level = -12

print("\nsettings configured by the nrf object")
with nrf:
    # only the first character gets written because it is on a pipe_number > 1
    nrf.open_rx_pipe(5, b"1Node")  # NOTE we do this inside the "with" block

    # display current settings of the nrf object
    nrf.what_happened(True)  # True dumps pipe info

print("\nsettings configured by the ble object")
with ble as nerf:  # the "as nerf" part is optional
    nerf.what_happened(1)

# if you examine the outputs from what_happened() you'll see:
#   pipe 5 is opened using the nrf object, but closed using the ble object.
#   pipe 0 is closed using the nrf object, but opened using the ble object.
#   also notice the different addresses bound to the RX pipes
# this is because the "with" statements load the existing settings
# for the RF24 object specified after the word "with".

# NOTE it is not advised to manipulate seperate RF24 objects outside of the
# "with" block; you will encounter bugs about configurations when doing so.
# Be sure to use 1 "with" block per RF24 object when instantiating multiple
# RF24 objects in your program.
# NOTE exiting a "with" block will always power down the nRF24L01
# NOTE upon instantiation, this library closes all RX pipes &
# extracts the TX/RX addresses from the nRF24L01 registers

OTA compatibility

Fake BLE Example

This is a test to show how to use the nRF24L01 as a BLE advertising beacon using the FakeBLE class.

examples/nrf24l01_fake_ble_test.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""
This example uses the nRF24L01 as a 'fake' BLE Beacon

    .. warning:: ATSAMD21 M0-based boards have memory allocation
        error when loading 'fake_ble.mpy'
"""
import time
import board
import digitalio as dio
from circuitpython_nrf24l01.fake_ble import (
    chunk,
    FakeBLE,
    UrlServiceData,
    BatteryServiceData,
    TemperatureServiceData,
)

# change these (digital output) pins accordingly
ce = dio.DigitalInOut(board.D4)
csn = dio.DigitalInOut(board.D5)

# using board.SPI() automatically selects the MCU's
# available SPI pins, board.SCK, board.MOSI, board.MISO
spi = board.SPI()  # init spi bus object

# initialize the nRF24L01 on the spi bus object as a BLE compliant radio
nrf = FakeBLE(spi, csn, ce)

# the name parameter is going to be its broadcasted BLE name
# this can be changed at any time using the `name` attribute
# nrf.name = b"foobar"

# you can optionally set the arbitrary MAC address to be used as the
# BLE device's MAC address. Otherwise this is randomly generated upon
# instantiation of the FakeBLE object.
# nrf.mac = b"\x19\x12\x14\x26\x09\xE0"

# set the Power Amplifier level to -12 dBm since this test example is
# usually run with nRF24L01 transceiver in close proximity to the
# BLE scanning application
nrf.pa_level = -12


def _prompt(count, iterator):
    if (count - iterator) % 5 == 0 or (count - iterator) < 5:
        if count - iterator - 1:
            print(count - iterator, "advertisments left to go!")
        else:
            print(count - iterator, "advertisment left to go!")


# create an object for manipulating the battery level data
battery_service = BatteryServiceData()
# battery level data is 1 unsigned byte representing a percentage
battery_service.data = 85


def master(count=50):
    """Sends out the device information twice a second."""
    # using the "with" statement is highly recommended if the nRF24L01 is
    # to be used for more than a BLE configuration
    with nrf as ble:
        ble.name = b"nRF24L01"
        # include the radio's pa_level attribute in the payload
        ble.show_pa_level = True
        print(
            "available bytes in next payload:",
            ble.available(chunk(battery_service.buffer))
        )  # using chunk() gives an accurate estimate of available bytes
        for i in range(count):  # advertise data this many times
            if ble.available(chunk(battery_service.buffer)) >= 0:
                _prompt(count, i)  # something to show that it isn't frozen
                # broadcast the device name, MAC address, &
                # battery charge info; 0x16 means service data
                ble.advertise(battery_service.buffer, data_type=0x16)
                # channel hoping is recommended per BLE specs
                ble.hop_channel()
                time.sleep(0.5)  # wait till next broadcast
    # nrf.show_pa_level & nrf.name both are set to false when
    # exiting a with statement block


# create an object for manipulating temperature measurements
temperature_service = TemperatureServiceData()
# temperature's float data has up to 2 decimal places of percision
temperature_service.data = 42.0


def send_temp(count=50):
    """Sends out a fake temperature twice a second."""
    with nrf as ble:
        ble.name = b"nRF24L01"
        print(
            "available bytes in next payload:",
            ble.available(chunk(temperature_service.buffer))
        )
        for i in range(count):
            if ble.available(chunk(temperature_service.buffer)) >= 0:
                _prompt(count, i)
                # broadcast a temperature measurement; 0x16 means service data
                ble.advertise(temperature_service.buffer, data_type=0x16)
                ble.hop_channel()
                time.sleep(0.2)


# use the Eddystone protocol from Google to broadcast a URL as
# service data. We'll need an object to manipulate that also
url_service = UrlServiceData()
# the data attribute converts a URL string into a simplified
# bytes object using byte codes defined by the Eddystone protocol.
url_service.data = "http://www.google.com"
# Eddystone protocol requires an estimated TX PA level at 1 meter
# lower this estimate since we lowered the actual `ble.pa_level`
url_service.pa_level_at_1_meter = -45  # defaults to -25 dBm

def send_url(count=50):
    """Sends out a URL twice a second."""
    with nrf as ble:
        print(
            "available bytes in next payload:",
            ble.available(chunk(url_service.buffer))
        )
        # NOTE we did NOT set a device name in this with block
        for i in range(count):
            # URLs easily exceed the nRF24L01's max payload length
            if ble.available(chunk(url_service.buffer)) >= 0:
                _prompt(count, i)
                ble.advertise(url_service.buffer, 0x16)
                ble.hop_channel()
                time.sleep(0.2)

print(
    """\
    nRF24L01 fake BLE beacon test.\n\
    Run master() to broadcast the device name, pa_level, & battery charge\n\
    Run send_temp() to broadcast the device name & a temperature\n\
    Run send_url() to broadcast a custom URL link"""
)

TMRh20’s Arduino library

This test is meant to prove compatibility with the popular Arduino library for the nRF24L01 by TMRh20 (available for install via the Arduino IDE’s Library Manager). The following code has been designed/test with the TMRh20 library example named GettingStarted_HandlingData.ino. If you changed the role variable in the TMRh20 sketch, you will have to adjust the addresses assigned to the pipes in this script.

examples/nrf24l01_2arduino_handling_data.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""
Example of library driving the nRF24L01 to communicate with a nRF24L01 driven by
the TMRh20 Arduino library. The Arduino program/sketch that this example was
designed for is named GettingStarted_HandlingData.ino and can be found in the "RF24"
examples after the TMRh20 library is installed from the Arduino Library Manager.
"""
import time
import struct
import board
import digitalio as dio
# if running this on a ATSAMD21 M0 based board
# from circuitpython_nrf24l01.rf24_lite import RF24
from circuitpython_nrf24l01.rf24 import RF24

# addresses needs to be in a buffer protocol object (bytearray)
address = [b"1Node", b"2Node"]

# change these (digital output) pins accordingly
ce = dio.DigitalInOut(board.D4)
csn = dio.DigitalInOut(board.D5)

# using board.SPI() automatically selects the MCU's
# available SPI pins, board.SCK, board.MOSI, board.MISO
spi = board.SPI()  # init spi bus object

# initialize the nRF24L01 on the spi bus object
nrf = RF24(spi, csn, ce)
nrf.dynamic_payloads = False  # the default in the TMRh20 arduino library

# set the Power Amplifier level to -12 dBm since this test example is
# usually run with nRF24L01 transceivers in close proximity
nrf.pa_level = -12

# set address of TX node into a RX pipe
nrf.open_rx_pipe(1, address[1])
# set address of RX node into a TX pipe
nrf.open_tx_pipe(address[0])

# pylint: disable=too-few-public-methods
class DataStruct:
    """A data structure to hold transmitted values as the
    'HandlingData' part of the TMRh20 library example"""
    time = 0  # in milliseconds (used as start of timer)
    value = 1.22  # incremented  by 0.01 with every transmission
# pylint: enable=too-few-public-methods

myData = DataStruct()


def master(count=5):  # count = 5 will only transmit 5 packets
    """Transmits an arbitrary unsigned long value every second"""
    while count:
        nrf.listen = False  # ensures the nRF24L01 is in TX mode
        print("Now Sending")
        myData.time = int(time.monotonic() * 1000)  # start timer
        # use struct.pack to packetize your data into a usable payload
        # '<' means little endian byte order.
        # 'L' means a single 4 byte unsigned long value.
        # 'f' means a single 4 byte float value.
        buffer = struct.pack("<Lf", myData.time, myData.value)
        result = nrf.send(buffer)
        if not result:
            print("send() failed or timed out")
        else:
            nrf.listen = True  # get radio ready to receive a response
            timeout = True  # used to determine if response timed out
            while time.monotonic() * 1000 - myData.time < 200:
                # the arbitrary 200 ms timeout value is also used in the
                # TMRh20 library's GettingStarted_HandlingData sketch
                if nrf.any():
                    end_timer = time.monotonic() * 1000  # end timer
                    rx = nrf.recv()
                    rx = struct.unpack("<Lf", rx[:8])
                    myData.value = rx[1]  # save the new float value
                    timeout = False  # skips timeout prompt
                    # print total time to send and receive data
                    print(
                        "Sent {} Got Response: {}".format(
                            struct.unpack("<Lf", buffer),
                            rx
                        )
                    )
                    print("Round-trip delay:", end_timer - myData.time, "ms")
                    break
            if timeout:
                print("failed to get a response; timed out")
        count -= 1
        time.sleep(1)


def slave(count=3):
    """Polls the radio and prints the received value. This method expires
    after 6 seconds of no received transmission"""
    myData.time = time.monotonic() * 1000  # in milliseconds
    while count and (time.monotonic() * 1000 - myData.time) < 6000:
        nrf.listen = True  # put radio into RX mode and power up
        if nrf.any():
            # retreive the received packet's payload
            buffer = nrf.recv()  # clears flags & empties RX FIFO
            # increment floating value as part of the "HandlingData" test
            myData.value = struct.unpack("<f", buffer[4:8])[0] + 0.01
            nrf.listen = False  # ensures the nRF24L01 is in TX mode
            myData.time = time.monotonic() * 1000
            # echo buffer[:4] appended with incremented float
            result = nrf.send(buffer[:4] + struct.pack("<f", myData.value))
            end_timer = time.monotonic() * 1000  # in milliseconds
            # expecting an unsigned long & a float, thus the
            # string format "<Lf"; buffer[:8] ignores the padded 0s
            rx = struct.unpack("<Lf", buffer[:8])
            # print the unsigned long and float data sent in the response
            print("Responding: {}, {}".format(rx[0], rx[1] + 0.01))
            if not result:
                print("response failed or timed out")
            else:
                # print timer results on transmission success
                print(
                    "successful response took {} ms".format(
                        end_timer - myData.time
                    )
                )
            # this will listen indefinitely till counter == 0
            count -= 1
    # recommended behavior is to keep in TX mode when in idle
    nrf.listen = False  # put the nRF24L01 in TX mode + Standby-I power state


print(
    """\
    nRF24L01 communicating with an Arduino running the\n\
    TMRh20 library's "GettingStarted_HandlingData.ino" example.\n\
    Run slave() on receiver\n\
    Run master() on transmitter"""
)