Back to blog

Spec driven development

Claude's recent /loop and routines move coding agents from one-shot CLI assistants toward scheduled, event-driven workers. It is also perfectly possible to replicate multiple worktrees, loops, and routines with a small agent-written runner. The public description says loops repeat prompts inside a session, while routines run in Claude Code's cloud on schedules, API calls, or GitHub events. I do not know Anthropic's internals, but from the description it sounds like a Temporal-shaped workflow layer: persisted intent, wakeups, retries, and resumable sessions.

spec Markdown requirements Durable intent, constraints, and the next bounded obligation.
loop Restate on a remote machine Continuously processes specs, resumes work, retries failures, and fans out worktrees.
agent Implementation, tests, logs The model edits one implementation slice and records exactly what happened.

Specs move into a remote Restate workflow, and Restate keeps turning them into implementation passes.

I have been using loops and specs in Foundry for a while, but with Restate instead of Temporal. The local scripts around the repository do the unglamorous work: snapshot the current state, find the next bounded obligation in the spec, generate a precise prompt, run the coding agent, collect output, and leave a trail that can survive interruption.

That is the practical version of spec driven development. The spec is not only prose for humans. It is the source of durable work items. The loop does not ask the model to remember the whole project or invent priority from scratch; it gives the model one narrow contract, a live repo, and a verification expectation. If the pass fails, the failure becomes input for the next pass instead of disappearing into a terminal scrollback.

This is different from prompt driven development. A prompt can be a useful instruction, but it is usually disposable. A spec is durable. It has scope, constraints, acceptance criteria, and a place in the product contract. When the loop reads from the spec, the agent is not just following a clever instruction; it is advancing a named obligation that can be audited later. That matters for systems work because correctness is rarely visible in one file. The useful artifact is the chain from requirement to patch to verification.

The surrounding automation is deliberately plain. One part gathers context without changing durable progress. Another part chooses or prepares a small unit of work. Another runs the agent and captures logs, prompts, patches, status, and failure output. A proof pass can then exercise real workflows and attach evidence. None of that requires the model to be consistent over days. The model can be interrupted, replaced, upgraded, or wrong; the loop still has enough state to decide what to do next.

It also changes review. Instead of asking whether an agent was impressive, I can ask whether a specific spec claim moved forward, whether the produced patch is narrow, whether verification actually covers the changed behavior, and whether the remaining gap is recorded. That makes automation useful even when the answer is "not done yet." The loop still produces a better next input.

Restate fits this shape because the workflow owns retry, recovery, and wakeups while the agent owns reading code, editing files, and reporting verification. In other words, the agent is replaceable. The durable system is the combination of spec, workflow state, logs, patches, commits, and evidence. Instead of a heroic session, the unit of progress becomes an auditable repeatable cycle. That is what makes long-running agent work boring enough to trust.

foundry_restate_codex_loop.py
#!/usr/bin/env python3
"""Standalone durable Codex loop for Foundry specs.

This script has two modes:

1. Local mode, available with the repository's normal Python:
   it runs audit/implementation cycles, streams all child output, and writes a
   durable local state/log trail under .foundry-restate-loop/.

2. Restate service mode, available when running Python >= 3.11 with
   restate_sdk[serde] and hypercorn installed:
   it exposes the same cycle as a Restate Workflow so Restate can resume,
   schedule, and inspect long-running Codex/spec implementation work.

It never edits .foundry-progress directly. Spec progress goes through
scripts/foundry_spec_runner.py status, list, run, prompt, and logs.
"""

from __future__ import annotations

import argparse
import contextlib
import datetime as dt
import difflib
import hashlib
import html
import json
import os
from pathlib import Path
import shutil
import shlex
import signal
import subprocess
import sys
import threading
import time
from typing import Any, Dict, Iterable, List, Optional, Tuple


ROOT = Path(__file__).resolve().parents[1]
STATE_DIR = ROOT / ".foundry-restate-loop"
LOG_DIR = STATE_DIR / "logs"
RESTATE_LOG_DIR = STATE_DIR / "restate-logs"
PROMPT_DIR = STATE_DIR / "prompts"
AUDIT_DIR = STATE_DIR / "audits"
CHANGE_DIR = STATE_DIR / "changes"
WORKTREE_DIR = STATE_DIR / "worktrees"
STATE_FILE = STATE_DIR / "state.json"
DECISIONS_FILE = STATE_DIR / "decisions.jsonl"
EVENTS_FILE = STATE_DIR / "events.jsonl"
INTERRUPTED_FILE = STATE_DIR / "last-interrupted.json"
STOP_FILE = STATE_DIR / "stop-request.json"
RESTATE_SERVER_FILE = STATE_DIR / "serve-restate.json"
DEFAULT_RESTATE_DATA_DIR = ROOT / "restate-data"

DEFAULT_CODEX_CMD = "codex exec -"
DEFAULT_VERIFY_CMD = "python3 -m py_compile scripts/foundry_restate_codex_loop.py"
DEFAULT_REMOTE_VERIFY_HOST = "selectel-day"
STATE_OUTPUT_PREVIEW_CHARS = 4000
MAX_SNAPSHOT_TEXT_BYTES = 2_000_000
DEFAULT_DIFF_LIMIT = 60000
CHANGE_FORMATS = ("text", "json", "patch")
CODEX_STREAM_MODES = ("auto", "raw", "assistant", "quiet")
DEFAULT_ISOLATED_BASE = "HEAD"
DEFAULT_COMMIT_BRANCH_PREFIX = "codex"
SPINNER_FRAMES = ("|", "/", "-", "\\")

ACTIVE: Dict[str, Any] = {
    "cycle": None,
    "step": None,
    "pid": None,
    "prompt_path": None,
    "log_path": None,
    "codex_running": False,
}
STOP_REQUESTED = False


def utc_now() -> str:
    return dt.datetime.now(dt.timezone.utc).replace(microsecond=0).isoformat()


def stamp() -> str:
    return dt.datetime.now(dt.timezone.utc).replace(microsecond=0).strftime("%Y%m%dT%H%M%SZ")


def slug(value: str, max_len: int = 56) -> str:
    chars: List[str] = []
    prev_dash = False
    for ch in value.lower():
        if ch.isalnum():
            chars.append(ch)
            prev_dash = False
        elif not prev_dash:
            chars.append("-")
            prev_dash = True
    result = "".join(chars).strip("-")
    return result[:max_len].strip("-") or "item"


def rel(path: Optional[Path]) -> Optional[str]:
    if path is None:
        return None
    try:
        return str(path.relative_to(ROOT))
    except ValueError:
        return str(path)


def ensure_dirs() -> None:
    STATE_DIR.mkdir(parents=True, exist_ok=True)
    LOG_DIR.mkdir(parents=True, exist_ok=True)
    RESTATE_LOG_DIR.mkdir(parents=True, exist_ok=True)
    PROMPT_DIR.mkdir(parents=True, exist_ok=True)
    AUDIT_DIR.mkdir(parents=True, exist_ok=True)
    CHANGE_DIR.mkdir(parents=True, exist_ok=True)
    WORKTREE_DIR.mkdir(parents=True, exist_ok=True)


def atomic_write_json(path: Path, value: Dict[str, Any]) -> None:
    ensure_dirs()
    tmp = path.with_name(f".{path.name}.{os.getpid()}.tmp")
    tmp.write_text(json.dumps(value, indent=2, sort_keys=True) + "\n", encoding="utf-8")
    tmp.replace(path)


def append_jsonl(path: Path, value: Dict[str, Any]) -> None:
    ensure_dirs()
    with path.open("a", encoding="utf-8") as fh:
        fh.write(json.dumps(value, sort_keys=True) + "\n")
        fh.flush()
        with contextlib.suppress(OSError):
            os.fsync(fh.fileno())


def event_fields_text(fields: Dict[str, Any]) -> str:
    parts: List[str] = []
    for key, value in fields.items():
        if value is None:
            continue
        if isinstance(value, (dict, list, tuple)):
            text = json.dumps(value, sort_keys=True)
        else:
            text = str(value)
        parts.append(f"{key}={shlex.quote(text)}")
    return " ".join(parts)


def record_loop_event(event: str, cycle: Optional[int] = None, **fields: Any) -> Dict[str, Any]:
    payload: Dict[str, Any] = {"time": utc_now(), "event": event}
    if cycle is not None:
        payload["cycle"] = cycle
    payload.update({key: value for key, value in fields.items() if value is not None})
    append_jsonl(EVENTS_FILE, payload)
    return payload


def log_loop_event(args: Optional[argparse.Namespace], event: str, message: str, cycle: Optional[int] = None, **fields: Any) -> Dict[str, Any]:
    payload = record_loop_event(event, cycle=cycle, **fields)
    if args is not None and not bool(getattr(args, "event_log", True)):
        return payload
    visible_fields = {key: value for key, value in payload.items() if key not in {"time", "event", "cycle"}}
    prefix = f"[foundry-loop] {event}"
    if cycle is not None:
        prefix += f" cycle={cycle}"
    suffix = event_fields_text(visible_fields)
    line = f"{prefix} {message}"
    if suffix:
        line += f" | {suffix}"
    print(line, file=sys.stderr, flush=True)
    return payload


def tail_jsonl(path: Path, lines: int) -> List[str]:
    if not path.exists():
        return []
    try:
        return path.read_text(encoding="utf-8", errors="replace").splitlines()[-lines:]
    except OSError as exc:
        return [json.dumps({"time": utc_now(), "event": "read-error", "path": rel(path), "error": str(exc)}, sort_keys=True)]


def read_json(path: Path) -> Dict[str, Any]:
    if not path.exists():
        return {}
    try:
        data = json.loads(path.read_text(encoding="utf-8"))
    except (OSError, json.JSONDecodeError, UnicodeDecodeError) as exc:
        return {"error": str(exc)}
    return data if isinstance(data, dict) else {"error": "state root is not an object"}


def trim_text(value: str, max_chars: int) -> str:
    if len(value) <= max_chars:
        return value
    head = max_chars // 2
    tail = max_chars - head
    return value[:head] + f"\n\n[... omitted {len(value) - max_chars} characters ...]\n\n" + value[-tail:]


