Presto Are there any Blue Tooth examples?

I suppose the title says it all. I am looking to use Blue Tooth.

Further information is that ultimately I want to transmit MIDI data to an iPad using the screen touch sensors. But starting off with a simple demonstration Blue Tooth example will do.

Thanks for reading.

This might be of interest to you.
BlueTooth on Pi Pico W - WORKING! - Support - Pimoroni Buccaneers

Thanks, but I don’t believe that the Presto has a Pico W inside it. Correct me if I am wrong.

No, it doesn’t, but I do believe they both use the same RM2 Wifi?
Either way I was thinking it may give you a hint at how to do it.

I found some Bluetooth examples but I can’t find them again. I only saved one called Simple_Bluetooth_example, and when I run that I get an error message saying:-

File ā€œā€, line 7, in
ImportError: no module named ā€˜ble_advertising’

As I said I can’t find the other examples again, the first time they were in the Raspberry Pi Pico window just under the gallery entry, but they are not there anymore.

For reference this is the code I saved:

# This example demonstrates a UART periperhal.

import bluetooth
import random
import struct
import time
from ble_advertising import advertising_payload

from micropython import const

_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)

_FLAG_READ = const(0x0002)
_FLAG_WRITE_NO_RESPONSE = const(0x0004)
_FLAG_WRITE = const(0x0008)
_FLAG_NOTIFY = const(0x0010)

_UART_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX = (
    bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"),
    _FLAG_READ | _FLAG_NOTIFY,
)
_UART_RX = (
    bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"),
    _FLAG_WRITE | _FLAG_WRITE_NO_RESPONSE,
)
_UART_SERVICE = (
    _UART_UUID,
    (_UART_TX, _UART_RX),
)


class BLESimplePeripheral:
    def __init__(self, ble, name="mpy-uart"):
        self._ble = ble
        self._ble.active(True)
        self._ble.irq(self._irq)
        ((self._handle_tx, self._handle_rx),) = self._ble.gatts_register_services((_UART_SERVICE,))
        self._connections = set()
        self._write_callback = None
        self._payload = advertising_payload(name=name, services=[_UART_UUID])
        self._advertise()

    def _irq(self, event, data):
        # Track connections so we can send notifications.
        if event == _IRQ_CENTRAL_CONNECT:
            conn_handle, _, _ = data
            print("New connection", conn_handle)
            self._connections.add(conn_handle)
        elif event == _IRQ_CENTRAL_DISCONNECT:
            conn_handle, _, _ = data
            print("Disconnected", conn_handle)
            self._connections.remove(conn_handle)
            # Start advertising again to allow a new connection.
            self._advertise()
        elif event == _IRQ_GATTS_WRITE:
            conn_handle, value_handle = data
            value = self._ble.gatts_read(value_handle)
            if value_handle == self._handle_rx and self._write_callback:
                self._write_callback(value)

    def send(self, data):
        for conn_handle in self._connections:
            self._ble.gatts_notify(conn_handle, self._handle_tx, data)

    def is_connected(self):
        return len(self._connections) > 0

    def _advertise(self, interval_us=500000):
        print("Starting advertising")
        self._ble.gap_advertise(interval_us, adv_data=self._payload)

    def on_write(self, callback):
        self._write_callback = callback


def demo():
    ble = bluetooth.BLE()
    p = BLESimplePeripheral(ble)

    def on_rx(v):
        print("RX", v)

    p.on_write(on_rx)

    i = 0
    while True:
        if p.is_connected():
            # Short burst of queued notifications.
            for _ in range(3):
                data = str(i) + "_"
                print("TX", data)
                p.send(data)
                i += 1
        time.sleep_ms(100)


if __name__ == "__main__":
    demo()

These Maybe?
micropython/examples/bluetooth at master Ā· micropython/micropython

Thanks,

I down loaded all of those but I am not sure how to use them together.

When I ran the BLE_advertising.py I got a load too big error. So I changed the variable controlling this

#_ADV_MAX_PAYLOAD = const(31)
_ADV_MAX_PAYLOAD = const(38)

And it ran and gave me the output of

bytearray(b'\x02\x01\x06\x0c\tmicropython\x03\x03\x1a\x18\x11\x07\x9e\xca\xdc$\x0e\xe5\xa9\xe0\x93\xf3\xa3\xb5\x01\x00@n')
micropython
[UUID(0x181a), UUID('6e400001-b5a3-f393-e0a9-e50e24dcca9e')]

