My Micro Python Distribution
Fleetingmy micro python distribution
helpers
import binascii
import json
import gc
import requests
def b64decode(content):
return binascii.a2b_base64(content)
class Event:
def fire(self, name, **json):
requests.put(f"http://home/consul/v1/event/fire/iot/{name}", json=json)
def list(self, name, index=None, wait=None):
url = f"http://home/consul/v1/event/list?name=iot/{name}"
if index:
url += f"&index={index}"
if wait:
url += f"&wait={wait}s"
resp = requests.get(url)
if resp.status_code // 100 != 2:
raise Exception(resp)
content = resp.json()
index = resp.headers["X-Consul-Index"]
return index, content
def walk(self, name, index=None, wait=3600 * 24, token=None, dump_old=False):
def payload(event):
payload = json.loads(b64decode(event["Payload"]))
return payload
events = self.list(name=name, index=index)
index = events[0]
if dump_old:
for event in events[1]:
yield payload(event)
while True:
new_events = self.list(name=name, wait=wait, index=events[0])
new_index = new_events[0]
if new_index == index:
print("nothing new")
continue
elif new_index < index:
# Reset the index if it goes backwards -> https://developer.hashicorp.com/consul/api-docs/features/blocking
index = 0
elif index <= 1:
# clients should sanity check that their index is at least 1 after
# each blocking response is handled (Sanity check index is greater
# than zero) -> https://developer.hashicorp.com/consul/api-docs/features/blocking
raise NotImplementedError()
else:
index = new_index
events = new_events
# it repeats the old events. Only the last one is of interest
yield payload(events[1][-1])
class KV:
def get(self, path, index=None, wait=None):
url = f"http://home/consul/v1/kv/iot/{path}?"
args = []
if index not in (True, None):
args.append(f"index={index}")
if wait:
args.append(f"wait={wait}s")
if args:
url += "&".join(args)
print(f"getting: {url}")
resp = requests.get(url)
newindex = resp.headers["X-Consul-Index"]
if resp.status_code == 404:
value = None
else:
value = resp.json()[-1]["Value"]
value = b64decode(value)
if index:
return newindex, value
else:
return value
def follow(self, path, current=True, wait=None):
index, value = self.get(path, index=True)
if current:
yield value
while True:
index, value = self.get(path, index=index, wait=wait)
yield value
class KVAsync:
def __init__(self):
self.path = None
self.index = None
self.wait = None
self.current = None
async def get(self, path, index=None, wait=None):
print(f"A {gc.mem_free()}")
import asynciohttp
print(f"A2 {gc.mem_free()}")
gc.collect()
print(f"A3 {gc.mem_free()}")
url = f"http://home/consul/v1/kv/iot/{path}?"
args = []
if index not in (True, None):
args.append(f"index={index}")
if wait:
args.append(f"wait={wait}s")
if args:
url += "&".join(args)
print(f"getting: {url}")
print(f"B {gc.mem_free()}")
async with asynciohttp.ClientSession() as session:
print(f"C {gc.mem_free()}")
async with session.get(url) as resp:
print(f"D {gc.mem_free()}")
newindex = resp.headers["X-Consul-Index"]
if resp.status == 404:
value = None
else:
data = await resp.json()
print(f"E {gc.mem_free()}")
value = data[-1]["Value"]
value = b64decode(value)
if index:
return newindex, value
else:
return value
def setup_follow(self, path, current=True, wait=None):
self.path = path
self.current = current
self.wait = wait
self.index = None
async def next(self):
if self.index is None:
self.index, value = await self.get(self.path, index=True)
if self.current:
return value
else:
return await self.next()
else:
self.index, value = await self.get(self.path, index=self.index, wait=self.wait)
return value
import time
import network
import requests
iothelper = b'\xc4[\xbeb\xc2\xdb'
class WiFiConnectionTimeout(Exception):
pass
class WiFi:
def __init__(self, progress=None, connect_timeout=30):
self.wlan = network.WLAN(network.STA_IF)
self.wasconnected = False
self.progress = progress
self.connect_timeout = connect_timeout
def __enter__(self):
if self.wlan.isconnected():
self.wasconnected = True
print("Noop, most likely a nested call")
return
print("Connecting to wifi")
self.wlan.active(True)
self.wlan.connect()
if self.progress:
p = self.progress()
timeout = self.connect_timeout
while True:
timeout = timeout - 1
if timeout <= 0:
raise WiFiConnectionTimeout()
if self.wlan.isconnected():
print("Connected to wifi")
return self.wlan
if self.progress:
next(p)
time.sleep(0.3)
def __exit__(self, exc_type=None, exc_value=None, traceback=None):
if isinstance(exc_value, KeyboardInterrupt):
print("Ctrl-c => Keep wifi on for debugging")
elif self.wasconnected:
print("Noop, don't disturb the nested call")
self.wasconnected = False
else:
print("Disconnecting from wifi")
# return
self.wlan.disconnect()
self.wlan.active(False)
def on(self):
self.__enter__()
def off(self):
self.wasconnected = False
self.__exit__()
def reset(self):
"""Make sure the wifi is at a coherent state.
Activating the STA and disconnecting prevent from ghost isconnected messages
from previous run before reset.
see https://docs.micropython.org/en/latest/library/espnow.html
"""
sta = network.WLAN(network.WLAN.IF_STA); sta.active(False)
ap = network.WLAN(network.WLAN.IF_AP); ap.active(False)
sta.active(True)
while not sta.active():
time.sleep(0.1)
sta.disconnect()
while sta.isconnected():
time.sleep(0.1)
return sta, ap
wifi = WiFi()
def setup_espnow(peers=[], async_=False):
if async_:
import aioespnow
e = aioespnow.AIOESPNow()
else:
import espnow
e = espnow.ESPNow()
sta, ap = wifi.reset()
sta.connect()
while not sta.isconnected():
print("Waiting for first connection")
time.sleep(1)
sta.disconnect()
# somehow, using the AP does not seem to do it anymore but I cannot understand why
# sta.active(False)
# ap.active(True)
# ap.config(essid="ESP_NOW_AP", hidden=True, channel=11, pm=network.WLAN.PM_POWERSAVE)
e.active(True)
for peer in peers:
e.add_peer(peer)
return e, sta, ap
def post(*args, **kwargs):
with wifi:
return requests.post(*args, **kwargs)
def ntfy(message, title="mp", channel="n", priority="low"):
print(f"ntfy: {message}")
with wifi:
from conf import BOARD
res = requests.post(
f"http://home:9705/{channel}",
headers={"title": f"{BOARD}: {title}", "priority": priority},
data=message)
if res.status_code // 100 != 2:
raise Exception(f"ntfy failed: {res.status_code} {res.text}")
return res
import utime
def pulse(pin, duration_ms):
pin.on()
utime.sleep_ms(duration_ms)
pin.off()
class LatchingRelay:
def __init__(self, pin_a, pin_b):
self.pin_a = Pin(pin_a, Pin.OUT)
self.pin_b = Pin(pin_b, Pin.OUT)
self.pin_a.off()
self.pin_b.off()
def a(self):
pulse(self.pin_a, 4)
def b(self):
pulse(self.pin_b, 4)
import json
import os
import time
import machine
import utime
sysname = os.uname().sysname
esp8266 = sysname == 'esp8266'
esp32 = sysname == 'esp32'
rtc = machine.RTC()
esp8266_deep_sleep_max = 3600
# esp8266_deep_sleep_max = 10
deep_sleep_context_version = 0
def dump_exception(e, path='error.log'):
import io
import sys
buf = io.StringIO()
sys.print_exception(e, buf)
with open(path, 'a+') as f:
f.write(f'{time.time()}: ')
f.write(buf.getvalue())
return buf.getvalue()
def deep_sleep_handler():
if not esp8266:
print("Not an esp8266, I don't need a complicated stuff to handle deep sleep")
return
mem = rtc.memory()
# beware that hard reseting while in deep sleep will still cause a DEEPSLEEP_RESET
if machine.reset_cause() != machine.DEEPSLEEP_RESET:
print("Normal reset, clearing deep sleep memory to avoid mistakenly resuming a sleep triggered by a previous algorithm")
rtc.memory('')
return
if "resetdeepsleep" in os.listdir():
print("New algorithm uploaded, reseting the deep sleep memory")
rtc.memory('')
os.unlink("resetdeepsleep")
return
if mem and machine.reset_cause() == machine.DEEPSLEEP_RESET:
print("Woke from deep sleep with memory:", mem)
try:
context = json.loads(mem)
except ValueError:
print("Invalid deep sleep context, continuing execution")
rtc.memory('')
return
if context.get("version") != deep_sleep_context_version:
print("Incompatible deep sleep context version, continuing execution")
rtc.memory('')
return
now = utime.mktime(utime.gmtime())
wake_on = context["wake_on"]
remaining = wake_on - now
threshold = 5
if remaining > threshold:
print(f"now ({now}) < target ({wake_on}). Going back to deep sleep for remaining {remaining}s")
# the rtc of the esp8266 sucks at keeping a good time sync. It tends to wake too early. Let's compensate a bit
deep_sleep(remaining)
elif remaining > 0:
print(f"Almost reached target wake time, don't waste power in a {threshold}s deep sleep")
time.sleep(remaining)
rtc.memory('')
else:
print(f"No remaining sleep. Target missed by {remaining}s, continuing execution")
rtc.memory('')
def deep_sleep(seconds):
if esp8266:
# the esp8266 sucks at keeping a good deep sleep, let's help it a little
return esp8266_deep_sleep(int(seconds * 1.3))
elif esp32:
machine.deepsleep(seconds * 1000)
else:
raise NotImplementedError()
def esp8266_deep_sleep(seconds):
rtc.irq(trigger=rtc.ALARM0, wake=machine.DEEPSLEEP)
print("Going to deep sleep for", seconds, "seconds")
if seconds > esp8266_deep_sleep_max:
now = utime.mktime(utime.gmtime())
wake_on = now + seconds
seconds = esp8266_deep_sleep_max
context = {"version": deep_sleep_context_version, "wake_on": wake_on}
rtc.memory(json.dumps(context))
print(f"recording to wake in {esp8266_deep_sleep_max}s to resume sleeping after")
overhead = 1 # expected time to process the deep sleep
# overhead = 2
rtc.alarm(rtc.ALARM0, (seconds - overhead) * 1000)
machine.deepsleep()
def runner(debugpin=None, debugtime=5):
def decorator(func):
if debugpin and debugpin.value() == 1:
print(f"Debug pin is high, skipping deep sleep handler, waiting for {debugtime}s instead")
time.sleep(debugtime)
else:
deep_sleep_handler()
try:
res = func()
if hasattr(res, "__next__"): # pre coroutine implem
import asyncio
asyncio.run(res)
except Exception as e:
from conf import BOARD
value = dump_exception(e)
err = f'Exception in main: {value}'
print(err)
try:
print("Notifying the error")
from comm import ntfy
ntfy(err, title=f"{BOARD}: mp exception", priority="high")
except Exception as ne:
print("Failed to notify exception:", ne)
raise e
return decorator
def lowpower():
machine.freq(80000000)
if hasattr(machine, 'sleep_type'):
machine.sleep_type(machine.SLEEP_LIGHT)
asyncio http
code taken from https://github.com/micropython/micropython-lib/
utils
(let (
(result (and (not (s-equals-p block "")) `(("main.py" . ,(konix/org-babel-ipfa-block block file)))))
)
(setq result (append result `(("conf.py" . ,(with-temp-buffer
(insert "# Configuration for " name "\n")
(insert "BOARD = \"" name "\"\n")
(konix/ipfa-buffer nil))))))
(mapc (lambda (extra_file)
(let* (
(file_name (concat extra_file ".py"))
(file_content (konix/org-babel-ipfa-block extra_file))
)
(setq result
(append result
`((,file_name . ,file_content))
)
)
)
)
(append distro extra)
)
(json-encode result)
)
echo "${manifest}"
Mon Nov 3 12:20:08 CET 2025
clk mp remote install https://ipfs.konubinix.eu/p/bafkreifhphvejkizynvk3bjyp62kth2lni4w6hr5ufijcdg2xqlsqaukui
boot
# aioespnow module for MicroPython on ESP32 and ESP8266
# MIT license; Copyright (c) 2022 Glenn Moloney @glenn20
import asyncio
import espnow
# Modelled on the asyncio.Stream class (extmod/asyncio/stream.py)
# NOTE: Relies on internal implementation of asyncio.core (_io_queue)
class AIOESPNow(espnow.ESPNow):
# Read one ESPNow message
async def arecv(self):
yield asyncio.core._io_queue.queue_read(self)
return self.recv(0) # type: ignore[misc]
async def airecv(self):
yield asyncio.core._io_queue.queue_read(self)
return self.irecv(0) # type: ignore[misc]
async def asend(self, mac, msg=None, sync=None):
if msg is None:
msg, mac = mac, None # If msg is None: swap mac and msg
yield asyncio.core._io_queue.queue_write(self)
return self.send(mac, msg, sync) # type: ignore[misc]
# "async for" support
def __aiter__(self):
return self
async def __anext__(self):
return await self.airecv()
# This file is executed on every boot (including wake-boot from deepsleep)
import network
#import esp
#esp.osdebug(None)
import os, machine
#os.dupterm(None, 1) # disable REPL on UART(0)
import gc
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, key)
import webrepl
webrepl.start()
gc.collect()
# stores the webrepl password
Fri Nov 28 17:34:20 CET 2025
Writing into conf.py in mpremote
Writing into boot.py in mpremote
Writing into webrepl_cfg.py in mpremote
Writing into aioespnow.py in mpremote
Writing into aioespnow.mpy in mpremote
Writing into asynciohttp.mpy in mpremote
Writing into error.log in mpremote
Writing into resetdeepsleep in mpremote
hello world
Wed Nov 26 19:26:45 CET 2025
Checking the current installation
Reading install.json from mpremote
Writing into main.py in mpremote
Writing into install.json in mpremote
Writing into error.log in mpremote
Writing into resetdeepsleep in mpremote
print("hello")
Fri Nov 28 17:34:38 CET 2025
Checking the current installation
Reading install.json from mpremote
Writing into main.py in mpremote
Writing into conf.py in mpremote
Writing into helpers.py in mpremote
Writing into state.py in mpremote
Writing into comm.py in mpremote
Writing into control.py in mpremote
Writing into aioespnow.mpy in mpremote
Writing into asynciohttp.mpy in mpremote
Writing into install.json in mpremote
Writing into error.log in mpremote
Writing into resetdeepsleep in mpremote
tests
exception handling
from helpers import deep_sleep, deep_sleep_handler, rtc, runner
@runner()
def run():
1 / 0 # trigger a ZeroDivisionError
Mon Nov 3 12:23:03 CET 2025
clk mp remote install --reset https://ipfs.konubinix.eu/p/bafkreifhphvejkizynvk3bjyp62kth2lni4w6hr5ufijcdg2xqlsqaukui
deepsleep
from helpers import deep_sleep, deep_sleep_handler, rtc, runner
import time
@runner()
def run():
print(rtc.memory())
print("hello")
time.sleep(2)
print("going into deep sleep")
deep_sleep(30)