db

import "github.com/jbcom/radioactive-ralph/internal/db"

Package db is radioactive-ralph’s per-repo SQLite event log.

Every interesting thing the supervisor does is recorded as an event here: session spawns, user messages injected into managed Claude subprocesses, stream-json events received, session deaths, resumes, commits, PRs, errors. The database is an append-only log the supervisor replays on startup to rebuild in-memory state.

SQLite is opened with journal_mode=WAL so the supervisor (single writer) and status/attach readers (many readers) can coexist without locking. foreign_keys=ON for referential integrity on sessions<->spend. busy_timeout=5000 so brief writer contention during reader checkpoint doesn’t crash the supervisor.

Dedup uses SQLite’s built-in FTS5 virtual table over task descriptions. Full semantic search (sqlite-vec) was scoped out of M2 because it requires an embeddings pipeline we don’t want to own yet.

Index

Constants

TaskStatus values map to the tasks.status column.

const (
    TaskQueued  = "queued"
    TaskRunning = "running"
    TaskBlocked = "blocked"
    TaskDone    = "done"
    TaskFailed  = "failed"
)

DriverName is the modernc.org/sqlite driver identifier. Exposed so tests can open in-memory databases directly.

const DriverName = "sqlite"

type DB

DB is the package’s wrapper around *sql.DB with the supervisor’s helper methods attached. Construct via Open.

type DB struct {
    // contains filtered or unexported fields
}

func Open

func Open(ctx context.Context, path string) (*DB, error)

Open opens the SQLite database at path (creating it if absent), applies pending migrations, and returns a *DB ready for use.

path is typically xdg.Paths.StateDB. An empty string opens an in-memory database, which is what tests want.

func (*DB) AccumulateSpend

func (d *DB) AccumulateSpend(ctx context.Context, sessionUUID, model string, input, output, cached int64) error

AccumulateSpend upserts token counts for a (session, model) pair. Input/output/cached values are additive — each call adds to the existing totals.

func (*DB) ActiveSessions

func (d *DB) ActiveSessions(ctx context.Context) ([]Session, error)

ActiveSessions returns sessions that haven’t exited yet.

func (*DB) Append

func (d *DB) Append(ctx context.Context, e Event) (int64, error)

Append inserts an event. PayloadParsed is JSON-encoded if non-nil.

func (*DB) ClaimTask

func (d *DB) ClaimTask(ctx context.Context, taskID, worktreePath, sessionUUID string) error

ClaimTask marks a queued task as running and sets the worktree + session.

func (*DB) Close

func (d *DB) Close() error

Close releases the underlying connection pool.

func (*DB) Conn

func (d *DB) Conn() *sql.DB

Conn returns the underlying *sql.DB. Exposed for advanced callers (integration tests that want to assert schema directly); most supervisor code should use the typed methods on DB.

func (*DB) EnqueueTask

func (d *DB) EnqueueTask(ctx context.Context, id, description string, priority int) (string, bool, error)

EnqueueTask inserts a new task and returns its ID. If a near-duplicate description already exists with status in (queued, running), the existing task ID is returned and a new row is NOT inserted. Dedup uses FTS5’s MATCH against the normalised description.

func (*DB) FinishTask

func (d *DB) FinishTask(ctx context.Context, taskID string, success bool) error

FinishTask marks a task as done or failed.

func (*DB) InsertSession

func (d *DB) InsertSession(ctx context.Context, s Session) error

InsertSession records a newly-spawned managed session.

func (*DB) ListTasks

func (d *DB) ListTasks(ctx context.Context, statuses ...string) ([]Task, error)

ListTasks returns tasks matching the given statuses, most recent first. Pass nil or empty statuses to return all statuses.

func (*DB) MarkSessionExited

func (d *DB) MarkSessionExited(ctx context.Context, uuid, reason string) error

MarkSessionExited records why and when a session ended.

func (*DB) Path

func (d *DB) Path() string

Path returns the on-disk path of the database (empty string for in-memory).

func (*DB) Replay

func (d *DB) Replay(ctx context.Context, afterID int64, fn func(Event) error) error

Replay iterates events in ID order (which is also approximately timestamp order) and yields them via the callback. The callback’s returned error aborts iteration and is propagated to the caller.

afterID lets the caller resume from a previous replay’s last-seen ID. Pass 0 to iterate from the beginning.

func (*DB) SpendBySession

func (d *DB) SpendBySession(ctx context.Context, sessionUUID string) ([]Spend, error)

SpendBySession returns all (model, spend) entries for one session.

type Event

Event is the structured form of an entry in the events table. PayloadRaw holds the exact bytes received from stream-json (nil if the event originated inside the supervisor).

type Event struct {
    ID            int64
    Timestamp     time.Time
    Stream        string
    Kind          string
    Actor         string
    PayloadParsed any // marshalled to JSON before insert
    PayloadRaw    []byte
}

type Session

Session is the row shape of the sessions table.

type Session struct {
    UUID         string
    Variant      string
    WorktreePath string
    PID          int
    Model        string
    Stage        string
    StartedAt    time.Time
    ExitedAt     *time.Time
    ExitReason   string
}

type Spend

Spend is the row shape of the spend table.

type Spend struct {
    SessionUUID  string
    Model        string
    InputTokens  int64
    OutputTokens int64
    CachedInput  int64
}

type Task

Task is a queued unit of work.

type Task struct {
    ID           string
    Description  string
    Priority     int
    Status       string
    WorktreePath string
    ClaimedBy    string
    CreatedAt    time.Time
    UpdatedAt    time.Time
}

Generated by gomarkdoc