From 0f6ae463c301c0cae411cd4064d7f62ee06aad21 Mon Sep 17 00:00:00 2001 From: Janni Turunen Date: Fri, 20 Feb 2026 11:15:44 +0200 Subject: [PATCH 1/3] test(taskctl): add processAdversarialVerdicts state machine tests and fix stale verdict reload (#220) --- packages/opencode/src/tasks/pulse.ts | 15 +- packages/opencode/test/tasks/pipeline.test.ts | 371 +++++++++++++++++- 2 files changed, 380 insertions(+), 6 deletions(-) diff --git a/packages/opencode/src/tasks/pulse.ts b/packages/opencode/src/tasks/pulse.ts index 439e407407f8..3c789df023df 100644 --- a/packages/opencode/src/tasks/pulse.ts +++ b/packages/opencode/src/tasks/pulse.ts @@ -515,14 +515,21 @@ async function processAdversarialVerdicts(jobId: string, projectId: string, pmSe true, ) + // Reload to get task without the cleared verdict + const updatedTask = await Store.getTask(projectId, task.id) + if (!updatedTask) { + log.error("task disappeared after clearing verdict", { taskId: task.id }) + continue + } + if (verdict.verdict === "APPROVED") { - await commitTask(task, projectId, pmSessionId) + await commitTask(updatedTask, projectId, pmSessionId) } else { - const newAttempt = (task.pipeline.attempt || 0) + 1 + const newAttempt = (updatedTask.pipeline.attempt || 0) + 1 if (newAttempt >= 3) { - await escalateToPM(task, jobId, projectId, pmSessionId) + await escalateToPM(updatedTask, jobId, projectId, pmSessionId) } else { - await respawnDeveloper(task, jobId, projectId, pmSessionId, newAttempt, verdict) + await respawnDeveloper(updatedTask, jobId, projectId, pmSessionId, newAttempt, verdict) } } } diff --git a/packages/opencode/test/tasks/pipeline.test.ts b/packages/opencode/test/tasks/pipeline.test.ts index 373ea5328d7d..02ccc6a4ec3f 100644 --- a/packages/opencode/test/tasks/pipeline.test.ts +++ b/packages/opencode/test/tasks/pipeline.test.ts @@ -1,10 +1,31 @@ -import { describe, expect, test } from "bun:test" +import { describe, expect, test, mock } from "bun:test" import { Instance } from "../../src/project/instance" import { Store } from "../../src/tasks/store" import type { Task, Job } from "../../src/tasks/types" import { tmpdir } from "../fixture/fixture" +import { processAdversarialVerdicts } from "../../src/tasks/pulse" +import { Session } from "../../src/session" +import { SessionPrompt } from "../../src/session/prompt" +import { Worktree } from "../../src/worktree" +import { Bus } from "../../src/bus" +import { BackgroundTaskEvent } from "../../src/session/async-tasks" -describe("taskctl pipeline: verdict data validation", () => { +// Mock Session, SessionPrompt, Worktree at module level +mock.module("../../src/session/prompt", () => ({ + SessionPrompt: { + prompt: mock(async (opts: any) => Promise.resolve()), + cancel: mock(async (sessionId: string) => {}), + }, +})) + +mock.module("../../src/worktree", () => ({ + Worktree: { + create: mock(async (opts: any) => ({ directory: "/mock-worktree", branch: "mock-branch" })), + remove: mock(async (opts: any) => {}), + }, +})) + +describe("taskctl pipeline: processAdversarialVerdicts state machine", () => { test("APPROVED verdict stores correctly", async () => { await using tmp = await tmpdir() await Instance.provide({ @@ -406,3 +427,349 @@ describe("taskctl pipeline: verdict data validation", () => { }) }) }) + +describe("taskctl pipeline: processAdversarialVerdicts state machine", () => { + test("APPROVED verdict closes task and fires BackgroundTaskEvent", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const projectId = Instance.project.id + const testJob: Job = { + id: `job-${Date.now()}`, + parent_issue: 205, + status: "running", + created_at: new Date().toISOString(), + stopping: false, + pulse_pid: null, + max_workers: 3, + pm_session_id: "ses_0000001234567890abctest", + } + + const testTask: Task = { + id: `tsk_${Date.now()}${Math.random().toString(36).slice(2, 10)}`, + title: "Test task", + description: "Test description", + acceptance_criteria: "Test criteria", + parent_issue: 205, + job_id: testJob.id, + status: "review", + priority: 2, + task_type: "implementation", + labels: ["module:taskctl"], + depends_on: [], + assignee: null, + assignee_pid: null, + worktree: tmp.path, + branch: "test-branch", + created_at: new Date().toISOString(), + updated_at: new Date().toISOString(), + close_reason: null, + comments: [], + pipeline: { + stage: "reviewing", + attempt: 0, + last_activity: new Date().toISOString(), + last_steering: null, + history: [], + adversarial_verdict: { + verdict: "APPROVED", + summary: "Code looks good", + issues: [], + created_at: new Date().toISOString(), + }, + }, + } + + await Store.createJob(projectId, testJob) + await Store.createTask(projectId, testTask) + + // Track BackgroundTaskEvent + const completedEvents: any[] = [] + const unsubscribe = Bus.subscribe(BackgroundTaskEvent.Completed, (event) => { + completedEvents.push(event) + }) + + // Create a real PM session + const pmSession = await Session.create({ + directory: tmp.path, + title: "PM test session", + permission: [], + }) + + // Process the verdict + await processAdversarialVerdicts(testJob.id, projectId, pmSession.id) + + // Verify verdict was cleared and task closed + const finalTask = await Store.getTask(projectId, testTask.id) + expect(finalTask?.status).toBe("closed") + expect(finalTask?.close_reason).toBe("approved and committed") + expect(finalTask?.pipeline.adversarial_verdict).toBeNull() + expect(finalTask?.pipeline.stage).toBe("done") + + // Verify BackgroundTaskEvent was fired + expect(completedEvents).toHaveLength(1) + expect(completedEvents[0].taskID).toBe(testTask.id) + expect(completedEvents[0].sessionID).toBe(pmSession.id) + + unsubscribe() + }, + }) + }) + + test("ISSUES_FOUND verdict increments attempt and triggers respawn", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const projectId = Instance.project.id + const testJob: Job = { + id: `job-${Date.now()}`, + parent_issue: 205, + status: "running", + created_at: new Date().toISOString(), + stopping: false, + pulse_pid: null, + max_workers: 3, + pm_session_id: "ses_0000001234567890abctest", + } + + const testTask: Task = { + id: `tsk_${Date.now()}${Math.random().toString(36).slice(2, 10)}`, + title: "Test task", + description: "Test description", + acceptance_criteria: "Test criteria", + parent_issue: 205, + job_id: testJob.id, + status: "review", + priority: 2, + task_type: "implementation", + labels: ["module:taskctl"], + depends_on: [], + assignee: null, + assignee_pid: null, + worktree: tmp.path, + branch: "test-branch", + created_at: new Date().toISOString(), + updated_at: new Date().toISOString(), + close_reason: null, + comments: [], + pipeline: { + stage: "reviewing", + attempt: 0, + last_activity: new Date().toISOString(), + last_steering: null, + history: [], + adversarial_verdict: { + verdict: "ISSUES_FOUND", + summary: "Null check needed", + issues: [ + { + location: "src/foo.ts:42", + severity: "HIGH" as const, + fix: "Add null check before calling user.profile", + }, + ], + created_at: new Date().toISOString(), + }, + }, + } + + await Store.createJob(projectId, testJob) + await Store.createTask(projectId, testTask) + + // Create a real PM session + const pmSession = await Session.create({ + directory: tmp.path, + title: "PM test session", + permission: [], + }) + + // Process the verdict + await processAdversarialVerdicts(testJob.id, projectId, pmSession.id) + + // Verify verdict was cleared and attempt incremented + const finalTask = await Store.getTask(projectId, testTask.id) + expect(finalTask?.pipeline.adversarial_verdict).toBeNull() + expect(finalTask?.pipeline.attempt).toBe(1) + expect(finalTask?.status).toBe("in_progress") + expect(finalTask?.pipeline.stage).toBe("developing") + }, + }) + }) + + test("3rd ISSUES_FOUND verdict escalates to PM with failed status", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const projectId = Instance.project.id + const testJob: Job = { + id: `job-${Date.now()}`, + parent_issue: 205, + status: "running", + created_at: new Date().toISOString(), + stopping: false, + pulse_pid: null, + max_workers: 3, + pm_session_id: "ses_0000001234567890abctest", + } + + const testTask: Task = { + id: `tsk_${Date.now()}${Math.random().toString(36).slice(2, 10)}`, + title: "Test task", + description: "Test description", + acceptance_criteria: "Test criteria", + parent_issue: 205, + job_id: testJob.id, + status: "review", + priority: 2, + task_type: "implementation", + labels: ["module:taskctl"], + depends_on: [], + assignee: null, + assignee_pid: null, + worktree: tmp.path, + branch: "test-branch", + created_at: new Date().toISOString(), + updated_at: new Date().toISOString(), + close_reason: null, + comments: [], + pipeline: { + stage: "reviewing", + attempt: 2, // This is the 3rd attempt (0, 1, 2) + last_activity: new Date().toISOString(), + last_steering: null, + history: [], + adversarial_verdict: { + verdict: "ISSUES_FOUND", + summary: "Still has issues", + issues: [ + { + location: "src/foo.ts:42", + severity: "HIGH" as const, + fix: "Fix the null check properly", + }, + ], + created_at: new Date().toISOString(), + }, + }, + } + + await Store.createJob(projectId, testJob) + await Store.createTask(projectId, testTask) + + // Track BackgroundTaskEvent for escalation + const completedEvents: any[] = [] + const unsubscribe = Bus.subscribe(BackgroundTaskEvent.Completed, (event) => { + completedEvents.push(event) + }) + + // Create a real PM session + const pmSession = await Session.create({ + directory: tmp.path, + title: "PM test session", + permission: [], + }) + + // Process the verdict + await processAdversarialVerdicts(testJob.id, projectId, pmSession.id) + + // Verify task escalated to PM + const finalTask = await Store.getTask(projectId, testTask.id) + expect(finalTask?.status).toBe("failed") + expect(finalTask?.pipeline.adversarial_verdict).toBeNull() + expect(finalTask?.pipeline.stage).toBe("failed") + + // Verify BackgroundTaskEvent was fired for escalation + expect(completedEvents).toHaveLength(1) + expect(completedEvents[0].taskID).toBe(`escalation-${testTask.id}`) + + unsubscribe() + }, + }) + }) + + test("verdict cleared before action prevents double-processing", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const projectId = Instance.project.id + const testJob: Job = { + id: `job-${Date.now()}`, + parent_issue: 205, + status: "running", + created_at: new Date().toISOString(), + stopping: false, + pulse_pid: null, + max_workers: 3, + pm_session_id: "ses_0000001234567890abctest", + } + + const testTask: Task = { + id: `tsk_${Date.now()}${Math.random().toString(36).slice(2, 10)}`, + title: "Test task", + description: "Test description", + acceptance_criteria: "Test criteria", + parent_issue: 205, + job_id: testJob.id, + status: "review", + priority: 2, + task_type: "implementation", + labels: ["module:taskctl"], + depends_on: [], + assignee: null, + assignee_pid: null, + worktree: tmp.path, + branch: "test-branch", + created_at: new Date().toISOString(), + updated_at: new Date().toISOString(), + close_reason: null, + comments: [], + pipeline: { + stage: "reviewing", + attempt: 0, + last_activity: new Date().toISOString(), + last_steering: null, + history: [], + adversarial_verdict: { + verdict: "APPROVED", + summary: "Code looks good", + issues: [], + created_at: new Date().toISOString(), + }, + }, + } + + await Store.createJob(projectId, testJob) + await Store.createTask(projectId, testTask) + + // Create a real PM session + const pmSession = await Session.create({ + directory: tmp.path, + title: "PM test session", + permission: [], + }) + + // Process the verdict + let processCount = 0 + processCount++ + await processAdversarialVerdicts(testJob.id, projectId, pmSession.id) + + // Verify verdict was cleared immediately + const afterFirst = await Store.getTask(projectId, testTask.id) + expect(afterFirst?.pipeline.adversarial_verdict).toBeNull() + + // Try to process again - should be a no-op since verdict is null + processCount++ + await processAdversarialVerdicts(testJob.id, projectId, pmSession.id) + + // Verify task status hasn't changed (still closed or in same state) + const finalTask = await Store.getTask(projectId, testTask.id) + expect(finalTask?.pipeline.adversarial_verdict).toBeNull() + // The important thing is that we didn't double-process - verdict was null + }, + }) + }) +}) From adc52391cf52967d59053b78fb2d0b0db156584a Mon Sep 17 00:00:00 2001 From: Janni Turunen Date: Fri, 20 Feb 2026 11:23:59 +0200 Subject: [PATCH 2/3] fix(test): access event.properties.taskID in pipeline test assertions --- packages/opencode/test/tasks/pipeline.test.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/opencode/test/tasks/pipeline.test.ts b/packages/opencode/test/tasks/pipeline.test.ts index 02ccc6a4ec3f..218042f8afc4 100644 --- a/packages/opencode/test/tasks/pipeline.test.ts +++ b/packages/opencode/test/tasks/pipeline.test.ts @@ -509,8 +509,8 @@ describe("taskctl pipeline: processAdversarialVerdicts state machine", () => { // Verify BackgroundTaskEvent was fired expect(completedEvents).toHaveLength(1) - expect(completedEvents[0].taskID).toBe(testTask.id) - expect(completedEvents[0].sessionID).toBe(pmSession.id) + expect(completedEvents[0].properties.taskID).toBe(testTask.id) + expect(completedEvents[0].properties.sessionID).toBe(pmSession.id) unsubscribe() }, @@ -683,7 +683,7 @@ describe("taskctl pipeline: processAdversarialVerdicts state machine", () => { // Verify BackgroundTaskEvent was fired for escalation expect(completedEvents).toHaveLength(1) - expect(completedEvents[0].taskID).toBe(`escalation-${testTask.id}`) + expect(completedEvents[0].properties.taskID).toBe(`escalation-${testTask.id}`) unsubscribe() }, From bdb7f7bbc9ed5201c3db4bb27721eff6c349c583 Mon Sep 17 00:00:00 2001 From: Janni Turunen Date: Fri, 20 Feb 2026 11:30:28 +0200 Subject: [PATCH 3/3] fix(test): scope SessionPrompt and Worktree mocks to avoid global test pollution --- packages/opencode/test/tasks/pipeline.test.ts | 38 +++++++++++-------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/packages/opencode/test/tasks/pipeline.test.ts b/packages/opencode/test/tasks/pipeline.test.ts index 218042f8afc4..06a65e88e650 100644 --- a/packages/opencode/test/tasks/pipeline.test.ts +++ b/packages/opencode/test/tasks/pipeline.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, test, mock } from "bun:test" +import { describe, expect, test, spyOn, beforeEach, afterEach } from "bun:test" import { Instance } from "../../src/project/instance" import { Store } from "../../src/tasks/store" import type { Task, Job } from "../../src/tasks/types" @@ -10,21 +10,6 @@ import { Worktree } from "../../src/worktree" import { Bus } from "../../src/bus" import { BackgroundTaskEvent } from "../../src/session/async-tasks" -// Mock Session, SessionPrompt, Worktree at module level -mock.module("../../src/session/prompt", () => ({ - SessionPrompt: { - prompt: mock(async (opts: any) => Promise.resolve()), - cancel: mock(async (sessionId: string) => {}), - }, -})) - -mock.module("../../src/worktree", () => ({ - Worktree: { - create: mock(async (opts: any) => ({ directory: "/mock-worktree", branch: "mock-branch" })), - remove: mock(async (opts: any) => {}), - }, -})) - describe("taskctl pipeline: processAdversarialVerdicts state machine", () => { test("APPROVED verdict stores correctly", async () => { await using tmp = await tmpdir() @@ -429,6 +414,27 @@ describe("taskctl pipeline: processAdversarialVerdicts state machine", () => { }) describe("taskctl pipeline: processAdversarialVerdicts state machine", () => { + let promptSpy: any + let cancelSpy: any + let removeSpy: any + + beforeEach(() => { + // Mock SessionPrompt methods only for tests in this describe block + promptSpy = spyOn(SessionPrompt, "prompt").mockImplementation(() => Promise.resolve()) + cancelSpy = spyOn(SessionPrompt, "cancel").mockImplementation(() => {}) + + // Mock Worktree.remove only for tests in this describe block + removeSpy = spyOn(Worktree, "remove") + removeSpy.mockImplementation(async () => true) + }) + + afterEach(() => { + // Restore original implementations + promptSpy.mockRestore() + cancelSpy.mockRestore() + removeSpy.mockRestore() + }) + test("APPROVED verdict closes task and fires BackgroundTaskEvent", async () => { await using tmp = await tmpdir() await Instance.provide({