Konubinix' opinionated web of thoughts

My Micro Python Distribution

Fleeting

my micro python distribution

compiling to .mpy

The deploy script supports on-demand compilation of .py files to .mpy bytecode using mpy-cross. This reduces RAM usage on the device and speeds up boot time. But it takes quite some amount of RAM and time to build.

usage

By default, compilation is disabled. To enable it, add compile“true”= to the #+call: directive:

# Deploy without compilation (default)
#+call: deploy(device="iotheart", manifest=manifest(block="main"))

# Deploy WITH mpy compilation (ESP8266)
#+call: deploy(device="iotheart", manifest=manifest(block="main"), compile="true")

# Deploy with compilation for ESP32
#+call: deploy(device="myesp32", manifest=manifest(block="main"), compile="true", march="xtensawin")

architecture flags

Device march value
ESP8266 xtensa (default)
ESP32 xtensawin
ESP32-S2 xtensawin
ESP32-S3 xtensawin
ESP32-C3 rv32imc
RP2040 armv6m

excluded files

The following files are NOT compiled (kept as .py):

  • boot.py - required by MicroPython boot sequence
  • conf.py - configuration file

how it works

  1. .py files from the manifest are staged in nomad/docker/mpy/
  2. Earthly runs mpy-cross via the +micropython-mpy target
  3. Compiled .mpy files replace .py entries in the manifest
  4. Temporary files are cleaned up

helpers

import binascii
import json
import gc
import requests

def b64decode(content):
    return binascii.a2b_base64(content)

class Event:
    def fire(self, name, **json):
        requests.put(f"http://home/consul/v1/event/fire/iot/{name}", json=json)

    def list(self, name, index=None, wait=None):
        url = f"http://home/consul/v1/event/list?name=iot/{name}"
        if index:
            url += f"&index={index}"
        if wait:
            url += f"&wait={wait}s"
        resp = requests.get(url)
        if resp.status_code // 100 != 2:
            raise Exception(resp)
        content = resp.json()
        index = resp.headers["X-Consul-Index"]
        return index, content

    def walk(self, name, index=None, wait=3600 * 24, token=None, dump_old=False):
        def payload(event):
            payload = json.loads(b64decode(event["Payload"]))
            return payload

        events = self.list(name=name, index=index)
        index = events[0]

        if dump_old:
            for event in events[1]:
                yield payload(event)

        while True:
            new_events = self.list(name=name, wait=wait, index=events[0])
            new_index = new_events[0]

            if new_index == index:
                print("nothing new")
                continue
            elif new_index < index:
                # Reset the index if it goes backwards -> https://developer.hashicorp.com/consul/api-docs/features/blocking
                index = 0
            elif index <= 1:
                # clients should sanity check that their index is at least 1 after
                # each blocking response is handled (Sanity check index is greater
                # than zero) -> https://developer.hashicorp.com/consul/api-docs/features/blocking
                raise NotImplementedError()
            else:
                index = new_index

            events = new_events
            # it repeats the old events. Only the last one is of interest
            yield payload(events[1][-1])

class KV:
    def get(self, path, index=None, wait=None):
        url = f"http://home/consul/v1/kv/iot/{path}?"
        args = []
        if index not in (True, None):
            args.append(f"index={index}")
        if wait:
            args.append(f"wait={wait}s")
        if args:
            url += "&".join(args)
        print(f"getting: {url}")
        resp = requests.get(url)
        newindex = resp.headers["X-Consul-Index"]
        if resp.status_code == 404:
            value = None
        else:
            value = resp.json()[-1]["Value"]
            value = b64decode(value)
        if index:
            return newindex, value
        else:
            return value

    def follow(self, path, current=True, wait=None):
        index, value = self.get(path, index=True)
        if current:
            yield value
        while True:
            index, value = self.get(path, index=index, wait=wait)
            yield value

class KVAsync:

    def __init__(self):
        self.path = None
        self.index = None
        self.wait = None
        self.current = None

    async def get(self, path, index=None, wait=None):
        import asynciohttp
        gc.collect()
        url = f"http://home/consul/v1/kv/iot/{path}?"
        args = []
        if index not in (True, None):
            args.append(f"index={index}")
        if wait:
            args.append(f"wait={wait}s")
        if args:
            url += "&".join(args)
        print(f"getting: {url}")
        async with asynciohttp.ClientSession() as session:
            async with session.get(url) as resp:
                newindex = resp.headers["X-Consul-Index"]
                if resp.status == 404:
                    value = None
                else:
                    data = await resp.json()
                    value = data[-1]["Value"]
                    value = b64decode(value)
        if index:
            return newindex, value
        else:
            return value

    def setup_follow(self, path, current=True, wait=None):
        self.path = path
        self.current = current
        self.wait = wait
        self.index = None

    async def next(self):
        if self.index is None:
            self.index, value = await self.get(self.path, index=True)
            if self.current:
                return value
            else:
                return await self.next()
        else:
            self.index, value = await self.get(self.path, index=self.index, wait=self.wait)
            return value

