ipc

import "github.com/jbcom/radioactive-ralph/internal/ipc"

Package ipc is radioactive-ralph’s repo-service IPC layer.

The repo service listens on a local control-plane endpoint under the repo’s state directory: a Unix domain socket on macOS/Linux and a named pipe on Windows. `radioactive_ralph status`, `radioactive_ralph attach`, `radioactive_ralph stop`, and internal control-path clients exchange newline-delimited JSON messages over the same transport.

Heartbeat liveness is signalled via the repo service touching an `.alive` file every few seconds. `radioactive_ralph status` checks the file’s mtime before even attempting a socket connect — if the service crashed and left a stale socket, we want to surface the dead-service state cleanly rather than hang on a connection attempt.

Wire protocol:

Request:  {"cmd": "<verb>", "args": {...}}\n
Response: {"ok": true|false, "data": ..., "error": "..."}\n

For commands that stream (attach), the server sends N >= 0 frames of {“event”: {…}}\n followed by a terminating {“ok”: true}\n.

Index

Constants

Command names for the JSON-line protocol.

const (
    CmdStatus       = "status"
    CmdAttach       = "attach"
    CmdEnqueue      = "enqueue"
    CmdStop         = "stop"
    CmdReloadConfig = "reload-config"
)

Variables

ErrClosed is a sentinel value; use errors.Is to match.

var ErrClosed error = closedError{}

func Dial

func Dial(socketPath string, timeout time.Duration) (*Client, error)

Dial connects to the repo service at socketPath with the given timeout. Typical usage:

c, err := ipc.Dial(socketPath, 3*time.Second)
if err != nil { ... }
defer c.Close()
status, err := c.Status(ctx)

func NewServer

func NewServer(opts ServerOptions) (*Server, error)

NewServer constructs a Server. It does NOT bind the socket — call Start to begin accepting connections.

func ServiceEndpoint

func ServiceEndpoint(sessionsDir string) (endpoint, heartbeat string)

ServiceEndpoint returns the local control-plane endpoint plus its heartbeat file for one repo workspace.

func SocketAlive

func SocketAlive(heartbeatPath string, maxAge time.Duration) bool

SocketAlive reports whether the heartbeat file at path was touched within maxAge. Clients (`radioactive_ralph status`) call this before attempting a socket connection so they can distinguish “service dead” from “service slow to respond.”

type Client

Client wraps a single Unix-socket connection to a Ralph repo service. Clients are short-lived: construct, send one command, read reply, close. For streaming commands (attach), the client instance stays open until the server closes the stream.

type Client struct {
    // contains filtered or unexported fields
}

func (*Client) Attach

func (c *Client) Attach(ctx context.Context, fn func(json.RawMessage) error) error

Attach issues an attach request and streams event frames through fn until the repo service closes the stream or ctx is cancelled. The returned error is nil for a clean end-of-stream.

func (*Client) Close

func (c *Client) Close() error

Close terminates the connection.

func (*Client) Enqueue

func (c *Client) Enqueue(ctx context.Context, args EnqueueArgs) (EnqueueReply, error)

Enqueue pushes a task. Returns the resulting task ID (possibly a dedup hit from FTS) and whether the task was freshly inserted.

func (*Client) ReloadConfig

func (c *Client) ReloadConfig(ctx context.Context) error

ReloadConfig asks the repo service to re-read config.toml.

func (*Client) Status

func (c *Client) Status(ctx context.Context) (StatusReply, error)

Status issues a status request and returns the parsed StatusReply.

func (*Client) Stop

func (c *Client) Stop(ctx context.Context, args StopArgs) error

Stop issues a stop request. The server closes the socket after replying; expect the returned error to be ErrClosed on the next call.

type EnqueueArgs

EnqueueArgs is the client’s payload when pushing work via CmdEnqueue.

type EnqueueArgs struct {
    TaskID      string `json:"task_id"` // optional; service generates UUID if empty
    Description string `json:"description"`
    Priority    int    `json:"priority,omitempty"`
}

type EnqueueReply

EnqueueReply tells the client whether a new task was created or a duplicate was collapsed (via FTS dedup in the db layer).

type EnqueueReply struct {
    TaskID   string `json:"task_id"`
    Inserted bool   `json:"inserted"` // false means FTS found a duplicate
}

type Handler

Handler handles a single client request. Attach streams events by calling emit repeatedly; other commands return (reply, nil) and the server transmits a single Response frame.