def compact_result(result: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
    if result is None:
        return None
    compact: Dict[str, Any] = {}
    for key in (
        "step",
        "exitCode",
        "startedAt",
        "completedAt",
        "logPath",
        "cwd",
        "auditInput",
        "auditReport",
        "auditHtml",
        "promptPath",
        "skipped",
    ):
        if key in result:
            compact[key] = result[key]
    argv = result.get("argv")
    if isinstance(argv, list):
        compact["command"] = shell_join(str(part) for part in argv)
    output = result.get("outputTail")
    if output:
        compact["outputPreview"] = trim_text(str(output), STATE_OUTPUT_PREVIEW_CHARS)
    return compact


def shell_join(argv: Iterable[str]) -> str:
    return " ".join(shlex.quote(part) for part in argv)


def codex_argv(args: argparse.Namespace) -> List[str]:
    argv = shlex.split(args.codex_cmd)
    if not argv:
        return argv
    extra: List[str] = []
    if getattr(args, "dangerously_bypass_approvals_and_sandbox", False):
        extra.append("--dangerously-bypass-approvals-and-sandbox")
    if getattr(args, "search", False):
        extra.append("--search")
    extra = [flag for flag in extra if flag not in argv]
    if not extra:
        return argv
    if Path(argv[0]).name == "codex":
        return [argv[0], *extra, *argv[1:]]
    return argv


def effective_codex_cmd(args: argparse.Namespace) -> str:
    return shell_join(codex_argv(args))


def repo_path(value: str) -> Path:
    path = Path(value).expanduser()
    if not path.is_absolute():
        path = ROOT / path
    return path


def work_root(args: Optional[argparse.Namespace] = None) -> Path:
    if args is not None and getattr(args, "_work_root", None):
        return Path(str(args._work_root))
    return ROOT


def read_prompt_file(path_text: str) -> str:
    path = repo_path(path_text)
    if not path.exists():
        raise SystemExit(f"prompt file does not exist: {path}")
    if not path.is_file():
        raise SystemExit(f"prompt path is not a file: {path}")
    return path.read_text(encoding="utf-8")


def operator_prompt(args: argparse.Namespace) -> str:
    parts: List[str] = []
    prompt_file = getattr(args, "prompt_file", None)
    if prompt_file:
        parts.append(read_prompt_file(str(prompt_file)).strip())
    prompt = getattr(args, "prompt", None)
    if prompt:
        parts.append(str(prompt).strip())
    return "\n\n".join(part for part in parts if part).strip()


def operator_prompt_source(args: argparse.Namespace) -> Optional[str]:
    sources: List[str] = []
    prompt_file = getattr(args, "prompt_file", None)
    if prompt_file:
        sources.append(f"file:{rel(repo_path(str(prompt_file)))}")
    if getattr(args, "prompt", None):
        sources.append("inline")
    return ",".join(sources) if sources else None


def verification_policy(args: argparse.Namespace) -> Dict[str, Any]:
    return {
        "remoteHost": str(getattr(args, "remote_verify_host", None) or DEFAULT_REMOTE_VERIFY_HOST),
        "allowLocalVerification": bool(getattr(args, "allow_local_verification", False)),
        "localVerificationReason": getattr(args, "local_verification_reason", None),
    }


def verification_policy_prompt(args: argparse.Namespace) -> str:
    policy = verification_policy(args)
    remote_host = policy["remoteHost"]
    if policy["allowLocalVerification"]:
        reason = policy.get("localVerificationReason") or (
            f"Remote SSH to `{remote_host}` is blocked or unavailable in this execution environment."
        )
        return f"""Verification policy:

- Prefer the repo-required remote verification path on `{remote_host}` when the check is relevant and SSH is available.
- This run explicitly allows local verification fallback if SSH to `{remote_host}` is blocked by sandbox, permission, or network restrictions.
- When falling back locally, run the narrowest relevant local checks that exercise the changed code path.
- In the completion response, state the exact remote verification blocker and mark acceptance as local-only pending remote proof.
- Local fallback reason: {reason}
"""
    return f"""Verification policy:

- Prefer the repo-required remote verification path on `{remote_host}` when the check is relevant.
- Do not silently downgrade remote-required verification to local-only checks.
- If SSH to `{remote_host}` is blocked, report the blocker and leave remote verification pending; local checks may still be run as supporting evidence.
"""


def normalize_changes_format(value: str) -> str:
    if value not in CHANGE_FORMATS:
        raise SystemExit(f"changes format must be one of: {', '.join(CHANGE_FORMATS)}")
    return value


def normalize_codex_stream(value: str) -> str:
    if value not in CODEX_STREAM_MODES:
        raise SystemExit(f"codex stream must be one of: {', '.join(CODEX_STREAM_MODES)}")
    return value


def normalize_run_args(args: argparse.Namespace) -> argparse.Namespace:
    if getattr(args, "plan_only", False) and getattr(args, "implement_only", False):
        raise SystemExit("--plan-only and --implement-only are mutually exclusive")
    if getattr(args, "plan_only", False) and getattr(args, "commit", False):
        raise SystemExit("--commit cannot be used with --plan-only")
    if getattr(args, "plan_only", False):
        args.skip_audit = False
    if getattr(args, "implement_only", False):
        args.skip_audit = True
    dangerous_codex = bool(getattr(args, "dangerously_bypass_approvals_and_sandbox", False))
    if getattr(args, "isolated_worktree", None) is None:
        args.isolated_worktree = bool(getattr(args, "changes_only", False) or getattr(args, "commit", False) or dangerous_codex)
    if getattr(args, "commit", False) and not bool(getattr(args, "isolated_worktree", False)):
        raise SystemExit("--commit requires --isolated-worktree or --changes-only so unrelated current-checkout changes are not committed")
    if dangerous_codex and not bool(getattr(args, "isolated_worktree", False)):
        raise SystemExit("--dangerously-bypass-approvals-and-sandbox requires isolated worktree mode; remove --no-isolated-worktree")
    args.remote_verify_host = str(getattr(args, "remote_verify_host", None) or DEFAULT_REMOTE_VERIFY_HOST)
    args.changes_format = normalize_changes_format(str(getattr(args, "changes_format", "text")))
    args.codex_stream = normalize_codex_stream(str(getattr(args, "codex_stream", "auto")))
    return args


def animation_enabled(args: Optional[argparse.Namespace] = None) -> bool:
    if args is not None and hasattr(args, "animation"):
        return bool(args.animation)
    return True


def file_sha256(data: bytes) -> str:
    return hashlib.sha256(data).hexdigest()


def read_file_record(path: Path) -> Dict[str, Any]:
    try:
        data = path.read_bytes()
    except OSError as exc:
        return {"error": str(exc), "size": None, "sha256": None, "text": None}
    record: Dict[str, Any] = {
        "size": len(data),
        "sha256": file_sha256(data),
        "text": None,
    }
    if len(data) <= MAX_SNAPSHOT_TEXT_BYTES and b"\0" not in data[:4096]:
        try:
            record["text"] = data.decode("utf-8")
        except UnicodeDecodeError:
            record["text"] = None
    return record


def workspace_snapshot(root: Path = ROOT) -> Dict[str, Dict[str, Any]]:
    proc = subprocess.run(
        ["git", "ls-files", "-z", "--cached", "--others", "--exclude-standard"],
        cwd=root,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        check=False,
    )
    if proc.returncode != 0:
        raise RuntimeError(proc.stderr.decode("utf-8", "replace").strip() or "git ls-files failed")
    snapshot: Dict[str, Dict[str, Any]] = {}
    for raw in proc.stdout.split(b"\0"):
        if not raw:
            continue
        path_text = raw.decode("utf-8", "surrogateescape")
        full_path = root / path_text
        if full_path.is_file():
            snapshot[path_text] = read_file_record(full_path)
    return snapshot


def diff_lines(path: str, before: Optional[str], after: Optional[str]) -> List[str]:
    before_lines = [] if before is None else before.splitlines()
    after_lines = [] if after is None else after.splitlines()
    return list(
        difflib.unified_diff(
            before_lines,
            after_lines,
            fromfile=f"a/{path}",
            tofile=f"b/{path}",
            lineterm="",
        )
    )


def build_patch_text(changed: List[Tuple[str, str]], before: Dict[str, Dict[str, Any]], after: Dict[str, Dict[str, Any]]) -> Tuple[str, List[str]]:
    patch_blocks: List[str] = []
    omitted: List[str] = []
    for _status, path in changed:
        before_text = before.get(path, {}).get("text")
        after_text = after.get(path, {}).get("text")
        if before_text is None and after_text is None:
            omitted.append(path)
            continue
        lines = diff_lines(path, before_text, after_text)
        if lines:
            patch_blocks.append("\n".join(lines))
    patch_text = "\n\n".join(patch_blocks)
    if patch_text:
        patch_text += "\n"
    return patch_text, omitted


def build_change_report(cycle: int, before: Dict[str, Dict[str, Any]], after: Dict[str, Dict[str, Any]], max_chars: int) -> Dict[str, Any]:
    changed: List[Tuple[str, str]] = []
    for path in sorted(set(before) | set(after)):
        if path not in before:
            changed.append(("A", path))
        elif path not in after:
            changed.append(("D", path))
        elif before[path].get("sha256") != after[path].get("sha256"):
            changed.append(("M", path))
    patch_text, patch_omitted = build_patch_text(changed, before, after)
    diff_text = trim_text(patch_text, max_chars)
    diff_truncated = len(patch_text) > max_chars

    sections = [
        "# Foundry Codex Cycle Changes",
        "",
        f"- Generated at: `{utc_now()}`",
        f"- Cycle: `{cycle}`",
        f"- Changed files: `{len(changed)}`",
        "",
    ]
    if not changed:
        sections.append("(no workspace changes from this cycle)")
    else:
        sections.append("## Summary")
        sections.append("")
        for status, path in changed:
            sections.append(f"- {status} `{path}`")
        if patch_omitted:
            sections.append("")
            sections.append("Patch omitted binary or large files:")
            for path in patch_omitted:
                sections.append(f"- `{path}`")
        sections.extend(["", "## Diff", ""])
        emitted = 0
        truncated = False
        for status, path in changed:
            before_text = before.get(path, {}).get("text")
            after_text = after.get(path, {}).get("text")
            if before_text is None and after_text is None:
                block = f"### {status} {path}\n\n(binary or large file; diff omitted)\n\n"
            else:
                lines = diff_lines(path, before_text, after_text)
                block = "```diff\n" + "\n".join(lines) + "\n```\n\n"
            if emitted + len(block) > max_chars:
                remaining = max_chars - emitted
                if remaining > 0:
                    sections.append(block[:remaining])
                sections.append(f"\n[diff truncated at {max_chars} characters]\n")
                truncated = True
                break
            sections.append(block)
            emitted += len(block)
        if truncated:
            sections.append("Full file contents remain in the workspace; this report is bounded.")

    report = "\n".join(sections).rstrip() + "\n"
    return {
        "changedFiles": [{"status": status, "path": path} for status, path in changed],
        "changedFileCount": len(changed),
        "diff": diff_text,
        "diffTruncated": diff_truncated,
        "patch": patch_text,
        "patchOmittedFiles": patch_omitted,
        "report": report,
    }


def write_change_report(cycle: int, before: Dict[str, Dict[str, Any]], max_chars: int, patch_file: Optional[str], root: Path = ROOT) -> Dict[str, Any]:
    after = workspace_snapshot(root)
    result = build_change_report(cycle, before, after, max_chars)
    path = CHANGE_DIR / f"{stamp()}-cycle-{cycle:05d}-changes.md"
    path.write_text(result["report"], encoding="utf-8")
    cycle_patch_path = CHANGE_DIR / f"{stamp()}-cycle-{cycle:05d}-changes.patch"
    cycle_patch_path.write_text(result["patch"], encoding="utf-8")
    selected_patch_path = cycle_patch_path
    if patch_file:
        selected_patch_path = repo_path(patch_file)
        selected_patch_path.parent.mkdir(parents=True, exist_ok=True)
        selected_patch_path.write_text(result["patch"], encoding="utf-8")
    result["reportPath"] = rel(path)
    result["patchPath"] = rel(selected_patch_path)
    result["patchArtifactPath"] = rel(cycle_patch_path)
    result["outputPreview"] = trim_text(result["report"], STATE_OUTPUT_PREVIEW_CHARS)
    return result


def patch_path_from_result(change_result: Dict[str, Any]) -> Optional[Path]:
    patch_path = change_result.get("patchPath")
    if not isinstance(patch_path, str) or not patch_path:
        return None
    return repo_path(patch_path)


def patch_check_summary(check_result: Dict[str, Any]) -> str:
    status = "passed" if check_result.get("patchApplies") else "failed"
    if check_result.get("skipped"):
        status = "skipped"
    details = [
        "## Patch Apply Check",
        "",
        f"- status: `{status}`",
        f"- exit_code: `{check_result.get('exitCode')}`",
    ]
    if check_result.get("logPath"):
        details.append(f"- log: `{check_result.get('logPath')}`")
    output = str(check_result.get("outputTail") or "").strip()
    if output:
        details.extend(["", "```text", trim_text(output, 4000), "```"])
    return "\n".join(details) + "\n"


def update_change_report_with_patch_check(change_result: Dict[str, Any]) -> None:
    report = str(change_result.get("report", "")).rstrip() + "\n\n" + patch_check_summary(change_result)
    change_result["report"] = report
    change_result["outputPreview"] = trim_text(report, STATE_OUTPUT_PREVIEW_CHARS)
    report_path = change_result.get("reportPath")
    if isinstance(report_path, str) and report_path:
        repo_path(report_path).write_text(report, encoding="utf-8")


def run_patch_apply_check(cycle: int, args: argparse.Namespace, change_result: Dict[str, Any]) -> Dict[str, Any]:
    patch_path = patch_path_from_result(change_result)
    log_loop_event(args, "patch-check-start", "checking generated patch", cycle=cycle, patchPath=rel(patch_path) if patch_path else None)
    if patch_path is None:
        result = {
            "patchApplies": False,
            "patchApplyCheckSkipped": True,
            "exitCode": 2,
            "outputTail": "patch path is missing",
        }
    elif not str(change_result.get("patch", "")).strip():
        result = {
            "patchApplies": True,
            "patchApplyCheckSkipped": True,
            "exitCode": 0,
            "outputTail": "empty patch",
        }
    else:
        check = run_logged(
            cycle,
            "patch-apply-check",
            ["git", "apply", "--check", str(patch_path)],
            stream_output=not args.changes_only,
            cwd=ROOT,
            animate=animation_enabled(args),
        )
        result = {
            "patchApplies": int(check.get("exitCode") or 0) == 0,
            "patchApplyCheckSkipped": False,
            "exitCode": check.get("exitCode"),
            "logPath": check.get("logPath"),
            "outputTail": check.get("outputTail"),
        }
    result["patchComplete"] = not bool(change_result.get("patchOmittedFiles"))
    change_result["patchApplies"] = result["patchApplies"]
    change_result["patchApplyCheckSkipped"] = result["patchApplyCheckSkipped"]
    change_result["patchApplyCheckExitCode"] = result["exitCode"]
    change_result["patchApplyCheckLogPath"] = result.get("logPath")
    change_result["patchApplyCheckOutput"] = trim_text(str(result.get("outputTail", "")), 4000)
    change_result["patchComplete"] = result["patchComplete"]
    update_change_report_with_patch_check(change_result)
    log_loop_event(
        args,
        "patch-check-complete",
        "patch check complete",
        cycle=cycle,
        patchApplies=result.get("patchApplies"),
        skipped=result.get("patchApplyCheckSkipped"),
        exitCode=result.get("exitCode"),
        logPath=result.get("logPath"),
    )
    return result


def commit_branch_basis(args: argparse.Namespace) -> str:
    prompt_file = getattr(args, "prompt_file", None)
    if prompt_file:
        path = Path(str(prompt_file))
        parts = [part for part in (path.parent.name, path.stem) if part and part != "."]
        return "-".join(parts)
    prompt = str(getattr(args, "prompt", "") or "").strip()
    if prompt:
        return prompt.splitlines()[0]
    choice = str(getattr(args, "choice", "") or "").strip()
    if choice:
        return choice.splitlines()[0]
    return "spec-loop"


def branch_exists(branch: str) -> bool:
    proc = subprocess.run(
        ["git", "show-ref", "--verify", "--quiet", f"refs/heads/{branch}"],
        cwd=ROOT,
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL,
        check=False,
    )
    return proc.returncode == 0


def generated_commit_branch(args: argparse.Namespace, cycle: int) -> str:
    explicit = getattr(args, "commit_branch", None)
    if explicit:
        return str(explicit).strip()
    prefix = str(getattr(args, "commit_branch_prefix", None) or DEFAULT_COMMIT_BRANCH_PREFIX)
    branch = f"{slug(prefix, 24)}/{slug(commit_branch_basis(args), 64)}-cycle-{cycle:05d}"
    if branch_exists(branch):
        branch = f"{branch}-{stamp().lower()}"
    return branch


def default_commit_message(args: argparse.Namespace, cycle: int) -> str:
    explicit = getattr(args, "commit_message", None)
    if explicit:
        return str(explicit).strip()
    prompt_file = getattr(args, "prompt_file", None)
    if prompt_file:
        title = Path(str(prompt_file)).stem.replace("-", " ")
    else:
        title = slug(commit_branch_basis(args), 48).replace("-", " ")
    title = " ".join(title.split()) or "selected Foundry spec gap"
    return f"Implement {title}"


def commit_summary(commit_result: Dict[str, Any]) -> str:
    status = "committed" if commit_result.get("committed") else "skipped" if commit_result.get("skipped") else "failed"
    details = [
        "## Git Commit",
        "",
        f"- status: `{status}`",
        f"- branch: `{commit_result.get('branch')}`",
        f"- commit: `{commit_result.get('commitSha')}`",
        f"- exit_code: `{commit_result.get('exitCode')}`",
    ]
    reason = commit_result.get("reason")
    if reason:
        details.append(f"- reason: `{reason}`")
    for key in ("branchLogPath", "stageLogPath", "commitLogPath"):
        if commit_result.get(key):
            details.append(f"- {key}: `{commit_result.get(key)}`")
    output = str(commit_result.get("outputTail") or "").strip()
    if output:
        details.extend(["", "```text", trim_text(output, 4000), "```"])
    return "\n".join(details) + "\n"


def update_change_report_with_commit(change_result: Dict[str, Any], commit_result: Dict[str, Any]) -> None:
    report = str(change_result.get("report", "")).rstrip() + "\n\n" + commit_summary(commit_result)
    change_result["report"] = report
    change_result["outputPreview"] = trim_text(report, STATE_OUTPUT_PREVIEW_CHARS)
    report_path = change_result.get("reportPath")
    if isinstance(report_path, str) and report_path:
        repo_path(report_path).write_text(report, encoding="utf-8")


def run_git_commit(cycle: int, args: argparse.Namespace, change_result: Dict[str, Any], impl_result: Dict[str, Any], verify_result: Optional[Dict[str, Any]], root: Path) -> Dict[str, Any]:
    result: Dict[str, Any] = {
        "committed": False,
        "skipped": True,
        "exitCode": 0,
        "branch": None,
        "commitSha": None,
        "message": None,
        "reason": None,
    }
    if not getattr(args, "commit", False):
        result["reason"] = "commit disabled"
        return result
    log_loop_event(args, "commit-start", "commit requested", cycle=cycle, worktree=rel(root), changedFiles=change_result.get("changedFileCount"))
    if not getattr(args, "_inside_isolated", False):
        result.update({"skipped": False, "exitCode": 2, "reason": "--commit requires an isolated worktree"})
        update_change_report_with_commit(change_result, result)
        log_loop_event(args, "commit-failed", "commit rejected", cycle=cycle, reason=result.get("reason"))
        return result
    if STOP_REQUESTED:
        result["reason"] = "interrupted"
        update_change_report_with_commit(change_result, result)
        log_loop_event(args, "commit-skipped", "commit skipped", cycle=cycle, reason=result.get("reason"))
        return result
    if int(impl_result.get("exitCode") or 0) != 0:
        result["reason"] = f"implementation exited {impl_result.get('exitCode')}"
        update_change_report_with_commit(change_result, result)
        log_loop_event(args, "commit-skipped", "commit skipped", cycle=cycle, reason=result.get("reason"))
        return result
    if verify_result is not None and int(verify_result.get("exitCode") or 0) != 0:
        result["reason"] = f"verification exited {verify_result.get('exitCode')}"
        update_change_report_with_commit(change_result, result)
        log_loop_event(args, "commit-skipped", "commit skipped", cycle=cycle, reason=result.get("reason"))
        return result
    if int(change_result.get("changedFileCount") or 0) == 0:
        result["reason"] = "no changes"
        update_change_report_with_commit(change_result, result)
        log_loop_event(args, "commit-skipped", "commit skipped", cycle=cycle, reason=result.get("reason"))
        return result

    branch = generated_commit_branch(args, cycle)
    message = default_commit_message(args, cycle)
    result.update({"branch": branch, "message": message, "skipped": False})
    log_loop_event(args, "commit-branch-selected", "selected commit branch", cycle=cycle, branch=branch, commitMessage=message)

    if getattr(args, "commit_branch", None) and branch_exists(branch):
        result.update({"exitCode": 2, "reason": "explicit commit branch already exists"})
        update_change_report_with_commit(change_result, result)
        log_loop_event(args, "commit-failed", "commit branch already exists", cycle=cycle, branch=branch, reason=result.get("reason"))
        return result

    log_loop_event(args, "commit-branch-create", "creating commit branch", cycle=cycle, branch=branch, worktree=rel(root))
    branch_result = run_logged(
        cycle,
        "commit-branch",
        ["git", "switch", "-c", branch],
        stream_output=not args.changes_only,
        cwd=root,
        animate=animation_enabled(args),
    )
    result["branchLogPath"] = branch_result.get("logPath")
    if int(branch_result.get("exitCode") or 0) != 0:
        result.update({"exitCode": branch_result.get("exitCode"), "reason": "failed to create commit branch", "outputTail": branch_result.get("outputTail")})
        update_change_report_with_commit(change_result, result)
        log_loop_event(args, "commit-failed", "failed to create commit branch", cycle=cycle, branch=branch, exitCode=result.get("exitCode"), logPath=result.get("branchLogPath"))
        return result
    log_loop_event(args, "commit-branch-created", "commit branch created", cycle=cycle, branch=branch, logPath=result.get("branchLogPath"))

    log_loop_event(args, "commit-stage-start", "staging isolated worktree changes", cycle=cycle, branch=branch)
    stage_result = run_logged(
        cycle,
        "commit-stage",
        ["git", "add", "-A"],
        stream_output=not args.changes_only,
        cwd=root,
        animate=animation_enabled(args),
    )
    result["stageLogPath"] = stage_result.get("logPath")
    if int(stage_result.get("exitCode") or 0) != 0:
        result.update({"exitCode": stage_result.get("exitCode"), "reason": "failed to stage changes", "outputTail": stage_result.get("outputTail")})
        update_change_report_with_commit(change_result, result)
        log_loop_event(args, "commit-failed", "failed to stage changes", cycle=cycle, branch=branch, exitCode=result.get("exitCode"), logPath=result.get("stageLogPath"))
        return result
    log_loop_event(args, "commit-stage-complete", "staged isolated worktree changes", cycle=cycle, branch=branch, logPath=result.get("stageLogPath"))

    diff_proc = subprocess.run(["git", "diff", "--cached", "--quiet"], cwd=root, check=False)
    if diff_proc.returncode == 0:
        result.update({"skipped": True, "exitCode": 0, "reason": "no staged changes"})
        update_change_report_with_commit(change_result, result)
        log_loop_event(args, "commit-skipped", "no staged changes", cycle=cycle, branch=branch)
        return result

    body = f"Foundry Codex cycle: {cycle}\nPrompt source: {operator_prompt_source(args) or 'interactive'}"
    log_loop_event(args, "commit-create", "creating git commit", cycle=cycle, branch=branch, commitMessage=message)
    commit_result = run_logged(
        cycle,
        "commit",
        ["git", "commit", "-m", message, "-m", body],
        stream_output=not args.changes_only,
        cwd=root,
        animate=animation_enabled(args),
    )
    result["commitLogPath"] = commit_result.get("logPath")
    result["outputTail"] = commit_result.get("outputTail")
    result["exitCode"] = commit_result.get("exitCode")
    if int(commit_result.get("exitCode") or 0) != 0:
        result["reason"] = "git commit failed"
        update_change_report_with_commit(change_result, result)
        log_loop_event(args, "commit-failed", "git commit failed", cycle=cycle, branch=branch, exitCode=result.get("exitCode"), logPath=result.get("commitLogPath"))
        return result

    sha_proc = subprocess.run(["git", "rev-parse", "HEAD"], cwd=root, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False)
    result.update(
        {
            "committed": True,
            "skipped": False,
            "commitSha": sha_proc.stdout.strip() if sha_proc.returncode == 0 else None,
            "reason": None,
        }
    )
    update_change_report_with_commit(change_result, result)
    log_loop_event(args, "commit-complete", "commit created", cycle=cycle, branch=branch, commitSha=result.get("commitSha"), logPath=result.get("commitLogPath"))
    return result


def attach_commit_result(change_result: Dict[str, Any], commit_result: Dict[str, Any]) -> None:
    change_result["committed"] = bool(commit_result.get("committed"))
    change_result["commitSkipped"] = bool(commit_result.get("skipped"))
    change_result["commitBranch"] = commit_result.get("branch")
    change_result["commitSha"] = commit_result.get("commitSha")
    change_result["commitMessage"] = commit_result.get("message")
    change_result["commitExitCode"] = commit_result.get("exitCode")
    change_result["commitReason"] = commit_result.get("reason")
    change_result["commitLogPath"] = commit_result.get("commitLogPath")
    for key in ("branchLogPath", "stageLogPath", "commitLogPath"):
        log_path = commit_result.get(key)
        if isinstance(log_path, str) and log_path:
            change_result.setdefault("logPaths", []).append(log_path)


def write_interrupted(signum: int) -> None:
    state = {
        "time": utc_now(),
        "signal": signum,
        "cycle": ACTIVE.get("cycle"),
        "step": ACTIVE.get("step"),
        "activeChildPid": ACTIVE.get("pid"),
        "promptPath": rel(Path(ACTIVE["prompt_path"])) if ACTIVE.get("prompt_path") else None,
        "logPath": rel(Path(ACTIVE["log_path"])) if ACTIVE.get("log_path") else None,
        "codexRunning": bool(ACTIVE.get("codex_running")),
    }
    atomic_write_json(INTERRUPTED_FILE, state)
    append_jsonl(DECISIONS_FILE, {"event": "interrupted", **state})
    record_loop_event("interrupted", cycle=state.get("cycle"), signal=signum, step=state.get("step"), activeChildPid=state.get("activeChildPid"), logPath=state.get("logPath"), codexRunning=state.get("codexRunning"))


def signal_handler(signum: int, _frame: Any) -> None:
    global STOP_REQUESTED
    STOP_REQUESTED = True
    write_interrupted(signum)
    pid = ACTIVE.get("pid")
    if isinstance(pid, int):
        with contextlib.suppress(ProcessLookupError, PermissionError):
            os.kill(pid, signal.SIGTERM)


def install_signal_handlers() -> None:
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)


