diff --git a/packages/opencode/src/tasks/pulse.ts b/packages/opencode/src/tasks/pulse.ts index 6d8ef32f9a8e..911344ad95c5 100644 --- a/packages/opencode/src/tasks/pulse.ts +++ b/packages/opencode/src/tasks/pulse.ts @@ -47,17 +47,13 @@ export function startPulse(jobId: string, projectId: string, pmSessionId: string } const startJob = async (): Promise => { - const existingPid = await readLockPid(jobId, projectId).catch(() => null) - if (existingPid && isPidAlive(existingPid)) { - log.error("job already running", { jobId, existingPid }) + await writeLockFile(jobId, projectId, process.pid) + const lockPid = await readLockPid(jobId, projectId).catch(() => null) + if (lockPid !== process.pid) { + log.error("lost lock file race, aborting start", { jobId, lockPid, myPid: process.pid }) return } - if (existingPid && !isPidAlive(existingPid)) { - log.warn("overwriting stale lock file", { jobId, oldPid: existingPid }) - } - writeLockFile(jobId, projectId, process.pid).catch((e) => - log.error("failed to write lock file", { jobId, error: String(e) }), - ) + await Store.updateJob(projectId, jobId, { pulse_pid: process.pid }) } startJob() @@ -119,8 +115,8 @@ export async function resurrectionScan(jobId: string, projectId: string): Promis const alive = pidAlive || sessionAlive if (!alive) { - if (task.pipeline.stage === "developing") { - // Developer finished before restart — advance to reviewing, preserve worktree/branch + if (task.pipeline.stage === "developing" || task.pipeline.stage === "adversarial-running") { + // Developer or adversarial finished before restart — advance to reviewing, preserve worktree/branch await Store.updateTask(projectId, task.id, { assignee: null, assignee_pid: null, @@ -128,10 +124,12 @@ export async function resurrectionScan(jobId: string, projectId: string): Promis }, true) await Store.addComment(projectId, task.id, { author: "system", - message: "Resurrected: developer session ended before restart. Advanced to reviewing.", + message: task.pipeline.stage === "developing" + ? "Resurrected: developer session ended before restart. Advanced to reviewing." + : "Resurrected: adversarial session ended before restart. Returned to reviewing.", created_at: new Date().toISOString(), }) - log.info("resurrected developing task to reviewing", { taskId: task.id, jobId }) + log.info("resurrected task to reviewing", { taskId: task.id, jobId, fromStage: task.pipeline.stage }) } else { // Other stages — reset to idle (existing behavior) let worktreeRemoved = false @@ -237,7 +235,7 @@ function isPidAlive(pid: number): boolean { } } -export { isPidAlive, writeLockFile, removeLockFile, readLockPid, processAdversarialVerdicts, spawnAdversarial, scheduleReadyTasks, heartbeatActiveAgents } +export { isPidAlive, writeLockFile, removeLockFile, readLockPid, checkTimeouts, processAdversarialVerdicts, spawnAdversarial, scheduleReadyTasks, heartbeatActiveAgents } async function scheduleReadyTasks(jobId: string, projectId: string, pmSessionId: string): Promise { const job = await Store.getJob(projectId, jobId) @@ -416,7 +414,7 @@ async function checkTimeouts(jobId: string, projectId: string): Promise { const allTasks = await Store.listTasks(projectId) const jobTasks = allTasks.filter((t) => t.job_id === jobId) const now = Date.now() - const ADVERSARIAL_TIMEOUT_MS = 60 * 60 * 1000 + const ADVERSARIAL_TIMEOUT_MS = 30 * 60 * 1000 const SESSION_MESSAGE_TIMEOUT_MS = 30 * 60 * 1000 for (const task of jobTasks) { @@ -508,7 +506,7 @@ async function checkTimeouts(jobId: string, projectId: string): Promise { await Store.addComment(projectId, task.id, { author: "system", - message: "Adversarial agent timed out after 60 minutes. Will retry on next Pulse tick.", + message: "Adversarial agent timed out after 30 minutes. Will retry on next Pulse tick.", created_at: new Date().toISOString(), }) } diff --git a/packages/opencode/src/tasks/tool.ts b/packages/opencode/src/tasks/tool.ts index 2fa8b6f0aaf4..00a36123c9bc 100644 --- a/packages/opencode/src/tasks/tool.ts +++ b/packages/opencode/src/tasks/tool.ts @@ -875,7 +875,7 @@ if (params.command === "start") { assignee: null, assignee_pid: null, pipeline: { ...task.pipeline, stage: "done" }, - }) + }, true) await Store.addComment(projectId, params.taskId, { author: "system", @@ -935,7 +935,7 @@ if (params.command === "start") { adversarial_verdict: null, last_activity: null, }, - }) + }, true) await Store.addComment(projectId, params.taskId, { author: "system", diff --git a/packages/opencode/test/tasks/tool.test.ts b/packages/opencode/test/tasks/tool.test.ts new file mode 100644 index 000000000000..910213007b7e --- /dev/null +++ b/packages/opencode/test/tasks/tool.test.ts @@ -0,0 +1,182 @@ +import { describe, test, expect, beforeEach, afterEach } from "bun:test" +import { Store } from "../../src/tasks/store" +import { Global } from "../../src/global" +import path from "path" +import fs from "fs/promises" + +const TEST_PROJECT_ID = "test-tool-project" +const TEST_JOB_ID = "job-test-tool-123" +const TEST_TASK_ID = "task-tool-1" + +describe("tool.ts - taskctl commands", () => { + let originalDataPath: string + let testDataDir: string + + beforeEach(async () => { + originalDataPath = Global.Path.data + testDataDir = path.join("/tmp", "opencode-tool-test-" + Math.random().toString(36).slice(2)) + await fs.mkdir(testDataDir, { recursive: true }) + + process.env.OPENCODE_TEST_HOME = testDataDir + await Global.init() + + await Store.createJob(TEST_PROJECT_ID, { + id: TEST_JOB_ID, + parent_issue: 123, + status: "running", + created_at: new Date().toISOString(), + stopping: false, + pulse_pid: null, + max_workers: 1, + pm_session_id: "pm-session", + }) + }) + + afterEach(async () => { + await fs.rm(testDataDir, { recursive: true, force: true }).catch(() => {}) + if (originalDataPath) { + delete process.env.OPENCODE_TEST_HOME + } + }) + + describe("retry command", () => { + test("retry resets adversarial-running task without throwing", async () => { + const worktreePath = path.join(testDataDir, "worktree-retry") + await fs.mkdir(worktreePath, { recursive: true }) + + const task: any = { + id: TEST_TASK_ID, + job_id: TEST_JOB_ID, + status: "failed", + priority: 1, + task_type: "implementation", + parent_issue: 123, + labels: [], + depends_on: [], + assignee: "ses-session123", + assignee_pid: 12345, + worktree: worktreePath, + branch: "feature/test", + title: "Test task", + description: "Test", + acceptance_criteria: "Test", + created_at: new Date().toISOString(), + updated_at: new Date().toISOString(), + close_reason: null, + comments: [], + pipeline: { + stage: "adversarial-running", + attempt: 1, + last_activity: new Date().toISOString(), + last_steering: null, + history: [], + adversarial_verdict: { + verdict: "ISSUES_FOUND", + summary: "Test issues", + issues: [{ location: "file.ts", severity: "HIGH", fix: "Fix this" }], + created_at: new Date().toISOString(), + }, + }, + } + + await Store.createTask(TEST_PROJECT_ID, task) + + await expect(async () => { + const result = await Store.updateTask(TEST_PROJECT_ID, TEST_TASK_ID, { + status: "open", + assignee: null, + assignee_pid: null, + worktree: null, + branch: null, + pipeline: { + ...task.pipeline, + stage: "idle", + attempt: 1, + adversarial_verdict: null, + last_activity: null, + }, + }, true) + + await Store.addComment(TEST_PROJECT_ID, TEST_TASK_ID, { + author: "system", + message: `Retried by PM. Task reset to open. Pulse will reschedule on next tick.`, + created_at: new Date().toISOString(), + }) + }).not.toThrow() + + const updated = await Store.getTask(TEST_PROJECT_ID, TEST_TASK_ID) + expect(updated?.status).toBe("open") + expect(updated?.assignee).toBeNull() + expect(updated?.worktree).toBeNull() + expect(updated?.branch).toBeNull() + expect(updated?.pipeline.stage).toBe("idle") + expect(updated?.pipeline.attempt).toBe(1) + expect(updated?.pipeline.adversarial_verdict).toBeNull() + }) + }) + + describe("override command", () => { + test("override --skip on adversarial-running task succeeds without throwing", async () => { + const worktreePath = path.join(testDataDir, "worktree-override") + await fs.mkdir(worktreePath, { recursive: true }) + + const task: any = { + id: TEST_TASK_ID, + job_id: TEST_JOB_ID, + status: "failed", + priority: 1, + task_type: "implementation", + parent_issue: 123, + labels: [], + depends_on: [], + assignee: "ses-session456", + assignee_pid: 67890, + worktree: worktreePath, + branch: "feature/override-test", + title: "Test task", + description: "Test", + acceptance_criteria: "Test", + created_at: new Date().toISOString(), + updated_at: new Date().toISOString(), + close_reason: null, + comments: [], + pipeline: { + stage: "adversarial-running", + attempt: 1, + last_activity: new Date().toISOString(), + last_steering: null, + history: [], + adversarial_verdict: null, + }, + } + + await Store.createTask(TEST_PROJECT_ID, task) + + await expect(async () => { + await Store.updateTask(TEST_PROJECT_ID, TEST_TASK_ID, { + status: "closed", + close_reason: "skipped by PM", + worktree: null, + branch: null, + assignee: null, + assignee_pid: null, + pipeline: { ...task.pipeline, stage: "done" }, + }, true) + + await Store.addComment(TEST_PROJECT_ID, TEST_TASK_ID, { + author: "system", + message: "Skipped by PM override. Dependent tasks are now unblocked.", + created_at: new Date().toISOString(), + }) + }).not.toThrow() + + const updated = await Store.getTask(TEST_PROJECT_ID, TEST_TASK_ID) + expect(updated?.status).toBe("closed") + expect(updated?.close_reason).toBe("skipped by PM") + expect(updated?.worktree).toBeNull() + expect(updated?.branch).toBeNull() + expect(updated?.assignee).toBeNull() + expect(updated?.pipeline.stage).toBe("done") + }) + }) +}) \ No newline at end of file