import time

import network
import requests
iothelper = b'\xc4[\xbeb\xc2\xdb'

class WiFiConnectionTimeout(Exception):
    pass

class WiFi:
    def __init__(self, progress=None, connect_timeout=30):
        self.wlan = network.WLAN(network.STA_IF)
        self.wasconnected = False
        self.progress = progress
        self.connect_timeout = connect_timeout

    def __enter__(self):
        if self.wlan.isconnected():
            self.wasconnected = True
            print("Noop, most likely a nested call")
            return
        print("Connecting to wifi")
        self.wlan.active(True)
        self.wlan.connect()
        if self.progress:
            p = self.progress()
        timeout = self.connect_timeout
        while True:
            timeout = timeout - 1
            if timeout <= 0:
                raise WiFiConnectionTimeout()

            if self.wlan.isconnected():
                print("Connected to wifi")
                return self.wlan
            if self.progress:
                next(p)
            time.sleep(0.3)

    def __exit__(self, exc_type=None, exc_value=None, traceback=None):
        if isinstance(exc_value, KeyboardInterrupt):
            print("Ctrl-c => Keep wifi on for debugging")
        elif self.wasconnected:
            print("Noop, don't disturb the nested call")
            self.wasconnected = False
        else:
            print("Disconnecting from wifi")
            # return
            self.wlan.disconnect()
            self.wlan.active(False)

    def on(self):
        self.__enter__()

    def off(self):
        self.wasconnected = False
        self.__exit__()

    def reset(self):
      """Make sure the wifi is at a coherent state.

      Activating the STA and disconnecting prevent from ghost isconnected messages
      from previous run before reset.

      see https://docs.micropython.org/en/latest/library/espnow.html
      """
      sta = network.WLAN(network.WLAN.IF_STA); sta.active(False)
      ap = network.WLAN(network.WLAN.IF_AP); ap.active(False)
      sta.active(True)
      while not sta.active():
          time.sleep(0.1)
      sta.disconnect()
      while sta.isconnected():
          time.sleep(0.1)
      return sta, ap

wifi = WiFi()

def setup_espnow(peers=[], async_=False):
    if async_:
        import aioespnow
        e = aioespnow.AIOESPNow()
    else:
        import espnow
        e = espnow.ESPNow()

    sta, ap = wifi.reset()
    sta.connect()
    while not sta.isconnected():
      print("Waiting for first connection")
      time.sleep(1)
    sta.disconnect()
    # somehow, using the AP does not seem to do it anymore but I cannot understand why
    # sta.active(False)
    # ap.active(True)
    # ap.config(essid="ESP_NOW_AP", hidden=True, channel=11, pm=network.WLAN.PM_POWERSAVE)
    e.active(True)
    to_debug = b'|\\\xf8\xb3\xe0R'
    for peer in peers + [to_debug]:
        e.add_peer(peer)
    return e, sta, ap


def post(*args, **kwargs):
    with wifi:
      return requests.post(*args, **kwargs)

def ntfy(message, title="mp", channel="n", priority="low"):
    print(f"ntfy: {message}")
    with wifi:
        from conf import BOARD
        res = requests.post(
            f"http://home:9705/{channel}",
            headers={"title": f"{BOARD}: {title}", "priority": priority},
            data=message)
        if res.status_code // 100 != 2:
              raise Exception(f"ntfy failed: {res.status_code} {res.text}")
        return res

import utime


def pulse(pin, duration_ms):
    pin.on()
    utime.sleep_ms(duration_ms)
    pin.off()

class LatchingRelay:
    def __init__(self, pin_a, pin_b):
        self.pin_a = Pin(pin_a, Pin.OUT)
        self.pin_b = Pin(pin_b, Pin.OUT)
        self.pin_a.off()
        self.pin_b.off()

    def a(self):
        pulse(self.pin_a, 4)

    def b(self):
        pulse(self.pin_b, 4)

