Konubinix' opinionated web of thoughts

IOT Heart Again, With Micropython

Fleeting

IOT heart again, with micropython on my wemos/lolin d1 (some ESP8266)

See how to play with the wemos d1.

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
from helpers import lowpower, runner
from comm import ntfy, wifi, setup_espnow
from state import KVAsync
import asyncio
import time

import json
import machine
import neopixel
import network
import esp
import gc
import ntptime
import requests
lowpower()

# led = machine.Pin(2, machine.Pin.OUT)
# led.off()  # on
# time.sleep(2)
# led.on()  # off

leds = {
    "left": {
        "ctrl": neopixel.NeoPixel(machine.Pin(4), 5),
        "up": [1, 2, 3],
        "bottom": [0, 4],
    },
    "right": {
        "ctrl": neopixel.NeoPixel(machine.Pin(5), 5),
        "up": [1, 2, 3],
        "bottom": [0, 4],
    }
}

def run_command(name, position, command):
    now = time.localtime()
    h = now[3]
    m = now[4]
    print(f"{h:02d}:{m:02d} - {gc.mem_free()}: {name}-{position}: Running command: {command}")
    led = leds[name]
    ctrl = led["ctrl"]
    if position == "fill":
        ctrl.fill(command)
    else:
        for cell in led[position]:
            ctrl[cell] = command
    ctrl.write()

class Connection:
    def __init__(self):
        self.kv = KVAsync()
        self.espnow = None

    def connect_kv(self):
        self.kv.setup_follow("heart/command")

    def connect(self):
        run_command("left", "fill", [0, 0, 0])
        run_command("right", "fill", [0, 0, 0])
        self.espnow, wlan, _ = setup_espnow(async_=True)
        wifi.on()
        ntfy(f"""mac: {wlan.config('mac')}, channel: {wlan.config('channel')}""",
             channel="iotheart")

        run_command("left", "fill", [0, 0, 0])
        run_command("right", "fill", [0, 50, 0])

        try:
            ntptime.settime()
        except OSError as e:
            ntfy(f"""{e}
while trying to set the time""", channel="iotheart")

    async def try_get(self):
        try:
            return await self.kv.next()
        except StopIteration:
            self.connect_kv()
            return None
        except Exception as e:
            print(f"{e}")
            return None

    async def step(self):
        print(f"Do step")
        commands = await self.try_get()
        print(f"After step")
        if commands is None:
            print("Got nothing, using a default value")
            commands = {
                "left": {"up": [5, 0, 0], "bottom": [5, 0, 0]},
                "right": {"up": [5, 0, 0], "bottom": [5, 0, 0]},
            }
        else:
            commands = json.loads(commands)
        if commands.get("off"):
            commands = {
                "left": {"fill": [0, 0, 0]},
                "right": {"fill": [0, 0, 0]},
            }
        for side, command in commands.items():
            for position, value in command.items():
                run_command(side, position, value)

initialsleeptime = 1

sleeptime = initialsleeptime

connection = Connection()

async def proxy_espnow():
    import asynciohttp

    async for data in connection.espnow:
        ntfy(f"Received via ESPNow: {data}", channel="iotheart")
        print(f"Got via espnow {data}")
        payload = json.loads(data[1])
        async with asynciohttp.ClientSession() as session:
            async with session.post(f"http://home/iothelper/{payload['path']}", json=payload["json"]) as resp:
                if resp.status != 200:
                    ntfy(f"""{resp.reason}""", channel="iotheart")


async def follow_colors():
    sleeptime = initialsleeptime
    while True:
        try:
            await connection.step()
            sleeptime = initialsleeptime
        except Exception as e:
            sleeptime *= 2
            if sleeptime > 60:
                sleeptime = 60
            try:
                now = time.localtime()
                h = now[3]
                m = now[4]
                ntfy(f"""{e}\nwill wait for {sleeptime}s""", channel="iotheart")
            except Exception as e2:
                print(f"{h:02d}:{m:02d}: Could not notify: {e2}")

        print(f"Waiting for {sleeptime}s")
        machine.lightsleep(sleeptime * 1000)

@runner()
async def run():
    print(f"first {gc.mem_free()}")
    connection.connect()
    connection.connect_kv()

    await asyncio.gather(follow_colors(), proxy_espnow())
Wed Dec  3 18:06:28 CET 2025
Checking the current installation
Reading install.json from iotheart
Writing into main.py in iotheart
Writing into install.json in iotheart
Writing into error.log in iotheart
Writing into resetdeepsleep in iotheart

Notes linking here