Konubinix' site

Streaming the TC001N Thermal Camera With Python on Android

Fleeting

Why a python driver at all

The Kotlin app already streams. It is a single v1.0.0 release that hard-codes the frame geometry — the 384-vs-392 mismatch that took a patched fork to clear — so doing anything further with it means forking it and living in its build.

Either way is real work. Extending that fork or writing a UVC driver from scratch are both a significant amount of coding, in the same ballpark of effort. When the effort is comparable, the platform is what decides.

And that tips it. A python runtime on Android is already running on the phone, with usb4a, numpy, opencv and Kivy — and it carries what a Kotlin build cannot: drop a .py in the sdcard and it reloads, drive it live from the PC over rpyc. That feedback loop is what makes the runtime a joy to work in. So the choice is to code in the runtime that is already mine rather than in someone else’s build.

One fact decides whether that choice is even open. Android’s Java USB Host API exposes bulk and control transfers but not isochronous ones — the wall most “UVC from Python” attempts hit. The TC001N’s streaming endpoint, read straight from its descriptors (07 05 81 02 00 02 00), is endpoint 0x81, attribute 0x02bulk. So the camera sits on the right side of that wall, and a pure pyjnius driver is feasible. This note proves it before any app is built.

The negotiation we have to reproduce is the standard UVC handshake — claim the interface, PROBE and COMMIT a format, then read frames off the bulk endpoint:

https://ipfs.konubinix.eu/p/bafkreig7hskttejoh4y42hx276mfbuplguy27iuqrbo7z6i75iamisixbm?uvc_handshake.png

The constants are the camera’s own descriptors

Every magic number below was read from the device, not guessed. The UVC node is 2bdf:0102 — the second identity the camera takes after its two-stage USB mode-switch, so the driver must wait for 0102, never 0106. Its VideoControl header lists the VideoStreaming interface as number 1. The uncompressed format index 1 is YUY2, and within it frame index 1 is the 256×392 mode at a default interval of 400000 (in 100 ns units, i.e. 25 fps) — exactly the row count the Kotlin app got wrong. bcdUVC is 1.10, which fixes the PROBE/COMMIT control at 34 bytes.

VENDOR, PRODUCT = 0x2bdf, 0x0102   # the UVC node, AFTER the 0106->0102 switch
VS_INTERFACE = 1                   # VideoStreaming interface number
FORMAT_INDEX, FRAME_INDEX = 1, 1   # YUY2, 256x392
FRAME_INTERVAL = 400000            # 100ns units -> 25 fps
PROBE_LEN = 34                     # UVC 1.1 PROBE/COMMIT control size
BULK_EP = 0x81                     # streaming endpoint (IN, bulk)

The request plumbing is the other half of the vocabulary: a class request to the interface, SET_CUR out and GET_CUR in, selecting the PROBE then the COMMIT control.

SET_CUR, GET_CUR = 0x01, 0x81
VS_PROBE_CONTROL, VS_COMMIT_CONTROL = 0x01, 0x02
RT_SET, RT_GET = 0x21, 0xA1        # class | interface, out / in

Finding the camera once it has settled

The driver must not act on the first thing that enumerates: the camera appears briefly as 0106 before re-enumerating as the UVC 0102 node. So discovery is a filter for the settled identity, returning nothing until 0102 is on the bus.

def find_camera():
    """The UsbDevice for 2bdf:0102, or None until the mode switch lands."""
    for dev in usb.get_usb_device_list():
        if dev.getVendorId() == VENDOR and dev.getProductId() == PRODUCT:
            return dev
    return None

Getting permission the way the Arduino did

Android gates USB host access behind a runtime dialog. We don’t re-solve that: the Arduino experiment already drove usb4a’s permission flow, where the request fires a dialog and the caller has to come back after the user accepts. We mirror it — hold permission and proceed, or request it and ask to be re-run.

def ensure_permission(dev):
    """True if granted. Otherwise fire the dialog and return False:
    accept on the phone, then call run() again."""
    if usb.has_usb_permission(dev):
        return True
    usb.request_usb_permission(dev)
    return False

With permission in hand we open the device and claim interface 1. The PROBE and COMMIT controls are addressed to the VideoStreaming interface, so it must be claimed first; forceClaim detaches any driver already holding it.