def load_state() -> Dict[str, Any]:
    state = read_json(STATE_FILE)
    if not state:
        state = {
            "createdAt": utc_now(),
            "cycle": 0,
            "status": "new",
            "lastDecision": None,
            "lastAudit": None,
            "lastImplementation": None,
            "lastVerification": None,
            "lastChanges": None,
        }
    return state


def next_cycle_from_state() -> int:
    state = load_state()
    return int(state.get("cycle") or 0) + 1


def save_state(state: Dict[str, Any]) -> None:
    state["updatedAt"] = utc_now()
    atomic_write_json(STATE_FILE, state)


def has_stop_request() -> bool:
    return STOP_FILE.exists()


def command_log_path(cycle: int, step: str) -> Path:
    return LOG_DIR / f"{stamp()}-cycle-{cycle:05d}-{slug(step)}.log"


def safe_mtime(path: Path) -> float:
    try:
        return path.stat().st_mtime
    except OSError:
        return 0.0


def read_tail_lines(path: Path, lines: int) -> List[str]:
    try:
        return path.read_text(encoding="utf-8", errors="replace").splitlines()[-lines:]
    except OSError as exc:
        return [f"[failed to read {path}: {exc}]"]


def print_log_tails(paths: List[Path], lines: int, stream: Any = None) -> None:
    if stream is None:
        stream = sys.stdout
    for path in paths:
        print(f"\n--- {rel(path)} ---", file=stream)
        print("\n".join(read_tail_lines(path, lines)), file=stream)


def is_text_log(path: Path) -> bool:
    if not path.is_file():
        return False
    try:
        sample = path.read_bytes()[:4096]
    except OSError:
        return False
    if b"\0" in sample:
        return False
    with contextlib.suppress(UnicodeDecodeError):
        sample.decode("utf-8")
        return True
    return True


def restate_data_log_candidates(data_dir: Path) -> List[Path]:
    if not data_dir.exists():
        return []
    candidates: List[Path] = []
    for path in data_dir.glob("**/LOG*"):
        if path.is_file() and is_text_log(path):
            candidates.append(path)
    return candidates


def restate_service_log_candidates(data_dir: Path) -> List[Path]:
    candidates: List[Path] = []
    for pattern in ("*.log", "*.out", "*.err", "*.jsonl", "*.txt"):
        candidates.extend(path for path in RESTATE_LOG_DIR.glob(pattern) if path.is_file() and is_text_log(path))
    candidates.extend(restate_data_log_candidates(data_dir))
    seen: Dict[str, Path] = {}
    for path in candidates:
        seen[str(path.resolve())] = path
    return sorted(seen.values(), key=safe_mtime, reverse=True)