import json
import os
import time

import machine
import utime

sysname = os.uname().sysname
esp8266 = sysname == 'esp8266'
esp32 = sysname == 'esp32'

rtc = machine.RTC()

esp8266_deep_sleep_max = 3600
# esp8266_deep_sleep_max = 10

deep_sleep_context_version = 0

ERROR_LOG_PATH = 'error.log'


def dump_exception(e, path=ERROR_LOG_PATH):
    import io
    import sys
    buf = io.StringIO()
    sys.print_exception(e, buf)
    with open(path, 'a+') as f:
        f.write(f'{time.time()}: ')
        f.write(buf.getvalue())
    return buf.getvalue()


def deep_sleep_handler():
    if not esp8266:
        print("Not an esp8266, I don't need a complicated stuff to handle deep sleep")
        return
    mem = rtc.memory()
    # beware that hard reseting while in deep sleep will still cause a DEEPSLEEP_RESET
    if machine.reset_cause() != machine.DEEPSLEEP_RESET:
          print("Normal reset, clearing deep sleep memory to avoid mistakenly resuming a sleep triggered by a previous algorithm")
          rtc.memory('')
          return
    if "resetdeepsleep" in os.listdir():
          print("New algorithm uploaded, reseting the deep sleep memory")
          rtc.memory('')
          os.unlink("resetdeepsleep")
          return
    if mem and machine.reset_cause() == machine.DEEPSLEEP_RESET:
        print("Woke from deep sleep with memory:", mem)
        try:
            context = json.loads(mem)
        except ValueError:
            print("Invalid deep sleep context, continuing execution")
            rtc.memory('')
            return
        if context.get("version") != deep_sleep_context_version:
            print("Incompatible deep sleep context version, continuing execution")
            rtc.memory('')
            return

        now = utime.mktime(utime.gmtime())
        wake_on = context["wake_on"]
        remaining = wake_on - now
        threshold = 5
        if remaining > threshold:
            print(f"now ({now}) < target ({wake_on}). Going back to deep sleep for remaining {remaining}s")
            # the rtc of the esp8266 sucks at keeping a good time sync. It tends to wake too early. Let's compensate a bit
            deep_sleep(remaining)
        elif remaining > 0:
            print(f"Almost reached target wake time, don't waste power in a {threshold}s deep sleep")
            time.sleep(remaining)
            rtc.memory('')
        else:
            print(f"No remaining sleep. Target missed by {remaining}s, continuing execution")
            rtc.memory('')

def deep_sleep(seconds):
    if esp8266:
        # the esp8266 sucks at keeping a good deep sleep, let's help it a little
        return esp8266_deep_sleep(int(seconds * 1.3))
    elif esp32:
        machine.deepsleep(seconds * 1000)
    else:
        raise NotImplementedError()

def esp8266_deep_sleep(seconds):
    rtc.irq(trigger=rtc.ALARM0, wake=machine.DEEPSLEEP)
    print("Going to deep sleep for", seconds, "seconds")
    if seconds > esp8266_deep_sleep_max:
        now = utime.mktime(utime.gmtime())
        wake_on = now + seconds
        seconds = esp8266_deep_sleep_max
        context = {"version": deep_sleep_context_version, "wake_on": wake_on}
        rtc.memory(json.dumps(context))
        print(f"recording to wake in {esp8266_deep_sleep_max}s to resume sleeping after")
    overhead = 1 # expected time to process the deep sleep
    # overhead = 2
    rtc.alarm(rtc.ALARM0, (seconds - overhead) * 1000)
    machine.deepsleep()

def runner(func, debugpin=None, debugtime=5):
    if debugpin and debugpin.value() == 1:
        print(f"Debug pin is high, skipping deep sleep handler, waiting for {debugtime}s instead")
        time.sleep(debugtime)
    else:
        deep_sleep_handler()
    # Check for previous errors in error.log
    if ERROR_LOG_PATH in os.listdir():
        try:
            with open(ERROR_LOG_PATH, 'r') as f:
                content = f.read()
            if content:
                print("Found previous errors in error.log, notifying")
                from comm import ntfy
                ntfy(content, title="previous error.log", priority="max")
                # Empty the error log
                with open(ERROR_LOG_PATH, 'w') as f:
                    pass
        except Exception as e:
            print("Failed to process error.log:", e)
    try:
        res = func()
        if hasattr(res, "__next__"): # pre coroutine implem
            import asyncio
            asyncio.run(res)

    except Exception as e:
        value = dump_exception(e)
        err = f'Exception in main: {value}'
        print(err)
        try:
            print("Notifying the error")
            from comm import ntfy
            ntfy(err, title="mp exception", priority="high")
        except Exception as ne:
            print("Failed to notify exception:", ne)
        raise e

