I have a UDP destination working and sending updates to my display. The crazy colors are for development only.
Here is the module I wrote. Don’t know if everything is necessary as it is still very much in development. Sending a burst of UPD messages causes dropped packets. The “time.sleep” after the send seemed to have fix this, at least temporarily.
#
import network
import socket
import select
import json
import time
# import enviro firmware, this will trigger provisioning if needed
import enviro
#-------------------------------------------------------------------------------
# UpdateUDPClient
#-------------------------------------------------------------------------------
class UpdateUDPClient :
def __init__ (self ,
server_ip = "192.168.19.100" ,
port = 5010
) :
self.server_ip = server_ip
self.port = port
self.wlan = network.WLAN(network.STA_IF)
self.udp_socket = socket.socket (socket.AF_INET, socket.SOCK_DGRAM)
self.udp_socket.bind (('0.0.0.0', self.port))
self.poller = select.poll()
self.poller.register(self.udp_socket, select.POLLOUT) # event 1
self.id_prefix = enviro.config.nickname + "."
self.reading_ids = {
"temperature" : "temp_c" ,
"humidity" : "hum" ,
"aqi" : "aqi"
}
def send_update (self, ordered_dict) :
self.check_connection ()
updates = {}
for reading_id, reading_value in ordered_dict.items() :
if reading_id in self.reading_ids :
updates[self.reading_ids[reading_id]] = reading_value
if "temp_c" in updates :
updates ["temp_f"] = "{: 3.1f}".format ((updates ["temp_c"] * 1.8) + 32)
updates ["temp_c"] = "{: 3.1f}".format (updates ["temp_c"])
if "hum" in updates :
updates ["hum"] = "{: 3.1f}".format (updates ["hum"])
if "aqi" in updates :
updates ["aqi"] = round (updates ["aqi"], 1)
reading_dict = None
for reading_id, reading_value in updates.items () :
reading_dict = {
"jsonrpc" : "2.0" ,
"method" : "update_area" ,
"params" : {
"area" : self.id_prefix + reading_id ,
"text" : str (reading_value)
}
}
#print (reading_dict)
#enviro.logging.info ("UDP >", reading_dict)
#self.udp_socket.sendto (bytes(json.dumps (reading_dict), "utf-8"), (self.server_ip, self.port))
self.send_message (reading_dict)
reading_dict = {
"jsonrpc" : "2.0" ,
"method" : "update_area" ,
"params" : {
"area_id" : self.id_prefix + "time"
}
}
#enviro.logging.info ("UDP >", reading_dict)
#self.udp_socket.sendto (bytes(json.dumps (reading_dict), "utf-8"), (self.server_ip, self.port))
self.send_message (reading_dict)
def send_message (self,
message_dict) :
#enviro.logging.info ("UDP >", message_dict)
'''
while True :
events = self.poller.poll ()
print (events)
for event in events :
if event [1] == 1 :
break
'''
self.udp_socket.sendto (bytes (json.dumps (message_dict), "utf-8"), (self.server_ip, self.port))
time.sleep_ms (500)
def check_connection (self) :
if self.wlan.isconnected () :
#print ("Already Connected")
return True
self.wlan.active (True)
self.wlan.connect ("MY_WIFI", "MY_PW")
connect_max_ms = time.ticks_add (time.ticks_ms (), (10 * 1000))
while time.ticks_diff (connect_max_ms, time.ticks_ms ()) > 0 :
if self.wlan.isconnected () :
print ("Connected")
return True
enviro.logging.info ("Connect failed")
return False
## UpdateUDPClient ##
if __name__ == "__main__" :
client = UpdateUDPClient ()
reading = enviro.get_sensor_readings()
client.send_update (reading)
'''
client.send_update (({'temperature': 19.93,
'humidity': 39.96684,
'pressure': 998.76,
'gas_resistance': 65373,
'aqi': 12.7,
'luminance': 5,
'color_temperature': 3799}))
'''
The main.py source with changes I made. The UPD client is called right after the the sensors are read.
# Enviro - wireless environmental monitoring and logging
#
# On first run Enviro will go into provisioning mode where it appears
# as a wireless access point called "Enviro <board type> Setup". Connect
# to the access point with your phone, tablet or laptop and follow the
# on screen instructions.
#
# The provisioning process will generate a `config.py` file which
# contains settings like your wifi username/password, how often you
# want to log data, and where to upload your data once it is collected.
#
# You can use enviro out of the box with the options that we supply
# or alternatively you can create your own firmware that behaves how
# you want it to - please share your setups with us! :-)
#
# Need help? check out https://pimoroni.com/enviro-guide
#
# Happy data hoarding folks,
#
# - the Pimoroni pirate crew
# uncomment the below two lines to change the amount of logging enviro will do
# from phew import logging
# logging.disable_logging_types(logging.LOG_DEBUG)
# Issue #117 where neeed to sleep on startup otherwis emight not boot
from time import sleep
sleep(0.5)
# import enviro firmware, this will trigger provisioning if needed
import enviro
import os
from update_udpclient import UpdateUDPClient
udp_update = UpdateUDPClient()
try:
# initialise enviro
enviro.startup()
# if the clock isn't set...
if not enviro.is_clock_set():
enviro.logging.info("> clock not set, synchronise from ntp server")
if not enviro.sync_clock_from_ntp():
# failed to talk to ntp server go back to sleep for another cycle
enviro.halt("! failed to synchronise clock")
# check disk space...
if enviro.low_disk_space():
# less than 10% of diskspace left, this probably means cached results
# are not getting uploaded so warn the user and halt with an error
# Issue #126 to try and upload if disk space is low
# is an upload destination set?
if enviro.config.destination:
enviro.logging.error("! low disk space. Attempting to upload file(s)")
# if we have enough cached uploads...
enviro.logging.info(f"> {enviro.cached_upload_count()} cache file(s) need uploading")
if not enviro.upload_readings():
enviro.halt("! reading upload failed")
else:
# no destination so go to sleep
enviro.halt("! low disk space")
# TODO this seems to be useful to keep around?
filesystem_stats = os.statvfs(".")
enviro.logging.debug(f"> {filesystem_stats[3]} blocks free out of {filesystem_stats[2]}")
# TODO should the board auto take a reading when the timer has been set, or wait for the time?
# take a reading from the onboard sensors
enviro.logging.debug(f"> taking new reading")
reading = enviro.get_sensor_readings()
udp_update.send_update (reading)
#enviro.logging.debug(reading)
#for key, value in reading.items():
#enviro.logging.debug(str(key) + "->" + str(value))
#enviro.logging.debug(reading["OrderedDict"]["Temperature"])
# here you can customise the sensor readings by adding extra information
# or removing readings that you don't want, for example:
#
# del readings["temperature"] # remove the temperature reading
#
# readings["custom"] = my_reading() # add my custom reading value
# is an upload destination set?
if enviro.config.destination:
# if so cache this reading for upload later
enviro.logging.debug(f"> caching reading for upload")
enviro.cache_upload(reading)
# if we have enough cached uploads...
if enviro.is_upload_needed():
enviro.logging.info(f"> {enviro.cached_upload_count()} cache file(s) need uploading")
if not enviro.upload_readings():
enviro.halt("! reading upload failed")
else:
enviro.logging.info(f"> {enviro.cached_upload_count()} cache file(s) not being uploaded. Waiting until there are {enviro.config.upload_frequency} file(s)")
else:
# otherwise save reading to local csv file (look in "/readings")
enviro.logging.debug(f"> saving reading locally")
enviro.save_reading(reading)
# go to sleep until our next scheduled reading
enviro.sleep()
# handle any unexpected exception that has occurred
except Exception as exc:
enviro.exception(exc)