Konubinix' opinionated web of thoughts

My Micro Python Distribution

Fleeting

my micro python distribution

helpers

import network
import time
import os
import json
import requests
import machine
import utime
from machine import Pin
import network
from espnow import ESPNow

sysname = os.uname().sysname
esp8266 = sysname == 'esp8266'
esp32 = sysname == 'esp32'

espnow = ESPNow()

iothelper = b'\xc4[\xbeb\xc2\xdb'

class WiFiConnectionTimeout(Exception):
    pass

class WiFi:
    def __init__(self, progress=None, connect_timeout=30):
        self.wlan = network.WLAN(network.STA_IF)
        self.wasconnected = False
        self.progress = progress
        self.connect_timeout = connect_timeout

    def __enter__(self):
        if self.wlan.isconnected():
            self.wasconnected = True
            print("Noop, most likely a nested call")
            return
        print("Connecting to wifi")
        self.wlan.active(True)
        self.wlan.connect()
        if self.progress:
            p = self.progress()
        timeout = self.connect_timeout
        while True:
            timeout = timeout - 1
            if timeout <= 0:
                raise WiFiConnectionTimeout()

            if self.wlan.isconnected():
                print("Connected to wifi")
                return self.wlan
            if self.progress:
                next(p)
            time.sleep(0.3)

    def __exit__(self, exc_type=None, exc_value=None, traceback=None):
        if isinstance(exc_value, KeyboardInterrupt):
            print("Ctrl-c => Keep wifi on for debugging")
        elif self.wasconnected:
            print("Noop, don't disturb the nested call")
            self.wasconnected = False
        else:
            print("Disconnecting from wifi")
            # return
            self.wlan.disconnect()
            self.wlan.active(False)

    def on(self):
        self.__enter__()

    def off(self):
        self.wasconnected = False
        self.__exit__()

    def reset(self):
      """Make sure the wifi is at a coherent state.

      Activating the STA and disconnecting prevent from ghost isconnected messages
      from previous run before reset.

      see https://docs.micropython.org/en/latest/library/espnow.html
      """
      sta = network.WLAN(network.WLAN.IF_STA); sta.active(False)
      ap = network.WLAN(network.WLAN.IF_AP); ap.active(False)
      sta.active(True)
      while not sta.active():
          time.sleep(0.1)
      sta.disconnect()
      while sta.isconnected():
          time.sleep(0.1)
      return sta, ap

wifi = WiFi()

def setup_espnow(peers=[]):
    sta, ap = wifi.reset()
    sta.connect()
    while not sta.isconnected():
      print("Waiting for first connection")
      time.sleep(1)
    sta.disconnect()
    # somehow, using the AP does not seem to do it anymore but I cannot understand why
    # sta.active(False)
    # ap.active(True)
    # ap.config(essid="ESP_NOW_AP", hidden=True, channel=11, pm=network.WLAN.PM_POWERSAVE)
    espnow.active(True)
    for peer in peers:
        espnow.add_peer(peer)
    return sta, ap

def post(*args, **kwargs):
    with wifi:
      return requests.post(*args, **kwargs)

def ntfy(message, title="mp", channel="n", priority="low"):
    with wifi:
        res = requests.post(
            f"http://home:9705/{channel}",
            headers={"title": title, "priority": priority},
            data=message)
        if res.status_code // 100 != 2:
              raise Exception(f"ntfy failed: {res.status_code} {res.text}")
        return res

rtc = machine.RTC()

def datetime_to_ts(datetime):
    # Convert RTC datetime tuple to timestamp

    import utime
    return utime.mktime((year, month, day, hour, minute, second, 0, 0))()

def pulse(pin, duration_ms):
    pin.on()
    utime.sleep_ms(duration_ms)
    pin.off()

class LatchingRelay:
    def __init__(self, pin_a, pin_b):
        self.pin_a = Pin(pin_a, Pin.OUT)
        self.pin_b = Pin(pin_b, Pin.OUT)
        self.pin_a.off()
        self.pin_b.off()

    def a(self):
        pulse(self.pin_a, 4)

    def b(self):
        pulse(self.pin_b, 4)


esp8266_deep_sleep_max = 3600
# esp8266_deep_sleep_max = 10

deep_sleep_context_version = 0


def dump_exception(e, path='error.log'):
    import sys
    import io
    buf = io.StringIO()
    sys.print_exception(e, buf)
    with open(path, 'a+') as f:
        f.write(f'{time.time()}: ')
        f.write(buf.getvalue())
    return buf.getvalue()



