Source code for circuitpython_nrf24l01.rf24_network

# The MIT License (MIT)
#
# Copyright (c) 2020 Brendan Doherty
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""rf24_network module containing the base class RF24Network"""
try:
    from typing import Union
except ImportError:
    pass
import busio  # type:ignore[import]
from digitalio import DigitalInOut  # type:ignore[import]
from .network.mixins import NetworkMixin
from .network.structs import RF24NetworkHeader, RF24NetworkFrame, is_address_valid
from .network.constants import (
    NETWORK_MULTICAST_ADDR,
    AUTO_ROUTING,
    TX_NORMAL,
    TX_PHYSICAL,
    TX_LOGICAL,
    TX_MULTICAST,
    MAX_FRAG_SIZE,
)


[docs] class RF24NetworkRoutingOnly(NetworkMixin): """A minimal Networking implementation for nodes that are meant for strictly routing data amidst a network of nodes.""" def __init__( self, spi: busio.SPI, csn_pin: DigitalInOut, ce_pin: DigitalInOut, node_address: int, spi_frequency=10000000, ): if not is_address_valid(node_address): raise ValueError("node_address argument is invalid or malformed") super().__init__(spi, csn_pin, ce_pin, spi_frequency) self._begin(node_address) # setup radio @NetworkMixin.node_address.setter # type: ignore def node_address(self, val: int): if not is_address_valid(val): return self._begin(val) def update(self) -> int: """This function is used to keep the network layer current.""" return self._net_update()
[docs] class RF24Network(RF24NetworkRoutingOnly): """The object used to instantiate the nRF24L01 as a network node."""
[docs] def send(self, header: RF24NetworkHeader, message: Union[bytes, bytearray]) -> bool: """Deliver a message according to the header information.""" return self.write(RF24NetworkFrame(header, message))
[docs] def write( self, frame: RF24NetworkFrame, traffic_direct: int = AUTO_ROUTING ) -> bool: """Deliver a network frame.""" if not isinstance(frame, RF24NetworkFrame): raise TypeError("frame expected object of type RF24NetworkFrame.") if not is_address_valid(frame.header.to_node): raise AttributeError("frame destined for an invalid address") if not self._validate_msg_len(len(frame.message)): frame.message = frame.message[:MAX_FRAG_SIZE] frame.header.from_node = self._addr return self._pre_write(frame, traffic_direct)
def _pre_write( self, frame: RF24NetworkFrame, traffic_direct: int = AUTO_ROUTING ) -> bool: """Helper to do prep work for _write_to_pipe(); like to TMRh20's _write()""" self.frame_buf = frame if traffic_direct != AUTO_ROUTING: # Payload is multicast to the first node, and routed normally to the next send_type = TX_LOGICAL if self.frame_buf.header.to_node == NETWORK_MULTICAST_ADDR: send_type = TX_MULTICAST if self.frame_buf.header.to_node == traffic_direct: # Payload is multicast to the first node, which is the recipient send_type = TX_PHYSICAL return self._write(traffic_direct, send_type) return self._write(self.frame_buf.header.to_node, TX_NORMAL)