Ralph orchestrator

Table of Contents

Ralph is an automated task orchestrator that uses Claude Code to implement tasks from a todo list. It reads tasks, generates code, runs tests, and commits changes — all inside a container.

This module provides a generic ralph function that takes any pre-configured container and wires up the ralph infrastructure: npm packages, git identity, credentials, and patch extraction. The caller is responsible for installing their project's own dependencies in the container before passing it here.

1. Configuration

Ralph is driven by a YAML file that defines the event loop, CLI backend, guardrails, and hats (agents). Each hat triggers on events, performs work, and publishes new events to keep the loop going.

This is a generic template — projects should adapt the builder instructions (step 3–7) to their own test and build workflow.

event_loop:
  starting_event: "work.start"
  completion_promise: "LOOP_COMPLETE"
  max_iterations: 50
  max_runtime_seconds: 14400
  checkpoint_interval: 3

cli:
  backend: "claude-code"
  prompt_mode: "arg"

core:
  guardrails:
    - "Run pytest after every code change to verify nothing is broken"
    - "NEVER commit if tests fail — fix the issue or skip the task"
    - "Commit after each completed task with a meaningful message"
    - "Never skip a task without adding a note in todo.org explaining why"

hats:
  picker:
    name: "Task Picker"
    description: "Reads todo.org and picks the next eligible task to implement"
    triggers: ["work.start", "work.done"]
    publishes: ["work.picked", "LOOP_COMPLETE"]
    instructions: |
      You are the task picker. Read todo.org and find the next task to work on.

      ## Rules
      1. Read the full todo.org file
      2. Skip tasks that are:
         - Already DONE (have CLOSED or DONE state)
         - Tagged :structure: (these are section headers)
         - Subtasks (level 3+ headings under a skipped parent)
         - Requiring external infrastructure not available in this container (k8s clusters, external services, specific hardware)
         - Requiring human interaction or decisions that cannot be automated
      3. Among eligible tasks, pick the next one that has NOT been attempted yet
      4. If you find an eligible task, emit work.picked with the task heading and file location as payload
      5. If ALL eligible tasks have been processed (done or skipped with a note), emit LOOP_COMPLETE with a summary of what was accomplished

      ## Important
      - Check for notes added by the builder indicating a task was attempted
      - A task with a note saying "Attempted by ralph" or "Skipped by ralph" counts as processed
      - Do NOT emit LOOP_COMPLETE if there are still unprocessed eligible tasks
      - When emitting LOOP_COMPLETE, include a count of tasks done, skipped, and remaining

  builder:
    name: "Builder"
    description: "Implements the picked task, runs tests, commits, and updates todo.org"
    triggers: ["work.picked"]
    publishes: ["work.done"]
    instructions: |
      You are the builder. Implement the task described in the event payload.

      ## Project conventions
      - Source of truth: .org files in src/ and readme.org at the root
      - Generated files: Python in src/lib/*.py, shell in tests/*.sh — NEVER edit these directly
      - To regenerate after editing .org files: ./tangle.sh (all) or ./tangle.sh src/foo.org (one file)
      - Test command: `pytest tests/test_use_cases.py -v` (do NOT use ./test-host.sh, it requires nix)
      - Tests compare dagger call output against cached #+RESULTS blocks in org files

      ## Workflow
      1. Read readme.org and the relevant src/*.org files to understand the project
      2. Understand the task from the event payload
      3. **Red**: If the change affects behavior (not just prose), add or extend a named bash test block in the relevant org file (without a #+RESULTS block yet). Run `./run.sh <file.org>` to execute the block and populate the #+RESULTS. Then change the expected result to what the new behavior should produce (so it mismatches the current output). Tangle with `./tangle.sh` and run `pytest tests/test_use_cases.py -v` to confirm the new test fails. If no test is relevant (e.g. pure documentation), note why in todo.org and move to step 5.
      4. **Green**: Edit the .org source files to implement the change (never the generated .py or .sh files). Run `./run.sh <file.org>` to update #+RESULTS with the actual output. If the output differs from the expected result written in step 3, verify it still matches the intended behavior — update the #+RESULTS only if the new output is correct. Tangle with `./tangle.sh`. Run `pytest tests/test_use_cases.py -v` and iterate until all tests pass (including the new one).
      5. **Review** (critical): This step is essential — steps 3–4 may have drifted from the original intent. Re-read the task description and verify the implementation actually addresses it. Check that the #+RESULTS assertions test the right thing (not just whatever the code happens to produce). Check `git diff` to make sure only relevant files were changed and nothing was left behind. If anything is off, go back to step 3 or 4.
      6. If tests pass and the review is satisfactory, commit with a meaningful message
      7. NEVER commit if tests fail — fix the issue or skip the task
      8. Update todo.org: mark the task DONE with a note describing what was done
      9. If the task cannot be implemented (missing context, external deps, too complex), add a note in todo.org explaining why and mark it as skipped
      10. Emit work.done

      ## Rules
      - NEVER commit if tests fail — fix the issue or skip the task
      - Always run pytest before committing
      - Keep changes focused on the single task
      - Do not modify unrelated code
      - If the diff touches more than ~10 files or adds more than ~200 lines, reconsider — the task may be too broad
      - If tests fail after 3 attempts, `git checkout .` to reset and skip the task with a note
      - If a task is ambiguous, implement the most reasonable interpretation and note what you assumed

2. Running ralph in a container

The ralph function orchestrates five stages, each handled by a private helper:

  • _ralph_tooling — install ralph CLI, claude-code, pytest, dagger CLI
  • _ralph_git — configure git identity
  • _ralph_workdir — copy project source and optional config files
  • _ralph_credentials — mount Claude credentials
  • _ralph_run — execute the wrapper script and generate patches

The caller provides a container with their project's dependencies already installed (test frameworks, build tools, etc.). The ralph function adds the ralph layer on top via these stages.

Exit code 2 from ralph means "max iterations reached", which is an expected outcome — not an error.

The wrapper script starts ralph in the background and optionally polls Consul for stop signals and log-level changes. It captures the exit code for later inspection.

#!/bin/sh
cd "$WORK_DIR"
git rev-parse HEAD > /tmp/ralph-base-commit
echo "[ralph-wrapper] starting ralph..."
eval "ralph $RALPH_ARGS" &
RALPH_PID=$!
echo "[ralph-wrapper] ralph started (PID $RALPH_PID)"

if [ -n "$CONSUL_ADDR" ]; then
    echo "[ralph-wrapper] polling consul at $CONSUL_ADDR key=$CONSUL_KEY"
    prev_level=normal
    while kill -0 $RALPH_PID 2>/dev/null; do
        status=$(curl -sf "$CONSUL_ADDR/v1/kv/$CONSUL_KEY?raw" 2>/dev/null || echo running)
        if [ "$status" = "stop" ]; then
            echo "[ralph-wrapper] stop signal received, sending SIGTERM to ralph (PID $RALPH_PID)..."
            kill -TERM $RALPH_PID 2>/dev/null || true
            for i in $(seq 1 30); do
                kill -0 $RALPH_PID 2>/dev/null || break
                sleep 1
            done
            if kill -0 $RALPH_PID 2>/dev/null; then
                echo "[ralph-wrapper] ralph did not exit after 30s, sending SIGKILL"
                kill -KILL $RALPH_PID 2>/dev/null || true
            fi
            break
        fi
        level=$(curl -sf "$CONSUL_ADDR/v1/kv/$CONSUL_KEY/log-level?raw" 2>/dev/null || echo normal)
        if [ "$level" != "$prev_level" ]; then
            echo "[ralph-wrapper] log-level: $level"
            prev_level=$level
        fi
        sleep 5
    done
fi

echo "[ralph-wrapper] waiting for ralph to exit..."
wait $RALPH_PID
RC=$?
echo "[ralph-wrapper] ralph exited with code $RC"
echo ${RC} > "$WORK_DIR/.ralph-exit-code"
true
def _ralph_tooling(
    self,
    ctr: dagger.Container,
    home: str,
    dagger_runner_host: str,
) -> dagger.Container:
    """Install ralph CLI, claude-code, pytest, and dagger CLI."""
    q_home = shlex.quote(home)
    ctr = ctr.with_exec(
        [
            "sh", "-c",
            f"npm config set prefix {q_home}/.npm-global"
            " && npm install -g"
            " @ralph-orchestrator/ralph-cli"
            " @anthropic-ai/claude-code",
        ]
    ).with_env_variable(
        "PATH", f"{home}/.npm-global/bin:$PATH", expand=True,
    )
    ctr = ctr.with_exec(
        ["sh", "-c", "pip install --quiet pytest"]
    ).with_exec(
        [
            "sh", "-c",
            f"cd {q_home} && curl -fsSL https://dl.dagger.io/dagger/install.sh"
            " | BIN_DIR=$HOME/.local/bin sh",
        ]
    )
    if dagger_runner_host:
        ctr = ctr.with_env_variable(
            "_EXPERIMENTAL_DAGGER_RUNNER_HOST", dagger_runner_host,
        )
    return ctr

def _ralph_git(
    self,
    ctr: dagger.Container,
    email: str,
    name: str,
) -> dagger.Container:
    """Configure git identity."""
    q_email, q_name = map(shlex.quote, (email, name))
    return ctr.with_exec(
        [
            "sh", "-c",
            f"git config --global user.email {q_email}"
            f" && git config --global user.name {q_name}"
            " && git config --global init.defaultBranch main",
        ]
    )

def _ralph_workdir(
    self,
    ctr: dagger.Container,
    src: dagger.Directory,
    work_dir: str,
    owner: str,
    ralph_yml: dagger.File | None,
    plan_md: dagger.File | None,
    todo_org: dagger.File | None,
) -> dagger.Container:
    """Copy project source and optional config files into workdir."""
    q_work_dir = shlex.quote(work_dir)
    ctr = (
        ctr.with_directory(
            f"{work_dir}/.git", src.directory(".git"), owner=owner,
        )
        .with_directory(work_dir, src, owner=owner)
        .with_exec(["sh", "-c", f"cd {q_work_dir} && git checkout ."])
    )
    if ralph_yml is not None:
        ctr = ctr.with_file(f"{work_dir}/ralph.yml", ralph_yml, owner=owner)
    if plan_md is not None:
        ctr = ctr.with_file(f"{work_dir}/plan.md", plan_md, owner=owner)
    if todo_org is not None:
        ctr = ctr.with_file(f"{work_dir}/todo.org", todo_org, owner=owner)
    return ctr

def _ralph_credentials(
    self,
    ctr: dagger.Container,
    home: str,
    credentials: dagger.Secret,
    owner: str,
) -> dagger.Container:
    """Mount Claude credentials."""
    return (
        ctr
        .with_exec(["mkdir", "-p", f"{home}/.claude"])
        .with_mounted_secret(
            f"{home}/.claude/.credentials.json", credentials, owner=owner,
        )
    )

def _ralph_run(
    self,
    ctr: dagger.Container,
    src: dagger.Directory,
    work_dir: str,
    ralph_args: str,
    consul_addr: str,
    consul_key: str,
) -> dagger.Container:
    """Run ralph wrapper and generate patches."""
    ctr = ctr.with_file(
        "/tmp/ralph-wrapper.sh",
        src.file("src/lib/ralph-wrapper.sh"),
    ).with_env_variable(
        "WORK_DIR", work_dir,
    ).with_env_variable(
        "RALPH_ARGS", ralph_args,
    )
    if consul_addr:
        ctr = ctr.with_env_variable(
            "CONSUL_ADDR", consul_addr,
        ).with_env_variable(
            "CONSUL_KEY", consul_key,
        )
    ctr = ctr.with_exec(["sh", "/tmp/ralph-wrapper.sh"])
    q_work_dir = shlex.quote(work_dir)
    ctr = ctr.with_exec(
        [
            "sh", "-c",
            f"cd {q_work_dir}"
            " && mkdir -p patches"
            " && base=$(cat /tmp/ralph-base-commit)"
            ' && if [ "$(git rev-parse HEAD)" != "$base" ]; then'
            ' git format-patch "$base"..HEAD -o patches;'
            " fi",
        ]
    )
    return ctr

@function
async def ralph(
    self,
    claude_credentials: dagger.Secret,
    # Container
    src: dagger.Directory | None = None,
    ctr: dagger.Container | None = None,
    distro_packages: list[str] = (),
    username: str | None = None,
    # Git identity
    git_email: str = "ralph@localhost",
    git_name: str = "Ralph",
    # Execution
    ralph_args: str = "",
    work_dir: str = "/tmp/ralph-workdir",
    dagger_runner_host: str = "",
    consul_addr: str = "",
    consul_key: str = "ralph/dagger/status",
    # File overrides
    ralph_yml: dagger.File | None = None,
    plan_md: dagger.File | None = None,
    todo_org: dagger.File | None = None,
) -> dagger.Directory:
    """Run ralph orchestrator in a container and return the workdir with patches.

    src is the module source directory; defaults to the current directory.
    """
    if src is None:
        try:
            src = dag.current_module().source()
        except dagger.QueryError:
            src = dag.address(".").directory()
    username = username or self.default_username
    if ctr is None:
        ctr = self.debian_python_user_venv(
            distro_packages=["git", "npm"] + list(distro_packages),
        )
    owner = f"{username}:{username}"
    home = f"/home/{username}"
    ctr = self._ralph_tooling(ctr, home, dagger_runner_host)
    ctr = self._ralph_git(ctr, git_email, git_name)
    ctr = self._ralph_workdir(ctr, src, work_dir, owner, ralph_yml, plan_md, todo_org)
    ctr = self._ralph_credentials(ctr, home, claude_credentials, owner)
    ctr = self._ralph_run(ctr, src, work_dir, ralph_args, consul_addr, consul_key)
    return ctr.directory(work_dir)

The function cannot be fully exercised without real Claude credentials, but we can verify the tangled ralph.yml is valid YAML.

dagger call debian --distro-packages=python3-yaml with-file --path=/tmp/ralph.yml --source=src/ralph.yml with-exec --args="python3","-c","import yaml; d=yaml.safe_load(open('/tmp/ralph.yml')); print(d['event_loop']['starting_event']); print(len(d['hats']))" stdout

The log filter is tested by piping a fixture of real ralph output through ralph-log-filter and comparing the result.

ralph-log-filter < tests/ralph_log_sample.txt 2>/dev/null | sed 's/\x1b\[[0-9;]*m//g'

[file] Read: tmp/ralph-workdir/ralph.yml [tool] Bash: mkdir -p .ralph/agent [file] Write: /tmp/ralph-workdir.ralph/agent/scratchpad.md [event] event: work.picked [ralph] log-level changed to json #+END_SRC

3. Log filter

The log filter is a standalone script that colorizes and summarizes ralph/dagger output from stdin. It only uses stdlib modules so it works in containers where clk is not installed.

#!/usr/bin/env python3
"""Filter and colorize ralph/dagger output from stdin."""
import json
import re
import sys

RST = "\033[0m"
TAG_COLORS = {
    "[git]": "\033[33m",
    "[tool]": "\033[36m",
    "[file]": "\033[35m",
    "[grep]": "\033[34m",
    "[PASS]": "\033[32m",
    "[FAIL]": "\033[31m",
    "[think]": "\033[90m",
    "[event]": "\033[37m",
    "[ralph]": "\033[90m",
    "[verbose]": "\033[90m",
}


def out(icon, msg):
    c = TAG_COLORS.get(icon, "")
    print(f"{c}{icon}{RST} {msg}", flush=True)


level = "normal"
for raw in sys.stdin:
    raw = raw.strip()
    m = re.search(r"\[ralph-wrapper\] (.*)", raw)
    if m:
        lm = re.match(r"log-level: (\S+)", m.group(1))
        if lm:
            level = lm.group(1)
            out("[ralph]", f"log-level changed to {level}")
            continue
        out("[ralph]", m.group(1))
        continue
    if level == "json":
        print(raw, flush=True)
        continue
    if level == "think":
        if "thinking" in raw.lower() or "antml:thinking" in raw:
            out("[think]", raw)
            continue
    if "Event emitted" in raw:
        m2 = re.search(r"Event emitted: ([\w.]+)", raw)
        out("[event]", f"event: {m2.group(1)}" if m2 else "event emitted")
        continue
    if "git commit" in raw:
        m = re.search(r"git commit -m .(.{3,120}?)(?:\\n|\\|\"\"|\x27\x27|$)", raw)
        if m:
            out("[git]", f"commit: {m.group(1).strip().strip(chr(34))}")
        else:
            out("[git]", "commit")
        continue
    m = re.search(r"\"name\":\"(Bash|Edit|Write|Glob|Grep|Read)\"", raw)
    if m:
        tool = m.group(1)
        mc = re.search(r"\"command\":\"([^\"]{0,200})", raw)
        mf = re.search(r"\"file_path\":\"([^\"]*)", raw)
        mp = re.search(r"\"pattern\":\"([^\"]{0,80})", raw)
        if mc:
            cmd = mc.group(1).replace("\\n", " ").strip()
            out("[tool]", f"{tool}: {cmd}")
        elif mf:
            out("[file]", f"{tool}: {mf.group(1)}")
        elif mp:
            out("[grep]", f"{tool}: {mp.group(1)}")
        continue
    m = re.search(r"(\d+ passed[^\"\\\\]*)", raw)
    if m:
        out(
            "[PASS]" if "failed" not in m.group(1) else "[FAIL]",
            m.group(1),
        )
        continue
    if re.search(r"FAILED|failed", raw) and "test_use_case" in raw:
        out("[FAIL]", "TEST FAILED")
        continue
    if level == "verbose":
        try:
            j = json.loads(raw)
            t = j.get("type", "?")
            msg = j.get("message", {})
            role = msg.get("role", "")
            content = msg.get("content", "")
            if isinstance(content, list):
                parts = []
                for c in content:
                    if isinstance(c, dict):
                        parts.append(c.get("type", "?"))
                    else:
                        parts.append(str(c)[:80])
                content = ", ".join(parts)
            summary = f"{t}"
            if role:
                summary += f"/{role}"
            if content:
                summary += f": {content}"
            out("[verbose]", summary)
        except (json.JSONDecodeError, AttributeError):
            out("[verbose]", raw)
        continue

4. clk commands

These commands let you run clk ralph run, clk ralph stop, etc. instead of typing long dagger call commands. They are implemented as a Python clk group with subcommands.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Ralph orchestrator CLI commands."""
import os
import re
import shutil
import signal
import subprocess
import sys
from pathlib import Path

import click

from clk.config import config
from clk.decorators import argument, command, group, option
from clk.log import get_logger

LOGGER = get_logger(__name__)

CONSUL_KEY_DEFAULT = "ralph/dagger/status"


@group()
def ralph():
    "Ralph orchestrator commands"


@ralph.command()
@option("--max-iterations", type=int, default=50, help="Maximum number of ralph iterations")
@option("--output-dir", default="ralph-output", help="Output directory for the artifact")
@option(
    "--credentials",
    type=click.Path(exists=True),
    default=os.path.expanduser("~/.claude/.credentials.json"),
    help="Path to claude code credentials",
)
@option("--consul-key", default=CONSUL_KEY_DEFAULT, help="Consul KV key for stop signaling")
def run(max_iterations, output_dir, credentials, consul_key):
    "Run ralph orchestrator in a dagger container"
    project = config.project
    consul_addr = os.environ.get("CONSUL_HTTP_ADDR", "")
    runner_host = os.environ.get("_EXPERIMENTAL_DAGGER_RUNNER_HOST", "")

    ralph_args = f"run --max-iterations {max_iterations}"
    ralph_args += " -p 'Follow the instructions in ralph.yml hats'"

    dagger_args = [
        "call", "ralph",
        "--claude-credentials", f"file:{credentials}",
        "--distro-packages", "emacs-nox",
        "--distro-packages", "curl",
        "--distro-packages", "xz-utils",
        "--ralph-args", ralph_args,
    ]

    if runner_host:
        dagger_args += ["--dagger-runner-host", runner_host]
    if consul_addr:
        dagger_args += ["--consul-addr", consul_addr, "--consul-key", consul_key]

    ralph_yml = Path(project) / "src" / "ralph.yml"
    if ralph_yml.exists():
        dagger_args += ["--ralph-yml", str(ralph_yml)]
    todo_org = Path(project) / "todo.org"
    if todo_org.exists():
        dagger_args += ["--todo-org", str(todo_org)]

    pid_file = Path(project) / ".ralph-pid"
    log_file = Path(project) / ".ralph-log"

    def cleanup():
        pid_file.unlink(missing_ok=True)
        if consul_addr:
            subprocess.run(["consul", "kv", "delete", consul_key], capture_output=True)
            subprocess.run(
                ["consul", "kv", "delete", f"{consul_key}/log-level"],
                capture_output=True,
            )

    def handle_signal(signum, frame):
        cleanup()
        sys.exit(130)

    signal.signal(signal.SIGINT, handle_signal)
    signal.signal(signal.SIGTERM, handle_signal)

    if consul_addr:
        print(f"[ralph.run] setting consul key '{consul_key}' to 'running'")
        subprocess.run(
            ["consul", "kv", "put", consul_key, "running"],
            capture_output=True,
        )
        subprocess.run(
            ["consul", "kv", "put", f"{consul_key}/log-level", "normal"],
            capture_output=True,
        )
    else:
        print("[ralph.run] CONSUL_HTTP_ADDR not set, stop signaling disabled")

    print(f"[ralph.run] starting dagger (output-dir={output_dir}, log={log_file})...")
    pid_file.write_text(str(os.getpid()))

    try:
        dagger_cmd = (
            ["dagger", "--progress=plain"]
            + dagger_args
            + ["export", "--path", output_dir]
        )
        dagger_proc = subprocess.Popen(
            dagger_cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            cwd=project,
        )
        filter_proc = subprocess.Popen(
            [sys.executable, str(Path(project) / "tests" / "ralph-log-filter")],
            stdin=subprocess.PIPE,
        )
        with open(log_file, "wb") as lf:
            for chunk in iter(lambda: dagger_proc.stdout.read(4096), b""):
                lf.write(chunk)
                filter_proc.stdin.write(chunk)
                filter_proc.stdin.flush()
        filter_proc.stdin.close()
        filter_proc.wait()
        rc = dagger_proc.wait()
        if rc == 0:
            print("[ralph.run] dagger completed successfully")
        else:
            print(f"[ralph.run] dagger exited with code {rc}")
    finally:
        cleanup()


@ralph.command()
@option("--consul-key", default=CONSUL_KEY_DEFAULT, help="Consul KV key for stop signaling")
def stop(consul_key):
    "Stop a running ralph via Consul and wait for results to be exported"
    consul_addr = os.environ.get("CONSUL_HTTP_ADDR", "")
    if not consul_addr:
        raise click.ClickException("CONSUL_HTTP_ADDR is not set")

    project = config.project
    pid_file = Path(project) / ".ralph-pid"

    result = subprocess.run(
        ["consul", "kv", "get", consul_key], capture_output=True, text=True
    )
    status = result.stdout.strip() if result.returncode == 0 else ""
    print(f"[ralph.stop] consul key '{consul_key}' = '{status or 'empty'}'")
    if status != "running":
        raise click.ClickException("no running ralph found")

    print(f"[ralph.stop] writing 'stop' to consul key '{consul_key}'...")
    subprocess.run(["consul", "kv", "put", consul_key, "stop"], capture_output=True)

    if pid_file.exists():
        pid = pid_file.read_text().strip()
        print(f"[ralph.stop] waiting for dagger (PID {pid}) to finish exporting...")
        subprocess.run(
            ["tail", f"--pid={pid}", "-f", "/dev/null"],
            capture_output=True,
        )
        print("[ralph.stop] dagger process finished")
    else:
        print("[ralph.stop] no .ralph-pid file, cannot wait for dagger")

    print("[ralph.stop] done")


@ralph.command()
@option("--consul-key", default=CONSUL_KEY_DEFAULT, help="Consul KV key for stop signaling")
@argument(
    "level",
    type=click.Choice(["normal", "think", "verbose", "json", "show"]),
    default="show",
    help="Log level. Omit to show current level.",
)
def log_level(consul_key, level):
    """Change the verbosity of a running ralph's output

    Levels:
      normal   filtered, categorized output (default)
      think    normal + claude thinking content
      verbose  show every line, truncated
      json     show every line raw"""
    consul_addr = os.environ.get("CONSUL_HTTP_ADDR", "")
    if not consul_addr:
        raise click.ClickException("CONSUL_HTTP_ADDR is not set")

    if level == "show":
        result = subprocess.run(
            ["consul", "kv", "get", f"{consul_key}/log-level"],
            capture_output=True,
            text=True,
        )
        print(result.stdout.strip() if result.returncode == 0 else "normal")
        return

    subprocess.run(
        ["consul", "kv", "put", f"{consul_key}/log-level", level],
        capture_output=True,
    )
    print(f"[ralph.log-level] set to '{level}'")


@ralph.command()
@option("--output-dir", default="ralph-output", help="Output directory for the artifact")
def gather(output_dir):
    "Apply patches generated by a ralph run to the current branch"
    project = Path(config.project)
    patch_dir = project / output_dir / "patches"

    if not patch_dir.is_dir():
        raise click.ClickException(f"patch directory '{patch_dir}' not found")

    patches = sorted(patch_dir.glob("*.patch"))
    if not patches:
        print(f"No patches to apply in '{patch_dir}'")
        return

    # Back up untracked files that patches may create
    backed_up = []
    for p in patches:
        for line in p.read_text().splitlines():
            if line.startswith("+++ b/"):
                f = project / line[6:]
                if f.exists():
                    result = subprocess.run(
                        ["git", "ls-files", "--error-unmatch", str(f)],
                        capture_output=True,
                        cwd=project,
                    )
                    if result.returncode != 0:
                        shutil.move(str(f), f"{f}.ralph-backup")
                        backed_up.append(f)

    print(f"Applying {len(patches)} patch(es) from '{patch_dir}':")
    for p in patches:
        print(f"  {p.name}")

    result = subprocess.run(
        ["git", "am"] + [str(p) for p in patches], cwd=project
    )
    if result.returncode != 0:
        for f in backed_up:
            backup = Path(f"{f}.ralph-backup")
            if backup.exists():
                shutil.move(str(backup), str(f))
        sys.exit(1)

    for f in backed_up:
        Path(f"{f}.ralph-backup").unlink(missing_ok=True)

    todo_src = project / output_dir / "todo.org"
    if todo_src.exists():
        shutil.copy2(str(todo_src), str(project / "todo.org"))
        print(f"Restored todo.org from {output_dir}/")


@ralph.command()
@option(
    "--log-file",
    type=click.Path(exists=True),
    default=None,
    help="Path to ralph log file (default: .ralph-log in project)",
)
@option(
    "--output",
    type=click.Path(),
    default=None,
    help="Output fixture path (default: tests/ralph_log_sample.txt in project)",
)
def capture_fixture(log_file, output):
    "Extract representative lines from a ralph log to build a test fixture"
    project = Path(config.project)
    log_path = Path(log_file) if log_file else project / ".ralph-log"
    output_path = Path(output) if output else project / "tests" / "ralph_log_sample.txt"

    if not log_path.exists():
        raise click.ClickException(f"log file not found: {log_path}")

    lines = log_path.read_text().splitlines(keepends=True)

    # Patterns for each filter branch, in priority order.
    # We collect the first (shortest) match for each.
    patterns = [
        ("ralph-wrapper-start", r"\[ralph-wrapper\] start"),
        ("ralph-wrapper-other", r"\[ralph-wrapper\] ralph (?:started|exited)"),
        ("ralph-wrapper-level", r"\[ralph-wrapper\] log-level:"),
        ("tool-read", r'"name":"Read"'),
        ("tool-bash", r'"name":"Bash"'),
        ("tool-grep", r'"name":"Grep"'),
        ("tool-edit", r'"name":"Edit"'),
        ("tool-write", r'"name":"Write"'),
        ("event", r"Event emitted"),
        ("git-commit", r"git commit -m"),
        ("test-pass", r"\d+ passed"),
        ("test-fail", r"FAILED.*test_use_case"),
    ]

    max_len = 2000  # skip giant JSON blobs

    selected = {}
    for label, pattern in patterns:
        if label in selected:
            continue
        best = None
        for line in lines:
            if len(line) > max_len:
                continue
            if re.search(pattern, line):
                if best is None or len(line) < len(best):
                    best = line
        if best is not None:
            selected[label] = best

    # Always include one unmatched line (for verbose catch-all)
    for line in lines:
        if len(line) > max_len:
            continue
        matched = any(
            re.search(p, line)
            for _, p in patterns
        )
        if not matched and line.strip():
            selected["unmatched"] = line
            break

    if not selected:
        raise click.ClickException("no matching lines found in log")

    # Write in a stable order. log-level goes last since it changes
    # filter mode and would affect processing of subsequent lines.
    order = [
        p[0] for p in patterns if p[0] != "ralph-wrapper-level"
    ] + ["unmatched", "ralph-wrapper-level"]
    fixture_lines = [selected[k] for k in order if k in selected]

    output_path.parent.mkdir(parents=True, exist_ok=True)
    with open(output_path, "w") as f:
        for line in fixture_lines:
            f.write(line if line.endswith("\n") else line + "\n")

    print(f"Wrote {len(fixture_lines)} lines to {output_path}")
    for label in order:
        if label in selected:
            print(f"  {label}: {len(selected[label].strip())} chars")

Author: root

Created: 2026-04-18 Sat 21:16

Validate