ipc

import "github.com/jbcom/radioactive-ralph/internal/ipc"

Package ipc is radioactive-ralph’s supervisor-client IPC layer.

The supervisor listens on a Unix domain socket under its workspace’s sessions/ directory. `radioactive_ralph status`, `radioactive_ralph attach`, `radioactive_ralph enqueue`, `radioactive_ralph stop`, and `radioactive_ralph reload-config` connect to the same socket and exchange newline-delimited JSON messages.

Heartbeat liveness is signalled via the supervisor touching an `.alive` file every few seconds. `radioactive_ralph status` checks the file’s mtime before even attempting a socket connect — if the supervisor crashed and left a stale socket, we want to surface the dead-daemon state cleanly rather than hang on a connection attempt.

Wire protocol:

Request:  {"cmd": "<verb>", "args": {...}}\n
Response: {"ok": true|false, "data": ..., "error": "..."}\n

For commands that stream (attach), the server sends N >= 0 frames of {“event”: {…}}\n followed by a terminating {“ok”: true}\n.

Index

Constants

Command names for the JSON-line protocol.

const (
    CmdStatus       = "status"
    CmdAttach       = "attach"
    CmdEnqueue      = "enqueue"
    CmdStop         = "stop"
    CmdReloadConfig = "reload-config"
)

Variables

ErrClosed is a sentinel value; use errors.Is to match.

var ErrClosed error = closedError{}

func Dial

func Dial(socketPath string, timeout time.Duration) (*Client, error)

Dial connects to the supervisor at socketPath with the given timeout. Typical usage:

c, err := ipc.Dial(socketPath, 3*time.Second)
if err != nil { ... }
defer c.Close()
status, err := c.Status(ctx)

func NewServer

func NewServer(opts ServerOptions) (*Server, error)

NewServer constructs a Server. It does NOT bind the socket — call Start to begin accepting connections.

func SocketAlive

func SocketAlive(heartbeatPath string, maxAge time.Duration) bool

SocketAlive reports whether the heartbeat file at path was touched within maxAge. Clients (`radioactive_ralph status`) call this before attempting a socket connection so they can distinguish “supervisor dead” from “supervisor slow to respond.”

type Client

Client wraps a single Unix-socket connection to a Ralph supervisor. Clients are short-lived: construct, send one command, read reply, close. For streaming commands (attach), the client instance stays open until the server closes the stream.

type Client struct {
    // contains filtered or unexported fields
}

func (*Client) Attach

func (c *Client) Attach(ctx context.Context, fn func(json.RawMessage) error) error

Attach issues an attach request and streams event frames through fn until the supervisor closes the stream or ctx is cancelled. The returned error is nil for a clean end-of-stream.

func (*Client) Close

func (c *Client) Close() error

Close terminates the connection.

func (*Client) Enqueue

func (c *Client) Enqueue(ctx context.Context, args EnqueueArgs) (EnqueueReply, error)

Enqueue pushes a task. Returns the resulting task ID (possibly a dedup hit from FTS) and whether the task was freshly inserted.

func (*Client) ReloadConfig

func (c *Client) ReloadConfig(ctx context.Context) error

ReloadConfig asks the supervisor to re-read config.toml.

func (*Client) Status

func (c *Client) Status(ctx context.Context) (StatusReply, error)

Status issues a status request and returns the parsed StatusReply.

func (*Client) Stop

func (c *Client) Stop(ctx context.Context, args StopArgs) error

Stop issues a stop request. The server closes the socket after replying; expect the returned error to be ErrClosed on the next call.

type EnqueueArgs

EnqueueArgs is the client’s payload when pushing work via CmdEnqueue.

type EnqueueArgs struct {
    TaskID      string `json:"task_id"` // optional; supervisor generates UUID if empty
    Description string `json:"description"`
    Priority    int    `json:"priority,omitempty"`
}

type EnqueueReply

EnqueueReply tells the client whether a new task was created or a duplicate was collapsed (via FTS dedup in the db layer).