type Handler interface {
    // HandleStatus returns the current repo-service status.
    HandleStatus(ctx context.Context) (StatusReply, error)

    // HandleEnqueue queues a new task, returning the task ID and whether
    // it was a fresh insert or a dedup hit.
    HandleEnqueue(ctx context.Context, args EnqueueArgs) (EnqueueReply, error)

    // HandleStop signals the repo service to shut down. The server closes
    // the IPC socket after sending the response.
    HandleStop(ctx context.Context, args StopArgs) error

    // HandleReloadConfig asks the repo service to re-read config.toml.
    HandleReloadConfig(ctx context.Context) error

    // HandleAttach streams events to the client until either the
    // service exits or the client disconnects. The implementation
    // should return when ctx is cancelled.
    HandleAttach(ctx context.Context, emit func(json.RawMessage) error) error
}

type Request

Request is a single command from a client to the repo service.

type Request struct {
    Cmd  string          `json:"cmd"`
    Args json.RawMessage `json:"args,omitempty"`
}

type Response

Response is the single-shot reply shape. For streaming commands the server sends multiple Event frames followed by a final Response with Ok=true; mid-stream errors send a Response with Ok=false.

type Response struct {
    Ok    bool            `json:"ok"`
    Data  json.RawMessage `json:"data,omitempty"`
    Error string          `json:"error,omitempty"`
}

type Server

Server is the repo-service IPC server. One instance per repo service.

type Server struct {
    // contains filtered or unexported fields
}

func (*Server) Start

func (s *Server) Start(heartbeatInterval time.Duration) error

Start binds the socket and begins accepting connections in a background goroutine. Safe to call once. Returns the listener error if bind fails.

func (*Server) Stop

func (s *Server) Stop() error

Stop shuts the server down and waits for goroutines to exit.

type ServerOptions

ServerOptions configures a Server.

type ServerOptions struct {
    // SocketPath is the local endpoint path to bind. Typically
    // state/<repo>/sessions/service.sock on POSIX hosts.
    SocketPath string

    // HeartbeatPath is the file whose mtime is bumped every
    // HeartbeatInterval by the server. Typically the repo-service
    // heartbeat file next to the endpoint metadata.
    HeartbeatPath string

    // HeartbeatInterval controls how often we refresh the heartbeat.
    // Defaults to 10s when zero.
    HeartbeatInterval time.Duration

    // Handler satisfies the IPC Handler interface.
    Handler Handler

    // Logger receives info/warn/error messages. Defaults to a no-op
    // logger when nil.
    Logger *slog.Logger
}

type StatusReply

StatusReply is the data payload for CmdStatus responses.

type StatusReply struct {
    RepoPath      string          `json:"repo_path"`
    PID           int             `json:"pid"`
    Uptime        time.Duration   `json:"uptime_ns"`
    ActiveWorkers int             `json:"active_workers"`
    ReadyTasks    int             `json:"ready_tasks"`
    ApprovalTasks int             `json:"approval_tasks"`
    BlockedTasks  int             `json:"blocked_tasks"`
    RunningTasks  int             `json:"running_tasks"`
    FailedTasks   int             `json:"failed_tasks"`
    ActivePlans   int             `json:"active_plans"`
    Workers       []WorkerSummary `json:"workers,omitempty"`
    LastEventAt   time.Time       `json:"last_event_at,omitempty"`
    HeartbeatAge  time.Duration   `json:"heartbeat_age_ns,omitempty"`
}

type StopArgs

StopArgs controls the termination mode for CmdStop.

type StopArgs struct {
    Graceful bool          `json:"graceful"`             // wait for in-flight sessions to finish cleanly
    Timeout  time.Duration `json:"timeout_ns,omitempty"` // overrides default if >0
}

type StreamEvent

StreamEvent is one frame emitted during a streaming command (e.g. attach).

type StreamEvent struct {
    Event json.RawMessage `json:"event"`
}

type WorkerSummary

WorkerSummary is the runtime-facing status for one in-flight worker.

type WorkerSummary struct {
    PlanID            string `json:"plan_id"`
    TaskID            string `json:"task_id"`
    Variant           string `json:"variant"`
    Provider          string `json:"provider,omitempty"`
    ProviderSessionID string `json:"provider_session_id,omitempty"`
    WorktreePath      string `json:"worktree_path,omitempty"`
}

Generated by gomarkdoc