Konubinix' opinionated web of thoughts

Plant Moisture Sensor

Fleeting

on the server

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import logging
import os
from datetime import datetime, timedelta

import requests
from helpers.otel import metrics
from quart import Blueprint, request

blueprint = Blueprint("basilic", __name__)
logger = logging.getLogger(__name__)
meter = metrics.get_meter(__name__)

basilic_metric = meter.create_gauge(
    name="basilic", description="Basilic gauge metric", unit="1"
)


schedules = [(8, 30)]  # , (12, 0), (18, 0), (22, 0)]
threshold = int(os.environ.get("BASILIC_THRESHOLD", "200"))

if os.environ.get("BASILIC_DEBUG") == "true":
    schedules = [
        (hour, minute)
        for hour in range(23)
        for minute in (
            0,
            # 15,
            30,
            # 45,
            # 50,
            # 55
        )
    ]


@blueprint.route("/basilic")
async def record():
    # called with http://home:9911/basilic?value={value}
    value = int(request.args.get("value"))
    basilic_metric.set(value)
    priority = "low" if value > threshold else "high"
    requests.post(
        "http://home:9705/basilic",
        headers={
            "title": "mp",
            "priority": priority,
        },
        data=str(value),
        timeout=30,
    )

    now = datetime.now()
    shifted = now + timedelta(hours=1)  # let's pretend we are actually past that date
    times = [
        now.replace(hour=hour, minute=minute, second=0, microsecond=0)
        for hour, minute in schedules
    ]
    found = None
    for prev, next in zip(times, times[1:]):
        if prev <= shifted < next:
            found = next
            break
    else:
        if shifted <= times[0]:
            found = times[0]
        else:
            found = times[0] + timedelta(days=1)
    wait_seconds = (found - now).total_seconds() if priority == "low" else 600
    logger.info("Waiting for %s (at %s)", wait_seconds, found)
    return str(wait_seconds)

on the MCU

import time
from machine import Pin, ADC

import requests
from comm import wifi, ntfy

from helpers import deep_sleep, dump_exception, lowpower, runner

lowpower()
wifi.reset()

debug = Pin(5, Pin.IN)
sensor = ADC(0)
# latching = LatchingRelay(
#     4, # d2
#     0, # d3
# )
switch = Pin(1, Pin.OUT)

def wait(waiting_seconds):
    if debug.value():
        time.sleep(waiting_seconds)
    else:
        deep_sleep(waiting_seconds)

def read_stable_value(read_fn, window_size=5, threshold=1, max_iterations=50):
    window = []

    for _ in range(window_size):
        window.append(read_fn())

    prev_mean = sum(window) / len(window)

    for _ in range(max_iterations):
        window.pop(0)
        window.append(read_fn())

        curr_mean = sum(window) / len(window)

        if abs(curr_mean - prev_mean) <= threshold:
            return int(curr_mean), False

        prev_mean = curr_mean

    return int(prev_mean), True

@runner(debugpin=debug)
def run():
    try:
        if debug.value():
            wifi.on()
        while True:
            switch.on()
            value, fallback = read_stable_value(sensor.read)
            switch.off()
            print(f"Uploading the value {value}")
            with wifi:
                if fallback:
                    ntfy("Used a fallback value since the sensor never converged")
                waiting_seconds = int(float(requests.get(f"http://home:9911/basilic?value={value}").text))
            print(f"going to sleep for {waiting_seconds}")
            wait(waiting_seconds)
    except Exception as e:
        dump_exception(e, "error2.log")
        waiting_seconds = 3600
        print(f"dumped this error, waiting for {waiting_seconds}s before restarting")
        wait(waiting_seconds)
Tue Dec  9 09:22:14 CET 2025
Checking the current installation
Reading install.json from basilic
Writing into main.py in basilic
Writing into install.json in basilic
Writing into error.log in basilic
Writing into resetdeepsleep in basilic

installation