PM-trigger via Gitea: pm-task.sh + webhook routing on pm:plan label + idempotency

This commit is contained in:
danny8632
2026-05-12 07:24:25 +00:00
parent b3ab9639af
commit dc61d92eb1
4 changed files with 232 additions and 22 deletions

View File

@@ -19,11 +19,13 @@ ISSUE="${2:-}"
[[ -z "$REPO" || -z "$ISSUE" ]] && { echo "usage: $0 <owner>/<repo> <issue-id>" >&2; exit 64; } [[ -z "$REPO" || -z "$ISSUE" ]] && { echo "usage: $0 <owner>/<repo> <issue-id>" >&2; exit 64; }
# Serialize: only one dev-task active at a time (Max-plan rate-limit pool). # Serialize: only one dev-task active at a time (Max-plan rate-limit pool).
# Wait up to 60s for the lock; if still held, exit 75 so the webhook retries. # Wait up to LOCK_WAIT seconds (default 30 min — enough for one queued task
# behind a max-wallclock task). If still held, exit 75.
LOCK_FILE="${LOCK_FILE:-/var/agent/dev-task.lock}" LOCK_FILE="${LOCK_FILE:-/var/agent/dev-task.lock}"
LOCK_WAIT="${LOCK_WAIT:-1800}"
exec 200>"$LOCK_FILE" exec 200>"$LOCK_FILE"
if ! flock -w 60 200; then if ! flock -w "$LOCK_WAIT" 200; then
echo "[dev-task] another dev-task is running; backing off" >&2 echo "[dev-task] queue full after ${LOCK_WAIT}s wait; backing off" >&2
exit 75 exit 75
fi fi

190
pm/pm-task.sh Executable file
View File

