# The MIT License (MIT)
#
# Copyright (c) 2020 Brendan Doherty
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""rf24_network module containing the base class RF24Network"""
import time
import struct
try:
import json
except ImportError:
pass # some CircuitPython boards don't have the json module
try:
from typing import Union, Dict, List, Optional, Callable, Any
except ImportError:
pass
import busio # type:ignore[import]
from digitalio import DigitalInOut # type:ignore[import]
from .network.constants import (
MESH_ADDR_REQUEST,
MESH_ADDR_RESPONSE,
NETWORK_DEFAULT_ADDR,
NETWORK_MULTICAST_ADDR,
NETWORK_POLL,
MESH_ADDR_RELEASE,
MESH_ADDR_LOOKUP,
MESH_ID_LOOKUP,
MESH_LOOKUP_TIMEOUT,
MESH_WRITE_TIMEOUT,
MESH_MAX_POLL,
MESH_MAX_CHILDREN,
TX_NORMAL,
TX_PHYSICAL,
TX_MULTICAST,
MAX_FRAG_SIZE,
)
from .network.structs import RF24NetworkHeader, is_address_valid
from .network.mixins import NetworkMixin, _lvl_2_addr
[docs]
class RF24MeshNoMaster(NetworkMixin):
"""A descendant of the same mixin class that `RF24Network` inherits from. This
class adds easy Mesh networking capability (non-master nodes only)."""
def __init__(
self,
spi: busio.SPI,
csn_pin: DigitalInOut,
ce_pin: DigitalInOut,
node_id: int,
spi_frequency: int = 10000000,
):
super().__init__(spi, csn_pin, ce_pin, spi_frequency)
self._id = min(255, node_id)
#: This variable can be assigned a function to perform during long operations.
self.block_less_callback: Optional[Callable[[], Any]] = None
self.ret_sys_msg = True # force _net_update() to return system message types
self._begin(0 if not node_id else NETWORK_DEFAULT_ADDR) # setup radio
@property
def node_id(self) -> int:
"""The unique ID number (1 byte long) of the mesh network node."""
return self._id
@node_id.setter
def node_id(self, _id: int):
if self._addr != NETWORK_DEFAULT_ADDR:
self.release_address()
self._id = _id & 0xFF
def print_details(self, dump_pipes: bool = False, network_only: bool = False):
"""See RF24.print_details() and Shared Networking API docs"""
super().print_details(False, network_only)
print("Network node id____________{}".format(self.node_id))
print("Mesh node allows children__{}".format(self._parenthood))
if dump_pipes:
self.print_pipes()
def release_address(self) -> bool:
"""Forces an address lease to expire from the master."""
if self._addr != NETWORK_DEFAULT_ADDR:
self.frame_buf.header.to_node = 0
self.frame_buf.header.from_node = self._addr
self.frame_buf.header.message_type = MESH_ADDR_RELEASE
self.frame_buf.message = b""
if self._write(0, TX_NORMAL):
super()._begin(NETWORK_DEFAULT_ADDR)
return True
return False
def renew_address(self, timeout: Union[float, int] = 7.5):
"""Connect to the mesh network and request a new `node_address`."""
if self._rf24.available():
self.update()
if self._addr != NETWORK_DEFAULT_ADDR:
super()._begin(NETWORK_DEFAULT_ADDR)
total_requests, request_count = (0, 0)
end_timer = timeout + time.monotonic()
while not self._request_address(request_count):
if time.monotonic() > end_timer:
return None
time.sleep((25 + ((total_requests + 1) * (request_count + 1)) * 2) / 1000)
request_count = (request_count + 1) % 4
total_requests = (total_requests + 1) % 10
return self._addr
def lookup_address(self, node_id: Optional[int] = None) -> int:
"""Convert a node's unique ID number into its corresponding
:ref:`Logical Address <Logical Address>`."""
if not node_id:
return 0
if self._addr == NETWORK_DEFAULT_ADDR:
return -2
return self._lookup_2_master(node_id, MESH_ADDR_LOOKUP)
def lookup_node_id(self, address: Optional[int] = None) -> int:
"""Convert a node's :ref:`Logical Address <Logical Address>` into its
corresponding unique ID number."""
if not address:
return self._id if address is None else 0
if self._addr == NETWORK_DEFAULT_ADDR:
return -2
return self._lookup_2_master(address, MESH_ID_LOOKUP)
def _lookup_2_master(self, number: int, lookup_type: int) -> int:
"""Returns False if timed out, otherwise lookup result"""
self.frame_buf.header.to_node = 0
self.frame_buf.header.from_node = self._addr
self.frame_buf.header.message_type = lookup_type
if lookup_type == MESH_ID_LOOKUP:
self.frame_buf.message = struct.pack("<H", number)
else:
self.frame_buf.message = bytes([number])
if not self._write(0, TX_NORMAL):
return -1
timeout = MESH_LOOKUP_TIMEOUT * 1000000 + time.monotonic_ns()
while self._net_update() not in (MESH_ID_LOOKUP, MESH_ADDR_LOOKUP):
if callable(self.block_less_callback):
self.block_less_callback() # pylint: disable=not-callable
if time.monotonic_ns() > timeout:
return -1
if lookup_type == MESH_ADDR_LOOKUP:
return struct.unpack("<H", self.frame_buf.message[:2])[0]
return self.frame_buf.message[0]
def check_connection(self) -> bool:
"""Check for network connectivity (not for use on master node)."""
# do a double check as a manual retry in lack of using auto-ack
if self.lookup_address(self._id) < 1:
if self.lookup_address(self._id) < 1:
return False
return True
def update(self) -> int:
"""Checks for incoming network data and returns last message type (if any)"""
msg_t = self._net_update()
if self._addr == NETWORK_DEFAULT_ADDR:
return msg_t
return msg_t
def _request_address(self, level: int) -> bool:
"""Get a new address assigned from the master node"""
contacts = self._make_contact(level)
# print("Got", len(contacts), "responses on level",level)
if not contacts:
return False
def _get_level(address: int) -> int:
count = 0
while address:
address >>= 3
count += 1
return count
new_addr = None
for contact in contacts:
# print("Requesting address from", oct(contact))
self.frame_buf.header.to_node = contact
self.frame_buf.header.from_node = NETWORK_DEFAULT_ADDR
self.frame_buf.header.message_type = MESH_ADDR_REQUEST
self.frame_buf.header.reserved = self._id
self.frame_buf.message = b""
self._write(contact, TX_PHYSICAL) # do a no auto-ack write
timeout = 225000000 + time.monotonic_ns()
while time.monotonic_ns() < timeout: # wait for network ack
if (
self._net_update() == MESH_ADDR_RESPONSE
and self.frame_buf.header.reserved == self.node_id
):
new_addr = struct.unpack("<H", self.frame_buf.message[:2])[0]
test_addr = new_addr & ~(0xFFFF << (_get_level(contact) * 3))
if test_addr != contact:
new_addr = None
else:
break
if callable(self.block_less_callback):
self.block_less_callback() # pylint: disable=not-callable
if new_addr is None:
return False
super()._begin(new_addr)
# print("new address assigned:", oct(new_addr))
# do a double check as a manual retry in lack of using auto-ack
if self.lookup_node_id(self._addr) != self._id:
if self.lookup_node_id(self._addr) != self._id:
super()._begin(NETWORK_DEFAULT_ADDR)
return False
return True
def _make_contact(self, lvl: int) -> List[int]:
"""Make a list of connections after multicasting a `NETWORK_POLL` message."""
responders: List[int] = []
self.frame_buf.header.to_node = NETWORK_MULTICAST_ADDR
self.frame_buf.header.from_node = NETWORK_DEFAULT_ADDR
self.frame_buf.header.message_type = NETWORK_POLL
self.frame_buf.message = b""
# self.multicast() does some extra logic to protect from user misuse.
self._write(_lvl_2_addr(lvl), TX_MULTICAST)
timeout = 55000000 + time.monotonic_ns()
while time.monotonic_ns() < timeout and len(responders) < MESH_MAX_POLL:
if self._net_update() == NETWORK_POLL:
contacted = self.frame_buf.header.from_node
is_duplicate = False
for contact in responders:
if contacted == contact:
is_duplicate = True
if not is_duplicate:
responders.append(contacted)
return responders
@property
def allow_children(self) -> bool:
"""Allow/disallow child node to connect to this network node."""
return self._parenthood
@allow_children.setter
def allow_children(self, allow: bool):
self._parenthood = allow
def send(
self,
to_node: int,
message_type: Union[int, str],
message: Union[bytes, bytearray],
) -> bool:
"""Send a message to a mesh `node_id`."""
if self._addr == NETWORK_DEFAULT_ADDR:
return False
if to_node and to_node != self._id:
timeout = MESH_WRITE_TIMEOUT * 1000000 + time.monotonic_ns()
retry_delay = 5
to_node_addr = -2
while to_node_addr < 0:
to_node_addr = self.lookup_address(to_node)
if time.monotonic_ns() >= timeout:
return False
if to_node_addr < 0:
time.sleep(retry_delay / 1000)
retry_delay += 10
to_node = to_node_addr
if to_node == self._id:
to_node = self._addr
return self.write(to_node, message_type, message)
def write(
self,
to_node: int,
message_type: Union[int, str],
message: Union[bytes, bytearray],
) -> bool:
"""Send a message to a network `node_address`."""
if not isinstance(message, (bytes, bytearray)):
raise TypeError("message must be a `bytes` or `bytearray` object")
if not self._validate_msg_len(len(message)):
message = message[:MAX_FRAG_SIZE]
if self._addr == NETWORK_DEFAULT_ADDR or not is_address_valid(to_node):
return False
self.frame_buf.header = RF24NetworkHeader(to_node, message_type)
self.frame_buf.header.from_node = self._addr
self.frame_buf.message = message
return self._write(to_node, TX_NORMAL)
[docs]
class RF24Mesh(RF24MeshNoMaster):
"""A descendant of the base class `RF24MeshNoMaster` that adds algorithms needed
for Mesh network master nodes."""
def __init__(
self,
spi: busio.SPI,
csn_pin: DigitalInOut,
ce_pin: DigitalInOut,
node_id: int,
spi_frequency: int = 10000000,
):
super().__init__(spi, csn_pin, ce_pin, node_id, spi_frequency)
self._do_dhcp = False
#: A `dict` that enables master nodes to act as a DNS.
self.dhcp_dict: Dict[int, int] = {}
def update(self) -> int:
"""Checks for incoming network data and returns last message type (if any)"""
msg_t = super().update()
if msg_t == MESH_ADDR_REQUEST and self.frame_buf.header.reserved:
self._do_dhcp = True
if not self.lookup_node_id(): # if this is the master node
if msg_t in (MESH_ADDR_LOOKUP, MESH_ID_LOOKUP):
self.frame_buf.header.to_node = self.frame_buf.header.from_node
ret_val = 0 # will be -2 for requesting un-assigned nodes
if msg_t == MESH_ADDR_LOOKUP:
ret_val = self.lookup_address(self.frame_buf.message[0])
self.frame_buf.message = struct.pack("<H", ret_val)
else:
ret_val = self.lookup_node_id(
struct.unpack("<H", self.frame_buf.message[:2])[0]
)
self.frame_buf.message = bytes([ret_val])
self._write(self.frame_buf.header.to_node, TX_NORMAL)
elif msg_t == MESH_ADDR_RELEASE:
for n_id, addr in self.dhcp_dict.items():
if addr == self.frame_buf.header.from_node:
del self.dhcp_dict[n_id]
break
self._dhcp()
return msg_t
def _dhcp(self):
"""Updates `_dhcp_dict` of assigned addresses (master node only)."""
if self._do_dhcp:
self._do_dhcp = False
else:
return
new_addr, via_node, shift_val = (0, 0, 0)
if self.frame_buf.header.from_node != NETWORK_DEFAULT_ADDR:
via_node = self.frame_buf.header.from_node
temp = via_node
while temp:
temp >>= 3
shift_val += 3
extra_child = self.frame_buf.header.from_node == NETWORK_DEFAULT_ADDR
for i in range(MESH_MAX_CHILDREN + extra_child, 0, -1):
found_addr, new_addr = (False, via_node | (i << shift_val))
if new_addr == NETWORK_DEFAULT_ADDR:
continue
for n_id, addr in self.dhcp_dict.items():
# print(i, "(in _addr_dict) ID:", n_id, "ADDR:", oct(addr))
if addr == new_addr and n_id != self.frame_buf.header.reserved:
found_addr = True
break
if not found_addr:
self.set_address(self.frame_buf.header.reserved, new_addr)
self.frame_buf.header.message_type = MESH_ADDR_RESPONSE
self.frame_buf.header.to_node = self.frame_buf.header.from_node
self.frame_buf.message = struct.pack("<H", new_addr)
if self.frame_buf.header.from_node != NETWORK_DEFAULT_ADDR:
if not self._write(self.frame_buf.header.to_node, TX_NORMAL):
self._write(self.frame_buf.header.to_node, TX_NORMAL)
else:
self._write(self.frame_buf.header.to_node, TX_PHYSICAL)
break
# print("address", new_addr, "not allocated.")
[docs]
def set_address(
self, node_id: int, node_address: int, search_by_address: bool = False
):
"""Set/change a `node_id` and `node_address` pair in the `dhcp_dict`."""
for n_id, addr in self.dhcp_dict.items():
if not search_by_address:
if n_id == node_id:
self.dhcp_dict[n_id] = node_address
return
else:
if addr == node_address:
del self.dhcp_dict[n_id]
self.dhcp_dict[node_id] = node_address
return
self.dhcp_dict[node_id] = node_address
[docs]
def save_dhcp(self, filename: str = "dhcplist.json", as_bin: bool = False):
"""Save the `dhcp_dict` to a JSON file (meant for master nodes only)."""
with open(filename, "wb") as json_file:
# This throws an OSError if file system is read-only. ALL MCU boards
# running CircuitPython firmware (not RPi) have read-only file system.
if json is not None and not as_bin:
json_file.write(
json.dumps(self.dhcp_dict, indent=2).encode(encoding="utf-8")
)
elif as_bin:
for _id, _addr in self.dhcp_dict.items():
json_file.write(bytes([_id, 0])) # pad id w/ 0 for mem alignment
json_file.write(struct.pack("<H", _addr))
[docs]
def load_dhcp(self, filename: str = "dhcplist.json", as_bin: bool = False):
"""Load the `dhcp_dict` from a JSON file (meant for master nodes only)."""
with open(filename, "rb") as json_file:
if json is not None and not as_bin:
temp_dict: Dict[str, int] = json.load(json_file)
# convert keys from `str` to `int`
for n_id, addr in temp_dict.items():
self.set_address(int(n_id), addr, True)
elif as_bin:
buffer = json_file.read()
for i in range(int(len(buffer) / 4)):
index = i * 4
self.set_address(
buffer[index], # skip index + 1 as it's only used for padding
struct.unpack("<H", buffer[index + 2 : index + 4])[0],
)
def print_details(self, dump_pipes: bool = False, network_only: bool = False):
"""See RF24.print_details() and Shared Networking API docs"""
super().print_details(False, network_only)
if not self._id and self.dhcp_dict: # only on master node
print("DHCP List:\n ID\tAddress\n ---\t-------")
for n_id, addr in self.dhcp_dict.items():
print(" {}\t{}".format(n_id, oct(addr)))
if dump_pipes:
self._rf24.print_pipes()
[docs]
def lookup_address(self, node_id: Optional[int] = None) -> int:
"""Convert a node's unique ID number into its corresponding
:ref:`Logical Address <Logical Address>`."""
if not node_id:
return 0
if self._addr == NETWORK_DEFAULT_ADDR:
return -2
if not self._id:
return self._get_address(node_id, MESH_ADDR_LOOKUP)
return self._lookup_2_master(node_id, MESH_ADDR_LOOKUP)
[docs]
def lookup_node_id(self, address: Optional[int] = None) -> int:
"""Convert a node's :ref:`Logical Address <Logical Address>` into its
corresponding unique ID number."""
if not address:
return self._id if address is None else 0
if self._addr == NETWORK_DEFAULT_ADDR:
return -2
if not self._addr:
return self._get_address(address, MESH_ID_LOOKUP)
return self._lookup_2_master(address, MESH_ID_LOOKUP)
def _get_address(self, number: int, lookup_type: int) -> int:
"""Helper for get_address() and lookup_node_id()"""
for n_id, addr in self.dhcp_dict.items():
if lookup_type == MESH_ID_LOOKUP and addr == number:
return n_id
if lookup_type == MESH_ADDR_LOOKUP and n_id == number:
return addr
return -2