Konubinix' opinionated web of thoughts

IOT Heart Again, With Micropython

Fleeting

IOT heart again, with micropython on my wemos/lolin d1 (some ESP8266)

See how to play with the wemos d1.

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
from helpers import lowpower, async_retry
from comm import ntfy, wifi, setup_espnow
from state import KVAsync
import asyncio
import time

import json
import machine
import neopixel
import gc
import ntptime
lowpower()

leds = {
    "left": {
        "ctrl": neopixel.NeoPixel(machine.Pin(4), 5),
        "up": [1, 2, 3],
        "bottom": [0, 4],
    },
    "right": {
        "ctrl": neopixel.NeoPixel(machine.Pin(5), 5),
        "up": [1, 2, 3],
        "bottom": [0, 4],
    }
}

def run_command(name, position, command):
    now = time.localtime()
    h = now[3]
    m = now[4]
    print(f"{h:02d}:{m:02d} - {gc.mem_free()}: {name}-{position}: Running command: {command}")
    led = leds[name]
    ctrl = led["ctrl"]
    if position == "fill":
        ctrl.fill(command)
    else:
        for cell in led[position]:
            ctrl[cell] = command
    ctrl.write()

class Connection:
    def __init__(self):
        self.kv = KVAsync()
        self.espnow = None

    def connect_kv(self):
        self.kv.setup_follow("heart/command")

    def connect(self):
        run_command("left", "fill", [0, 0, 0])
        run_command("right", "fill", [0, 0, 0])
        self.espnow, wlan, _ = setup_espnow(async_=True)
        wifi.on()
        ntfy(f"""mac: {wlan.config('mac')}, channel: {wlan.config('channel')}""",
             channel="iotheart")

        run_command("left", "fill", [0, 0, 0])
        run_command("right", "fill", [0, 50, 0])

        try:
            ntptime.settime()
        except OSError as e:
            ntfy(f"""{e}
while trying to set the time""", channel="iotheart")

    async def try_get(self):
        try:
            return await self.kv.next()
        except StopIteration:
            self.connect_kv()
            return None
        except Exception as e:
            print(f"{e}")
            self.connect_kv()
            return None

    async def step(self):
        print(f"Do step")
        commands = await self.try_get()
        print(f"After step")
        if commands is None:
            print("Got nothing, using a default value")
            commands = {
                "left": {"up": [5, 0, 0], "bottom": [5, 0, 0]},
                "right": {"up": [5, 0, 0], "bottom": [5, 0, 0]},
            }
        else:
            commands = json.loads(commands)
        if commands.get("off"):
            commands = {
                "left": {"fill": [0, 0, 0]},
                "right": {"fill": [0, 0, 0]},
            }
        for side, command in commands.items():
            for position, value in command.items():
                run_command(side, position, value)

initialsleeptime = 1

connection = Connection()

@async_retry(channel="iotheart")
async def post_to_iothelper(payload):
    import asynciohttp
    async with asynciohttp.ClientSession() as session:
        async with session.post(f"http://home/iothelper/{payload['path']}", json=payload["json"]) as resp:
            if resp.status != 200:
                raise Exception(f"HTTP {resp.status}: {resp.reason}")

async def proxy_espnow():
    async for data in connection.espnow:
        ntfy(f"Received via ESPNow: {data}", channel="iotheart")
        print(f"Got via espnow {data}")
        payload = json.loads(data[1])
        try:
            await post_to_iothelper(payload)
        except Exception:
            pass


async def follow_colors():
    sleeptime = initialsleeptime
    while True:
        try:
            await connection.step()
            sleeptime = initialsleeptime
        except Exception as e:
            sleeptime *= 2
            if sleeptime > 60:
                sleeptime = 60
            try:
                now = time.localtime()
                h = now[3]
                m = now[4]
                ntfy(f"""{e}\nwill wait for {sleeptime}s""", channel="iotheart")
            except Exception as e2:
                print(f"{h:02d}:{m:02d}: Could not notify: {e2}")

        print(f"Waiting for {sleeptime}s")
        machine.lightsleep(sleeptime * 1000)

async def run():
    print(f"first {gc.mem_free()}")
    connection.connect()
    connection.connect_kv()

    await asyncio.gather(follow_colors(), proxy_espnow())

from heart import run

async def main():
    await run()
Tue Feb 24 21:50:08 CET 2026
Compiling Python files to .mpy (march=xtensa)...
  Staging app.py for compilation...
  Staging helpers.py for compilation...
  Staging state.py for compilation...
  Staging comm.py for compilation...
  Staging control.py for compilation...
  Staging aioespnow.py for compilation...
  Staging asynciohttp.py for compilation...
  Staging heart.py for compilation...
  Running mpy-cross via Earthly...
  Compiled app.py -> app.mpy
  Compiled helpers.py -> helpers.mpy
  Compiled state.py -> state.mpy
  Compiled comm.py -> comm.mpy
  Compiled control.py -> control.mpy
  Compiled aioespnow.py -> aioespnow.mpy
  Compiled asynciohttp.py -> asynciohttp.mpy
  Compiled heart.py -> heart.mpy
Checking the current installation
Reading install.json from iotheart
Writing into comm.mpy in iotheart
Writing into heart.mpy in iotheart
Writing into install.json in iotheart
Writing into error.log in iotheart
Writing into resetdeepsleep in iotheart

Notes linking here