def emit_restate_logs(log_count: int, line_count: int, data_dir_text: str, stream: Any = None) -> None:
    if log_count <= 0:
        return
    if stream is None:
        stream = sys.stdout
    restate_data_dir = repo_path(data_dir_text)
    restate_logs = restate_service_log_candidates(restate_data_dir)
    if not restate_logs:
        print(f"\nno Restate logs found under {rel(RESTATE_LOG_DIR)} or {rel(restate_data_dir)}", file=stream)
        return
    print(f"\nlatest Restate logs from {rel(RESTATE_LOG_DIR)} and {rel(restate_data_dir)}:", file=stream)
    print_log_tails(restate_logs[:log_count], line_count, stream=stream)


def terminal_width() -> int:
    return max(40, shutil.get_terminal_size(fallback=(100, 24)).columns)


def fit_status_line(value: str) -> str:
    width = terminal_width() - 1
    text = value.replace("\n", " ")
    if len(text) <= width:
        return text
    if width <= 6:
        return text[:width]
    return text[: width - 3] + "..."


def short_log_path(log_path: Path) -> str:
    return f"logs/{log_path.name}"


def spinner_message(cycle: int, step: str, started: float) -> str:
    elapsed = int(time.monotonic() - started)
    return f"cycle {cycle} {step} running {elapsed}s"


def start_spinner(cycle: int, step: str, log_path: Path, enabled: bool) -> Tuple[threading.Event, Optional[threading.Thread]]:
    stop = threading.Event()
    if not enabled:
        return stop, None

    started = time.monotonic()
    if not sys.stderr.isatty():
        print(f"[working] {spinner_message(cycle, step, started)} | log {short_log_path(log_path)}", file=sys.stderr, flush=True)
        return stop, None

    def animate() -> None:
        idx = 0
        while not stop.is_set():
            frame = SPINNER_FRAMES[idx % len(SPINNER_FRAMES)]
            sys.stderr.write("\r\x1b[2K" + fit_status_line(f"{frame} {spinner_message(cycle, step, started)}"))
            sys.stderr.flush()
            idx += 1
            stop.wait(0.12)
        sys.stderr.write("\r\x1b[2K" + fit_status_line(f"done cycle {cycle} {step} | log {short_log_path(log_path)}") + "\n")
        sys.stderr.flush()

    thread = threading.Thread(target=animate, daemon=True)
    thread.start()
    return stop, thread


def stop_spinner(stop: threading.Event, thread: Optional[threading.Thread]) -> None:
    stop.set()
    if thread is not None:
        thread.join(timeout=1)


class CodexAssistantStreamFilter:
    """Extract Codex-authored text from the raw Codex CLI transcript."""

    HIDDEN_MARKERS = {"user", "exec", "system", "tool"}
    ASSISTANT_MARKERS = {"codex", "assistant"}

    def __init__(self, stream: Any) -> None:
        self.stream = stream
        self.section = "hidden"
        self.hide_next_token_line = False
        self.started = False

    def process(self, line: str) -> None:
        marker = line.strip()
        if marker in self.ASSISTANT_MARKERS:
            self.section = "assistant"
            if self.started:
                print("", file=self.stream, flush=True)
            return
        if marker in self.HIDDEN_MARKERS:
            self.section = "hidden"
            return
        if marker == "tokens used":
            self.section = "hidden"
            self.hide_next_token_line = True
            return
        if self.hide_next_token_line:
            self.hide_next_token_line = False
            self.section = "assistant"
            return
        if self.section == "assistant":
            print(line, end="", file=self.stream, flush=True)
            self.started = True


def effective_codex_stream(codex_running: bool, stream_output: bool, codex_stream: str) -> str:
    if not codex_running:
        return "raw" if stream_output else "quiet"
    if codex_stream == "auto":
        return "raw" if stream_output else "quiet"
    return codex_stream


def run_logged(
    cycle: int,
    step: str,
    argv: List[str],
    stdin: Optional[str] = None,
    codex_running: bool = False,
    timeout_seconds: Optional[int] = None,
    stream_output: bool = True,
    cwd: Optional[Path] = None,
    animate: bool = True,
    codex_stream: str = "auto",
) -> Dict[str, Any]:
    ensure_dirs()
    run_cwd = cwd or ROOT
    log_path = command_log_path(cycle, step)
    started = utc_now()
    ACTIVE.update(
        {
            "cycle": cycle,
            "step": step,
            "pid": None,
            "log_path": str(log_path),
            "codex_running": codex_running,
        }
    )
    stream_mode = effective_codex_stream(codex_running, stream_output, codex_stream)
    raw_stream = stream_mode == "raw"
    assistant_stream = stream_mode == "assistant"
    terminal_stream = sys.stdout if stream_output else sys.stderr
    if raw_stream:
        print(f"\n=== cycle {cycle}: {step} ===")
        print(f"$ {shell_join(argv)}")
        if run_cwd != ROOT:
            print(f"cwd: {run_cwd}")
        print(f"log: {rel(log_path)}")
    elif assistant_stream:
        print(f"\n=== cycle {cycle}: {step} Codex response ===", file=terminal_stream)
        print(f"raw log: {rel(log_path)}", file=terminal_stream, flush=True)
    spinner_stop: Optional[threading.Event] = None
    spinner_thread: Optional[threading.Thread] = None
    assistant_filter = CodexAssistantStreamFilter(terminal_stream) if assistant_stream else None
    with log_path.open("w", encoding="utf-8") as log:
        log.write(f"$ {shell_join(argv)}\n")
        log.write(f"cwd: {run_cwd}\n")
        log.write(f"started_at: {started}\n\n")
        log.flush()
        proc = subprocess.Popen(
            argv,
            cwd=run_cwd,
            stdin=subprocess.PIPE if stdin is not None else subprocess.DEVNULL,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            text=True,
            bufsize=1,
        )
        if stream_mode == "quiet":
            spinner_stop, spinner_thread = start_spinner(cycle, step, log_path, animate)
        ACTIVE["pid"] = proc.pid
        if stdin is not None and proc.stdin is not None:
            with contextlib.suppress(BrokenPipeError):
                proc.stdin.write(stdin)
            with contextlib.suppress(BrokenPipeError):
                proc.stdin.close()
        output_parts: List[str] = []
        assert proc.stdout is not None
        deadline = time.time() + timeout_seconds if timeout_seconds else None
        try:
            for line in proc.stdout:
                if raw_stream:
                    print(line, end="")
                elif assistant_filter is not None:
                    assistant_filter.process(line)
                output_parts.append(line)
                log.write(line)
                log.flush()
                if deadline is not None and time.time() > deadline:
                    log.write(f"\n[timeout after {timeout_seconds}s]\n")
                    with contextlib.suppress(ProcessLookupError):
                        proc.terminate()
                    break
        finally:
            proc.stdout.close()
        rc = proc.wait()
        completed = utc_now()
        log.write(f"\ncompleted_at: {completed}\n")
        log.write(f"[exit_code] {rc}\n")
        log.flush()
    if spinner_stop is not None:
        stop_spinner(spinner_stop, spinner_thread)
        if spinner_thread is None and not stream_output and animate:
            print(f"[done] cycle {cycle} {step} exit {rc} | log {short_log_path(log_path)}", file=sys.stderr, flush=True)
    result = {
        "step": step,
        "argv": argv,
        "exitCode": rc,
        "startedAt": started,
        "completedAt": completed,
        "logPath": rel(log_path),
        "cwd": rel(run_cwd),
        "outputTail": trim_text("".join(output_parts), 30000),
    }
    ACTIVE.update({"pid": None, "codex_running": False})
    return result


def run_capture(
    cycle: int,
    name: str,
    argv: List[str],
    max_chars: int = 60000,
    stream_output: bool = True,
    cwd: Optional[Path] = None,
    animate: bool = True,
) -> Dict[str, Any]:
    result = run_logged(cycle, name, argv, stream_output=stream_output, cwd=cwd, animate=animate)
    result["outputTail"] = trim_text(str(result.get("outputTail", "")), max_chars)
    return result


def collect_snapshot(cycle: int, audit_limit: int, log_lines: int, stream_output: bool = True, root: Path = ROOT, animate: bool = True) -> str:
    py = sys.executable or "python3"
    commands = [
        run_capture(cycle, "git-status", ["git", "status", "--short", "--branch"], 40000, stream_output, root, animate),
        run_capture(cycle, "git-diff-stat", ["git", "diff", "--stat"], 40000, stream_output, root, animate),
        run_capture(cycle, "spec-status", [py, "scripts/foundry_spec_runner.py", "status"], 30000, stream_output, root, animate),
        run_capture(
            cycle,
            "spec-pending",
            [py, "scripts/foundry_spec_runner.py", "list", "--status", "pending", "--limit", str(audit_limit)],
            70000,
            stream_output,
            root,
            animate,
        ),
        run_capture(
            cycle,
            "spec-failed",
            [py, "scripts/foundry_spec_runner.py", "list", "--status", "failed", "--limit", str(audit_limit)],
            70000,
            stream_output,
            root,
            animate,
        ),
        run_capture(
            cycle,
            "spec-logs-latest",
            [py, "scripts/foundry_spec_runner.py", "logs", "--latest", "--lines", str(log_lines)],
            50000,
            stream_output,
            root,
            animate,
        ),
        run_capture(cycle, "test-status", [py, "scripts/foundry_test_runner.py", "status"], 40000, stream_output, root, animate),
        run_capture(
            cycle,
            "planning-markers",
            [
                "rg",
                "-n",
                "Status:|Tasks:|Deliverables:|Acceptance:|Deferred|Unsupported|out of scope|gap|TODO|TBD",
                "specs",
                "docs",
                "-g",
                "*.md",
            ],
            90000,
            stream_output,
            root,
            animate,
        ),
        run_capture(
            cycle,
            "code-risk-markers",
            [
                "rg",
                "-n",
                r"TODO|FIXME|panic!|unwrap\(|expect\(",
                "crates",
                "scripts",
                "-g",
                "!docs/generated/**",
            ],
            90000,
            stream_output,
            root,
            animate,
        ),
    ]
    sections = [
        "# Foundry Codex Loop Snapshot",
        "",
        f"- Generated at: `{utc_now()}`",
        f"- Cycle: `{cycle}`",
        f"- Repository: `{root}`",
        "",
        "Progress state was gathered through official runner commands. Do not edit `.foundry-progress` manually.",
        "",
    ]
    for command in commands:
        sections.extend(
            [
                f"## {command['step']}",
                "",
                f"`$ {shell_join(command['argv'])}`",
                "",
                f"exit_code: `{command['exitCode']}`",
                "",
                "```text",
                str(command.get("outputTail", "")).strip() or "(no output)",
                "```",
                "",
            ]
        )
    return "\n".join(sections)


def audit_prompt(snapshot: str, scope: Optional[str]) -> str:
    scope_text = f"\nOperator scope: {scope}\n" if scope else ""
    return f"""You are Codex performing a read-only Foundry planning audit.

Do not modify files. Do not mark spec-runner tasks complete or failed. Do not start long-running tests.
Use AGENTS.md and specs/foundry-rust-ydb-vm-kubernetes-architecture-spec.md as the source of truth.
Use the snapshot below and inspect repository files if needed.
{scope_text}
Return:

# Foundry Implementation Gap Audit

## Executive Priority
3-6 bullets.

## Highest Priority Implementation Tasks
A table with: priority, task, why now, spec source, evidence, verification.

## Spec And Docs Changes Required
Bullets.

## Code Changes Required
Bullets grouped by component or crate.

## Testing And Proof Gaps
Bullets naming the claim, missing evidence, and narrowest proof command.

## Suggested Next Choice
One concise recommendation for the next implementation cycle.

Snapshot:

{snapshot}
"""


def run_audit(cycle: int, args: argparse.Namespace) -> Dict[str, Any]:
    if args.skip_audit:
        now = utc_now()
        return {
            "step": "codex-audit",
            "skipped": True,
            "exitCode": 0,
            "startedAt": now,
            "completedAt": now,
            "outputTail": "audit skipped by operator request",
            "auditInput": None,
            "auditReport": None,
            "auditHtml": None,
        }
    root = work_root(args)
    snapshot = collect_snapshot(
        cycle,
        args.audit_limit,
        args.log_lines,
        stream_output=not args.changes_only,
        root=root,
        animate=animation_enabled(args),
    )
    audit_input = AUDIT_DIR / f"{stamp()}-cycle-{cycle:05d}-audit-input.md"
    audit_input.write_text(snapshot, encoding="utf-8")
    ACTIVE["prompt_path"] = str(audit_input)
    prompt = audit_prompt(snapshot, args.scope)
    argv = codex_argv(args)
    result = run_logged(
        cycle,
        "codex-audit",
        argv,
        stdin=prompt,
        codex_running=True,
        stream_output=not args.changes_only,
        cwd=root,
        animate=animation_enabled(args),
        codex_stream=args.codex_stream,
    )
    report_path = AUDIT_DIR / f"{stamp()}-cycle-{cycle:05d}-audit-report.md"
    report_path.write_text(str(result.get("outputTail", "")), encoding="utf-8")
    html_path = AUDIT_DIR / f"{stamp()}-cycle-{cycle:05d}-audit.html"
    html_path.write_text(render_audit_html(snapshot, str(result.get("outputTail", ""))), encoding="utf-8")
    result.update(
        {
            "auditInput": rel(audit_input),
            "auditReport": rel(report_path),
            "auditHtml": rel(html_path),
        }
    )
    ACTIVE["prompt_path"] = None
    return result


