ipc¶
import "github.com/jbcom/radioactive-ralph/internal/ipc"
Package ipc is radioactive-ralph’s supervisor-client IPC layer.
The supervisor listens on a Unix domain socket under its workspace’s sessions/ directory. `radioactive_ralph status`, `radioactive_ralph attach`, `radioactive_ralph enqueue`, `radioactive_ralph stop`, and `radioactive_ralph reload-config` connect to the same socket and exchange newline-delimited JSON messages.
Heartbeat liveness is signalled via the supervisor touching an `.alive` file every few seconds. `radioactive_ralph status` checks the file’s mtime before even attempting a socket connect — if the supervisor crashed and left a stale socket, we want to surface the dead-daemon state cleanly rather than hang on a connection attempt.
Wire protocol:
Request: {"cmd": "<verb>", "args": {...}}\n
Response: {"ok": true|false, "data": ..., "error": "..."}\n
For commands that stream (attach), the server sends N >= 0 frames of {“event”: {…}}\n followed by a terminating {“ok”: true}\n.
Index¶
func Dial(socketPath string, timeout time.Duration) (*Client, error)
func SocketAlive(heartbeatPath string, maxAge time.Duration) bool
Constants¶
Command names for the JSON-line protocol.
const (
CmdStatus = "status"
CmdAttach = "attach"
CmdEnqueue = "enqueue"
CmdStop = "stop"
CmdReloadConfig = "reload-config"
)
Variables¶
ErrClosed is a sentinel value; use errors.Is to match.
var ErrClosed error = closedError{}
func Dial¶
func Dial(socketPath string, timeout time.Duration) (*Client, error)
Dial connects to the supervisor at socketPath with the given timeout. Typical usage:
c, err := ipc.Dial(socketPath, 3*time.Second)
if err != nil { ... }
defer c.Close()
status, err := c.Status(ctx)
func NewServer¶
func NewServer(opts ServerOptions) (*Server, error)
NewServer constructs a Server. It does NOT bind the socket — call Start to begin accepting connections.
func SocketAlive¶
func SocketAlive(heartbeatPath string, maxAge time.Duration) bool
SocketAlive reports whether the heartbeat file at path was touched within maxAge. Clients (`radioactive_ralph status`) call this before attempting a socket connection so they can distinguish “supervisor dead” from “supervisor slow to respond.”
type Client¶
Client wraps a single Unix-socket connection to a Ralph supervisor. Clients are short-lived: construct, send one command, read reply, close. For streaming commands (attach), the client instance stays open until the server closes the stream.
type Client struct {
// contains filtered or unexported fields
}
func (*Client) Attach¶
func (c *Client) Attach(ctx context.Context, fn func(json.RawMessage) error) error
Attach issues an attach request and streams event frames through fn until the supervisor closes the stream or ctx is cancelled. The returned error is nil for a clean end-of-stream.
func (*Client) Close¶
func (c *Client) Close() error
Close terminates the connection.
func (*Client) Enqueue¶
func (c *Client) Enqueue(ctx context.Context, args EnqueueArgs) (EnqueueReply, error)
Enqueue pushes a task. Returns the resulting task ID (possibly a dedup hit from FTS) and whether the task was freshly inserted.
func (*Client) ReloadConfig¶
func (c *Client) ReloadConfig(ctx context.Context) error
ReloadConfig asks the supervisor to re-read config.toml.
func (*Client) Status¶
func (c *Client) Status(ctx context.Context) (StatusReply, error)
Status issues a status request and returns the parsed StatusReply.
func (*Client) Stop¶
func (c *Client) Stop(ctx context.Context, args StopArgs) error
Stop issues a stop request. The server closes the socket after replying; expect the returned error to be ErrClosed on the next call.
type EnqueueArgs¶
EnqueueArgs is the client’s payload when pushing work via CmdEnqueue.
type EnqueueArgs struct {
TaskID string `json:"task_id"` // optional; supervisor generates UUID if empty
Description string `json:"description"`
Priority int `json:"priority,omitempty"`
}
type EnqueueReply¶
EnqueueReply tells the client whether a new task was created or a duplicate was collapsed (via FTS dedup in the db layer).
type EnqueueReply struct {
TaskID string `json:"task_id"`
Inserted bool `json:"inserted"` // false means FTS found a duplicate
}
type Handler¶
Handler handles a single client request. Attach streams events by calling emit repeatedly; other commands return (reply, nil) and the server transmits a single Response frame.
type Handler interface {
// HandleStatus returns the current supervisor status.
HandleStatus(ctx context.Context) (StatusReply, error)
// HandleEnqueue queues a new task, returning the task ID and whether
// it was a fresh insert or a dedup hit.
HandleEnqueue(ctx context.Context, args EnqueueArgs) (EnqueueReply, error)
// HandleStop signals the supervisor to shut down. The server closes
// the IPC socket after sending the response.
HandleStop(ctx context.Context, args StopArgs) error
// HandleReloadConfig asks the supervisor to re-read config.toml.
HandleReloadConfig(ctx context.Context) error
// HandleAttach streams events to the client until either the
// supervisor exits or the client disconnects. The implementation
// should return when ctx is cancelled.
HandleAttach(ctx context.Context, emit func(json.RawMessage) error) error
}
type Request¶
Request is a single command from a client to the supervisor.
type Request struct {
Cmd string `json:"cmd"`
Args json.RawMessage `json:"args,omitempty"`
}
type Response¶
Response is the single-shot reply shape. For streaming commands the server sends multiple Event frames followed by a final Response with Ok=true; mid-stream errors send a Response with Ok=false.
type Response struct {
Ok bool `json:"ok"`
Data json.RawMessage `json:"data,omitempty"`
Error string `json:"error,omitempty"`
}
type Server¶
Server is the Unix-socket IPC server. One instance per supervisor process (one per variant per repo).
type Server struct {
// contains filtered or unexported fields
}
func (*Server) Start¶
func (s *Server) Start(heartbeatInterval time.Duration) error
Start binds the socket and begins accepting connections in a background goroutine. Safe to call once. Returns the listener error if bind fails.
func (*Server) Stop¶
func (s *Server) Stop() error
Stop shuts the server down and waits for goroutines to exit.
type ServerOptions¶
ServerOptions configures a Server.
type ServerOptions struct {
// SocketPath is the path to bind. Typically workspace/sessions/<variant>.sock.
SocketPath string
// HeartbeatPath is the file whose mtime is bumped every
// HeartbeatInterval by the server. Typically workspace/sessions/<variant>.alive.
HeartbeatPath string
// HeartbeatInterval controls how often we refresh the heartbeat.
// Defaults to 10s when zero.
HeartbeatInterval time.Duration
// Handler satisfies the IPC Handler interface.
Handler Handler
// Logger receives info/warn/error messages. Defaults to a no-op
// logger when nil.
Logger *slog.Logger
}
type StatusReply¶
StatusReply is the data payload for CmdStatus responses. Keeping this a plain struct rather than map[string]any makes versioning/migration explicit — callers know at compile-time what fields to expect.
type StatusReply struct {
Variant string `json:"variant"`
PID int `json:"pid"`
Uptime time.Duration `json:"uptime_ns"`
ActiveSessions int `json:"active_sessions"`
QueuedTasks int `json:"queued_tasks"`
RunningTasks int `json:"running_tasks"`
LastEventAt time.Time `json:"last_event_at,omitempty"`
HeartbeatAge time.Duration `json:"heartbeat_age_ns,omitempty"`
}
type StopArgs¶
StopArgs controls the termination mode for CmdStop.
type StopArgs struct {
Graceful bool `json:"graceful"` // wait for in-flight sessions to finish cleanly
Timeout time.Duration `json:"timeout_ns,omitempty"` // overrides default if >0
}
type StreamEvent¶
StreamEvent is one frame emitted during a streaming command (e.g. attach).
type StreamEvent struct {
Event json.RawMessage `json:"event"`
}
Generated by gomarkdoc