Source code for circuitpython_nrf24l01.fake_ble

# The MIT License (MIT)
#
# Copyright (c) 2019 Brendan Doherty
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""Original research was done by `Dmitry Grinberg and his write-up can be
found here <http://dmitry.gr/index.php?r=05.Projects&proj=11.%20Bluetooth%20LE%20fakery>`_"""
from os import urandom
import struct
from .rf24 import RF24

[docs]def swap_bits(original): """This function reverses the bit order for a single byte.""" original &= 0xFF reverse = 0 for _ in range(8): reverse <<= 1 reverse |= original & 1 original >>= 1 return reverse
[docs]def reverse_bits(original): """This function reverses the bit order for an entire buffer protocol object.""" ret = bytearray(len(original)) for i, byte in enumerate(original): ret[i] = swap_bits(byte) return ret
[docs]def chunk(buf, data_type=0x16): """This function is used to pack data values into a block of data that make up part of the BLE payload per Bluetooth Core Specifications.""" return bytearray([len(buf) + 1, data_type & 0xFF]) + buf
[docs]def crc24_ble(data, deg_poly=0x65B, init_val=0x555555): """This function calculates a checksum of various sized buffers.""" crc = init_val for byte in data: crc ^= swap_bits(byte) << 16 for _ in range(8): if crc & 0x800000: crc = (crc << 1) ^ deg_poly else: crc <<= 1 crc &= 0xFFFFFF return reverse_bits((crc).to_bytes(3, "big"))
BLE_FREQ = (2, 26, 80) #: The BLE channel number is different from the nRF channel number.
[docs]class FakeBLE: """A class to implement BLE advertisements using the nRF24L01.""" def __init__(self, spi, csn, ce, spi_frequency=10000000): self._radio = RF24(spi, csn, ce, spi_frequency=spi_frequency) self._chan = 0 self._to_android = 0x42 self._show_dbm = False self._ble_name = None self._mac = urandom(6) with self: self._radio.dynamic_payloads = False self._radio.payload_length = 32 self._radio.data_rate = 1 self._radio.arc = 0 self._radio.address_length = 4 self._radio.open_rx_pipe(0, b"\x71\x91\x7D\x6B") self._radio.open_tx_pipe(b"\x71\x91\x7D\x6B") self._radio.auto_ack = False self._radio.crc = 0 self._radio.flush_rx() def __enter__(self): self._radio = self._radio.__enter__() return self def __exit__(self, *exc): self._show_dbm = False self._ble_name = None return self._radio.__exit__() @property def to_android(self): """A `bool` attribute to specify if advertisements should be compatible with Android smartphones.""" return self._to_android == 0x42 @to_android.setter def to_android(self, enable): self._to_android = 0x42 if enable else 0x40 @property def mac(self): """This attribute returns a 6-byte buffer that is used as the arbitrary mac address of the BLE device being emulated.""" return self._mac @mac.setter def mac(self, address): if address is None: self._mac = urandom(6) if isinstance(address, int): self._mac = (address).to_bytes(6, "little") elif isinstance(address, (bytearray, bytes)): self._mac = address if len(self._mac) < 6: self._mac += urandom(6 - len(self._mac)) @property def name(self): """The broadcasted BLE name of the nRF24L01.""" return self._ble_name @name.setter def name(self, n): if n is not None: if not isinstance(n, (bytes, bytearray)): raise ValueError("name must be a bytearray or bytes object.") if len(n) > (21 - self._show_dbm * 3): raise ValueError("name length exceeds maximum.") self._ble_name = n @property def show_pa_level(self): """If this attribute is `True`, the payload will automatically include the nRF24L01's pa_level in the advertisement.""" return bool(self._show_dbm) @show_pa_level.setter def show_pa_level(self, enable): if enable and len(self.name) > 16: raise ValueError("there is not enough room to show the pa_level.") self._show_dbm = bool(enable)
[docs] def hop_channel(self): """Trigger an automatic change of BLE compliant channels.""" self._chan += 1 if self._chan > 2: self._chan = 0 self._radio.channel = BLE_FREQ[self._chan]
[docs] def whiten(self, data): """Whitening the BLE packet data ensures there's no long repeatition of bits.""" data, coef = (bytearray(data), (self._chan + 37) | 0x40) for i, byte in enumerate(data): res, mask = (0, 1) for _ in range(8): if coef & 1: coef ^= 0x88 byte ^= mask mask <<= 1 coef >>= 1 data[i] = byte ^ res return data
def _make_payload(self, payload): """Assemble the entire packet to be transmitted as a payload.""" if self.available(payload) < 0: raise ValueError( "Payload length exceeds maximum buffer size by " "{} bytes".format(abs(self.available(payload))) ) name_length = (len(self.name) + 2) if self.name is not None else 0 pl_size = 9 + len(payload) + name_length + self._show_dbm * 3 buf = bytes([self._to_android, pl_size]) + self.mac buf += chunk(b"\x05", 1) pa_level = b"" if self._show_dbm: pa_level = chunk(struct.pack(">b", self._radio.pa_level), 0x0A) buf += pa_level if name_length: buf += chunk(self.name, 0x08) buf += payload buf += crc24_ble(buf) return buf
[docs] def available(self, hypothetical=b""): """This function will calculates how much length (in bytes) is available in the next payload.""" name_length = (len(self.name) + 2) if self.name is not None else 0 return 18 - name_length - self._show_dbm * 3 - len(hypothetical)
[docs] def advertise(self, buf=b"", data_type=0xFF): """This blocking function is used to broadcast a payload.""" if not isinstance(buf, (bytearray, bytes, list, tuple)): raise ValueError("buffer is an invalid format") payload = b"" if isinstance(buf, (list, tuple)): for b in buf: payload += b else: payload = chunk(buf, data_type) if buf else b"" payload = self._make_payload(payload) self._radio.send(reverse_bits(self.whiten(payload)))
@property def pa_level(self): """See :py:attr:`~circuitpython_nrf24l01.rf24.RF24.pa_level` for more details.""" return self._radio.pa_level @pa_level.setter def pa_level(self, value): self._radio.pa_level = value @property def channel(self): """The only allowed channels are those contained in the `BLE_FREQ` tuple.""" return self._radio.channel @channel.setter def channel(self, value): if value not in BLE_FREQ: raise ValueError( "nrf channel {} is not a valid BLE frequency".format(value) ) self._radio.channel = value @property def payload_length(self): """This attribute is best left at 32 bytes for all BLE operations.""" return self._radio.payload_length @payload_length.setter def payload_length(self, value): self._radio.payload_length = value @property def power(self): """See :py:attr:`~circuitpython_nrf24l01.rf24.RF24.power` for more details.""" return self._radio.power @power.setter def power(self, is_on): self._radio.power = is_on @property def is_lna_enabled(self): """See :py:attr:`~circuitpython_nrf24l01.rf24.RF24.is_lna_enabled` for more details.""" return self._radio.is_lna_enabled @property def is_plus_variant(self): """See :py:attr:`~circuitpython_nrf24l01.rf24.RF24.is_plus_variant` for more details.""" return self._radio.is_plus_variant
[docs] def interrupt_config(self, data_recv=True, data_sent=True): """See :py:func:`~circuitpython_nrf24l01.rf24.RF24.interrupt_config()` for more details. .. warning:: The :py:attr:`~circuitpython_nrf24l01.rf24.RF24.irq_df` attribute (and also this function's ``data_fail`` parameter) is not implemented for BLE operations.""" self._radio.interrupt_config(data_recv=data_recv, data_sent=data_sent)
@property def irq_ds(self): """See :py:attr:`~circuitpython_nrf24l01.rf24.RF24.irq_ds` for more details.""" return self._radio.irq_ds @property def irq_dr(self): """See :py:attr:`~circuitpython_nrf24l01.rf24.RF24.irq_dr` for more details.""" return self._radio.irq_dr
[docs] def clear_status_flags(self): """See :py:func:`~circuitpython_nrf24l01.rf24.RF24.clear_status_flags()` for more details.""" self._radio.clear_status_flags()
[docs] def update(self): """See :py:func:`~circuitpython_nrf24l01.rf24.RF24.update()` for more details.""" self._radio.update()
[docs] def what_happened(self, dump_pipes=False): """See :py:func:`~circuitpython_nrf24l01.rf24.RF24.what_happened()` for more details.""" self._radio.what_happened(dump_pipes=dump_pipes)
[docs]class ServiceData: """An abstract helper class to package specific service data using Bluetooth SIG defined 16-bit UUID flags to describe the data type.""" def __init__(self, uuid): self._type = struct.pack("<H", uuid) self._data = b"" @property def uuid(self): """This returns the 16-bit Service UUID as a `bytearray` in little endian. (read-only)""" return self._type @property def data(self): """The service's data. This is a `bytearray`, and its format is defined by relative Bluetooth Service Specifications (and GATT supplemental specifications).""" return self._data @data.setter def data(self, value): self._data = value @property def buffer(self): """Get the representation of the instantiated object as an immutable `bytes` object (read-only).""" return bytes(self._type + self.data)
[docs] def __len__(self): """For convenience, this class is compatible with python's builtin :py:func:`len()` method. In this context, this will return the length of the object (in bytes) as it would appear in the advertisement payload.""" return len(self._type) + len(self.data)
[docs]class TemperatureServiceData(ServiceData): """This derivitive of the `ServiceData` class can be used to represent temperature data values as a `float` value.""" def __init__(self): super().__init__(0x1809) @ServiceData.data.setter def data(self, value): value = struct.pack("<i", int(value * 100) & 0xFFFFFF) self._data = value[:3] + bytes([0xFE])
[docs]class BatteryServiceData(ServiceData): """This derivitive of the `ServiceData` class can be used to represent battery charge percentage as a 1-byte value.""" def __init__(self): super().__init__(0x180F) @ServiceData.data.setter def data(self, value): self._data = struct.pack(">B", value)
[docs]class UrlServiceData(ServiceData): """This derivitive of the `ServiceData` class can be used to represent URL data as a `bytes` value.""" def __init__(self): super().__init__(0xFEAA) self._type += bytes([0x10]) + struct.pack(">b", -25) @property def pa_level_at_1_meter(self): """The TX power level (in dBm) at 1 meter from the nRF24L01. This defaults to -25 (due to testing when broadcasting with 0 dBm) and must be a 1-byte signed `int`.""" return struct.unpack(">b", self._type[-1:])[0] @pa_level_at_1_meter.setter def pa_level_at_1_meter(self, value): self._type = self._type[:-1] + struct.pack(">b", int(value)) @property def uuid(self): return self._type[:2] @ServiceData.data.setter def data(self, value): value = value.replace("http://www.", "\x00") value = value.replace("https://www.", "\x01") value = value.replace("http://", "\x02") value = value.replace("https://", "\x03") value = value.replace(".com/", "\x00") value = value.replace(".org/", "\x01") value = value.replace(".edu/", "\x02") value = value.replace(".net/", "\x03") value = value.replace(".info/", "\x04") value = value.replace(".biz/", "\x05") value = value.replace(".gov/", "\x06") value = value.replace(".com", "\x07") value = value.replace(".org", "\x08") value = value.replace(".edu", "\x09") value = value.replace(".net", "\x0A") value = value.replace(".info", "\x0B") value = value.replace(".biz", "\x0C") self._data = value.replace(".gov", "\x0D").encode("utf-8")