def render_audit_html(snapshot: str, report: str) -> str:
    return f"""<!doctype html>
<html lang="en">
<head>
  <meta charset="utf-8">
  <title>Foundry Restate Codex Audit</title>
  <style>
    body {{ font-family: system-ui, sans-serif; margin: 2rem; line-height: 1.4; }}
    pre {{ background: #f6f8fa; border: 1px solid #ddd; overflow: auto; padding: 0.75rem; white-space: pre-wrap; }}
    section {{ border-top: 1px solid #ddd; margin-top: 1rem; padding-top: 1rem; }}
  </style>
</head>
<body>
  <h1>Foundry Restate Codex Audit</h1>
  <section><h2>Report</h2><pre>{html.escape(report)}</pre></section>
  <section><h2>Snapshot</h2><pre>{html.escape(snapshot)}</pre></section>
</body>
</html>
"""


def latest_audit_context(max_chars: int = 70000) -> str:
    reports = sorted(AUDIT_DIR.glob("*-audit-report.md"), key=lambda path: path.stat().st_mtime, reverse=True)
    if not reports:
        return "No audit report is available yet."
    text = reports[0].read_text(encoding="utf-8", errors="replace")
    return f"Latest audit report: {rel(reports[0])}\n\n{trim_text(text, max_chars)}"


def prompt_audit_context(audit_result: Dict[str, Any]) -> str:
    if audit_result.get("skipped"):
        return "Audit skipped for this cycle. Use the operator prompt as the work request and inspect repository files as needed."
    return latest_audit_context()


def implementation_prompt(choice: str, cycle: int, audit_result: Dict[str, Any], root: Path, args: argparse.Namespace) -> Tuple[str, Path]:
    prompt = f"""You are Codex working in the Foundry repository at {root}.

Follow AGENTS.md and the architecture specs. Implement the operator-selected next work below.
Keep changes scoped, update specs/docs when behavior changes, preserve existing user work, and run the narrowest relevant verification.
Do not edit `.foundry-progress` manually; use scripts/foundry_spec_runner.py for spec-driven progress.

{verification_policy_prompt(args)}

Operator-selected work:

{choice}

Audit result refs:

- audit input: {audit_result.get("auditInput")}
- audit report: {audit_result.get("auditReport")}
- audit html: {audit_result.get("auditHtml")}

Audit context:

{prompt_audit_context(audit_result)}

Expected completion response:

- summarize changes
- list files changed
- list verification run
- mention blockers, if any
- recommend the next follow-up question for the loop
"""
    path = PROMPT_DIR / f"{stamp()}-cycle-{cycle:05d}-implementation-prompt.md"
    path.write_text(prompt, encoding="utf-8")
    return prompt, path


def choose_next(args: argparse.Namespace, cycle: int, audit_result: Dict[str, Any]) -> str:
    selected_prompt = operator_prompt(args)
    if args.choice and args.choice.lower() in {"next", "n", "auto", "non-interactive", "quit", "q", "stop", "exit"}:
        return args.choice
    if selected_prompt and args.choice:
        return f"{args.choice}\n\n{selected_prompt}"
    if selected_prompt:
        return selected_prompt
    if args.choice:
        return args.choice
    if args.auto:
        return "next"
    print("\nWhat next?")
    print("  enter / next  -> run next spec-runner task")
    print("  auto          -> run next spec-runner task non-interactively")
    print("  text          -> custom Codex implementation prompt")
    print("  quit          -> stop")
    try:
        value = input("> ").strip()
    except EOFError:
        return "quit"
    return value or "next"


def run_next_spec_task(cycle: int, args: argparse.Namespace, non_interactive: bool) -> Dict[str, Any]:
    root = work_root(args)
    argv = [
        sys.executable or "python3",
        "scripts/foundry_spec_runner.py",
        "run",
        "--limit",
        str(args.task_limit),
        "--codex-cmd",
        effective_codex_cmd(args),
        "--show-logs",
    ]
    if non_interactive:
        argv.extend(["--non-interactive", "--auto-complete-on-zero"])
    return run_logged(
        cycle,
        "implement-spec-runner-task",
        argv,
        codex_running=True,
        stream_output=not args.changes_only,
        cwd=root,
        animate=animation_enabled(args),
        codex_stream=args.codex_stream,
    )


def run_custom_task(cycle: int, args: argparse.Namespace, choice: str, audit_result: Dict[str, Any]) -> Dict[str, Any]:
    root = work_root(args)
    prompt, prompt_path = implementation_prompt(choice, cycle, audit_result, root, args)
    ACTIVE["prompt_path"] = str(prompt_path)
    argv = codex_argv(args)
    result = run_logged(
        cycle,
        "implement-custom-task",
        argv,
        stdin=prompt,
        codex_running=True,
        stream_output=not args.changes_only,
        cwd=root,
        animate=animation_enabled(args),
        codex_stream=args.codex_stream,
    )
    result["promptPath"] = rel(prompt_path)
    ACTIVE["prompt_path"] = None
    return result


def run_verification(
    cycle: int,
    verify_cmd: Optional[str],
    stream_output: bool = True,
    cwd: Optional[Path] = None,
    animate: bool = True,
) -> Optional[Dict[str, Any]]:
    if not verify_cmd:
        return None
    return run_logged(cycle, "verification", shlex.split(verify_cmd), stream_output=stream_output, cwd=cwd, animate=animate)


def result_log_paths(*results: Optional[Dict[str, Any]]) -> List[str]:
    paths: List[str] = []
    for result in results:
        if not result:
            continue
        path = result.get("logPath")
        if isinstance(path, str) and path:
            paths.append(path)
    return paths


def format_changes_output(change_result: Dict[str, Any], changes_format: str) -> str:
    if changes_format == "json":
        payload = {
            "changedFileCount": change_result.get("changedFileCount", 0),
            "changedFiles": change_result.get("changedFiles", []),
            "diff": change_result.get("diff", ""),
            "diffTruncated": change_result.get("diffTruncated", False),
            "patchPath": change_result.get("patchPath"),
            "patchArtifactPath": change_result.get("patchArtifactPath"),
            "patchOmittedFiles": change_result.get("patchOmittedFiles", []),
            "reportPath": change_result.get("reportPath"),
            "logPaths": change_result.get("logPaths", []),
            "exitCode": change_result.get("exitCode", 0),
            "isolatedWorktree": change_result.get("isolatedWorktree"),
            "patchApplies": change_result.get("patchApplies"),
            "patchApplyCheckSkipped": change_result.get("patchApplyCheckSkipped"),
            "patchApplyCheckExitCode": change_result.get("patchApplyCheckExitCode"),
            "patchApplyCheckLogPath": change_result.get("patchApplyCheckLogPath"),
            "patchComplete": change_result.get("patchComplete"),
            "committed": change_result.get("committed"),
            "commitSkipped": change_result.get("commitSkipped"),
            "commitBranch": change_result.get("commitBranch"),
            "commitSha": change_result.get("commitSha"),
            "commitMessage": change_result.get("commitMessage"),
            "commitExitCode": change_result.get("commitExitCode"),
            "commitReason": change_result.get("commitReason"),
            "commitLogPath": change_result.get("commitLogPath"),
            "verificationPolicy": change_result.get("verificationPolicy"),
        }
        return json.dumps(payload, indent=2, sort_keys=True) + "\n"
    if changes_format == "patch":
        return str(change_result.get("patch", ""))
    return str(change_result.get("report", ""))


def compact_change_result(change_result: Dict[str, Any]) -> Dict[str, Any]:
    return {
        "changedFileCount": change_result.get("changedFileCount", 0),
        "changedFiles": change_result.get("changedFiles", []),
        "diff": change_result.get("diff", ""),
        "diffTruncated": change_result.get("diffTruncated", False),
        "patchPath": change_result.get("patchPath"),
        "patchArtifactPath": change_result.get("patchArtifactPath"),
        "patchOmittedFiles": change_result.get("patchOmittedFiles", []),
        "reportPath": change_result.get("reportPath"),
        "logPaths": change_result.get("logPaths", []),
        "exitCode": change_result.get("exitCode", 0),
        "isolatedWorktree": change_result.get("isolatedWorktree"),
        "patchApplies": change_result.get("patchApplies"),
        "patchApplyCheckSkipped": change_result.get("patchApplyCheckSkipped"),
        "patchApplyCheckExitCode": change_result.get("patchApplyCheckExitCode"),
        "patchApplyCheckLogPath": change_result.get("patchApplyCheckLogPath"),
        "patchComplete": change_result.get("patchComplete"),
        "committed": change_result.get("committed"),
        "commitSkipped": change_result.get("commitSkipped"),
        "commitBranch": change_result.get("commitBranch"),
        "commitSha": change_result.get("commitSha"),
        "commitMessage": change_result.get("commitMessage"),
        "commitExitCode": change_result.get("commitExitCode"),
        "commitReason": change_result.get("commitReason"),
        "commitLogPath": change_result.get("commitLogPath"),
        "verificationPolicy": change_result.get("verificationPolicy"),
    }


def cycle_result_payload(
    cycle: int,
    decision: Dict[str, Any],
    audit_result: Optional[Dict[str, Any]],
    impl_result: Optional[Dict[str, Any]],
    verify_result: Optional[Dict[str, Any]],
    change_result: Dict[str, Any],
    include_details: bool,
    args: Optional[argparse.Namespace] = None,
) -> Dict[str, Any]:
    exit_code = int(change_result.get("exitCode", (impl_result or {}).get("exitCode") or 0) or 0)
    payload = {
        "cycle": cycle,
        "decision": decision,
        "changedFiles": change_result.get("changedFiles", []),
        "changedFileCount": change_result.get("changedFileCount", 0),
        "diff": change_result.get("diff", ""),
        "diffTruncated": change_result.get("diffTruncated", False),
        "patchPath": change_result.get("patchPath"),
        "patchArtifactPath": change_result.get("patchArtifactPath"),
        "patchOmittedFiles": change_result.get("patchOmittedFiles", []),
        "reportPath": change_result.get("reportPath"),
        "logPaths": change_result.get("logPaths", []),
        "exitCode": exit_code,
        "patchApplies": change_result.get("patchApplies"),
        "patchApplyCheckSkipped": change_result.get("patchApplyCheckSkipped"),
        "patchApplyCheckExitCode": change_result.get("patchApplyCheckExitCode"),
        "patchApplyCheckLogPath": change_result.get("patchApplyCheckLogPath"),
        "patchComplete": change_result.get("patchComplete"),
        "committed": change_result.get("committed"),
        "commitSkipped": change_result.get("commitSkipped"),
        "commitBranch": change_result.get("commitBranch"),
        "commitSha": change_result.get("commitSha"),
        "commitMessage": change_result.get("commitMessage"),
        "commitExitCode": change_result.get("commitExitCode"),
        "commitReason": change_result.get("commitReason"),
        "commitLogPath": change_result.get("commitLogPath"),
        "verificationPolicy": verification_policy(args) if args is not None else change_result.get("verificationPolicy"),
        "changes": compact_change_result(change_result),
    }
    if include_details:
        payload.update(
            {
                "audit": audit_result,
                "implementation": impl_result,
                "verification": verify_result,
            }
        )
    return payload


def isolated_worktree_path(cycle: int) -> Path:
    base = WORKTREE_DIR / f"{stamp()}-cycle-{cycle:05d}"
    if not base.exists():
        return base
    idx = 2
    while True:
        candidate = WORKTREE_DIR / f"{base.name}-{idx}"
        if not candidate.exists():
            return candidate
        idx += 1


def create_isolated_worktree(cycle: int, args: argparse.Namespace) -> Tuple[Path, str, Dict[str, Any]]:
    ensure_dirs()
    base_ref = str(getattr(args, "isolated_base", None) or DEFAULT_ISOLATED_BASE)
    path = isolated_worktree_path(cycle)
    log_loop_event(args, "isolated-worktree-create", "creating isolated worktree", cycle=cycle, base=base_ref, path=rel(path))
    result = run_logged(
        cycle,
        "isolated-worktree-add",
        ["git", "worktree", "add", "--detach", str(path), base_ref],
        stream_output=not args.changes_only,
        cwd=ROOT,
        animate=animation_enabled(args),
    )
    if int(result.get("exitCode") or 0) != 0:
        log_loop_event(args, "isolated-worktree-failed", "failed to create isolated worktree", cycle=cycle, base=base_ref, path=rel(path), exitCode=result.get("exitCode"), logPath=result.get("logPath"))
        raise SystemExit(f"failed to create isolated worktree at {path}; see {result.get('logPath')}")
    log_loop_event(args, "isolated-worktree-created", "isolated worktree ready", cycle=cycle, base=base_ref, path=rel(path), logPath=result.get("logPath"))
    return path, base_ref, result


def remove_isolated_worktree(cycle: int, path: Path, args: argparse.Namespace, stream_output: bool, animate: bool) -> None:
    if not path.exists():
        log_loop_event(args, "isolated-worktree-remove-skip", "isolated worktree already absent", cycle=cycle, path=rel(path))
        return
    log_loop_event(args, "isolated-worktree-remove", "removing isolated worktree", cycle=cycle, path=rel(path))
    result = run_logged(
        cycle,
        "isolated-worktree-remove",
        ["git", "worktree", "remove", "--force", str(path)],
        stream_output=stream_output,
        cwd=ROOT,
        animate=animate,
    )
    if int(result.get("exitCode") or 0) != 0:
        with contextlib.suppress(OSError):
            shutil.rmtree(path)
        log_loop_event(args, "isolated-worktree-remove-fallback", "forced filesystem cleanup after git worktree remove failed", cycle=cycle, path=rel(path), exitCode=result.get("exitCode"), logPath=result.get("logPath"))
    else:
        log_loop_event(args, "isolated-worktree-removed", "isolated worktree removed", cycle=cycle, path=rel(path), logPath=result.get("logPath"))