def lowpower():
    machine.freq(80000000)
    if hasattr(machine, 'sleep_type'):
        machine.sleep_type(machine.SLEEP_LIGHT)

def async_retry(retries=5, delay=2, channel="n"):
    def decorator(fn):
        description = fn.__name__.replace("_", " ")
        async def wrapper(*args, **kwargs):
            import asyncio
            from comm import ntfy
            last_error = None
            for attempt in range(retries):
                try:
                    result = await fn(*args, **kwargs)
                    if attempt > 0:
                        ntfy(f"{description}: succeeded after {attempt + 1} attempts", channel=channel)
                    return result
                except Exception as e:
                    last_error = e
                    if attempt < retries - 1:
                        await asyncio.sleep(delay)
            ntfy(f"{description}: failed after {retries} attempts: {last_error}", channel=channel)
            raise last_error
        return wrapper
    return decorator

distro_main

This is the generated main.py that imports the user’s app and runs it through the runner.

import os
if "toremove.json" in os.listdir():
    import json
    with open("toremove.json") as f:
        for path in json.load(f):
            print(f"Removing {path}")
            os.remove(path)
    os.remove("toremove.json")

from helpers import runner
import app
runner(app.main)

aioespnow

# aioespnow module for MicroPython on ESP32 and ESP8266
# MIT license; Copyright (c) 2022 Glenn Moloney @glenn20

import asyncio
import espnow


# Modelled on the asyncio.Stream class (extmod/asyncio/stream.py)
# NOTE: Relies on internal implementation of asyncio.core (_io_queue)
class AIOESPNow(espnow.ESPNow):
    # Read one ESPNow message
    async def arecv(self):
        yield asyncio.core._io_queue.queue_read(self)
        return self.recv(0)  # type: ignore[misc]

    async def airecv(self):
        yield asyncio.core._io_queue.queue_read(self)
        return self.irecv(0)  # type: ignore[misc]

    async def asend(self, mac, msg=None, sync=None):
        if msg is None:
            msg, mac = mac, None  # If msg is None: swap mac and msg
        yield asyncio.core._io_queue.queue_write(self)
        return self.send(mac, msg, sync)  # type: ignore[misc]

    # "async for" support
    def __aiter__(self):
        return self

    async def __anext__(self):
        return await self.airecv()

asyncio http

code taken from https://github.com/micropython/micropython-lib/

# MicroPython aiohttp library
# MIT license; Copyright (c) 2023 Carlos Gil

import asyncio
import json as _json

HttpVersion10 = "HTTP/1.0"
HttpVersion11 = "HTTP/1.1"


class ClientResponse:
    def __init__(self, reader):
        self.content = reader

    def _get_header(self, keyname, default):
        for k in self.headers:
            if k.lower() == keyname:
                return self.headers[k]
        return default

    def _decode(self, data):
        c_encoding = self._get_header("content-encoding", None)
        if c_encoding in ("gzip", "deflate", "gzip,deflate"):
            try:
                import io

                import deflate

                if c_encoding == "deflate":
                    with deflate.DeflateIO(io.BytesIO(data), deflate.ZLIB) as d:
                        return d.read()
                elif c_encoding == "gzip":
                    with deflate.DeflateIO(io.BytesIO(data), deflate.GZIP, 15) as d:
                        return d.read()
            except ImportError:
                print("WARNING: deflate module required")
        return data

    async def read(self, sz=-1):
        return self._decode(
            await (self.content.read(sz) if sz == -1 else self.content.readexactly(sz))
        )

    async def text(self, encoding="utf-8"):
        return (await self.read(int(self._get_header("content-length", -1)))).decode(
            encoding
        )

    async def json(self):
        return _json.loads(await self.read(int(self._get_header("content-length", -1))))

    def __repr__(self):
        return "<ClientResponse %d %s>" % (self.status, self.headers)


