IOT Heart Again, With Micropython
FleetingIOT heart again, with micropython on my wemos/lolin d1 (some ESP8266)
See how to play with the wemos d1.
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
from helpers import lowpower, runner
from comm import ntfy, wifi, setup_espnow
from state import KVAsync
import asyncio
import time
import json
import machine
import neopixel
import network
import esp
import gc
import ntptime
import requests
lowpower()
# led = machine.Pin(2, machine.Pin.OUT)
# led.off() # on
# time.sleep(2)
# led.on() # off
leds = {
"left": {
"ctrl": neopixel.NeoPixel(machine.Pin(4), 5),
"up": [1, 2, 3],
"bottom": [0, 4],
},
"right": {
"ctrl": neopixel.NeoPixel(machine.Pin(5), 5),
"up": [1, 2, 3],
"bottom": [0, 4],
}
}
def run_command(name, position, command):
now = time.localtime()
h = now[3]
m = now[4]
print(f"{h:02d}:{m:02d} - {gc.mem_free()}: {name}-{position}: Running command: {command}")
led = leds[name]
ctrl = led["ctrl"]
if position == "fill":
ctrl.fill(command)
else:
for cell in led[position]:
ctrl[cell] = command
ctrl.write()
class Connection:
def __init__(self):
self.kv = KVAsync()
self.espnow = None
def connect_kv(self):
self.kv.setup_follow("heart/command")
def connect(self):
run_command("left", "fill", [0, 0, 0])
run_command("right", "fill", [0, 0, 0])
self.espnow, wlan, _ = setup_espnow(async_=True)
wifi.on()
ntfy(f"""mac: {wlan.config('mac')}, channel: {wlan.config('channel')}""",
channel="iotheart")
run_command("left", "fill", [0, 0, 0])
run_command("right", "fill", [0, 50, 0])
try:
ntptime.settime()
except OSError as e:
ntfy(f"""{e}
while trying to set the time""", channel="iotheart")
async def try_get(self):
try:
return await self.kv.next()
except StopIteration:
self.connect_kv()
return None
except Exception as e:
print(f"{e}")
return None
async def step(self):
print(f"Do step")
commands = await self.try_get()
print(f"After step")
if commands is None:
print("Got nothing, using a default value")
commands = {
"left": {"up": [5, 0, 0], "bottom": [5, 0, 0]},
"right": {"up": [5, 0, 0], "bottom": [5, 0, 0]},
}
else:
commands = json.loads(commands)
if commands.get("off"):
commands = {
"left": {"fill": [0, 0, 0]},
"right": {"fill": [0, 0, 0]},
}
for side, command in commands.items():
for position, value in command.items():
run_command(side, position, value)
initialsleeptime = 1
sleeptime = initialsleeptime
connection = Connection()
async def proxy_espnow():
import asynciohttp
async for data in connection.espnow:
ntfy(f"Received via ESPNow: {data}", channel="iotheart")
print(f"Got via espnow {data}")
payload = json.loads(data[1])
async with asynciohttp.ClientSession() as session:
async with session.post(f"http://home/iothelper/{payload['path']}", json=payload["json"]) as resp:
if resp.status != 200:
ntfy(f"""{resp.reason}""", channel="iotheart")
async def follow_colors():
sleeptime = initialsleeptime
while True:
try:
await connection.step()
sleeptime = initialsleeptime
except Exception as e:
sleeptime *= 2
if sleeptime > 60:
sleeptime = 60
try:
now = time.localtime()
h = now[3]
m = now[4]
ntfy(f"""{e}\nwill wait for {sleeptime}s""", channel="iotheart")
except Exception as e2:
print(f"{h:02d}:{m:02d}: Could not notify: {e2}")
print(f"Waiting for {sleeptime}s")
machine.lightsleep(sleeptime * 1000)
@runner()
async def run():
print(f"first {gc.mem_free()}")
connection.connect()
connection.connect_kv()
await asyncio.gather(follow_colors(), proxy_espnow())
Wed Dec 3 18:06:28 CET 2025
Checking the current installation
Reading install.json from iotheart
Writing into main.py in iotheart
Writing into install.json in iotheart
Writing into error.log in iotheart
Writing into resetdeepsleep in iotheart