@@ -0,0 +1,190 @@
#!/usr/bin/env bash
# pm-task.sh <owner>/<repo> <parent-issue-id>
#
# Webhook-triggered: an issue acquired the label "pm:plan". Reads the parent
# issue body as a free-form goal, asks claude to decompose into 15 structured
# agent-issues, creates each as a child issue with label "agent:assign", and
# comments on the parent linking to the children.
#
# Env knobs:
# MODEL default sonnet
# GITEA_URL default https://gitea.dannyhaslund.dk
# GITEA_TOKEN_FILE default /etc/agent/gitea-token
# PM_PLAN_LABEL_ID default 16 (the pm:plan label id on the target repo)
# PM_PLANNED_LABEL_ID default 17 (the pm:planned label id)
# AGENT_LABEL_ID default 15 (the agent:assign label id)
set -euo pipefail
REPO="${1:-}"
PARENT="${2:-}"
[[ -z "$REPO" || -z "$PARENT" ]] && { echo "usage: $0 <owner>/<repo> <parent-issue-id>" >&2; exit 64; }
MODEL="${MODEL:-sonnet}"
GITEA_URL="${GITEA_URL:-https://gitea.dannyhaslund.dk}"
GITEA_TOKEN_FILE="${GITEA_TOKEN_FILE:-/etc/agent/gitea-token}"
PM_PLAN_LABEL_ID="${PM_PLAN_LABEL_ID:-16}"
PM_PLANNED_LABEL_ID="${PM_PLANNED_LABEL_ID:-17}"
AGENT_LABEL_ID="${AGENT_LABEL_ID:-15}"
[[ -r "$GITEA_TOKEN_FILE" ]] || { echo "missing $GITEA_TOKEN_FILE" >&2; exit 65; }
GITEA_TOKEN="$(cat "$GITEA_TOKEN_FILE")"
# Per-issue flock so duplicate webhook deliveries (Gitea fires both 'opened'
# and 'label_updated' for an issue opened with a label) don't double-plan.
LOCK_DIR="${LOCK_DIR:-/var/lib/agent-pm}"
mkdir -p "$LOCK_DIR"
LOCK_FILE="$LOCK_DIR/${REPO//\//_}-${PARENT}.lock"
exec 201>"$LOCK_FILE"
if ! flock -n 201; then
echo "[pm-task] another pm-task already running for $REPO#$PARENT; exiting" >&2
exit 0
fi
LOG_FILE="/var/log/agent-webhook/pm-task-${REPO//\//_}-${PARENT}-$(date -u +%Y%m%dT%H%M%S).log"
mkdir -p "$(dirname "$LOG_FILE")"
log() { printf '[%s] %s\n' "$(date -u +%H:%M:%S)" "$*" | tee -a "$LOG_FILE"; }
api() { curl -fsS -H "Authorization: token $GITEA_TOKEN" -H "Content-Type: application/json" "$@"; }
# ---------- 1. Fetch parent issue + idempotency check ----------
log "Fetching parent $REPO#$PARENT"
parent_json="$(api "$GITEA_URL/api/v1/repos/$REPO/issues/$PARENT")"
parent_title="$(jq -r .title <<<"$parent_json")"
goal_body="$(jq -r .body <<<"$parent_json")"
parent_labels="$(jq -r '.labels[].name' <<<"$parent_json" | tr '\n' ' ')"
if echo "$parent_labels" | grep -qw "pm:planned"; then
log "Parent already has pm:planned label — refusing to re-plan. Remove pm:planned first if you want to re-plan."
exit 0
fi
# ---------- 2. Probe repo state ----------
# Cheap context: list the top-level file names on the default branch so the
# PM doesn't propose duplicating things that already exist.
default_branch="$(api "$GITEA_URL/api/v1/repos/$REPO" | jq -r .default_branch)"
contents_json="$(api "$GITEA_URL/api/v1/repos/$REPO/contents?ref=$default_branch")"
file_list="$(jq -r '.[].name' <<<"$contents_json" 2>/dev/null | head -25 || echo "(empty repo)")"
# Open PRs as context (may be relevant to whether work is in flight)
open_prs="$(api "$GITEA_URL/api/v1/repos/$REPO/pulls?state=open&limit=10" | \
jq -r '.[] | " #\(.number) — \(.title) (branch \(.head.ref))"' || echo " (none)")"
REPO_STATE="default branch: $default_branch
top-level files on $default_branch:
$(printf '%s\n' "$file_list" | sed 's/^/ - /')
open PRs:
$open_prs"
# ---------- 3. Decompose via claude ----------
SCHEMA='{
"type": "object",
"required": ["issues"],
"properties": {
"issues": {
"type": "array",
"minItems": 1,
"maxItems": 5,
"items": {
"type": "object",
"required": ["title", "goal", "done_criteria", "model"],
"properties": {
"title": { "type": "string", "maxLength": 80 },
"goal": { "type": "string", "minLength": 20 },
"done_criteria": { "type": "array", "minItems": 1, "items": { "type": "string" } },
"hints": { "type": "string" },
"model": { "type": "string", "enum": ["sonnet", "opus"] }
}
}
}
}
}'
PROMPT="You are the project manager for an autonomous coding empire. The human just opened parent issue #$PARENT in repo '$REPO' with the following goal. Decompose it into 1 to 5 structured agent-issues that, implemented in order, achieve the goal.
# Parent issue title
$parent_title
# Parent issue body (the goal)
$goal_body
# Current repo state
$REPO_STATE
# Constraints on the issues you produce
- Each issue must fit in <30 minutes of autonomous agent time on Sonnet.
- Done criteria must be specific and mechanically verifiable (file exists,
endpoint returns X, HTTP code, test passes). No taste calls.
- Order issues by dependency (foundation first).
- Use 'opus' only for tasks requiring design tradeoffs; routine
scaffolding/CRUD/refactors are 'sonnet'.
- Prefer fewer larger issues over many trivial ones, as long as each fits in
30 min.
- Avoid issues that require human judgment (\"make it look good\").
- Do NOT call any tools — produce JSON from the prompt text alone.
Return only the JSON matching the schema."
TMP="$(mktemp)"
trap 'rm -f "$TMP"' EXIT
log "Decomposing via claude (model=$MODEL)"
claude -p "$PROMPT" \
--model "$MODEL" \
--output-format json \
--json-schema "$SCHEMA" \
--allowedTools "" \
< /dev/null > "$TMP" 2>>"$LOG_FILE"
issues_json="$(jq -c '.structured_output.issues // empty' "$TMP")"
if [[ -z "$issues_json" || "$issues_json" == "null" ]]; then
log "ERROR: claude did not return structured_output.issues"
api -X POST -d "$(jq -nc --arg b "agent PM: failed to decompose. See log \`$LOG_FILE\` and remove + re-add the pm:plan label to retry." '{body:$b}')" \
"$GITEA_URL/api/v1/repos/$REPO/issues/$PARENT/comments" >/dev/null
exit 1
fi
count="$(jq 'length' <<<"$issues_json")"
log "claude proposed $count child issue(s)"
# ---------- 4. Create child issues with agent:assign label ----------
created=()
for i in $(seq 0 $((count - 1))); do
issue="$(jq -c ".[$i]" <<<"$issues_json")"
title="$(jq -r .title <<<"$issue")"
goal_t="$(jq -r .goal <<<"$issue")"
done_b="$(jq -r '.done_criteria | map("- [ ] " + .) | join("\n")' <<<"$issue")"
hints="$(jq -r '.hints // ""' <<<"$issue")"
model_t="$(jq -r '.model // "sonnet"' <<<"$issue")"
body="$(printf 'Parent: #%s\n\n## Goal\n\n%s\n\n## Done criteria\n\n%s\n\n## Hints\n\n%s\n\n## Model\n\n%s\n' \
"$PARENT" "$goal_t" "$done_b" "${hints:-(none)}" "$model_t")"
resp="$(api -X POST -d "$(jq -nc \
--arg t "$title" --arg b "$body" --argjson lid "$AGENT_LABEL_ID" \
'{title:$t, body:$b, labels:[$lid]}')" \
"$GITEA_URL/api/v1/repos/$REPO/issues")"
num="$(jq -r .number <<<"$resp")"
created+=("$num")
log " created #$num$title"
done
# ---------- 5. Update parent: remove pm:plan, add pm:planned, comment ----------
api -X DELETE "$GITEA_URL/api/v1/repos/$REPO/issues/$PARENT/labels/$PM_PLAN_LABEL_ID" >/dev/null 2>&1 || true
api -X POST -d "$(jq -nc --argjson lid "$PM_PLANNED_LABEL_ID" '{labels:[$lid]}')" \
"$GITEA_URL/api/v1/repos/$REPO/issues/$PARENT/labels" >/dev/null
comment_lines=""
for n in "${created[@]}"; do
comment_lines+="- #$n\n"
done
parent_comment="$(printf 'agent PM: decomposed into %d child issue(s):\n\n%b\nEach is labeled `agent:assign` and will be picked up by a dev agent (one at a time).' "${#created[@]}" "$comment_lines")"
api -X POST -d "$(jq -nc --arg b "$parent_comment" '{body:$b}')" \
"$GITEA_URL/api/v1/repos/$REPO/issues/$PARENT/comments" >/dev/null
log "Done. created=${created[*]}"

View File

@@ -11,6 +11,7 @@ RestartSec=3
StandardOutput=journal StandardOutput=journal
StandardError=journal StandardError=journal
Environment=PYTHONUNBUFFERED=1 Environment=PYTHONUNBUFFERED=1
Environment=PATH=/root/.local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
[Install] [Install]
WantedBy=multi-user.target WantedBy=multi-user.target

View File

@@ -28,7 +28,9 @@ LISTEN_PORT = int(os.environ.get("LISTEN_PORT", "18790"))
SECRET_FILE = Path(os.environ.get("WEBHOOK_SECRET_FILE", "/etc/agent/webhook-secret")) SECRET_FILE = Path(os.environ.get("WEBHOOK_SECRET_FILE", "/etc/agent/webhook-secret"))
DEV_HOST = os.environ.get("DEV_HOST", "agent@192.168.1.29") DEV_HOST = os.environ.get("DEV_HOST", "agent@192.168.1.29")
SSH_KEY = os.environ.get("SSH_KEY", "/root/.ssh/id_ed25519_pve") SSH_KEY = os.environ.get("SSH_KEY", "/root/.ssh/id_ed25519_pve")
LABEL = os.environ.get("AGENT_LABEL", "agent:assign") DEV_LABEL = os.environ.get("AGENT_LABEL", "agent:assign")
PM_LABEL = os.environ.get("PM_LABEL", "pm:plan")
PM_TASK_BIN = os.environ.get("PM_TASK_BIN", "/usr/local/bin/pm-task.sh")
LOG_DIR = Path(os.environ.get("HANDLER_LOG_DIR", "/var/log/agent-webhook")) LOG_DIR = Path(os.environ.get("HANDLER_LOG_DIR", "/var/log/agent-webhook"))
logging.basicConfig( logging.basicConfig(
@@ -47,27 +49,29 @@ def verify_signature(body: bytes, sig_header: str) -> bool:
return hmac.compare_digest(mac, sig_header) return hmac.compare_digest(mac, sig_header)
def should_dispatch(event: str, payload: dict) -> tuple[bool, str | None, int | None]: def route(event: str, payload: dict) -> tuple[str, str, int] | None:
"""Decide if this delivery should kick off a dev-task. Returns (yes, repo, issue#).""" """Decide which task to dispatch. Returns (kind, repo, issue#) or None."""
if event != "issues": if event != "issues":
return False, None, None return None
action = payload.get("action", "") action = payload.get("action", "")
if action not in ("opened", "label_updated"):
return None
issue = payload.get("issue", {}) issue = payload.get("issue", {})
repo = payload.get("repository", {}).get("full_name") repo = payload.get("repository", {}).get("full_name")
number = issue.get("number") number = issue.get("number")
labels = [l.get("name", "") for l in issue.get("labels", [])] labels = [l.get("name", "") for l in issue.get("labels", [])]
if action == "opened" and LABEL in labels: # PM takes priority over dev: a parent issue should be planned before any dev work fires
return True, repo, number if PM_LABEL in labels:
if action == "label_updated" and LABEL in labels: return ("pm", repo, number)
# Gitea fires label_updated for both add and remove; we only care about presence if DEV_LABEL in labels:
return True, repo, number return ("dev", repo, number)
return False, None, None return None
def dispatch(repo: str, issue: int): def dispatch_dev(repo: str, issue: int):
"""SSH into dev-01 and fire dev-task in the background.""" """SSH into dev-01 and fire dev-task in the background."""
LOG_DIR.mkdir(parents=True, exist_ok=True) LOG_DIR.mkdir(parents=True, exist_ok=True)
fn = LOG_DIR / f"dispatch-{repo.replace('/', '_')}-issue-{issue}-{int(time.time())}.log" fn = LOG_DIR / f"dispatch-dev-{repo.replace('/', '_')}-issue-{issue}-{int(time.time())}.log"
log.info("dispatching dev-task for %s#%d -> %s", repo, issue, fn) log.info("dispatching dev-task for %s#%d -> %s", repo, issue, fn)
cmd = [ cmd = [
"ssh", "-i", SSH_KEY, "ssh", "-i", SSH_KEY,
@@ -80,7 +84,18 @@ def dispatch(repo: str, issue: int):
] ]
with open(fn, "wb") as f: with open(fn, "wb") as f:
rc = subprocess.run(cmd, stdout=f, stderr=subprocess.STDOUT).returncode rc = subprocess.run(cmd, stdout=f, stderr=subprocess.STDOUT).returncode
log.info("dispatch ssh rc=%d", rc) log.info("dispatch-dev ssh rc=%d", rc)
def dispatch_pm(repo: str, issue: int):
"""Run pm-task.sh locally (claude is installed on this host)."""
LOG_DIR.mkdir(parents=True, exist_ok=True)
fn = LOG_DIR / f"dispatch-pm-{repo.replace('/', '_')}-issue-{issue}-{int(time.time())}.log"
log.info("dispatching pm-task for %s#%d -> %s", repo, issue, fn)
cmd = [PM_TASK_BIN, repo, str(issue)]
with open(fn, "wb") as f:
rc = subprocess.run(cmd, stdout=f, stderr=subprocess.STDOUT).returncode
log.info("dispatch-pm rc=%d", rc)
class WebhookHandler(http.server.BaseHTTPRequestHandler): class WebhookHandler(http.server.BaseHTTPRequestHandler):
@@ -99,11 +114,13 @@ class WebhookHandler(http.server.BaseHTTPRequestHandler):
except json.JSONDecodeError: except json.JSONDecodeError:
self.send_response(400); self.end_headers(); self.wfile.write(b"bad json\n"); return self.send_response(400); self.end_headers(); self.wfile.write(b"bad json\n"); return
ok, repo, issue = should_dispatch(ev, payload) r = route(ev, payload)
if ok and repo and issue is not None: if r is not None:
threading.Thread(target=dispatch, args=(repo, issue), daemon=True).start() kind, repo, issue = r
target = dispatch_pm if kind == "pm" else dispatch_dev
threading.Thread(target=target, args=(repo, issue), daemon=True).start()
self.send_response(202); self.end_headers() self.send_response(202); self.end_headers()
self.wfile.write(b"dispatched\n") self.wfile.write(f"dispatched {kind}\n".encode())
else: else:
self.send_response(200); self.end_headers() self.send_response(200); self.end_headers()
self.wfile.write(b"ignored\n") self.wfile.write(b"ignored\n")
@@ -123,6 +140,6 @@ class ThreadedServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
if __name__ == "__main__": if __name__ == "__main__":
log.info("agent-webhook listening on %s:%d, target=%s, label=%s", log.info("agent-webhook listening on %s:%d, dev=%s dev_label=%s pm_label=%s",
LISTEN_HOST, LISTEN_PORT, DEV_HOST, LABEL) LISTEN_HOST, LISTEN_PORT, DEV_HOST, DEV_LABEL, PM_LABEL)
ThreadedServer((LISTEN_HOST, LISTEN_PORT), WebhookHandler).serve_forever() ThreadedServer((LISTEN_HOST, LISTEN_PORT), WebhookHandler).serve_forever()