def deep_sleep_handler():
    if esp8266:
        print("Not an esp8266, I don't need a complicated stuff to handle deep sleep")
        return
    mem = rtc.memory()
    # beware that hard reseting while in deep sleep will still cause a DEEPSLEEP_RESET
    if machine.reset_cause() != machine.DEEPSLEEP_RESET:
          print("Normal reset, clearing deep sleep memory to avoid mistakenly resuming a sleep triggered by a previous algorithm")
          rtc.memory('')
          return
    if "resetdeepsleep" in os.listdir():
          print("New algorithm uploaded, reseting the deep sleep memory")
          rtc.memory('')
          os.unlink("resetdeepsleep")
          return
    if mem and machine.reset_cause() == machine.DEEPSLEEP_RESET:
        print("Woke from deep sleep with memory:", mem)
        try:
            context = json.loads(mem)
        except ValueError:
            print("Invalid deep sleep context, continuing execution")
            rtc.memory('')
            return
        if context.get("version") != deep_sleep_context_version:
            print("Incompatible deep sleep context version, continuing execution")
            rtc.memory('')
            return

        now = utime.mktime(utime.gmtime())
        wake_on = context["wake_on"]
        remaining = wake_on - now
        threshold = 5
        if remaining > threshold:
            print(f"now ({now}) < target ({wake_on}). Going back to deep sleep for remaining {remaining}s")
            # the rtc of the esp8266 sucks at keeping a good time sync. It tends to wake too early. Let's compensate a bit
            deep_sleep(remaining)
        elif remaining > 0:
            print(f"Almost reached target wake time, don't waste power in a {threshold}s deep sleep")
            time.sleep(remaining)
            rtc.memory('')
        else:
            print(f"No remaining sleep. Target missed by {remaining}s, continuing execution")
            rtc.memory('')

def deep_sleep(seconds):
    if esp8266:
        # the esp8266 sucks at keeping a good deep sleep, let's help it a little
        return esp8266_deep_sleep(int(seconds * 1.3))
    elif esp32:
        machine.deepsleep(seconds * 1000)
    else:
        raise NotImplementedError()

def esp8266_deep_sleep(seconds):
    rtc.irq(trigger=rtc.ALARM0, wake=machine.DEEPSLEEP)
    print("Going to deep sleep for", seconds, "seconds")
    if seconds > esp8266_deep_sleep_max:
        now = utime.mktime(utime.gmtime())
        wake_on = now + seconds
        seconds = esp8266_deep_sleep_max
        context = {"version": deep_sleep_context_version, "wake_on": wake_on}
        rtc.memory(json.dumps(context))
        print(f"recording to wake in {esp8266_deep_sleep_max}s to resume sleeping after")
    overhead = 1 # expected time to process the deep sleep
    # overhead = 2
    rtc.alarm(rtc.ALARM0, (seconds - overhead) * 1000)
    machine.deepsleep()

def runner(debugpin=None, debugtime=5):
    def decorator(func):
        if debugpin and debugpin.value() == 1:
            print(f"Debug pin is high, skipping deep sleep handler, waiting for {debugtime}s instead")
            time.sleep(debugtime)
        else:
            deep_sleep_handler()
        try:
            func()
        except Exception as e:
            from conf import BOARD
            value = dump_exception(e)
            err = f'Exception in main: {value}'
            print(err)
            try:
                print("Notifying the error")
                ntfy(err, title=f"{BOARD}: mp exception", priority="high")
            except Exception as ne:
                print("Failed to notify exception:", ne)
            raise e
    return decorator

def lowpower():
    machine.freq(80000000)
    if hasattr(machine, 'sleep_type'):
        machine.sleep_type(machine.SLEEP_LIGHT)

utils

(let (
      (result (and (not (s-equals-p block "")) `(("main.py" . ,(konix/org-babel-ipfa-block block file)))))
      )
  (setq result (append result `(("conf.py" . ,(with-temp-buffer
                          (insert "# Configuration for " name "\n")
                          (insert "BOARD = \"" name "\"\n")
                          (konix/ipfa-buffer nil))))))
  (mapc (lambda (extra_file)
          (let* (
                 (file_name (concat extra_file ".py"))
                 (file_content (konix/org-babel-ipfa-block extra_file))
                 )
            (setq result
                  (append result
                          `((,file_name . ,file_content))
                          )
                  )
            )
          )
        extra
        )
  (json-encode result)
  )

echo "${manifest}"
Mon Nov  3 12:20:08 CET 2025
clk mp remote install https://ipfs.konubinix.eu/p/bafkreifhphvejkizynvk3bjyp62kth2lni4w6hr5ufijcdg2xqlsqaukui

boot

# This file is executed on every boot (including wake-boot from deepsleep)
import network
#import esp
#esp.osdebug(None)
import os, machine
#os.dupterm(None, 1) # disable REPL on UART(0)
import gc

wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, key)

import webrepl
webrepl.start()
gc.collect()

# stores the webrepl password
Mon Nov 10 15:39:14 CET 2025
Writing into conf.py in mpremote
Writing into boot.py in mpremote
Writing into webrepl_cfg.py in mpremote
Writing into error.log in mpremote
Writing into resetdeepsleep in mpremote

print("hello world")
Mon Nov  3 12:51:26 CET 2025
Checking the current installation
Reading install.json from mpremote
Writing into main.py in mpremote
Writing into helpers.py in mpremote
Writing into install.json in mpremote
Writing into error.log in mpremote
Writing into resetdeepsleep in mpremote

tests

exception handling

from helpers import deep_sleep, deep_sleep_handler, rtc, runner

@runner()
def run():
    1 / 0  # trigger a ZeroDivisionError
Mon Nov  3 12:23:03 CET 2025
clk mp remote install --reset https://ipfs.konubinix.eu/p/bafkreifhphvejkizynvk3bjyp62kth2lni4w6hr5ufijcdg2xqlsqaukui

deepsleep

from helpers import deep_sleep, deep_sleep_handler, rtc, runner
import time

@runner()
def run():

    print(rtc.memory())

    print("hello")
    time.sleep(2)
    print("going into deep sleep")
    deep_sleep(30)

Notes linking here