class ChunkedClientResponse(ClientResponse):
    def __init__(self, reader):
        self.content = reader
        self.chunk_size = 0

    async def read(self, sz=4 * 1024 * 1024):
        if self.chunk_size == 0:
            l = await self.content.readline()
            l = l.split(b";", 1)[0]
            self.chunk_size = int(l, 16)
            if self.chunk_size == 0:
                # End of message
                sep = await self.content.readexactly(2)
                assert sep == b"\r\n"
                return b""
        data = await self.content.readexactly(min(sz, self.chunk_size))
        self.chunk_size -= len(data)
        if self.chunk_size == 0:
            sep = await self.content.readexactly(2)
            assert sep == b"\r\n"
        return self._decode(data)

    def __repr__(self):
        return "<ChunkedClientResponse %d %s>" % (self.status, self.headers)


class _RequestContextManager:
    def __init__(self, client, request_co):
        self.reqco = request_co
        self.client = client

    async def __aenter__(self):
        return await self.reqco

    async def __aexit__(self, *args):
        await self.client._reader.aclose()
        return await asyncio.sleep(0)


class ClientSession:
    def __init__(self, base_url="", headers={}, version=HttpVersion10):
        self._reader = None
        self._base_url = base_url
        self._base_headers = {"Connection": "close", "User-Agent": "compat"}
        self._base_headers.update(**headers)
        self._http_version = version

    async def __aenter__(self):
        return self

    async def __aexit__(self, *args):
        return await asyncio.sleep(0)

    # TODO: Implement timeouts

    async def _request(
        self, method, url, data=None, json=None, ssl=None, params=None, headers={}
    ):
        redir_cnt = 0
        while redir_cnt < 2:
            reader = await self.request_raw(
                method, url, data, json, ssl, params, headers
            )
            _headers = []
            sline = await reader.readline()
            sline = sline.split(None, 2)
            status = int(sline[1])
            chunked = False
            while True:
                line = await reader.readline()
                if not line or line == b"\r\n":
                    break
                _headers.append(line)
                if line.startswith(b"Transfer-Encoding:"):
                    if b"chunked" in line:
                        chunked = True
                elif line.startswith(b"Location:"):
                    url = line.rstrip().split(None, 1)[1].decode()

            if 301 <= status <= 303:
                redir_cnt += 1
                await reader.aclose()
                continue
            break

        if chunked:
            resp = ChunkedClientResponse(reader)
        else:
            resp = ClientResponse(reader)
        resp.status = status
        resp.reason = sline[2].decode().strip() if len(sline) > 2 else ""
        resp.headers = _headers
        resp.url = url
        if params:
            resp.url += "?" + "&".join(f"{k}={params[k]}" for k in sorted(params))
        try:
            resp.headers = {
                val.split(":", 1)[0]: val.split(":", 1)[-1].strip()
                for val in [hed.decode().strip() for hed in _headers]
            }
        except Exception:
            pass
        self._reader = reader
        return resp

    async def request_raw(
        self,
        method,
        url,
        data=None,
        json=None,
        ssl=None,
        params=None,
        headers={},
        is_handshake=False,
        version=None,
    ):
        if json and isinstance(json, dict):
            data = _json.dumps(json)
        if data is not None and method == "GET":
            method = "POST"
        if params:
            url += "?" + "&".join(f"{k}={params[k]}" for k in sorted(params))
        try:
            proto, dummy, host, path = url.split("/", 3)
        except ValueError:
            proto, dummy, host = url.split("/", 2)
            path = ""

        if proto == "http:":
            port = 80
        elif proto == "https:":
            port = 443
            if ssl is None:
                ssl = True
        else:
            raise ValueError("Unsupported protocol: " + proto)

        if ":" in host:
            host, port = host.split(":", 1)
            port = int(port)

        reader, writer = await asyncio.open_connection(host, port, ssl=ssl)

        # Use protocol 1.0, because 1.1 always allows to use chunked transfer-encoding
        # But explicitly set Connection: close, even though this should be default for 1.0,
        # because some servers misbehave w/o it.
        if version is None:
            version = self._http_version
        if "Host" not in headers:
            headers.update(Host=host)
        if not data:
            query = b"%s /%s %s\r\n%s\r\n" % (
                method,
                path,
                version,
                "\r\n".join(f"{k}: {v}" for k, v in headers.items()) + "\r\n"
                if headers
                else "",
            )
        else:
            if json:
                headers.update(**{"Content-Type": "application/json"})
            if isinstance(data, bytes):
                headers.update(**{"Content-Type": "application/octet-stream"})
            else:
                data = data.encode()

            headers.update(**{"Content-Length": len(data)})
            query = b"""%s /%s %s\r\n%s\r\n%s""" % (
                method,
                path,
                version,
                "\r\n".join(f"{k}: {v}" for k, v in headers.items()) + "\r\n",
                data,
            )
        if not is_handshake:
            await writer.awrite(query)
            return reader
        else:
            await writer.awrite(query)
            return reader, writer

    def request(
        self, method, url, data=None, json=None, ssl=None, params=None, headers={}
    ):
        return _RequestContextManager(
            self,
            self._request(
                method,
                self._base_url + url,
                data=data,
                json=json,
                ssl=ssl,
                params=params,
                headers=dict(**self._base_headers, **headers),
            ),
        )

    def get(self, url, **kwargs):
        return self.request("GET", url, **kwargs)

    def post(self, url, **kwargs):
        return self.request("POST", url, **kwargs)

    def put(self, url, **kwargs):
        return self.request("PUT", url, **kwargs)

    def patch(self, url, **kwargs):
        return self.request("PATCH", url, **kwargs)

    def delete(self, url, **kwargs):
        return self.request("DELETE", url, **kwargs)

    def head(self, url, **kwargs):
        return self.request("HEAD", url, **kwargs)

    def options(self, url, **kwargs):
        return self.request("OPTIONS", url, **kwargs)

    def ws_connect(self, url, ssl=None):
        raise NotImplementedError()

    async def _ws_connect(self, url, ssl=None):
        raise NotImplementedError()

