An updated version with vertical scrolling text and a smaller outline on the font - this works well - use the config.py and mqtt_as.py from further up the thread. It should say Checking WiFi integrity.
Got reliable connection
Connecting to broker.
Connected to broker.
Messages then come in every three minutes or so…
from interstate75 import Interstate75, SWITCH_A, SWITCH_B
from mqtt_as import MQTTClient, config
from config import wifi_led, blue_led
import uasyncio as asyncio
import machine
import time
# Constants for controlling scrolling text
BACKGROUND_COLOUR = (0, 0, 0) # Black background to turn off the screen
HOLD_TIME = 2.0
BLANK_SCREEN_TIME = 5.0
BUFFER_PIXELS = 2 # Increased buffer to ensure full scroll off
SCROLL_SPEED_LEVEL = 7 # Set the desired scrolling speed level (1 to 10)
SCROLL_SPEED = 1 / SCROLL_SPEED_LEVEL # Convert to a delay in seconds
# State constants
STATE_PRE_SCROLL = 0
STATE_SCROLLING = 1
STATE_POST_SCROLL = 2
STATE_BLANK_SCREEN = 3
# Create Interstate75 object and graphics surface for drawing
i75 = Interstate75(display=Interstate75.DISPLAY_INTERSTATE75_64X32)
graphics = i75.display
width = i75.width
height = i75.height
# Display-related functions
def set_background(color):
graphics.set_pen(graphics.create_pen(*color))
graphics.clear()
def draw_text_with_outline_multiline(text, x, y, scale=1, text_color=(255, 255, 255), outline_color=(0, 0, 0), line_height=8):
graphics.set_font("bitmap8")
lines = text.split('\n')
for line_num, line in enumerate(lines):
y_offset = y + (line_num * line_height * scale)
# Draw the outline by drawing the text with minimal offsets
graphics.set_pen(graphics.create_pen(*outline_color))
offsets = [(-1, 0), (1, 0), (0, -1), (0, 1)]
for dx, dy in offsets:
graphics.text(line, x + dx, y_offset + dy, -1, scale)
# Draw the main text
graphics.set_pen(graphics.create_pen(*text_color))
graphics.text(line, x, y_offset, -1, scale)
# MQTT Message Subscription and Display
def sub_cb(topic, msg, retained):
global STATE_PRE_SCROLL, STATE_SCROLLING, STATE_POST_SCROLL, STATE_BLANK_SCREEN, width, height
print(f'Topic: "{topic.decode()}" Message: "{msg.decode()}" Retained: {retained}')
state = STATE_PRE_SCROLL
shift = 0
scroll = 0
DATA = msg.decode('utf-8')
MESSAGE = " " + DATA + " "
# Split the message into lines
words = MESSAGE.split()
lines = []
current_line = ""
for word in words:
if graphics.measure_text(current_line + " " + word, 1) <= width - 2 * (BUFFER_PIXELS + 1):
current_line += " " + word
else:
lines.append(current_line.strip())
current_line = word
if current_line:
lines.append(current_line.strip())
message_lines = "\n".join(lines)
num_lines = len(lines)
line_height = 8 # Font height for scale 1
last_time = time.ticks_ms()
while True:
time_ms = time.ticks_ms()
if state == STATE_PRE_SCROLL and time_ms - last_time > HOLD_TIME * 1000:
state = STATE_SCROLLING
last_time = time_ms
if state == STATE_SCROLLING and time_ms - last_time > SCROLL_SPEED * 1000:
scroll += 1
if scroll >= num_lines * line_height + height + BUFFER_PIXELS + 1:
state = STATE_POST_SCROLL
last_time = time.ticks_ms()
last_time = time.ticks_ms()
if state == STATE_POST_SCROLL and time.ticks_ms() - last_time > HOLD_TIME * 1000:
state = STATE_BLANK_SCREEN
last_time = time.ticks_ms()
if state == STATE_BLANK_SCREEN and time.ticks_ms() - last_time > BLANK_SCREEN_TIME * 1000:
set_background(BACKGROUND_COLOUR)
i75.update(graphics)
break
set_background(BACKGROUND_COLOUR)
if "Time" in MESSAGE:
set_background((255, 255, 0))
elif "News" in MESSAGE:
set_background((255, 0, 0))
elif "Weather" in MESSAGE:
set_background((0, 255, 255))
draw_text_with_outline_multiline(message_lines, x=BUFFER_PIXELS + 1, y=height - scroll + BUFFER_PIXELS, scale=1)
i75.update(graphics)
time.sleep(0.001)
# Demonstrate scheduler is operational.
async def heartbeat():
s = True
while True:
await asyncio.sleep_ms(500)
blue_led(s)
s = not s
async def wifi_han(state):
wifi_led(not state)
print('Wifi is ', 'up' if state else 'down')
await asyncio.sleep(1)
# If you connect with clean_session True, must re-subscribe (MQTT spec 3.1.2.4)
async def conn_han(client):
# MQTT Subscribe Topic
await client.subscribe('personal/ucfnaps/led/#', 1)
async def main(client):
try:
await client.connect()
except OSError:
print('Connection failed.')
machine.reset()
return
while True:
await asyncio.sleep(5)
# Define configuration
config['subs_cb'] = sub_cb
config['wifi_coro'] = wifi_han
config['connect_coro'] = conn_han
config['clean'] = True
# Set up client
MQTTClient.DEBUG = True # Optional
client = MQTTClient(config)
asyncio.create_task(heartbeat())
try:
asyncio.run(main(client))
finally:
client.close() # Prevent LmacRxBlk:1 errors
asyncio.new_event_loop()