def isolated_cycle_args(args: argparse.Namespace, path: Path) -> argparse.Namespace:
    child = argparse.Namespace(**vars(args))
    child._work_root = str(path)
    child._inside_isolated = True
    child._suppress_changes_output = True
    if getattr(child, "implement_only", False):
        child.skip_audit = True
    if child.verify_cmd == DEFAULT_VERIFY_CMD and not (path / "scripts/foundry_restate_codex_loop.py").exists():
        child.verify_cmd = None
    return child


def annotate_isolated_result(result: Dict[str, Any], path: Path, base_ref: str, kept: bool, add_result: Dict[str, Any]) -> None:
    metadata = {
        "path": str(path),
        "base": base_ref,
        "kept": kept,
        "addLogPath": add_result.get("logPath"),
    }
    result["isolatedWorktree"] = metadata
    changes = result.get("changes")
    if isinstance(changes, dict):
        changes["isolatedWorktree"] = metadata
    state = load_state()
    last_changes = state.get("lastChanges")
    if isinstance(last_changes, dict):
        last_changes["isolatedWorktree"] = metadata
        save_state(state)


def run_isolated_cycle(cycle: int, args: argparse.Namespace) -> Dict[str, Any]:
    path, base_ref, add_result = create_isolated_worktree(cycle, args)
    kept = bool(getattr(args, "keep_isolated_worktree", False))
    log_loop_event(args, "isolated-cycle-start", "running cycle inside isolated worktree", cycle=cycle, path=rel(path), base=base_ref, kept=kept)
    try:
        child = isolated_cycle_args(args, path)
        result = run_cycle(cycle, child)
        annotate_isolated_result(result, path, base_ref, kept, add_result)
        log_loop_event(args, "isolated-cycle-complete", "isolated cycle complete", cycle=cycle, path=rel(path), exitCode=result.get("exitCode"), kept=kept)
        if args.changes_only and result.get("changesOutput"):
            if args.changes_format == "json" and isinstance(result.get("changes"), dict):
                print(format_changes_output(result["changes"], "json"), end="")
            else:
                print(str(result["changesOutput"]), end="")
        return result
    finally:
        if not kept:
            remove_isolated_worktree(cycle, path, args=args, stream_output=not args.changes_only, animate=animation_enabled(args))
        else:
            log_loop_event(args, "isolated-worktree-kept", "kept isolated worktree for inspection", cycle=cycle, path=rel(path))


def run_cycle(cycle: int, args: argparse.Namespace) -> Dict[str, Any]:
    if getattr(args, "isolated_worktree", False) and not getattr(args, "_inside_isolated", False):
        return run_isolated_cycle(cycle, args)
    root = work_root(args)
    log_loop_event(
        args,
        "cycle-start",
        "starting cycle",
        cycle=cycle,
        root=rel(root),
        changesOnly=getattr(args, "changes_only", False),
        isolated=getattr(args, "isolated_worktree", False),
        insideIsolated=getattr(args, "_inside_isolated", False),
        commit=getattr(args, "commit", False),
        promptSource=operator_prompt_source(args),
        remoteVerifyHost=getattr(args, "remote_verify_host", DEFAULT_REMOTE_VERIFY_HOST),
        allowLocalVerification=getattr(args, "allow_local_verification", False),
        codexDangerouslyBypassApprovalsAndSandbox=getattr(args, "dangerously_bypass_approvals_and_sandbox", False),
        codexSearch=getattr(args, "search", False),
    )
    before_changes = workspace_snapshot(root)
    state = load_state()
    state.update({"status": "running", "cycle": cycle, "activeStep": "audit"})
    save_state(state)
    audit_result = run_audit(cycle, args)
    log_loop_event(args, "audit-complete", "audit step complete", cycle=cycle, skipped=audit_result.get("skipped"), exitCode=audit_result.get("exitCode"), auditReport=audit_result.get("auditReport"))
    state.update({"activeStep": "choose", "lastAudit": compact_result(audit_result)})
    save_state(state)
    if getattr(args, "plan_only", False):
        decision = {
            "time": utc_now(),
            "cycle": cycle,
            "choice": "plan-only",
            "auditReport": audit_result.get("auditReport"),
            "auditInput": audit_result.get("auditInput"),
            "promptSource": operator_prompt_source(args),
        }
        append_jsonl(DECISIONS_FILE, {"event": "decision", **decision})
        change_result = write_change_report(cycle, before_changes, args.diff_limit, args.patch_file, root)
        change_result["verificationPolicy"] = verification_policy(args)
        log_loop_event(args, "changes-written", "change report written", cycle=cycle, changedFiles=change_result.get("changedFileCount"), reportPath=change_result.get("reportPath"), patchPath=change_result.get("patchPath"))
        change_result["logPaths"] = result_log_paths(audit_result)
        change_result["exitCode"] = int(audit_result.get("exitCode") or 0)
        if args.patch_check:
            run_patch_apply_check(cycle, args, change_result)
        state.update(
            {
                "status": "plan-complete",
                "activeStep": None,
                "lastDecision": decision,
                "lastImplementation": None,
                "lastVerification": None,
                "lastChanges": {
                    "changedFileCount": change_result["changedFileCount"],
                    "changedFiles": change_result["changedFiles"],
                    "reportPath": change_result["reportPath"],
                    "patchPath": change_result["patchPath"],
                    "diffTruncated": change_result["diffTruncated"],
                    "logPaths": change_result["logPaths"],
                    "exitCode": change_result["exitCode"],
                    "patchApplies": change_result.get("patchApplies"),
                    "patchApplyCheckSkipped": change_result.get("patchApplyCheckSkipped"),
                    "patchApplyCheckLogPath": change_result.get("patchApplyCheckLogPath"),
                    "verificationPolicy": change_result.get("verificationPolicy"),
                    "outputPreview": change_result["outputPreview"],
                },
            }
        )
        save_state(state)
        log_loop_event(args, "cycle-complete", "plan-only cycle complete", cycle=cycle, exitCode=change_result.get("exitCode"), changedFiles=change_result.get("changedFileCount"))
        if args.changes_only:
            changes_output = format_changes_output(change_result, args.changes_format)
            if not getattr(args, "_suppress_changes_output", False):
                print(changes_output, end="")
            payload = cycle_result_payload(cycle, decision, audit_result, None, None, change_result, include_details=True, args=args)
            payload["changesOutput"] = changes_output
            return payload
        return cycle_result_payload(cycle, decision, audit_result, None, None, change_result, include_details=True, args=args)
    if STOP_REQUESTED:
        decision = {
            "time": utc_now(),
            "cycle": cycle,
            "choice": "interrupted",
            "auditReport": audit_result.get("auditReport"),
            "auditInput": audit_result.get("auditInput"),
            "exitCode": audit_result.get("exitCode"),
        }
        append_jsonl(DECISIONS_FILE, {"event": "cycle-stopped-after-audit", **decision})
        state.update({"status": "interrupted", "activeStep": None, "lastDecision": decision})
        save_state(state)
        return {"cycle": cycle, "stopped": True, "decision": decision, "exitCode": int(audit_result.get("exitCode") or 130)}

    choice = choose_next(args, cycle, audit_result)
    decision = {
        "time": utc_now(),
        "cycle": cycle,
        "choice": choice,
        "auditReport": audit_result.get("auditReport"),
        "auditInput": audit_result.get("auditInput"),
        "promptSource": operator_prompt_source(args),
    }
    append_jsonl(DECISIONS_FILE, {"event": "decision", **decision})
    log_loop_event(args, "decision", "operator decision selected", cycle=cycle, promptSource=decision.get("promptSource"), choicePreview=trim_text(choice.replace("\n", " "), 160))
    if choice.lower() in {"quit", "q", "stop", "exit"}:
        state.update({"status": "stopped", "activeStep": None, "lastDecision": decision})
        save_state(state)
        return {"cycle": cycle, "stopped": True, "decision": decision, "exitCode": 0}

    state.update({"activeStep": "implementation", "lastDecision": decision})
    save_state(state)
    if choice.lower() in {"next", "n"}:
        impl_result = run_next_spec_task(cycle, args, non_interactive=False)
    elif choice.lower() in {"auto", "non-interactive"}:
        impl_result = run_next_spec_task(cycle, args, non_interactive=True)
    else:
        impl_result = run_custom_task(cycle, args, choice, audit_result)
    if STOP_REQUESTED:
        decision["choice"] = "interrupted"
    decision["exitCode"] = impl_result.get("exitCode")
    decision["logPath"] = impl_result.get("logPath")
    decision["prompt"] = impl_result.get("promptPath")
    append_jsonl(DECISIONS_FILE, {"event": "implementation-complete", **decision})
    log_loop_event(args, "implementation-complete", "implementation step complete", cycle=cycle, exitCode=impl_result.get("exitCode"), logPath=impl_result.get("logPath"), promptPath=impl_result.get("promptPath"))

    verify_result = None
    if STOP_REQUESTED:
        log_loop_event(args, "verification-skipped", "verification skipped after stop request", cycle=cycle)
    else:
        log_loop_event(
            args,
            "verification-start",
            "starting verification",
            cycle=cycle,
            verifyCmd=args.verify_cmd,
            remoteVerifyHost=getattr(args, "remote_verify_host", DEFAULT_REMOTE_VERIFY_HOST),
            allowLocalVerification=getattr(args, "allow_local_verification", False),
        )
        verify_result = run_verification(
            cycle,
            args.verify_cmd,
            stream_output=not args.changes_only,
            cwd=root,
            animate=animation_enabled(args),
        )
        log_loop_event(args, "verification-complete", "verification complete", cycle=cycle, exitCode=(verify_result or {}).get("exitCode"), logPath=(verify_result or {}).get("logPath"))
    change_result = write_change_report(cycle, before_changes, args.diff_limit, args.patch_file, root)
    change_result["verificationPolicy"] = verification_policy(args)
    change_result["logPaths"] = result_log_paths(audit_result, impl_result, verify_result)
    change_result["exitCode"] = int(impl_result.get("exitCode") or 0)
    log_loop_event(args, "changes-written", "change report written", cycle=cycle, changedFiles=change_result.get("changedFileCount"), reportPath=change_result.get("reportPath"), patchPath=change_result.get("patchPath"), patchArtifactPath=change_result.get("patchArtifactPath"))
    if args.patch_check:
        patch_check_result = run_patch_apply_check(cycle, args, change_result)
        if patch_check_result.get("logPath"):
            change_result["logPaths"].append(str(patch_check_result["logPath"]))
    if getattr(args, "commit", False):
        commit_result = run_git_commit(cycle, args, change_result, impl_result, verify_result, root)
        attach_commit_result(change_result, commit_result)
        if int(commit_result.get("exitCode") or 0) != 0:
            change_result["exitCode"] = int(commit_result.get("exitCode") or 0)
    state.update(
        {
            "status": "interrupted" if STOP_REQUESTED else "cycle-complete",
            "activeStep": None,
            "lastImplementation": compact_result(impl_result),
            "lastVerification": compact_result(verify_result),
            "lastChanges": {
                "changedFileCount": change_result["changedFileCount"],
                "changedFiles": change_result["changedFiles"],
                "reportPath": change_result["reportPath"],
                "patchPath": change_result["patchPath"],
                "diffTruncated": change_result["diffTruncated"],
                "logPaths": change_result["logPaths"],
                "exitCode": change_result["exitCode"],
                "patchApplies": change_result.get("patchApplies"),
                "patchApplyCheckSkipped": change_result.get("patchApplyCheckSkipped"),
                "patchApplyCheckLogPath": change_result.get("patchApplyCheckLogPath"),
                "committed": change_result.get("committed"),
                "commitSkipped": change_result.get("commitSkipped"),
                "commitBranch": change_result.get("commitBranch"),
                "commitSha": change_result.get("commitSha"),
                "commitMessage": change_result.get("commitMessage"),
                "commitExitCode": change_result.get("commitExitCode"),
                "commitReason": change_result.get("commitReason"),
                "commitLogPath": change_result.get("commitLogPath"),
                "verificationPolicy": change_result.get("verificationPolicy"),
                "outputPreview": change_result["outputPreview"],
            },
        }
    )
    save_state(state)
    log_loop_event(args, "cycle-complete", "cycle complete", cycle=cycle, exitCode=change_result.get("exitCode"), changedFiles=change_result.get("changedFileCount"), committed=change_result.get("committed"), commitBranch=change_result.get("commitBranch"), commitSha=change_result.get("commitSha"))
    if args.changes_only:
        changes_output = format_changes_output(change_result, args.changes_format)
        if not getattr(args, "_suppress_changes_output", False):
            print(changes_output, end="")
        payload = cycle_result_payload(cycle, decision, audit_result, impl_result, verify_result, change_result, include_details=False, args=args)
        payload["changesOutput"] = changes_output
        return payload
    return cycle_result_payload(cycle, decision, audit_result, impl_result, verify_result, change_result, include_details=True, args=args)


