IOT Heart Again, With Micropython
FleetingIOT heart again, with micropython on my wemos/lolin d1 (some ESP8266)
See how to play with the wemos d1.
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
from helpers import lowpower, async_retry
from comm import ntfy, wifi, setup_espnow
from state import KVAsync
import asyncio
import time
import json
import machine
import neopixel
import gc
import ntptime
lowpower()
leds = {
"left": {
"ctrl": neopixel.NeoPixel(machine.Pin(4), 5),
"up": [1, 2, 3],
"bottom": [0, 4],
},
"right": {
"ctrl": neopixel.NeoPixel(machine.Pin(5), 5),
"up": [1, 2, 3],
"bottom": [0, 4],
}
}
def run_command(name, position, command):
now = time.localtime()
h = now[3]
m = now[4]
print(f"{h:02d}:{m:02d} - {gc.mem_free()}: {name}-{position}: Running command: {command}")
led = leds[name]
ctrl = led["ctrl"]
if position == "fill":
ctrl.fill(command)
else:
for cell in led[position]:
ctrl[cell] = command
ctrl.write()
class Connection:
def __init__(self):
self.kv = KVAsync()
self.espnow = None
def connect_kv(self):
self.kv.setup_follow("heart/command")
def connect(self):
run_command("left", "fill", [0, 0, 0])
run_command("right", "fill", [0, 0, 0])
self.espnow, wlan, _ = setup_espnow(async_=True)
wifi.on()
ntfy(f"""mac: {wlan.config('mac')}, channel: {wlan.config('channel')}""",
channel="iotheart")
run_command("left", "fill", [0, 0, 0])
run_command("right", "fill", [0, 50, 0])
try:
ntptime.settime()
except OSError as e:
ntfy(f"""{e}
while trying to set the time""", channel="iotheart")
async def try_get(self):
try:
return await self.kv.next()
except StopIteration:
self.connect_kv()
return None
except Exception as e:
print(f"{e}")
self.connect_kv()
return None
async def step(self):
print(f"Do step")
commands = await self.try_get()
print(f"After step")
if commands is None:
print("Got nothing, using a default value")
commands = {
"left": {"up": [5, 0, 0], "bottom": [5, 0, 0]},
"right": {"up": [5, 0, 0], "bottom": [5, 0, 0]},
}
else:
commands = json.loads(commands)
if commands.get("off"):
commands = {
"left": {"fill": [0, 0, 0]},
"right": {"fill": [0, 0, 0]},
}
for side, command in commands.items():
for position, value in command.items():
run_command(side, position, value)
initialsleeptime = 1
connection = Connection()
@async_retry(channel="iotheart")
async def post_to_iothelper(payload):
import asynciohttp
async with asynciohttp.ClientSession() as session:
async with session.post(f"http://home/iothelper/{payload['path']}", json=payload["json"]) as resp:
if resp.status != 200:
raise Exception(f"HTTP {resp.status}: {resp.reason}")
async def proxy_espnow():
async for data in connection.espnow:
ntfy(f"Received via ESPNow: {data}", channel="iotheart")
print(f"Got via espnow {data}")
payload = json.loads(data[1])
try:
await post_to_iothelper(payload)
except Exception:
pass
async def follow_colors():
sleeptime = initialsleeptime
while True:
try:
await connection.step()
sleeptime = initialsleeptime
except Exception as e:
sleeptime *= 2
if sleeptime > 60:
sleeptime = 60
try:
now = time.localtime()
h = now[3]
m = now[4]
ntfy(f"""{e}\nwill wait for {sleeptime}s""", channel="iotheart")
except Exception as e2:
print(f"{h:02d}:{m:02d}: Could not notify: {e2}")
print(f"Waiting for {sleeptime}s")
machine.lightsleep(sleeptime * 1000)
async def run():
print(f"first {gc.mem_free()}")
connection.connect()
connection.connect_kv()
await asyncio.gather(follow_colors(), proxy_espnow())
from heart import run
async def main():
await run()
Tue Feb 24 21:50:08 CET 2026
Compiling Python files to .mpy (march=xtensa)...
Staging app.py for compilation...
Staging helpers.py for compilation...
Staging state.py for compilation...
Staging comm.py for compilation...
Staging control.py for compilation...
Staging aioespnow.py for compilation...
Staging asynciohttp.py for compilation...
Staging heart.py for compilation...
Running mpy-cross via Earthly...
Compiled app.py -> app.mpy
Compiled helpers.py -> helpers.mpy
Compiled state.py -> state.mpy
Compiled comm.py -> comm.mpy
Compiled control.py -> control.mpy
Compiled aioespnow.py -> aioespnow.mpy
Compiled asynciohttp.py -> asynciohttp.mpy
Compiled heart.py -> heart.mpy
Checking the current installation
Reading install.json from iotheart
Writing into comm.mpy in iotheart
Writing into heart.mpy in iotheart
Writing into install.json in iotheart
Writing into error.log in iotheart
Writing into resetdeepsleep in iotheart