def open_device(dev):
    """Open and claim the VideoStreaming interface. Returns (conn, intf)."""
    conn = usb.get_usb_manager().openDevice(dev)
    if conn is None:
        raise RuntimeError("openDevice returned null (permission? already open?)")
    intf = next((dev.getInterface(i) for i in range(dev.getInterfaceCount())
                 if dev.getInterface(i).getId() == VS_INTERFACE), None)
    if intf is None:
        raise RuntimeError("VideoStreaming interface %d absent" % VS_INTERFACE)
    if not conn.claimInterface(intf, True):
        raise RuntimeError("claimInterface failed")
    return conn, intf

The negotiation, where the Kotlin app died

This is the step that matters. The Kotlin app reached an open camera and then failed format negotiation with uvc_get_stream_ctrl_format_size failed: Invalid mode, because it asked for 256×384 and the TC001N only advertises 256×392. That observed red is the spec for the spike: we ask for the advertised frame and expect the camera to accept it instead of rejecting it.

A PROBE payload pins the format, frame and interval, sets the hint bit that says the interval is fixed, and leaves the rest zero for the device to fill in.

def build_probe():
    buf = bytearray(PROBE_LEN)
    struct.pack_into("<H", buf, 0, 0x0001)        # bmHint: dwFrameInterval fixed
    buf[2], buf[3] = FORMAT_INDEX, FRAME_INDEX
    struct.pack_into("<I", buf, 4, FRAME_INTERVAL)
    return buf

On the way back the camera returns the negotiated control, from which we read the two numbers the streaming loop will need: the per-frame size and the most bytes it will hand over per bulk transfer.

def parse_probe(b):
    if not b or len(b) < 26:
        return None
    f = struct.unpack("<HBBIHHHHHII", b[:26])
    return {"bFormatIndex": f[1], "bFrameIndex": f[2], "dwFrameInterval": f[3],
            "fps": round(1e7 / f[3]) if f[3] else 0,
            "dwMaxVideoFrameSize": f[9], "dwMaxPayloadTransferSize": f[10]}

The control transfers themselves are one wrapper each. Under the hood, the IN direction depends on pyjnius copying the Java byte[] back into the bytearray we passed — the same write-back usbserial4a’s read path relies on; the spike is partly there to confirm that holds here too.

def _ctrl_out(conn, value, data):
    b = bytearray(data)
    return conn.controlTransfer(RT_SET, SET_CUR, value, VS_INTERFACE, b, len(b), 2000)

def _ctrl_in(conn, value, length=PROBE_LEN):
    b = bytearray(length)
    n = conn.controlTransfer(RT_GET, GET_CUR, value, VS_INTERFACE, b, length, 2000)
    return n, bytes(b[:n]) if n >= 0 else None

Negotiation is then SET_CUR(PROBE) followed by GET_CUR(PROBE); a non-negative read with our format echoed back is the camera accepting the mode.

def negotiate(conn):
    if _ctrl_out(conn, VS_PROBE_CONTROL << 8, build_probe()) < 0:
        raise RuntimeError("SET_CUR(PROBE) failed")
    n, raw = _ctrl_in(conn, VS_PROBE_CONTROL << 8)
    if n < 0:
        raise RuntimeError("GET_CUR(PROBE) failed -- the 'Invalid mode' wall")
    return parse_probe(raw), raw

The payoff: one real frame

Negotiation succeeding is necessary but not sufficient — the proof is bytes moving. COMMIT activates the negotiated mode by writing the same struct back to the COMMIT control, after which a single bulk read should return data led by a short UVC payload header. Seeing that header is the spike going green.

def commit(conn, raw):
    return _ctrl_out(conn, VS_COMMIT_CONTROL << 8, raw)

def grab_one(conn, dev, max_payload):
    ep = next(e for i in range(dev.getInterfaceCount())
              for e in (dev.getInterface(i).getEndpoint(j)
                        for j in range(dev.getInterface(i).getEndpointCount()))
              if e.getAddress() == BULK_EP)
    buf = bytearray(max(max_payload, 16384))
    n = conn.bulkTransfer(ep, buf, len(buf), 2000)
    return n, bytes(buf[:12]) if n > 0 else None