def run_local(args: argparse.Namespace) -> int:
    install_signal_handlers()
    ensure_dirs()
    normalize_run_args(args)
    if getattr(args, "print_codex_argv", False):
        print(effective_codex_cmd(args))
        return 0
    operator_prompt(args)
    state = load_state()
    start_cycle = int(state.get("cycle") or 0) + 1
    max_cycles = args.cycles
    cycle = start_cycle
    rc = 0
    log_loop_event(
        args,
        "loop-start",
        "local loop starting",
        cycle=start_cycle,
        cycles=max_cycles,
        once=args.once,
        changesOnly=args.changes_only,
        isolated=args.isolated_worktree,
        commit=args.commit,
        promptSource=operator_prompt_source(args),
        remoteVerifyHost=getattr(args, "remote_verify_host", DEFAULT_REMOTE_VERIFY_HOST),
        allowLocalVerification=getattr(args, "allow_local_verification", False),
        codexDangerouslyBypassApprovalsAndSandbox=getattr(args, "dangerously_bypass_approvals_and_sandbox", False),
        codexSearch=getattr(args, "search", False),
    )
    if not args.changes_only:
        print(f"state: {rel(STATE_FILE)}")
        print(f"decisions: {rel(DECISIONS_FILE)}")
        print(f"events: {rel(EVENTS_FILE)}")
        print(f"logs: {rel(LOG_DIR)}")
    while max_cycles == 0 or cycle < start_cycle + max_cycles:
        if STOP_REQUESTED or has_stop_request():
            if has_stop_request():
                stream = sys.stderr if args.changes_only else sys.stdout
                print(f"stop request found: {rel(STOP_FILE)}", file=stream)
                print("clear it with: scripts/foundry_restate_codex_loop.py clear-stop", file=stream)
                log_loop_event(args, "loop-stop-marker", "stop marker found", cycle=cycle, stopFile=rel(STOP_FILE))
            break
        result = run_cycle(cycle, args)
        emit_restate_logs(
            int(getattr(args, "restate_logs", 0) or 0),
            int(getattr(args, "restate_log_lines", 80) or 80),
            str(getattr(args, "restate_data_dir", DEFAULT_RESTATE_DATA_DIR)),
            stream=sys.stderr if args.changes_only else sys.stdout,
        )
        rc = int(result.get("exitCode") or 0)
        if result.get("stopped"):
            log_loop_event(args, "loop-stopped", "cycle requested stop", cycle=cycle, exitCode=rc)
            break
        if rc != 0 and args.fail_fast:
            log_loop_event(args, "loop-fail-fast", "stopping after non-zero exit", cycle=cycle, exitCode=rc)
            break
        if args.once:
            log_loop_event(args, "loop-once-complete", "single-cycle run complete", cycle=cycle, exitCode=rc)
            break
        if args.sleep_seconds > 0:
            if not args.changes_only:
                print(f"\nsleeping {args.sleep_seconds}s before next cycle")
            time.sleep(args.sleep_seconds)
        cycle += 1
    state = load_state()
    if STOP_REQUESTED:
        state["status"] = "interrupted"
    else:
        state["status"] = state.get("status", "stopped")
    save_state(state)
    log_loop_event(args, "loop-exit", "local loop exited", cycle=cycle, exitCode=rc, status=state.get("status"))
    return rc


def status_cmd(args: argparse.Namespace) -> int:
    state = load_state()
    print(json.dumps(state, indent=2, sort_keys=True))
    if STOP_FILE.exists():
        print(f"\nstop request is present: {rel(STOP_FILE)}")
    if RESTATE_SERVER_FILE.exists():
        print(f"\nlatest Restate service metadata from {rel(RESTATE_SERVER_FILE)}:")
        print(json.dumps(read_json(RESTATE_SERVER_FILE), indent=2, sort_keys=True))
    if DECISIONS_FILE.exists():
        print(f"\nlatest decisions from {rel(DECISIONS_FILE)}:")
        lines = DECISIONS_FILE.read_text(encoding="utf-8", errors="replace").splitlines()[-args.lines :]
        for line in lines:
            print(line)
    if args.events and EVENTS_FILE.exists():
        print(f"\nlatest loop events from {rel(EVENTS_FILE)}:")
        for line in tail_jsonl(EVENTS_FILE, args.events):
            print(line)
    if args.logs:
        logs = sorted(LOG_DIR.glob("*.log"), key=lambda path: path.stat().st_mtime, reverse=True)
        print_log_tails(logs[: args.logs], args.lines)
    if args.restate_logs:
        emit_restate_logs(args.restate_logs, args.lines, args.restate_data_dir)
    return 0


def restate_logs_cmd(args: argparse.Namespace) -> int:
    emit_restate_logs(args.logs, args.lines, args.restate_data_dir)
    return 0


def events_cmd(args: argparse.Namespace) -> int:
    lines = tail_jsonl(EVENTS_FILE, args.lines)
    if not lines:
        print(f"no loop events found at {rel(EVENTS_FILE)}")
        return 0
    for line in lines:
        print(line)
    return 0


def stop_local_cmd(_args: argparse.Namespace) -> int:
    ensure_dirs()
    atomic_write_json(STOP_FILE, {"time": utc_now(), "reason": "operator requested stop"})
    record_loop_event("stop-requested", stopFile=rel(STOP_FILE), reason="operator requested stop")
    print(f"wrote {rel(STOP_FILE)}")
    print("clear it with: scripts/foundry_restate_codex_loop.py clear-stop")
    return 0


def clear_stop_cmd(_args: argparse.Namespace) -> int:
    if STOP_FILE.exists():
        STOP_FILE.unlink()
        record_loop_event("stop-cleared", stopFile=rel(STOP_FILE))
        print(f"removed {rel(STOP_FILE)}")
    else:
        record_loop_event("stop-clear-noop", stopFile=rel(STOP_FILE))
        print(f"no stop request found at {rel(STOP_FILE)}")
    return 0


def require_restate() -> Any:
    if sys.version_info < (3, 11):
        raise SystemExit("Restate Python SDK requires Python >= 3.11. Run serve-restate with python3.11.")
    try:
        import restate  # type: ignore[import-not-found]
    except ImportError as exc:
        raise SystemExit("Install Restate Python SDK first: python3.11 -m pip install 'restate_sdk[serde]' hypercorn") from exc
    return restate


def build_restate_app() -> Any:
    restate = require_restate()
    workflow = restate.Workflow("FoundryCodexSpecLoop")

    @workflow.main()
    async def run(ctx: Any, req: Dict[str, Any]) -> Dict[str, Any]:
        if request_requires_mutation(req) and not bool(request_value(req, "allowMutation", "allow_mutation", default=False)):
            return {
                "workflow": ctx.key(),
                "denied": True,
                "reason": "allowMutation true is required for Restate requests that can edit files",
                "exitCode": 2,
            }
        if request_requires_dangerous_codex(req) and not request_bool(req, "allowDangerousCodex", "allow_dangerous_codex", default=False):
            return {
                "workflow": ctx.key(),
                "denied": True,
                "reason": "allowDangerousCodex true is required when dangerouslyBypassApprovalsAndSandbox is requested",
                "exitCode": 2,
            }
        cycles_raw = request_value(req, "cycles", default=1)
        cycles = int(cycles_raw)
        sleep_seconds = max(0, int(request_value(req, "sleepSeconds", "sleep_seconds", default=0) or 0))
        base_cycle = await ctx.run_typed(
            "determine first local cycle",
            lambda: int(request_value(req, "startCycle", "start_cycle"))
            if request_value(req, "startCycle", "start_cycle") is not None
            else next_cycle_from_state(),
        )
        results: List[Dict[str, Any]] = []
        idx = 0
        while cycles == 0 or idx < cycles:
            local_args = namespace_from_restate_request(req)
            cycle = int(base_cycle) + idx
            stop_requested = await ctx.run_typed(f"foundry stop check {cycle}", has_stop_request)
            if stop_requested:
                return {"workflow": ctx.key(), "stopped": True, "results": results}
            result = await ctx.run_typed(
                f"foundry codex cycle {cycle}",
                lambda: run_cycle(cycle, local_args),
            )
            ctx.set("lastResult", result)
            results.append(result)
            if result.get("stopped"):
                break
            if int(result.get("exitCode") or 0) != 0 and local_args.fail_fast:
                break
            idx += 1
            if sleep_seconds > 0 and (cycles == 0 or idx < cycles):
                await ctx.sleep(delta=dt.timedelta(seconds=sleep_seconds))
        return {"workflow": ctx.key(), "results": results}

    @workflow.handler()
    async def status(ctx: Any, _req: Dict[str, Any]) -> Dict[str, Any]:
        return await ctx.get("lastResult", type_hint=dict) or {"status": "no result yet"}

    return restate.app([workflow])


def request_value(req: Dict[str, Any], *names: str, default: Any = None) -> Any:
    for name in names:
        if name in req and req[name] is not None:
            return req[name]
    return default


def request_bool(req: Dict[str, Any], *names: str, default: bool = False) -> bool:
    value = request_value(req, *names, default=default)
    if isinstance(value, str):
        return value.lower() in {"1", "true", "yes", "on"}
    return bool(value)


def request_requires_mutation(req: Dict[str, Any]) -> bool:
    if request_bool(req, "planOnly", "plan_only", default=False):
        return False
    choice = request_value(req, "choice")
    if isinstance(choice, str) and choice.lower() in {"quit", "q", "stop", "exit"}:
        return False
    return True


def request_requires_dangerous_codex(req: Dict[str, Any]) -> bool:
    return request_bool(
        req,
        "dangerouslyBypassApprovalsAndSandbox",
        "dangerously_bypass_approvals_and_sandbox",
        default=False,
    )


def namespace_from_restate_request(req: Dict[str, Any]) -> argparse.Namespace:
    prompt = request_value(req, "prompt")
    prompt_file = request_value(req, "promptFile", "prompt_file")
    has_prompt = bool(prompt or prompt_file)
    choice = request_value(req, "choice")
    args = argparse.Namespace(
        codex_cmd=str(request_value(req, "codexCmd", "codex_cmd", default=DEFAULT_CODEX_CMD)),
        dangerously_bypass_approvals_and_sandbox=request_bool(
            req,
            "dangerouslyBypassApprovalsAndSandbox",
            "dangerously_bypass_approvals_and_sandbox",
            default=False,
        ),
        search=request_bool(req, "search", "codexSearch", "codex_search", default=False),
        audit_limit=int(request_value(req, "auditLimit", "audit_limit", default=20)),
        log_lines=int(request_value(req, "logLines", "log_lines", default=80)),
        scope=request_value(req, "scope"),
        task_limit=int(request_value(req, "taskLimit", "task_limit", default=1)),
        choice=choice,
        prompt=prompt,
        prompt_file=prompt_file,
        auto=request_bool(req, "auto", default=not (choice or has_prompt)),
        verify_cmd=request_value(req, "verifyCmd", "verify_cmd", default=DEFAULT_VERIFY_CMD),
        remote_verify_host=str(request_value(req, "remoteVerifyHost", "remote_verify_host", default=DEFAULT_REMOTE_VERIFY_HOST)),
        allow_local_verification=request_bool(req, "allowLocalVerification", "allow_local_verification", default=False),
        local_verification_reason=request_value(req, "localVerificationReason", "local_verification_reason"),
        fail_fast=request_bool(req, "failFast", "fail_fast", default=True),
        skip_audit=request_bool(req, "skipAudit", "skip_audit", default=True),
        plan_only=request_bool(req, "planOnly", "plan_only", default=False),
        implement_only=request_bool(req, "implementOnly", "implement_only", default=False),
        isolated_worktree=request_bool(
            req,
            "isolatedWorktree",
            "isolated_worktree",
            default=request_bool(req, "changesOnly", "changes_only", default=False),
        ),
        isolated_base=str(request_value(req, "isolatedBase", "isolated_base", default=DEFAULT_ISOLATED_BASE)),
        keep_isolated_worktree=request_bool(req, "keepIsolatedWorktree", "keep_isolated_worktree", default=False),
        changes_only=request_bool(req, "changesOnly", "changes_only", default=False),
        changes_format=normalize_changes_format(str(request_value(req, "changesFormat", "changes_format", default="json"))),
        patch_file=request_value(req, "patchFile", "patch_file"),
        patch_check=request_bool(req, "patchCheck", "patch_check", default=True),
        diff_limit=int(request_value(req, "diffLimit", "diff_limit", default=DEFAULT_DIFF_LIMIT)),
        animation=request_bool(req, "animation", default=True),
        codex_stream=str(request_value(req, "codexStream", "codex_stream", default="auto")),
        event_log=request_bool(req, "eventLog", "event_log", default=True),
        commit=request_bool(req, "commit", default=False),
        commit_branch=request_value(req, "commitBranch", "commit_branch"),
        commit_branch_prefix=str(request_value(req, "commitBranchPrefix", "commit_branch_prefix", default=DEFAULT_COMMIT_BRANCH_PREFIX)),
        commit_message=request_value(req, "commitMessage", "commit_message"),
        cycles=1,
        once=True,
        sleep_seconds=0,
    )
    return normalize_run_args(args)


