plandag¶
import "github.com/jbcom/radioactive-ralph/internal/plandag"
Package plandag owns the SQLite schema, DAG operations, and sync protocol for user-facing plans.
The schema is embedded under schema/*.sql and applied in lexical order by Migrate. The schema_version is tracked in a dedicated table so future versions can detect skew and refuse to open a DB newer than the running binary supports.
Index¶
-
func Open(ctx context.Context, opts Options) (*Store, error)
func (s *Store) AddDep(ctx context.Context, planID, taskID, dependsOn string) error
func (s *Store) ApproveTask(ctx context.Context, planID, taskID string) error
func (s *Store) AttachPlan(ctx context.Context, sessionID, planID string) error
func (s *Store) ClearSessionVariantTask(ctx context.Context, sessionVariantID, status string) error
func (s *Store) CloseSession(ctx context.Context, sessionID string) error
func (s *Store) CreatePlan(ctx context.Context, o CreatePlanOpts) (string, error)
func (s *Store) CreateSession(ctx context.Context, o SessionOpts) (string, error)
func (s *Store) CreateSessionVariant(ctx context.Context, o SessionVariantOpts) (string, error)
func (s *Store) CreateTask(ctx context.Context, o CreateTaskOpts) error
func (s *Store) GetPlan(ctx context.Context, id string) (*Plan, error)
func (s *Store) GetTask(ctx context.Context, planID, id string) (*Task, error)
func (s *Store) HeartbeatSession(ctx context.Context, sessionID string) error
func (s *Store) HeartbeatSessionVariant(ctx context.Context, sessionVariantID string) error
func (s *Store) ListPlans(ctx context.Context, statuses []PlanStatus) ([]Plan, error)
func (s *Store) ListTasks(ctx context.Context, planID string, statuses []TaskStatus) ([]Task, error)
func (s *Store) Ready(ctx context.Context, planID string) ([]Task, error)
func (s *Store) SetPlanStatus(ctx context.Context, id string, status PlanStatus) error
func (s *Store) TaskDeps(ctx context.Context, planID, taskID string) (TaskDepsResult, error)
Variables¶
ErrDuplicateSlug is returned when a plan with (repo_path, slug) already exists. Callers either pick a new slug or update.
var ErrDuplicateSlug = errors.New("plandag: plan with slug already exists in this repo")
ErrNoReadyTask indicates ClaimNextReady found nothing claimable.
var ErrNoReadyTask = errors.New("plandag: no ready task")
func Migrate¶
func Migrate(db *sql.DB) error
Migrate brings db up to currentSchemaVersion by applying any pending *.up.sql migrations in lexical order.
Migrations are idempotent per-version via SQLite’s user_version PRAGMA. Each migration runs inside a transaction.
type CreatePlanOpts¶
CreatePlanOpts configures plan creation. The caller supplies slug + title; ID is generated via Store.uuid().
type CreatePlanOpts struct {
Slug string
Title string
RepoPath string
RepoRemote string
PrimaryVariant string
Confidence int
TagsJSON string
}
type CreateTaskOpts¶
CreateTaskOpts configures task creation.
type CreateTaskOpts struct {
PlanID string
ID string // stable slug (operator-chosen or fixit-emitted)
Description string
Complexity string
Effort string
VariantHint string
ContextBoundary bool
AcceptanceJSON string
ParentTaskID string
}
type Options¶
Options configures Open.
type Options struct {
// DSN is a modernc.org/sqlite DSN, e.g.
// "file:/path/to/plans.db?_pragma=foreign_keys(1)&_pragma=journal_mode(WAL)"
DSN string
// Clock is swappable for tests. Nil defaults to clockwork.NewRealClock().
Clock clockwork.Clock
// UUID is swappable for tests. Nil defaults to uuid.NewV7().
UUID func() string
}
type Plan¶
Plan is the durable row.
type Plan struct {
ID string
Slug string
Title string
RepoPath string
RepoRemote string
Status PlanStatus
PrimaryVariant string
Confidence int
CreatedAt time.Time
UpdatedAt time.Time
LastSessionAt sql.NullTime
TagsJSON string // JSON array
}
type PlanStatus¶
PlanStatus enumerates the lifecycle states a plan can be in.
type PlanStatus string
const (
PlanStatusDraft PlanStatus = "draft"
PlanStatusActive PlanStatus = "active"
PlanStatusPaused PlanStatus = "paused"
PlanStatusDone PlanStatus = "done"
PlanStatusFailedPartial PlanStatus = "failed_partial"
PlanStatusArchived PlanStatus = "archived"
PlanStatusAbandoned PlanStatus = "abandoned"
)
type RepoTaskSummary¶
RepoTaskSummary is a task plus its repo-local plan context and latest event.
type RepoTaskSummary struct {
PlanID string
PlanSlug string
PlanTitle string
PlanStatus PlanStatus
Task Task
LatestEventType string
LatestPayloadJSON string
}
type SessionMode¶
SessionMode identifies attached/headless vs durable repo-service modes.
type SessionMode string
const (
SessionModeAttached SessionMode = "attached"
SessionModeDurable SessionMode = "durable"
)
type SessionOpts¶
SessionOpts configures CreateSession.
type SessionOpts struct {
ID string // optional; caller may pass an existing UUID. Empty → auto.
Mode SessionMode
Transport SessionTransport
PID int
PIDStartTime string
Host string
}
type SessionTransport¶
SessionTransport identifies how the operator/runtime reached the session.
type SessionTransport string
const (
SessionTransportStdio SessionTransport = "stdio"
SessionTransportSocket SessionTransport = "socket"
)
type SessionVariantOpts¶
SessionVariantOpts configures CreateSessionVariant.
type SessionVariantOpts struct {
SessionID string
VariantName string
SubprocessPID int
SubprocessStartTime string
}
type SessionVariantSummary¶
SessionVariantSummary is the current runtime-facing view of one active session_variant row joined to task + plan metadata.
type SessionVariantSummary struct {
ID string
SessionID string
VariantName string
Status string
PlanID string
PlanSlug string
TaskID string
TaskDesc string
StartedAt time.Time
LastHeartbeat time.Time
}
type Store¶
Store is the plandag handle. It wraps a *sql.DB plus deterministic clock + UUID provider (test-swappable).
type Store struct {
// contains filtered or unexported fields
}
func Open¶
func Open(ctx context.Context, opts Options) (*Store, error)
Open returns a migrated, ready-to-use Store.
func (*Store) AddDep¶
func (s *Store) AddDep(ctx context.Context, planID, taskID, dependsOn string) error
AddDep wires task → depends_on for the same plan. Rejects cycles.
func (*Store) ApproveTask¶
func (s *Store) ApproveTask(ctx context.Context, planID, taskID string) error
ApproveTask transitions a task waiting for operator approval back into the pending set.
func (*Store) ApproveTaskWithPayload¶
func (s *Store) ApproveTaskWithPayload(ctx context.Context, planID, taskID string, payload TaskEventPayload) error
ApproveTaskWithPayload transitions a task waiting for operator approval back into the pending set and records the operator action in task history.
func (*Store) AttachPlan¶
func (s *Store) AttachPlan(ctx context.Context, sessionID, planID string) error
AttachPlan records which session is attached to which plan. Used by `radioactive_ralph status` to enumerate active runtime ownership.
func (*Store) ClaimNextReady¶
func (s *Store) ClaimNextReady(ctx context.Context, planID, variant, sessionID, sessionVariantID string) (*Task, error)
ClaimNextReady is the atomic “claim the next ready task for this variant” operation. Returns the claimed task id, or ErrNoReadyTask if none. Uses BEGIN IMMEDIATE + UPDATE … RETURNING so two parallel ralphs never claim the same task.
func (*Store) ClearSessionVariantTask¶
func (s *Store) ClearSessionVariantTask(ctx context.Context, sessionVariantID, status string) error
ClearSessionVariantTask clears the active task from one worker row and marks it idle or terminated.
func (*Store) Close¶
func (s *Store) Close() error
Close releases DB resources.
func (*Store) CloseSession¶
func (s *Store) CloseSession(ctx context.Context, sessionID string) error
CloseSession removes a session row. FK cascades clear its variants and attach rows.
func (*Store) CreatePlan¶
func (s *Store) CreatePlan(ctx context.Context, o CreatePlanOpts) (string, error)
CreatePlan inserts a fresh plan in draft status and returns the newly generated UUID v7 id.
func (*Store) CreateSession¶
func (s *Store) CreateSession(ctx context.Context, o SessionOpts) (string, error)
CreateSession inserts a session row. Returns the session id. The row lifetime matches one attached run or durable repo-service process.
func (*Store) CreateSessionVariant¶
func (s *Store) CreateSessionVariant(ctx context.Context, o SessionVariantOpts) (string, error)
CreateSessionVariant registers a newly-spawned ralph subprocess against a session. Returns the variant row id.
func (*Store) CreateTask¶
func (s *Store) CreateTask(ctx context.Context, o CreateTaskOpts) error
CreateTask inserts a pending task. Callers wire dependencies via AddDep.
func (*Store) DB¶
func (s *Store) DB() *sql.DB
DB returns the underlying *sql.DB for callers that need raw access. Business-logic callers should use Store’s typed methods instead.
func (*Store) GetPlan¶
func (s *Store) GetPlan(ctx context.Context, id string) (*Plan, error)
GetPlan loads a plan by id.
func (*Store) GetTask¶
func (s *Store) GetTask(ctx context.Context, planID, id string) (*Task, error)
GetTask loads one task by (plan_id, id).
func (*Store) HeartbeatSession¶
func (s *Store) HeartbeatSession(ctx context.Context, sessionID string) error
HeartbeatSession refreshes last_heartbeat for a session. Called periodically by the attached run or durable repo service. Reaper uses staleness to detect dead sessions.
func (*Store) HeartbeatSessionVariant¶
func (s *Store) HeartbeatSessionVariant(ctx context.Context, sessionVariantID string) error
HeartbeatSessionVariant refreshes one worker row’s heartbeat.
func (*Store) ListActiveSessionVariants¶
func (s *Store) ListActiveSessionVariants(ctx context.Context, repoPath string, limit int) ([]SessionVariantSummary, error)
ListActiveSessionVariants returns running/idle session variants for a repo.
func (*Store) ListPlans¶
func (s *Store) ListPlans(ctx context.Context, statuses []PlanStatus) ([]Plan, error)
ListPlans returns plans matching filter. Empty filter → active + paused.
func (*Store) ListRepoTaskSummaries¶
func (s *Store) ListRepoTaskSummaries(ctx context.Context, repoPath string, statuses []TaskStatus, limit int) ([]RepoTaskSummary, error)
ListRepoTaskSummaries returns tasks for one repo with enough plan/event context for operator UIs.
func (*Store) ListTaskEvents¶
func (s *Store) ListTaskEvents(ctx context.Context, planID, taskID string, limit int) ([]TaskEvent, error)
ListTaskEvents returns the most recent task events first.
func (*Store) ListTasks¶
func (s *Store) ListTasks(ctx context.Context, planID string, statuses []TaskStatus) ([]Task, error)
ListTasks returns tasks for one plan, optionally filtered by status.
func (*Store) MarkBlocked¶
func (s *Store) MarkBlocked(ctx context.Context, planID, taskID, sessionID string, payload TaskEventPayload) error
MarkBlocked releases a running task into the blocked set so an operator can later requeue or otherwise intervene.
func (*Store) MarkDone¶
func (s *Store) MarkDone(ctx context.Context, planID, taskID, sessionID string, evidenceJSON string) ([]Task, error)
MarkDone transitions a running task to done, logs the event, and returns the set of newly-ready downstream tasks.
func (*Store) MarkFailed¶
func (s *Store) MarkFailed(ctx context.Context, planID, taskID, sessionID, reason string, maxRetries int) (retried bool, err error)
MarkFailed transitions a running task to failed or retries.
func (*Store) MarkFailedWithPayload¶
func (s *Store) MarkFailedWithPayload(ctx context.Context, planID, taskID, sessionID string, payload TaskEventPayload, maxRetries int) (retried bool, err error)
MarkFailedWithPayload transitions a running task to failed or retries while preserving structured payload details in task history.
func (*Store) OperatorFailTask¶
func (s *Store) OperatorFailTask(ctx context.Context, planID, taskID string, payload TaskEventPayload) error
OperatorFailTask force-fails a task and records an operator action.
func (*Store) OperatorHandoffTask¶
func (s *Store) OperatorHandoffTask(ctx context.Context, planID, taskID string, payload TaskEventPayload, variantHint string, requireApproval bool) error
OperatorHandoffTask returns a task to the runnable queue with a new variant hint supplied by the operator.
func (*Store) OperatorRequeueTask¶
func (s *Store) OperatorRequeueTask(ctx context.Context, planID, taskID string, payload TaskEventPayload, variantHint string, requireApproval bool) error
OperatorRequeueTask returns a blocked/failed/approval-gated task to the runnable queue and records the operator action in task history.
func (*Store) OperatorRetryTask¶
func (s *Store) OperatorRetryTask(ctx context.Context, planID, taskID string, payload TaskEventPayload) error
OperatorRetryTask increments retry_count and returns a blocked/failed task to the runnable queue.
func (*Store) Ready¶
func (s *Store) Ready(ctx context.Context, planID string) ([]Task, error)
Ready returns tasks that are ready to run — every dependency is `done` (or `skipped`). Result is ordered by created_at for stable test output.
func (*Store) RequeueTask¶
func (s *Store) RequeueTask(ctx context.Context, planID, taskID, sessionID, reason, variantHint string, requireApproval bool) error
RequeueTask releases a running task back into the DAG, optionally changing the variant hint and/or requiring operator approval before it becomes runnable again.
func (*Store) RequeueTaskWithPayload¶
func (s *Store) RequeueTaskWithPayload(ctx context.Context, planID, taskID, sessionID string, payload TaskEventPayload, variantHint string, requireApproval bool) error
RequeueTaskWithPayload releases a running task back into the DAG and emits a structured audit-log payload describing why it was requeued.
func (*Store) SetPlanStatus¶
func (s *Store) SetPlanStatus(ctx context.Context, id string, status PlanStatus) error
SetPlanStatus updates the plan’s status column.
func (*Store) SetSessionVariantTask¶
func (s *Store) SetSessionVariantTask(ctx context.Context, sessionVariantID, planID, taskID string) error
SetSessionVariantTask updates the currently assigned plan/task for one session_variant row and refreshes its heartbeat.
func (*Store) TaskDeps¶
func (s *Store) TaskDeps(ctx context.Context, planID, taskID string) (TaskDepsResult, error)
TaskDeps returns the up-and-down neighborhood for (planID, taskID) in a single round trip. Used by the TUI task-detail view.
type Task¶
Task is a DAG node.
type Task struct {
ID string
PlanID string
Description string
Complexity string
Effort string
VariantHint string
ContextBoundary bool
AcceptanceJSON string
Status TaskStatus
AssignedVariant string
ClaimedBySession string
ClaimedByVariantID string
RetryCount int
ReclaimCount int
ParentTaskID string
CreatedAt time.Time
UpdatedAt time.Time
}
type TaskDepsResult¶
TaskDepsResult captures both directions of a task’s DAG edges: the tasks this one depends on, and the tasks that depend on this one. Each side is returned as a list of task IDs in ascending lexical order so TUI drilldowns render deterministically.
type TaskDepsResult struct {
DependsOn []string // tasks this task waits for
DependedBy []string // tasks that wait for this task
}
type TaskEvent¶
TaskEvent is one append-only audit-log row for a task.
type TaskEvent struct {
ID int64
PlanID string
TaskID string
EventType string
Variant string
SessionID string
PayloadJSON string
OccurredAt time.Time
}
type TaskEventPayload¶
TaskEventPayload keeps task history payloads structured so the CLI, TUI, and tests can reason about approvals, handoffs, retries, and provider context without string scraping.
type TaskEventPayload struct {
Summary string `json:"summary,omitempty"`
Reason string `json:"reason,omitempty"`
Evidence []string `json:"evidence,omitempty"`
HandoffTo string `json:"handoff_to,omitempty"`
Retryable bool `json:"retryable,omitempty"`
NeedsContext []string `json:"needs_context,omitempty"`
ApprovalRequired bool `json:"approval_required,omitempty"`
Provider string `json:"provider,omitempty"`
ProviderSessionID string `json:"provider_session_id,omitempty"`
OperatorAction string `json:"operator_action,omitempty"`
}
func ParseTaskPayload¶
func ParseTaskPayload(raw string) (TaskEventPayload, error)
ParseTaskPayload decodes one payload_json string into the typed helper.
type TaskStatus¶
TaskStatus enumerates valid task lifecycle states.
type TaskStatus string
const (
TaskStatusPending TaskStatus = "pending"
TaskStatusReady TaskStatus = "ready"
TaskStatusReadyPendingApproval TaskStatus = "ready_pending_approval"
TaskStatusBlocked TaskStatus = "blocked"
TaskStatusRunning TaskStatus = "running"
TaskStatusDone TaskStatus = "done"
TaskStatusFailed TaskStatus = "failed"
TaskStatusSkipped TaskStatus = "skipped"
TaskStatusDecomposed TaskStatus = "decomposed"
)
Generated by gomarkdoc