type EnqueueReply struct {
    TaskID   string `json:"task_id"`
    Inserted bool   `json:"inserted"` // false means FTS found a duplicate
}

type Handler

Handler handles a single client request. Attach streams events by calling emit repeatedly; other commands return (reply, nil) and the server transmits a single Response frame.

type Handler interface {
    // HandleStatus returns the current supervisor status.
    HandleStatus(ctx context.Context) (StatusReply, error)

    // HandleEnqueue queues a new task, returning the task ID and whether
    // it was a fresh insert or a dedup hit.
    HandleEnqueue(ctx context.Context, args EnqueueArgs) (EnqueueReply, error)

    // HandleStop signals the supervisor to shut down. The server closes
    // the IPC socket after sending the response.
    HandleStop(ctx context.Context, args StopArgs) error

    // HandleReloadConfig asks the supervisor to re-read config.toml.
    HandleReloadConfig(ctx context.Context) error

    // HandleAttach streams events to the client until either the
    // supervisor exits or the client disconnects. The implementation
    // should return when ctx is cancelled.
    HandleAttach(ctx context.Context, emit func(json.RawMessage) error) error
}

type Request

Request is a single command from a client to the supervisor.

type Request struct {
    Cmd  string          `json:"cmd"`
    Args json.RawMessage `json:"args,omitempty"`
}

type Response

Response is the single-shot reply shape. For streaming commands the server sends multiple Event frames followed by a final Response with Ok=true; mid-stream errors send a Response with Ok=false.

type Response struct {
    Ok    bool            `json:"ok"`
    Data  json.RawMessage `json:"data,omitempty"`
    Error string          `json:"error,omitempty"`
}

type Server

Server is the Unix-socket IPC server. One instance per supervisor process (one per variant per repo).

type Server struct {
    // contains filtered or unexported fields
}

func (*Server) Start

func (s *Server) Start(heartbeatInterval time.Duration) error

Start binds the socket and begins accepting connections in a background goroutine. Safe to call once. Returns the listener error if bind fails.

func (*Server) Stop

func (s *Server) Stop() error

Stop shuts the server down and waits for goroutines to exit.

type ServerOptions

ServerOptions configures a Server.

type ServerOptions struct {
    // SocketPath is the path to bind. Typically workspace/sessions/<variant>.sock.
    SocketPath string

    // HeartbeatPath is the file whose mtime is bumped every
    // HeartbeatInterval by the server. Typically workspace/sessions/<variant>.alive.
    HeartbeatPath string

    // HeartbeatInterval controls how often we refresh the heartbeat.
    // Defaults to 10s when zero.
    HeartbeatInterval time.Duration

    // Handler satisfies the IPC Handler interface.
    Handler Handler

    // Logger receives info/warn/error messages. Defaults to a no-op
    // logger when nil.
    Logger *slog.Logger
}

type StatusReply

StatusReply is the data payload for CmdStatus responses. Keeping this a plain struct rather than map[string]any makes versioning/migration explicit — callers know at compile-time what fields to expect.

type StatusReply struct {
    Variant        string        `json:"variant"`
    PID            int           `json:"pid"`
    Uptime         time.Duration `json:"uptime_ns"`
    ActiveSessions int           `json:"active_sessions"`
    QueuedTasks    int           `json:"queued_tasks"`
    RunningTasks   int           `json:"running_tasks"`
    LastEventAt    time.Time     `json:"last_event_at,omitempty"`
    HeartbeatAge   time.Duration `json:"heartbeat_age_ns,omitempty"`
}

type StopArgs

StopArgs controls the termination mode for CmdStop.

type StopArgs struct {
    Graceful bool          `json:"graceful"`             // wait for in-flight sessions to finish cleanly
    Timeout  time.Duration `json:"timeout_ns,omitempty"` // overrides default if >0
}

type StreamEvent

StreamEvent is one frame emitted during a streaming command (e.g. attach).

type StreamEvent struct {
    Event json.RawMessage `json:"event"`
}

Generated by gomarkdoc