utils

(let (
      ;; User code goes into app.py, distro_main becomes main.py
      (result (and (not (s-equals-p block "")) `(("app.py" . ,(konix/org-babel-ipfa-block block file))
                                                 ("main.py" . ,(konix/org-babel-ipfa-block "distro_main")))))
      )
  (setq result (append result `(("conf.py" . ,(with-temp-buffer
                          (insert "# Configuration for " name "\n")
                          (insert "BOARD = \"" name "\"\n")
                          (konix/ipfa-buffer nil))))))
  ;; distro blocks are in the current file (my_micro_python_distribution.org)
  (mapc (lambda (distro_file)
          (let* (
                 (file_name (concat distro_file ".py"))
                 (file_content (konix/org-babel-ipfa-block distro_file))
                 )
            (setq result
                  (append result
                          `((,file_name . ,file_content))
                          )
                  )
            )
          )
        distro
        )
  ;; extra blocks are in the file specified by the 'file' parameter
  (mapc (lambda (extra_file)
          (let* (
                 (file_name (concat extra_file ".py"))
                 (file_content (konix/org-babel-ipfa-block extra_file file))
                 )
            (setq result
                  (append result
                          `((,file_name . ,file_content))
                          )
                  )
            )
          )
        extra
        )
  (json-encode result)
  )

echo "${manifest}"
Mon Nov  3 12:20:08 CET 2025
clk mp remote install https://ipfs.konubinix.eu/p/bafkreifhphvejkizynvk3bjyp62kth2lni4w6hr5ufijcdg2xqlsqaukui

boot

# This file is executed on every boot (including wake-boot from deepsleep)
import network
#import esp
#esp.osdebug(None)
import os, machine
#os.dupterm(None, 1) # disable REPL on UART(0)
import gc

wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, key)

import webrepl
webrepl.start()
gc.collect()

# stores the webrepl password
Sun Feb 22 17:10:01 CET 2026
Writing into conf.py in mpremote
Writing into boot.py in mpremote
Writing into webrepl_cfg.py in mpremote
Writing into error.log in mpremote
Writing into resetdeepsleep in mpremote

hello world

Wed Nov 26 19:26:45 CET 2025
Checking the current installation
Reading install.json from mpremote
Writing into main.py in mpremote
Writing into install.json in mpremote
Writing into error.log in mpremote
Writing into resetdeepsleep in mpremote

print("hello")
Thu Dec 11 13:34:49 CET 2025
Checking the current installation
Reading install.json from mpremote
Writing into main.py in mpremote
Writing into install.json in mpremote
Writing into error.log in mpremote
Writing into resetdeepsleep in mpremote

tests

exception handling

def main():
    1 / 0  # trigger a ZeroDivisionError
Mon Nov  3 12:23:03 CET 2025
clk mp remote install --reset https://ipfs.konubinix.eu/p/bafkreifhphvejkizynvk3bjyp62kth2lni4w6hr5ufijcdg2xqlsqaukui

deepsleep

from helpers import deep_sleep, rtc
import time

def main():
    print(rtc.memory())

    print("hello")
    time.sleep(2)
    print("going into deep sleep")
    deep_sleep(30)

Notes linking here