def serve_restate(args: argparse.Namespace) -> int:
    ensure_dirs()
    app = build_restate_app()
    try:
        import asyncio
        import hypercorn.asyncio  # type: ignore[import-not-found]
        import hypercorn.config  # type: ignore[import-not-found]
    except ImportError as exc:
        raise SystemExit("Install Hypercorn first: python3.11 -m pip install hypercorn") from exc
    conf = hypercorn.config.Config()
    conf.bind = [f"{args.host}:{args.port}"]
    access_log_path = repo_path(args.access_log_file) if args.access_log_file else RESTATE_LOG_DIR / f"{stamp()}-serve-restate-access.log"
    error_log_path = repo_path(args.error_log_file) if args.error_log_file else RESTATE_LOG_DIR / f"{stamp()}-serve-restate-error.log"
    access_log_path.parent.mkdir(parents=True, exist_ok=True)
    error_log_path.parent.mkdir(parents=True, exist_ok=True)
    conf.accesslog = str(access_log_path)
    conf.errorlog = str(error_log_path)
    conf.loglevel = str(args.log_level).upper()
    metadata = {
        "time": utc_now(),
        "host": args.host,
        "port": args.port,
        "accessLog": rel(access_log_path),
        "errorLog": rel(error_log_path),
        "logLevel": conf.loglevel,
    }
    atomic_write_json(RESTATE_SERVER_FILE, metadata)
    append_jsonl(DECISIONS_FILE, {"event": "serve-restate-start", **metadata})
    record_loop_event("serve-restate-start", **metadata)
    print(f"Serving Restate workflow on http://{args.host}:{args.port}")
    print(f"Restate access log: {rel(access_log_path)}")
    print(f"Restate error log: {rel(error_log_path)}")
    print(f"Show logs with: scripts/foundry_restate_codex_loop.py status --restate-logs 3")
    print("Register it with Restate, for example:")
    print(f"  restate deployments register http://localhost:{args.port}")
    asyncio.run(hypercorn.asyncio.serve(app, conf))
    return 0


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(description="Standalone Restate/local Codex loop over Foundry specs")
    sub = parser.add_subparsers(dest="command", required=True)

    local = sub.add_parser("local", help="run audit/implement cycles locally with durable logs")
    local.add_argument("--codex-cmd", default=DEFAULT_CODEX_CMD)
    local.add_argument("--print-codex-argv", action="store_true", help="print the effective child Codex command and exit")
    local.add_argument("--dangerously-bypass-approvals-and-sandbox", action="store_true", help="pass Codex's dangerous no-approval/no-sandbox flag to child Codex runs")
    local.add_argument("--search", action="store_true", help="pass Codex's live web search flag to child Codex runs")
    local.add_argument("--audit-limit", type=int, default=20)
    local.add_argument("--log-lines", type=int, default=80)
    local.add_argument("--scope")
    local.add_argument("--task-limit", type=int, default=1)
    local.add_argument("--choice", help="next, auto, quit, or custom feature text")
    local.add_argument("--prompt", help="inline implementation prompt for this cycle")
    local.add_argument("--prompt-file", help="file containing the implementation prompt")
    local.add_argument("--auto", action="store_true", help="choose auto when no explicit choice is provided")
    local.add_argument("--verify-cmd", default=DEFAULT_VERIFY_CMD)
    local.add_argument("--remote-verify-host", default=DEFAULT_REMOTE_VERIFY_HOST, help="remote host Codex should prefer for Foundry verification")
    local.add_argument("--allow-local-verification", action="store_true", help="allow local verification fallback when SSH to the remote verification host is blocked")
    local.add_argument("--local-verification-reason", help="reason to include when local verification fallback is allowed")
    local.add_argument("--cycles", type=int, default=1, help="0 means repeat until stopped")
    local.add_argument("--once", action="store_true", help="stop after one cycle regardless of --cycles")
    local.add_argument("--sleep-seconds", type=int, default=0)
    local.add_argument("--fail-fast", action="store_true")
    local.add_argument("--skip-audit", dest="skip_audit", action="store_true", default=True, help="skip the read-only audit and run the selected implementation prompt directly; this is the default")
    local.add_argument("--audit", dest="skip_audit", action="store_false", help="run the read-only planning audit before choosing implementation work")
    local.add_argument("--plan-only", action="store_true", help="run only the planning audit and stop before implementation")
    local.add_argument("--implement-only", action="store_true", help="skip audit and run only the selected implementation prompt")
    local.add_argument("--isolated-worktree", dest="isolated_worktree", action="store_true", default=None, help="run each cycle in a temporary git worktree and report only that worktree patch; this is implied by --changes-only")
    local.add_argument("--no-isolated-worktree", dest="isolated_worktree", action="store_false", help="run --changes-only directly in the current checkout")
    local.add_argument("--isolated-base", default=DEFAULT_ISOLATED_BASE, help="git ref used as the isolated worktree base")
    local.add_argument("--keep-isolated-worktree", action="store_true", help="keep the temporary worktree after the cycle for inspection")
    local.add_argument("--changes-only", action="store_true", help="suppress child logs and print only the files changed by each cycle")
    local.add_argument("--changes-format", choices=CHANGE_FORMATS, default="text")
    local.add_argument("--patch-file", help="additional path to write the cycle patch")
    local.add_argument("--no-patch-check", dest="patch_check", action="store_false", default=True, help="skip git apply --check validation of the generated patch")
    local.add_argument("--diff-limit", type=int, default=DEFAULT_DIFF_LIMIT, help="maximum characters of per-cycle diff to print/store")
    local.add_argument("--no-animation", dest="animation", action="store_false", default=True, help="disable stderr progress animation while child commands run")
    local.add_argument("--quiet-events", dest="event_log", action="store_false", default=True, help="write events.jsonl but do not mirror loop events to stderr")
    local.add_argument("--codex-stream", choices=CODEX_STREAM_MODES, default="auto", help="terminal stream mode for Codex child output")
    local.add_argument("--stream-codex-response", dest="codex_stream", action="store_const", const="assistant", help="stream Codex-authored response text while suppressing raw exec/tool output")
    local.add_argument("--commit", action="store_true", help="commit isolated worktree changes on a prompt-derived branch after a successful cycle")
    local.add_argument("--commit-branch", help="explicit branch name for --commit; must not already exist")
    local.add_argument("--commit-branch-prefix", default=DEFAULT_COMMIT_BRANCH_PREFIX, help="prefix for generated commit branches")
    local.add_argument("--commit-message", help="git commit subject for --commit")
    local.add_argument("--restate-logs", nargs="?", const=3, type=int, default=0, help="tail N Restate service/storage logs after each cycle; omit N to show 3")
    local.add_argument("--restate-log-lines", type=int, default=80, help="lines per Restate log to show after each cycle")
    local.add_argument("--restate-data-dir", default=str(DEFAULT_RESTATE_DATA_DIR), help="Restate data directory to scan for text LOG files")
    local.set_defaults(func=run_local)

    once = sub.add_parser("once", help="run one audit/implement cycle")
    once.add_argument("--codex-cmd", default=DEFAULT_CODEX_CMD)
    once.add_argument("--print-codex-argv", action="store_true")
    once.add_argument("--dangerously-bypass-approvals-and-sandbox", action="store_true")
    once.add_argument("--search", action="store_true")
    once.add_argument("--audit-limit", type=int, default=20)
    once.add_argument("--log-lines", type=int, default=80)
    once.add_argument("--scope")
    once.add_argument("--task-limit", type=int, default=1)
    once.add_argument("--choice")
    once.add_argument("--prompt")
    once.add_argument("--prompt-file")
    once.add_argument("--auto", action="store_true")
    once.add_argument("--verify-cmd", default=DEFAULT_VERIFY_CMD)
    once.add_argument("--remote-verify-host", default=DEFAULT_REMOTE_VERIFY_HOST)
    once.add_argument("--allow-local-verification", action="store_true")
    once.add_argument("--local-verification-reason")
    once.add_argument("--fail-fast", action="store_true")
    once.add_argument("--skip-audit", dest="skip_audit", action="store_true", default=True)
    once.add_argument("--audit", dest="skip_audit", action="store_false")
    once.add_argument("--plan-only", action="store_true")
    once.add_argument("--implement-only", action="store_true")
    once.add_argument("--isolated-worktree", dest="isolated_worktree", action="store_true", default=None)
    once.add_argument("--no-isolated-worktree", dest="isolated_worktree", action="store_false")
    once.add_argument("--isolated-base", default=DEFAULT_ISOLATED_BASE)
    once.add_argument("--keep-isolated-worktree", action="store_true")
    once.add_argument("--changes-only", action="store_true")
    once.add_argument("--changes-format", choices=CHANGE_FORMATS, default="text")
    once.add_argument("--patch-file")
    once.add_argument("--no-patch-check", dest="patch_check", action="store_false", default=True)
    once.add_argument("--diff-limit", type=int, default=DEFAULT_DIFF_LIMIT)
    once.add_argument("--no-animation", dest="animation", action="store_false", default=True)
    once.add_argument("--quiet-events", dest="event_log", action="store_false", default=True)
    once.add_argument("--codex-stream", choices=CODEX_STREAM_MODES, default="auto")
    once.add_argument("--stream-codex-response", dest="codex_stream", action="store_const", const="assistant")
    once.add_argument("--commit", action="store_true")
    once.add_argument("--commit-branch")
    once.add_argument("--commit-branch-prefix", default=DEFAULT_COMMIT_BRANCH_PREFIX)
    once.add_argument("--commit-message")
    once.add_argument("--restate-logs", nargs="?", const=3, type=int, default=0)
    once.add_argument("--restate-log-lines", type=int, default=80)
    once.add_argument("--restate-data-dir", default=str(DEFAULT_RESTATE_DATA_DIR))
    once.set_defaults(func=run_local, cycles=1, once=True, sleep_seconds=0)

    status = sub.add_parser("status", help="show local loop state and recent logs")
    status.add_argument("--lines", type=int, default=30)
    status.add_argument("--logs", type=int, default=1)
    status.add_argument("--events", type=int, default=10, help="number of loop lifecycle events to show; use 0 to disable")
    status.add_argument("--restate-logs", nargs="?", const=3, type=int, default=1, help="number of Restate service/storage logs to tail; omit N to show 3; use 0 to disable")
    status.add_argument("--restate-data-dir", default=str(DEFAULT_RESTATE_DATA_DIR), help="Restate data directory to scan for text LOG files")
    status.set_defaults(func=status_cmd)

    restate_logs = sub.add_parser("restate-logs", help="tail Restate service and storage logs")
    restate_logs.add_argument("--lines", type=int, default=80)
    restate_logs.add_argument("--logs", "--restate-logs", dest="logs", nargs="?", const=3, type=int, default=3)
    restate_logs.add_argument("--restate-data-dir", default=str(DEFAULT_RESTATE_DATA_DIR), help="Restate data directory to scan for text LOG files")
    restate_logs.set_defaults(func=restate_logs_cmd)

    events = sub.add_parser("events", help="tail loop lifecycle events")
    events.add_argument("--lines", type=int, default=80)
    events.set_defaults(func=events_cmd)

    stop_local = sub.add_parser("stop-local", help="write a local stop request marker")
    stop_local.set_defaults(func=stop_local_cmd)

    clear_stop = sub.add_parser("clear-stop", help="remove the local stop request marker")
    clear_stop.set_defaults(func=clear_stop_cmd)

    serve = sub.add_parser("serve-restate", help="serve the Restate Workflow endpoint")
    serve.add_argument("--host", default="0.0.0.0")
    serve.add_argument("--port", type=int, default=9080)
    serve.add_argument("--log-level", default="INFO", help="Hypercorn log level")
    serve.add_argument("--access-log-file", help="path for Restate service access logs")
    serve.add_argument("--error-log-file", help="path for Restate service error logs")
    serve.set_defaults(func=serve_restate)

    return parser


COMMANDS = {"local", "once", "status", "restate-logs", "events", "stop-local", "clear-stop", "serve-restate"}
LOG_ONLY_OPTIONS = {"--restate-logs", "--logs", "--lines", "--restate-data-dir"}
IMPLEMENTATION_ONLY_OPTIONS = {
    "--prompt",
    "--prompt-file",
    "--implement-only",
    "--plan-only",
    "--isolated-worktree",
    "--no-isolated-worktree",
    "--changes-only",
    "--changes-format",
    "--patch-file",
    "--codex-cmd",
    "--print-codex-argv",
    "--dangerously-bypass-approvals-and-sandbox",
    "--search",
    "--codex-stream",
    "--stream-codex-response",
    "--commit",
    "--commit-branch",
    "--commit-branch-prefix",
    "--commit-message",
    "--quiet-events",
    "--verify-cmd",
    "--remote-verify-host",
    "--allow-local-verification",
    "--local-verification-reason",
}


def normalize_argv(argv: List[str]) -> List[str]:
    if not argv or argv[0] in COMMANDS or argv[0] in {"-h", "--help"}:
        if argv and argv[0] == "status":
            bad = [arg for arg in argv[1:] if arg.split("=", 1)[0] in IMPLEMENTATION_ONLY_OPTIONS]
            if bad:
                raise SystemExit(
                    "status is read-only and does not accept implementation flags: "
                    + ", ".join(bad)
                    + "\nUse `once` or `local` for implementation cycles, for example:\n"
                    + "  scripts/foundry_restate_codex_loop.py once --prompt-file prompts/spec-loop/02-kubernetes-api-cli-surface.md --implement-only --changes-only --restate-logs 3\n"
                    + "Use `status --logs 3 --restate-logs 3` only for inspection."
                )
        return argv
    for arg in argv:
        option = arg.split("=", 1)[0]
        if option in LOG_ONLY_OPTIONS:
            return ["restate-logs", *argv]
    return argv


def main() -> int:
    args = build_parser().parse_args(normalize_argv(sys.argv[1:]))
    return int(args.func(args))


if __name__ == "__main__":
    raise SystemExit(main())