--- title: internal/ipc description: Go API reference for the ipc package. --- # ipc ```go import "github.com/jbcom/radioactive-ralph/internal/ipc" ``` Package ipc is radioactive\-ralph's supervisor\-client IPC layer. The supervisor listens on a Unix domain socket under its workspace's sessions/ directory. \`radioactive\_ralph status\`, \`radioactive\_ralph attach\`, \`radioactive\_ralph enqueue\`, \`radioactive\_ralph stop\`, and \`radioactive\_ralph reload\-config\` connect to the same socket and exchange newline\-delimited JSON messages. Heartbeat liveness is signalled via the supervisor touching an \`.alive\` file every few seconds. \`radioactive\_ralph status\` checks the file's mtime before even attempting a socket connect — if the supervisor crashed and left a stale socket, we want to surface the dead\-daemon state cleanly rather than hang on a connection attempt. Wire protocol: ``` Request: {"cmd": "", "args": {...}}\n Response: {"ok": true|false, "data": ..., "error": "..."}\n ``` For commands that stream \(attach\), the server sends N \>= 0 frames of \{"event": \{...\}\}\\n followed by a terminating \{"ok": true\}\\n. ## Index - [Constants](<#constants>) - [Variables](<#variables>) - [func Dial\(socketPath string, timeout time.Duration\) \(\*Client, error\)](<#Dial>) - [func NewServer\(opts ServerOptions\) \(\*Server, error\)](<#NewServer>) - [func SocketAlive\(heartbeatPath string, maxAge time.Duration\) bool](<#SocketAlive>) - [type Client](<#Client>) - [func \(c \*Client\) Attach\(ctx context.Context, fn func\(json.RawMessage\) error\) error](<#Client.Attach>) - [func \(c \*Client\) Close\(\) error](<#Client.Close>) - [func \(c \*Client\) Enqueue\(ctx context.Context, args EnqueueArgs\) \(EnqueueReply, error\)](<#Client.Enqueue>) - [func \(c \*Client\) ReloadConfig\(ctx context.Context\) error](<#Client.ReloadConfig>) - [func \(c \*Client\) Status\(ctx context.Context\) \(StatusReply, error\)](<#Client.Status>) - [func \(c \*Client\) Stop\(ctx context.Context, args StopArgs\) error](<#Client.Stop>) - [type EnqueueArgs](<#EnqueueArgs>) - [type EnqueueReply](<#EnqueueReply>) - [type Handler](<#Handler>) - [type Request](<#Request>) - [type Response](<#Response>) - [type Server](<#Server>) - [func \(s \*Server\) Start\(heartbeatInterval time.Duration\) error](<#Server.Start>) - [func \(s \*Server\) Stop\(\) error](<#Server.Stop>) - [type ServerOptions](<#ServerOptions>) - [type StatusReply](<#StatusReply>) - [type StopArgs](<#StopArgs>) - [type StreamEvent](<#StreamEvent>) ## Constants Command names for the JSON\-line protocol. ```go const ( CmdStatus = "status" CmdAttach = "attach" CmdEnqueue = "enqueue" CmdStop = "stop" CmdReloadConfig = "reload-config" ) ``` ## Variables ErrClosed is a sentinel value; use errors.Is to match. ```go var ErrClosed error = closedError{} ``` ## func [Dial]() ```go func Dial(socketPath string, timeout time.Duration) (*Client, error) ``` Dial connects to the supervisor at socketPath with the given timeout. Typical usage: ``` c, err := ipc.Dial(socketPath, 3*time.Second) if err != nil { ... } defer c.Close() status, err := c.Status(ctx) ``` ## func [NewServer]() ```go func NewServer(opts ServerOptions) (*Server, error) ``` NewServer constructs a Server. It does NOT bind the socket — call Start to begin accepting connections. ## func [SocketAlive]() ```go func SocketAlive(heartbeatPath string, maxAge time.Duration) bool ``` SocketAlive reports whether the heartbeat file at path was touched within maxAge. Clients \(\`radioactive\_ralph status\`\) call this before attempting a socket connection so they can distinguish "supervisor dead" from "supervisor slow to respond." ## type [Client]() Client wraps a single Unix\-socket connection to a Ralph supervisor. Clients are short\-lived: construct, send one command, read reply, close. For streaming commands \(attach\), the client instance stays open until the server closes the stream. ```go type Client struct { // contains filtered or unexported fields } ``` ### func \(\*Client\) [Attach]() ```go func (c *Client) Attach(ctx context.Context, fn func(json.RawMessage) error) error ``` Attach issues an attach request and streams event frames through fn until the supervisor closes the stream or ctx is cancelled. The returned error is nil for a clean end\-of\-stream. ### func \(\*Client\) [Close]() ```go func (c *Client) Close() error ``` Close terminates the connection. ### func \(\*Client\) [Enqueue]() ```go func (c *Client) Enqueue(ctx context.Context, args EnqueueArgs) (EnqueueReply, error) ``` Enqueue pushes a task. Returns the resulting task ID \(possibly a dedup hit from FTS\) and whether the task was freshly inserted. ### func \(\*Client\) [ReloadConfig]() ```go func (c *Client) ReloadConfig(ctx context.Context) error ``` ReloadConfig asks the supervisor to re\-read config.toml. ### func \(\*Client\) [Status]() ```go func (c *Client) Status(ctx context.Context) (StatusReply, error) ``` Status issues a status request and returns the parsed StatusReply. ### func \(\*Client\) [Stop]() ```go func (c *Client) Stop(ctx context.Context, args StopArgs) error ``` Stop issues a stop request. The server closes the socket after replying; expect the returned error to be ErrClosed on the next call. ## type [EnqueueArgs]() EnqueueArgs is the client's payload when pushing work via CmdEnqueue. ```go type EnqueueArgs struct { TaskID string `json:"task_id"` // optional; supervisor generates UUID if empty Description string `json:"description"` Priority int `json:"priority,omitempty"` } ``` ## type [EnqueueReply]() EnqueueReply tells the client whether a new task was created or a duplicate was collapsed \(via FTS dedup in the db layer\). ```go type EnqueueReply struct { TaskID string `json:"task_id"` Inserted bool `json:"inserted"` // false means FTS found a duplicate } ``` ## type [Handler]() Handler handles a single client request. Attach streams events by calling emit repeatedly; other commands return \(reply, nil\) and the server transmits a single Response frame. ```go type Handler interface { // HandleStatus returns the current supervisor status. HandleStatus(ctx context.Context) (StatusReply, error) // HandleEnqueue queues a new task, returning the task ID and whether // it was a fresh insert or a dedup hit. HandleEnqueue(ctx context.Context, args EnqueueArgs) (EnqueueReply, error) // HandleStop signals the supervisor to shut down. The server closes // the IPC socket after sending the response. HandleStop(ctx context.Context, args StopArgs) error // HandleReloadConfig asks the supervisor to re-read config.toml. HandleReloadConfig(ctx context.Context) error // HandleAttach streams events to the client until either the // supervisor exits or the client disconnects. The implementation // should return when ctx is cancelled. HandleAttach(ctx context.Context, emit func(json.RawMessage) error) error } ``` ## type [Request]() Request is a single command from a client to the supervisor. ```go type Request struct { Cmd string `json:"cmd"` Args json.RawMessage `json:"args,omitempty"` } ``` ## type [Response]() Response is the single\-shot reply shape. For streaming commands the server sends multiple Event frames followed by a final Response with Ok=true; mid\-stream errors send a Response with Ok=false. ```go type Response struct { Ok bool `json:"ok"` Data json.RawMessage `json:"data,omitempty"` Error string `json:"error,omitempty"` } ``` ## type [Server]() Server is the Unix\-socket IPC server. One instance per supervisor process \(one per variant per repo\). ```go type Server struct { // contains filtered or unexported fields } ``` ### func \(\*Server\) [Start]() ```go func (s *Server) Start(heartbeatInterval time.Duration) error ``` Start binds the socket and begins accepting connections in a background goroutine. Safe to call once. Returns the listener error if bind fails. ### func \(\*Server\) [Stop]() ```go func (s *Server) Stop() error ``` Stop shuts the server down and waits for goroutines to exit. ## type [ServerOptions]() ServerOptions configures a Server. ```go type ServerOptions struct { // SocketPath is the path to bind. Typically workspace/sessions/.sock. SocketPath string // HeartbeatPath is the file whose mtime is bumped every // HeartbeatInterval by the server. Typically workspace/sessions/.alive. HeartbeatPath string // HeartbeatInterval controls how often we refresh the heartbeat. // Defaults to 10s when zero. HeartbeatInterval time.Duration // Handler satisfies the IPC Handler interface. Handler Handler // Logger receives info/warn/error messages. Defaults to a no-op // logger when nil. Logger *slog.Logger } ``` ## type [StatusReply]() StatusReply is the data payload for CmdStatus responses. Keeping this a plain struct rather than map\[string\]any makes versioning/migration explicit — callers know at compile\-time what fields to expect. ```go type StatusReply struct { Variant string `json:"variant"` PID int `json:"pid"` Uptime time.Duration `json:"uptime_ns"` ActiveSessions int `json:"active_sessions"` QueuedTasks int `json:"queued_tasks"` RunningTasks int `json:"running_tasks"` LastEventAt time.Time `json:"last_event_at,omitempty"` HeartbeatAge time.Duration `json:"heartbeat_age_ns,omitempty"` } ``` ## type [StopArgs]() StopArgs controls the termination mode for CmdStop. ```go type StopArgs struct { Graceful bool `json:"graceful"` // wait for in-flight sessions to finish cleanly Timeout time.Duration `json:"timeout_ns,omitempty"` // overrides default if >0 } ``` ## type [StreamEvent]() StreamEvent is one frame emitted during a streaming command \(e.g. attach\). ```go type StreamEvent struct { Event json.RawMessage `json:"event"` } ``` Generated by [gomarkdoc]()