ipc¶
import "github.com/jbcom/radioactive-ralph/internal/ipc"
Package ipc is radioactive-ralph’s repo-service IPC layer.
The repo service listens on a local control-plane endpoint under the repo’s state directory: a Unix domain socket on macOS/Linux and a named pipe on Windows. `radioactive_ralph status`, `radioactive_ralph attach`, `radioactive_ralph stop`, and internal control-path clients exchange newline-delimited JSON messages over the same transport.
Heartbeat liveness is signalled via the repo service touching an `.alive` file every few seconds. `radioactive_ralph status` checks the file’s mtime before even attempting a socket connect — if the service crashed and left a stale socket, we want to surface the dead-service state cleanly rather than hang on a connection attempt.
Wire protocol:
Request: {"cmd": "<verb>", "args": {...}}\n
Response: {"ok": true|false, "data": ..., "error": "..."}\n
For commands that stream (attach), the server sends N >= 0 frames of {“event”: {…}}\n followed by a terminating {“ok”: true}\n.
Index¶
func Dial(socketPath string, timeout time.Duration) (*Client, error)
func ServiceEndpoint(sessionsDir string) (endpoint, heartbeat string)
func SocketAlive(heartbeatPath string, maxAge time.Duration) bool
Constants¶
Command names for the JSON-line protocol.
const (
CmdStatus = "status"
CmdAttach = "attach"
CmdEnqueue = "enqueue"
CmdStop = "stop"
CmdReloadConfig = "reload-config"
)
Variables¶
ErrClosed is a sentinel value; use errors.Is to match.
var ErrClosed error = closedError{}
func Dial¶
func Dial(socketPath string, timeout time.Duration) (*Client, error)
Dial connects to the repo service at socketPath with the given timeout. Typical usage:
c, err := ipc.Dial(socketPath, 3*time.Second)
if err != nil { ... }
defer c.Close()
status, err := c.Status(ctx)
func NewServer¶
func NewServer(opts ServerOptions) (*Server, error)
NewServer constructs a Server. It does NOT bind the socket — call Start to begin accepting connections.
func ServiceEndpoint¶
func ServiceEndpoint(sessionsDir string) (endpoint, heartbeat string)
ServiceEndpoint returns the local control-plane endpoint plus its heartbeat file for one repo workspace.
func SocketAlive¶
func SocketAlive(heartbeatPath string, maxAge time.Duration) bool
SocketAlive reports whether the heartbeat file at path was touched within maxAge. Clients (`radioactive_ralph status`) call this before attempting a socket connection so they can distinguish “service dead” from “service slow to respond.”
type Client¶
Client wraps a single Unix-socket connection to a Ralph repo service. Clients are short-lived: construct, send one command, read reply, close. For streaming commands (attach), the client instance stays open until the server closes the stream.
type Client struct {
// contains filtered or unexported fields
}
func (*Client) Attach¶
func (c *Client) Attach(ctx context.Context, fn func(json.RawMessage) error) error
Attach issues an attach request and streams event frames through fn until the repo service closes the stream or ctx is cancelled. The returned error is nil for a clean end-of-stream.
func (*Client) Close¶
func (c *Client) Close() error
Close terminates the connection.
func (*Client) Enqueue¶
func (c *Client) Enqueue(ctx context.Context, args EnqueueArgs) (EnqueueReply, error)
Enqueue pushes a task. Returns the resulting task ID (possibly a dedup hit from FTS) and whether the task was freshly inserted.
func (*Client) ReloadConfig¶
func (c *Client) ReloadConfig(ctx context.Context) error
ReloadConfig asks the repo service to re-read config.toml.
func (*Client) Status¶
func (c *Client) Status(ctx context.Context) (StatusReply, error)
Status issues a status request and returns the parsed StatusReply.
func (*Client) Stop¶
func (c *Client) Stop(ctx context.Context, args StopArgs) error
Stop issues a stop request. The server closes the socket after replying; expect the returned error to be ErrClosed on the next call.
type EnqueueArgs¶
EnqueueArgs is the client’s payload when pushing work via CmdEnqueue.
type EnqueueArgs struct {
TaskID string `json:"task_id"` // optional; service generates UUID if empty
Description string `json:"description"`
Priority int `json:"priority,omitempty"`
}
type EnqueueReply¶
EnqueueReply tells the client whether a new task was created or a duplicate was collapsed (via FTS dedup in the db layer).
type EnqueueReply struct {
TaskID string `json:"task_id"`
Inserted bool `json:"inserted"` // false means FTS found a duplicate
}
type Handler¶
Handler handles a single client request. Attach streams events by calling emit repeatedly; other commands return (reply, nil) and the server transmits a single Response frame.
type Handler interface {
// HandleStatus returns the current repo-service status.
HandleStatus(ctx context.Context) (StatusReply, error)
// HandleEnqueue queues a new task, returning the task ID and whether
// it was a fresh insert or a dedup hit.
HandleEnqueue(ctx context.Context, args EnqueueArgs) (EnqueueReply, error)
// HandleStop signals the repo service to shut down. The server closes
// the IPC socket after sending the response.
HandleStop(ctx context.Context, args StopArgs) error
// HandleReloadConfig asks the repo service to re-read config.toml.
HandleReloadConfig(ctx context.Context) error
// HandleAttach streams events to the client until either the
// service exits or the client disconnects. The implementation
// should return when ctx is cancelled.
HandleAttach(ctx context.Context, emit func(json.RawMessage) error) error
}
type Request¶
Request is a single command from a client to the repo service.
type Request struct {
Cmd string `json:"cmd"`
Args json.RawMessage `json:"args,omitempty"`
}
type Response¶
Response is the single-shot reply shape. For streaming commands the server sends multiple Event frames followed by a final Response with Ok=true; mid-stream errors send a Response with Ok=false.
type Response struct {
Ok bool `json:"ok"`
Data json.RawMessage `json:"data,omitempty"`
Error string `json:"error,omitempty"`
}
type Server¶
Server is the repo-service IPC server. One instance per repo service.
type Server struct {
// contains filtered or unexported fields
}
func (*Server) Start¶
func (s *Server) Start(heartbeatInterval time.Duration) error
Start binds the socket and begins accepting connections in a background goroutine. Safe to call once. Returns the listener error if bind fails.
func (*Server) Stop¶
func (s *Server) Stop() error
Stop shuts the server down and waits for goroutines to exit.
type ServerOptions¶
ServerOptions configures a Server.
type ServerOptions struct {
// SocketPath is the local endpoint path to bind. Typically
// state/<repo>/sessions/service.sock on POSIX hosts.
SocketPath string
// HeartbeatPath is the file whose mtime is bumped every
// HeartbeatInterval by the server. Typically the repo-service
// heartbeat file next to the endpoint metadata.
HeartbeatPath string
// HeartbeatInterval controls how often we refresh the heartbeat.
// Defaults to 10s when zero.
HeartbeatInterval time.Duration
// Handler satisfies the IPC Handler interface.
Handler Handler
// Logger receives info/warn/error messages. Defaults to a no-op
// logger when nil.
Logger *slog.Logger
}
type StatusReply¶
StatusReply is the data payload for CmdStatus responses.
type StatusReply struct {
RepoPath string `json:"repo_path"`
PID int `json:"pid"`
Uptime time.Duration `json:"uptime_ns"`
ActiveWorkers int `json:"active_workers"`
ReadyTasks int `json:"ready_tasks"`
ApprovalTasks int `json:"approval_tasks"`
BlockedTasks int `json:"blocked_tasks"`
RunningTasks int `json:"running_tasks"`
FailedTasks int `json:"failed_tasks"`
ActivePlans int `json:"active_plans"`
Workers []WorkerSummary `json:"workers,omitempty"`
LastEventAt time.Time `json:"last_event_at,omitempty"`
HeartbeatAge time.Duration `json:"heartbeat_age_ns,omitempty"`
}
type StopArgs¶
StopArgs controls the termination mode for CmdStop.
type StopArgs struct {
Graceful bool `json:"graceful"` // wait for in-flight sessions to finish cleanly
Timeout time.Duration `json:"timeout_ns,omitempty"` // overrides default if >0
}
type StreamEvent¶
StreamEvent is one frame emitted during a streaming command (e.g. attach).
type StreamEvent struct {
Event json.RawMessage `json:"event"`
}
type WorkerSummary¶
WorkerSummary is the runtime-facing status for one in-flight worker.
type WorkerSummary struct {
PlanID string `json:"plan_id"`
TaskID string `json:"task_id"`
Variant string `json:"variant"`
Provider string `json:"provider,omitempty"`
ProviderSessionID string `json:"provider_session_id,omitempty"`
WorktreePath string `json:"worktree_path,omitempty"`
}
Generated by gomarkdoc