So what should I do with this output? I tried adding this before after and instead of the code I posted in a previous answer, but to no effect. I kept getting errors of one sort or another.

Any idea what to try next?

Sorry, can’t help you there. :(
I haven’t tried to do anything Blue Tooth related on a Pi or Pico.
@Tonygo2 or @bablokb might be able to help though.

@Grumpy_Mike

First question: do you understand the architecture of BLE? If not, you should search the net for a primer that gives you a basic understanding.

Once you know what a central, peripheral, service and so on is, you can write code that you actually understand and can use.

And I would not start with the creation of my own BLE-service as in one of the code-snippets you gave above. Start with something common like the NUS (Nordic UART-service.). You will find code, examples and usage scenario for well known services like the NUS.

1 Like

This pair worked when I wrote them. They may provide some insight.

# Save as BLE_server.py
# Modified to send random integers much faster
# Tony Goodhew 19th June 2023
import bluetooth
import random
import struct
import time
import machine
import ubinascii
from ble_advertising import advertising_payload
from micropython import const
from machine import Pin
import random ############################# EXTRA LINE ###########
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_INDICATE_DONE = const(20)

_FLAG_READ = const(0x0002)
_FLAG_NOTIFY = const(0x0010)
_FLAG_INDICATE = const(0x0020)

# org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)

# org.bluetooth.characteristic.temperature
_TEMP_CHAR = (
    bluetooth.UUID(0x2A6E),
    _FLAG_READ | _FLAG_NOTIFY | _FLAG_INDICATE,
)
_ENV_SENSE_SERVICE = (
    _ENV_SENSE_UUID,
    (_TEMP_CHAR,),
)

# org.bluetooth.characteristic.gap.appearance.xml
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
 
class BLETemperature:
    def __init__(self, ble, name=""):
        self._sensor_temp = machine.ADC(4)
        self._ble = ble
        self._ble.active(True)
        self._ble.irq(self._irq)
        ((self._handle,),) = self._ble.gatts_register_services((_ENV_SENSE_SERVICE,))
        self._connections = set()
        if len(name) == 0:
            name = 'Pico %s' % ubinascii.hexlify(self._ble.config('mac')[1],':').decode().upper()
        print('Sensor name %s' % name)
        self._payload = advertising_payload(
            name=name, services=[_ENV_SENSE_UUID]
        )
        self._advertise()

    def _irq(self, event, data):
        # Track connections so we can send notifications.
        if event == _IRQ_CENTRAL_CONNECT:
            conn_handle, _, _ = data
            self._connections.add(conn_handle)
        elif event == _IRQ_CENTRAL_DISCONNECT:
            conn_handle, _, _ = data
            self._connections.remove(conn_handle)
            # Start advertising again to allow a new connection.
            self._advertise()
        elif event == _IRQ_GATTS_INDICATE_DONE:
            conn_handle, value_handle, status = data
        
    def update_temperature(self, notify=False, indicate=False):
        # Write the local value, ready for a central to read.
        data = random.randint(0,9999)   # Generate a random number
#        print("write temp %.2f degc" % temp_deg_c);
        print(data,"Current value being sent");
        self._ble.gatts_write(self._handle, struct.pack("<h", data)) ###
        if notify or indicate:
            for conn_handle in self._connections:
                if notify:
                    # Notify connected centrals.
                    self._ble.gatts_notify(conn_handle, self._handle)
                if indicate:
                    # Indicate connected centrals.
                    self._ble.gatts_indicate(conn_handle, self._handle)
  
    def _advertise(self, interval_us=5000):  # was 500000
        self._ble.gap_advertise(interval_us, adv_data=self._payload)
''' 
    def _get_temp(self):   #################### Modified here ##########
        x = random.randint(0,9999)
        
        return float(x)
'''

def demo():
    ble = bluetooth.BLE()
    temp = BLETemperature(ble)
    counter = 0
    led = Pin('LED', Pin.OUT)
    while True:
        if counter % 2 == 0:
            temp.update_temperature(notify=True, indicate=False)
        led.toggle()
        time.sleep_ms(100)
        counter += 1

if __name__ == "__main__":
    demo()


############ SECOND PROGRAM BELOW ############
    
# Save as BLE_reader.py
# Finds and connects to a BLE temperature sensor but reads integers
# Modified to receive random integers much faster
# Tony Goodhew 19th June 2023
import bluetooth
import random
import struct
import time
import micropython
from ble_advertising import decode_services, decode_name
from micropython import const
from machine import Pin
 
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
_IRQ_GATTC_DESCRIPTOR_DONE = const(14)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
_IRQ_GATTC_INDICATE = const(19)

_ADV_IND = const(0x00)
_ADV_DIRECT_IND = const(0x01)
_ADV_SCAN_IND = const(0x02)
_ADV_NONCONN_IND = const(0x03)

# org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# org.bluetooth.characteristic.temperature
_TEMP_UUID = bluetooth.UUID(0x2A6E)
_TEMP_CHAR = (
    _TEMP_UUID,
    bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY,
)
_ENV_SENSE_SERVICE = (
    _ENV_SENSE_UUID,
    (_TEMP_CHAR,),
)

class BLETemperatureCentral:
    def __init__(self, ble):
        self._ble = ble
        self._ble.active(True)
        self._ble.irq(self._irq)
        self._reset()
        self._led = Pin('LED', Pin.OUT)

    def _reset(self):
        # Cached name and address from a successful scan.
        self._name = None
        self._addr_type = None
        self._addr = None

        # Cached value (if we have one)
        self._value = None

        # Callbacks for completion of various operations.
        # These reset back to None after being invoked.
        self._scan_callback = None
        self._conn_callback = None
        self._read_callback = None
         
        # Persistent callback for when new data is notified from the device.
        self._notify_callback = None

        # Connected device.
        self._conn_handle = None
        self._start_handle = None
        self._end_handle = None
        self._value_handle = None

    def _irq(self, event, data):
        if event == _IRQ_SCAN_RESULT:
            addr_type, addr, adv_type, rssi, adv_data = data
            if adv_type in (_ADV_IND, _ADV_DIRECT_IND):
                type_list = decode_services(adv_data)
                if _ENV_SENSE_UUID in type_list:
                    # Found a potential device, remember it and stop scanning.
                    self._addr_type = addr_type
                    self._addr = bytes(addr) # Note: addr buffer is owned by caller so need to copy it.
                    self._name = decode_name(adv_data) or "?"
                    self._ble.gap_scan(None)

        elif event == _IRQ_SCAN_DONE:
            if self._scan_callback:
                if self._addr:
                    # Found a device during the scan (and the scan was explicitly stopped).
                    self._scan_callback(self._addr_type, self._addr, self._name)
                    self._scan_callback = None
                else:
                    # Scan timed out.
                    self._scan_callback(None, None, None)

        elif event == _IRQ_PERIPHERAL_CONNECT:
            # Connect successful.
            conn_handle, addr_type, addr = data
            if addr_type == self._addr_type and addr == self._addr:
                self._conn_handle = conn_handle
                self._ble.gattc_discover_services(self._conn_handle)

        elif event == _IRQ_PERIPHERAL_DISCONNECT:
            # Disconnect (either initiated by us or the remote end).
            conn_handle, _, _ = data
            if conn_handle == self._conn_handle:
                # If it was initiated by us, it'll already be reset.
                self._reset()

        elif event == _IRQ_GATTC_SERVICE_RESULT:
            # Connected device returned a service.
            conn_handle, start_handle, end_handle, uuid = data
            if conn_handle == self._conn_handle and uuid == _ENV_SENSE_UUID:
                self._start_handle, self._end_handle = start_handle, end_handle
         
        elif event == _IRQ_GATTC_SERVICE_DONE:
            # Service query complete.
            if self._start_handle and self._end_handle:
                self._ble.gattc_discover_characteristics(
                    self._conn_handle, self._start_handle, self._end_handle)
        
            else:
                print("Failed to find environmental sensing service.")

        elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
            # Connected device returned a characteristic.
            conn_handle, def_handle, value_handle, properties, uuid = data
            if conn_handle == self._conn_handle and uuid == _TEMP_UUID:
                self._value_handle = value_handle

        elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
            # Characteristic query complete.
            if self._value_handle:
                # We've finished connecting and discovering device, fire the connect callback.
                if self._conn_callback:
                    self._conn_callback()
            else:
                print("Failed to find temperature characteristic.")

        elif event == _IRQ_GATTC_READ_RESULT:
            # A read completed successfully.
            conn_handle, value_handle, char_data = data
            if conn_handle == self._conn_handle and value_handle == self._value_handle:
                self._update_value(char_data)
                if self._read_callback:
                    self._read_callback(self._value)
                    self._read_callback = None

        elif event == _IRQ_GATTC_READ_DONE:
            # Read completed (no-op).
            conn_handle, value_handle, status = data
         
        elif event == _IRQ_GATTC_NOTIFY:
            # The ble_temperature.py demo periodically notifies its value.
            conn_handle, value_handle, notify_data = data
            if conn_handle == self._conn_handle and value_handle == self._value_handle:
                self._update_value(notify_data)
                if self._notify_callback:
                    self._notify_callback(self._value)

        # Returns true if we've successfully connected and discovered characteristics.
    def is_connected(self):
        return self._conn_handle is not None and self._value_handle is not None

    # Find a device advertising the environmental sensor service.
    def scan(self, callback=None):
        self._addr_type = None
        self._addr = None
        self._scan_callback = callback
        self._ble.gap_scan(2000, 30000, 30000)

    # Connect to the specified device (otherwise use cached address from a scan).
    def connect(self, addr_type=None, addr=None, callback=None):
        self._addr_type = addr_type or self._addr_type
        self._addr = addr or self._addr
        self._conn_callback = callback
        if self._addr_type is None or self._addr is None:
            return False
        self._ble.gap_connect(self._addr_type, self._addr)
        return True

    # Disconnect from current device.
    def disconnect(self):
        if not self._conn_handle:
            return
        self._ble.gap_disconnect(self._conn_handle)
        self._reset()

    # Issues an (asynchronous) read, will invoke callback with data.
    def read(self, callback):
        if not self.is_connected():
            return
        self._read_callback = callback
        try:
            self._ble.gattc_read(self._conn_handle, self._value_handle)
        except OSError as error:
            print(error)

    # Sets a callback to be invoked when the device notifies us.
    def on_notify(self, callback):
        self._notify_callback = callback

    def _update_value(self, data):
    # Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius.
        try:
            self._value = struct.unpack("<h", data)[0]   ####### Modified
        except OSError as error:
            print(error)

    def value(self):
        return self._value

def sleep_ms_flash_led(self, flash_count, delay_ms):
    self._led.off()
    while(delay_ms > 0):
        for i in range(flash_count):
            self._led.on()
            time.sleep_ms(100)
            self._led.off()
            time.sleep_ms(100)
            delay_ms -= 200
        time.sleep_ms(10) # was 1000
        delay_ms -= 1000
     
def print_temp(result):
    print(result, "Received")  ### Modified

def demo(ble, central):
    not_found = False

    def on_scan(addr_type, addr, name):
        if addr_type is not None:
            print("Found sensor: %s" % name)
            central.connect()
        else:
            nonlocal not_found
            not_found = True
            print("No sensor found.")

    central.scan(callback=on_scan)

    # Wait for connection...
    while not central.is_connected():
        time.sleep_ms(100)
        if not_found:
            return
 
    print("Connected")
 
    # Explicitly issue reads
    while central.is_connected():
        central.read(callback=print_temp)
        sleep_ms_flash_led(central, 1, 2) ## was 2000

    print("Disconnected")
 
if __name__ == "__main__":
    ble = bluetooth.BLE()
    central = BLETemperatureCentral(ble)
    while(True):
        demo(ble, central)
        sleep_ms_flash_led(central, 1, 10000)


You may find Kev's video offers some help:

 https://www.youtube.com/watch?v=HeGr8RnzVqk
2 Likes

A very nice example. It has everything you need to understand the architecture: UUIDs, service/characteristic, central/peripheral, client/server. Added bonus is that the central is the client.

All these terms sound complicated, but once you get used to them, they do make sense.

Thanks Tony,

The First example gives the error

Traceback (most recent call last):
  File "<stdin>", line 10, in <module>
ImportError: no module named 'ble_advertising' 

And the second example gives:-

Traceback (most recent call last):
  File "<stdin>", line 12, in <module>
ImportError: no module named 'ble_advertising'

It would seem that my problem is not knowing how to create a module called ble_advertising.

I am not trying to make my own BLE-services, I am just trying to get the given examples to work.

I have used in the past the Adafruit Feather nRF52832 example and got it to work.

This is my guiding light when trying to get a new concept working

# Helpers for generating BLE advertising payloads.

from micropython import const
import struct
import bluetooth

# Advertising payloads are repeated packets of the following form:
#   1 byte data length (N + 1)
#   1 byte type (see constants below)
#   N bytes type-specific data

_ADV_TYPE_FLAGS = const(0x01)
_ADV_TYPE_NAME = const(0x09)
_ADV_TYPE_UUID16_COMPLETE = const(0x3)
_ADV_TYPE_UUID32_COMPLETE = const(0x5)
_ADV_TYPE_UUID128_COMPLETE = const(0x7)
_ADV_TYPE_UUID16_MORE = const(0x2)
_ADV_TYPE_UUID32_MORE = const(0x4)
_ADV_TYPE_UUID128_MORE = const(0x6)
_ADV_TYPE_APPEARANCE = const(0x19)


# Generate a payload to be passed to gap_advertise(adv_data=...).
def advertising_payload(limited_disc=False, br_edr=False, name=None, services=None, appearance=0):
    payload = bytearray()

    def _append(adv_type, value):
        nonlocal payload
        payload += struct.pack("BB", len(value) + 1, adv_type) + value

    _append(
        _ADV_TYPE_FLAGS,
        struct.pack("B", (0x01 if limited_disc else 0x02) + (0x18 if br_edr else 0x04)),
    )

    if name:
        _append(_ADV_TYPE_NAME, name)

    if services:
        for uuid in services:
            b = bytes(uuid)
            if len(b) == 2:
                _append(_ADV_TYPE_UUID16_COMPLETE, b)
            elif len(b) == 4:
                _append(_ADV_TYPE_UUID32_COMPLETE, b)
            elif len(b) == 16:
                _append(_ADV_TYPE_UUID128_COMPLETE, b)

    # See org.bluetooth.characteristic.gap.appearance.xml
    if appearance:
        _append(_ADV_TYPE_APPEARANCE, struct.pack("<h", appearance))

    return payload


def decode_field(payload, adv_type):
    i = 0
    result = []
    while i + 1 < len(payload):
        if payload[i + 1] == adv_type:
            result.append(payload[i + 2 : i + payload[i] + 1])
        i += 1 + payload[i]
    return result


def decode_name(payload):
    n = decode_field(payload, _ADV_TYPE_NAME)
    return str(n[0], "utf-8") if n else ""


def decode_services(payload):
    services = []
    for u in decode_field(payload, _ADV_TYPE_UUID16_COMPLETE):
        services.append(bluetooth.UUID(struct.unpack("<h", u)[0]))
    for u in decode_field(payload, _ADV_TYPE_UUID32_COMPLETE):
        services.append(bluetooth.UUID(struct.unpack("<d", u)[0]))
    for u in decode_field(payload, _ADV_TYPE_UUID128_COMPLETE):
        services.append(bluetooth.UUID(u))
    return services


def demo():
    payload = advertising_payload(
        name="micropython",
        services=[bluetooth.UUID(0x181A), bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")],
    )
    print(payload)
    print(decode_name(payload))
    print(decode_services(payload))


if __name__ == "__main__":
    demo()

See if this helps.

Thanks.

Well just like before I got this output

bytearray(b'\x02\x01\x06\x0c\tmicropython\x03\x03\x1a\x18\x11\x07\x9e\xca\xdc$\x0e\xe5\xa9\xe0\x93\xf3\xa3\xb5\x01\x00@n')
micropython
[UUID(0x181a), UUID('6e400001-b5a3-f393-e0a9-e50e24dcca9e')]

So still don’t know what to do with this. It still produces the same errors as I had before.

I have been trying to upgrade the software and using the ExWiFi app. But I think I may have screwed something because the word clock will not connect. I needed to reenter the secrets data because that was wiped but on saving it the word clock still would not connect. I get the error message ā€œUnable to get time Check your network try againā€

I seem to be going backwards here!

But as I say I am still no closer to knowing how to chain these results together.

Thanks again.

Just found this:

Raspberry Pi Pico W: Bluetooth Low Energy (BLE) with MicroPython | Random Nerd Tutorials

Hope it helps.

Thanks.

However this is for a PicoW and I am not sure how relevant this is to the Presto.

I have another thread open about my specific problem about the software upgrade trashing my whole WI-FI system. At

Upgrade bricked my WI-FI

Which has now been solved. Basically it was down to my dyslexia.

Now solved

So back up and running with the latest version of software, v0.1.0

I still get

MPY: soft reboot
bytearray(b’\x02\x01\x06\x0c\tmicropython\x03\x03\x1a\x18\x11\x07\x9e\xca\xdc$\x0e\xe5\xa9\xe0\x93\xf3\xa3\xb5\x01\x00@n’)
micropython
[UUID(0x181a), UUID(ā€˜6e400001-b5a3-f393-e0a9-e50e24dcca9e’)]

From running that last example you sent (posts don’t seem to be numbered on this forum, is this right?)

But I still do not know what to do with this information in order to run one of the Blue Tooth examples.