Let’s see what this gives. The one-shot ties the steps together and prints the negotiated mode and the first header, releasing the interface whatever happens.

def run():
    dev = find_camera()
    if dev is None:
        print("2bdf:0102 not on the bus -- replug, wait for the mode switch"); return
    if not ensure_permission(dev):
        print("permission requested -- accept on the phone and rerun"); return
    conn, intf = open_device(dev)
    try:
        mode, raw = negotiate(conn)
        print("negotiated:", mode)
        print("COMMIT ->", commit(conn, raw))
        n, header = grab_one(conn, dev, mode["dwMaxPayloadTransferSize"])
        print("bulk %d bytes, header: %s" % (n, header and header.hex(" ")))
    finally:
        conn.releaseInterface(intf); conn.close()

Assembling a full frame

One payload proves the pipe; a picture needs whole frames, which stream back to back as bulk payloads. A synchronous read can’t keep up — at ~5 MB/s the endpoint sits idle between calls and the kernel buffer overflows, dropping payloads and tearing frames. So we drain asynchronously: a pool of UsbRequest’s always in flight (libusb’s URB trick), each handing back its payload data and the bmHeaderInfo byte from its 2-byte header.

def grab_stream_async(conn, dev, want, bulk_ep=BULK_EP, nreq=16, bufsz=16384):
    from jnius import autoclass, cast
    UsbRequest = autoclass("android.hardware.usb.UsbRequest")
    ByteBuffer = autoclass("java.nio.ByteBuffer")
    ep = next(e for i in range(dev.getInterfaceCount())
              for e in (dev.getInterface(i).getEndpoint(j)
                        for j in range(dev.getInterface(i).getEndpointCount()))
              if e.getAddress() == bulk_ep)
    reqs = []
    for _ in range(nreq):
        r = UsbRequest(); r.initialize(conn, ep)
        b = ByteBuffer.allocate(bufsz); r.setClientData(b); r.queue(b)
        reqs.append(r)
    out = bytearray(); infos = []
    while len(out) < want:
        done = conn.requestWait(2000)
        if done is None:
            break
        b = cast("java.nio.ByteBuffer", done.getClientData())
        n = b.position()
        if n > 2:
            chunk = bytearray(n); b.rewind(); b.get(chunk, 0, n)
            infos.append(int(chunk[1]))          # bmHeaderInfo (FID/EOF bits)
            out += chunk[2:]
        b.clear(); done.queue(b)
    for r in reqs:
        r.cancel()
    return bytes(out), infos

No pixel-level guessing is needed to find frame edges: the frame-ID bit (bmHeaderInfo & 1) flips at every boundary, so a frame is one run of constant FID — for this camera 40 payloads of 5018 bytes. We hand back the first complete run’s image bytes.

PAYLOAD = 5018   # measured: de-headered bytes per bulk payload (40 payloads = 1 frame)

def frame_by_fid(out, infos, w=256, h=392):
    img = w * h * 2
    fid = [v & 1 for v in infos]
    tog = [0] + [i for i in range(1, len(fid)) if fid[i] != fid[i - 1]] + [len(fid)]
    for a, b in zip(tog, tog[1:]):
        if 38 <= b - a <= 42 and a * PAYLOAD + img <= len(out):
            return out[a * PAYLOAD:a * PAYLOAD + img]
    raise RuntimeError("no complete frame in capture")

The frame is two stacked 196-row panes of the same scene: an IR-image pane (luma on the even YUYV bytes) and a temperature pane whose byte pairs are 16-bit raw values, raw / 64 − 273.15 °C (the reference’s conversion — uncalibrated, so good for relative readings and false colour, not trustworthy absolute temperatures). The raw values clump into a narrow band, so a linear ramp washes out; we histogram-equalise instead — the automatic-gain trick real thermal cameras use — then false-colour through a black→red→yellow→white palette, upscale, and report min/avg/max from the true °C.

def render_frame(conn, dev, path, w=256, h=392):
    import numpy as np
    from PIL import Image
    out, infos = grab_stream_async(conn, dev, 6 * w * h * 2)
    arr = np.frombuffer(frame_by_fid(out, infos, w, h), np.uint8).reshape(h, w, 2)
    raw = arr[h // 2:, :, 0].astype(np.uint16) | (arr[h // 2:, :, 1].astype(np.uint16) << 8)
    celsius = raw / 64.0 - 273.15
    flat = raw.astype(np.float64).ravel()                  # histogram-equalise (AGC)
    rank = np.empty(flat.size); rank[np.argsort(flat, kind="stable")] = np.arange(flat.size)
    t = (rank / (flat.size - 1)).reshape(raw.shape)
    rgb = np.dstack([np.clip(t * 3, 0, 1), np.clip(t * 3 - 1, 0, 1), np.clip(t * 3 - 2, 0, 1)])
    Image.fromarray((rgb * 255).astype(np.uint8), "RGB").resize((w * 2, h), Image.NEAREST).save(path)
    return {"min": round(float(celsius.min()), 1),
            "avg": round(float(celsius.mean()), 1),
            "max": round(float(celsius.max()), 1)}

Running it live

This is a hardware spike, so its check is a live run, not an automated suite — the camera has to be plugged into the phone. The runtime’s idiom is to drive it live from the PC over rpyc: call into the module, watch, adjust, call again — the tight loop that is the whole reason for doing this in Python.

import rpyc
c = rpyc.classic.connect(ip, int(port))
spike = c.modules["app.uvc_spike"]      # the module on the device
c.modules.importlib.reload(spike)       # pick up the latest edit
spike.run()

Success is the inverse of the Kotlin failure: negotiate returns fps: 25 with bFrameIndex: 1, and the bulk read returns a frame led by the camera’s 2-byte UVC payload header (02 80, end-of-header set). If instead GET_CUR(PROBE) throws, the mode is still being rejected and the format constants are the place to look. Once a header prints, the risk is retired and the rich Kivy thermal app becomes the reuse job the reasoning assumed — the streaming loop, the YUYV split, and a numpy colormap into the existing Kivy texture.

What it sees

Let’s see what it gives. Pointed at a ceiling window, here is the temperature pane the driver pulls back — the glass glowing against the cooler ceiling, every pixel a 16-bit reading behind the false colour (absolute °C uncalibrated, so read it for contrast, not as a thermometer):

A live viewer

A still is one grab; a live view has to keep the drain running flat out — the instant it pauses, payloads drop, the bug async just fixed. The screen only needs refreshing a dozen times a second, though, so the two run on separate clocks: a background thread drains and decodes as fast as the camera sends and keeps only the newest frame, while Kivy’s timer shows whatever is latest. Reading frames inside the timer instead would re-open the very gaps that tore the picture.

Colour reuses the still’s AGC false-colour, lifted out so the loop calls it per frame:

def colorize(raw16):
    flat = raw16.astype(np.float64).ravel()
    rank = np.empty(flat.size); rank[np.argsort(flat, kind="stable")] = np.arange(flat.size)
    t = (rank / (flat.size - 1)).reshape(raw16.shape)
    rgb = np.dstack([np.clip(t * 3, 0, 1), np.clip(t * 3 - 1, 0, 1), np.clip(t * 3 - 2, 0, 1)])
    return (rgb * 255).astype(np.uint8)

The reader differs from the one-shot grab in the one way that matters for a stream: the request pool stays up for the whole session instead of being rebuilt each frame, so there is never a gap to drop into. It cuts frames on the same FID flip as before and keeps only the latest. And because a single thermal frame is noisy, it blends each into a running average before colouring — a few frames of integration, the way the camera’s own app does, trading a little motion smear for a much cleaner picture.

def _reader(view, dev):
    ep = next(e for i in range(dev.getInterfaceCount())
              for e in (dev.getInterface(i).getEndpoint(j)
                        for j in range(dev.getInterface(i).getEndpointCount()))
              if e.getAddress() == BULK_EP)
    reqs = []
    for _ in range(16):
        r = UsbRequest(); r.initialize(view.conn, ep)
        b = ByteBuffer.allocate(16384); r.setClientData(b); r.queue(b); reqs.append(r)
    cur, acc, avg, IMG = None, bytearray(), None, W * H * 2
    while view.running:
        done = view.conn.requestWait(2000)
        if done is None:
            continue
        b = cast("java.nio.ByteBuffer", done.getClientData())
        n = b.position()
        if n > 2:
            chunk = bytearray(n); b.rewind(); b.get(chunk, 0, n)
            f = chunk[1] & 1
            if cur is None:
                cur = f
            if f != cur:
                if len(acc) >= IMG:
                    a = np.frombuffer(bytes(acc[:IMG]), np.uint8).reshape(H, W, 2)
                    raw16 = (a[H // 2:, :, 0].astype(np.uint16) | (a[H // 2:, :, 1].astype(np.uint16) << 8))
                    avg = raw16.astype(np.float32) if avg is None else avg * 0.75 + raw16 * 0.25
                    view.frame = colorize(avg)
                    c = avg / 64.0 - 273.15
                    view.temps = (float(c.min()), float(c.mean()), float(c.max()))
                acc, cur = bytearray(), f
            acc += chunk[2:]
        b.clear(); done.queue(b)
    for r in reqs:
        r.cancel()

The camera is a single-owner device — one open handle at a time — and a live stream also pins a thread, a wakelock and the lit screen. So the view claims all of that only while it is visible (start) and gives it back when it leaves (stop); otherwise it would lock the camera away from the rpyc console and the other screens, and keep streaming behind your back. And because the camera enumerates lazily (the 0106→0102 settle) and grants permission asynchronously, start polls once a second until both are ready instead of giving up on enter.

class ThermalView(BoxLayout):
    def __init__(self, **kw):
        super().__init__(orientation="vertical", **kw)
        self.view_img = Image(allow_stretch=True, keep_ratio=True)
        self.readout = Label(size_hint_y=None, height="36dp", text="opening camera…")
        self.add_widget(self.view_img); self.add_widget(self.readout)
        self.frame = self.temps = self.texture = self.conn = self._ev = self._poll = None
        self.running = False

    def start(self):
        self._poll = Clock.schedule_interval(self._try_open, 1.0)

    def _try_open(self, dt):
        dev = find_camera()
        if dev is None:
            self.readout.text = "waiting for the camera (2bdf:0102)…"; return
        if not ensure_permission(dev):
            self.readout.text = "accept the USB permission dialog…"; return
        self._poll.cancel(); self._poll = None
        self.conn, self.intf = open_device(dev)
        _m, raw = negotiate(self.conn); commit(self.conn, raw)
        self.running = True
        wl.acquire()
        threading.Thread(target=_reader, args=(self, dev), daemon=True).start()
        self._ev = Clock.schedule_interval(self._tick, 1 / 15.0)

    def _tick(self, dt):
        if self.frame is None:
            return
        h, w, _ = self.frame.shape
        if self.texture is None:
            self.texture = Texture.create(size=(w, h), colorfmt="rgb")
            self.texture.flip_vertical()                       # Kivy origin is bottom-left
        self.texture.blit_buffer(self.frame.tobytes(), colorfmt="rgb", bufferfmt="ubyte")
        self.view_img.texture = None; self.view_img.texture = self.texture
        if self.temps:
            self.readout.text = "min %.0f   avg %.0f   max %.0f °C" % self.temps

    def stop(self):
        self.running = False
        for ev in (self._poll, self._ev):
            if ev:
                ev.cancel()
        self._poll = self._ev = None
        if self.conn is not None:
            try:
                self.conn.releaseInterface(self.intf); self.conn.close()
            except Exception:
                pass
            self.conn = None
        if wl.isHeld():
            wl.release()

Wrapped as a launcher screen like the rest, so the home grid’s button can populate it and navigate in:

class ThermalScreen(Screen):
    def __init__(self, **kw):
        super().__init__(**kw)
        self.view = ThermalView()
        self.add_widget(self.view)
        self.bind(on_enter=lambda *_: self.view.start(),
                  on_pre_leave=lambda *_: self.view.stop())

class ThermalApp(App):
    @staticmethod
    def populate(sm):
        try:
            sm.get_screen("thermal"); return
        except ScreenManagerException:
            pass
        sm.add_widget(ThermalScreen(name="thermal"))

    def build(self):
        sm = ScreenManager()
        self.populate(sm)
        return sm

def run():
    ThermalApp().run()