feat(02-01): add Unix socket server, poll loop, and stop handler

- Daemon struct with Start/Stop/Wait lifecycle
- Unix socket server handling list/label/stop actions
- Poll loop scanning /proc every 5s
- Stale socket cleanup before listen
- Connection dispatch with JSON encoding
- Tests with -race: StartStop, ListOverSocket, LabelOverSocket, UnknownAction
This commit is contained in:
Pierre Martin
2026-03-23 17:45:57 +01:00
parent 9427dd3eda
commit 4b142a79b5
2 changed files with 368 additions and 3 deletions

219
daemon.go
View File

@@ -2,6 +2,8 @@ package main
import (
"encoding/json"
"fmt"
"net"
"os"
"path/filepath"
"sync"
@@ -139,3 +141,220 @@ func (ls *LabelStore) save() error {
return os.WriteFile(ls.path, data, 0o644)
}
// Daemon is the long-running vmuxd process. It polls /proc for Claude sessions,
// maintains the registry, and serves requests over a Unix socket.
type Daemon struct {
registry *SessionRegistry
labels *LabelStore
sockPath string
procDir string
claudeDir string
workspaceResolver func(claudePID int) string // nil = no workspace resolution
pollInterval time.Duration
stopCh chan struct{}
listener net.Listener
}
// NewDaemon creates a daemon ready to start.
func NewDaemon(sockPath, procDir, claudeDir string, labels *LabelStore) *Daemon {
return &Daemon{
registry: NewRegistry(),
labels: labels,
sockPath: sockPath,
procDir: procDir,
claudeDir: claudeDir,
pollInterval: 5 * time.Second,
stopCh: make(chan struct{}),
}
}
// Start runs the daemon: initial scan, then listens on the Unix socket
// and polls for sessions in the background.
func (d *Daemon) Start() error {
// Synchronous initial scan before accepting connections
d.scanOnce(time.Now())
if err := d.cleanStaleSocket(); err != nil {
return fmt.Errorf("clean stale socket: %w", err)
}
ln, err := net.Listen("unix", d.sockPath)
if err != nil {
return fmt.Errorf("listen %s: %w", d.sockPath, err)
}
d.listener = ln
// Poll loop in background
go d.pollLoop()
// Accept loop
go d.acceptLoop()
return nil
}
// Stop shuts down the daemon gracefully.
func (d *Daemon) Stop() {
select {
case <-d.stopCh:
return // already stopped
default:
close(d.stopCh)
}
if d.listener != nil {
d.listener.Close()
}
}
// Wait blocks until the daemon stops.
func (d *Daemon) Wait() {
<-d.stopCh
}
func (d *Daemon) acceptLoop() {
for {
conn, err := d.listener.Accept()
if err != nil {
select {
case <-d.stopCh:
return
default:
continue
}
}
go d.handleConnection(conn)
}
}
func (d *Daemon) pollLoop() {
ticker := time.NewTicker(d.pollInterval)
defer ticker.Stop()
for {
select {
case <-d.stopCh:
return
case t := <-ticker.C:
d.scanOnce(t)
}
}
}
func (d *Daemon) scanOnce(now time.Time) {
procs, err := FindClaudeProcesses(d.procDir)
if err != nil {
return
}
activeIDs := make(map[string]bool)
for _, proc := range procs {
_, messages, err := FindSessionForProcess(d.claudeDir, proc)
if err != nil {
continue
}
state := DetectState(messages, now)
preview := ExtractPreview(messages)
var sessionID, gitBranch string
for _, msg := range messages {
if msg.SessionID != "" {
sessionID = msg.SessionID
}
if msg.GitBranch != "" {
gitBranch = msg.GitBranch
}
}
if sessionID == "" {
continue
}
workspace := ""
if d.workspaceResolver != nil {
workspace = d.workspaceResolver(proc.PID)
}
label := d.labels.Get(sessionID)
info := SessionInfo{
PID: proc.PID,
SessionID: sessionID,
Cwd: proc.Cwd,
GitBranch: gitBranch,
State: state.String(),
Preview: preview,
Workspace: workspace,
Label: label,
}
d.registry.Update(info)
activeIDs[sessionID] = true
}
d.registry.RemoveStale(activeIDs)
}
func (d *Daemon) handleConnection(conn net.Conn) {
defer conn.Close()
var req Request
if err := json.NewDecoder(conn).Decode(&req); err != nil {
writeResponse(conn, Response{Error: "invalid request: " + err.Error()})
return
}
switch req.Action {
case "list":
sessions := d.registry.List()
writeResponse(conn, Response{OK: true, Sessions: sessions})
case "label":
var args LabelArgs
if err := json.Unmarshal(req.Args, &args); err != nil {
writeResponse(conn, Response{Error: "invalid label args: " + err.Error()})
return
}
if err := d.labels.Set(args.SessionID, args.Label); err != nil {
writeResponse(conn, Response{Error: "set label: " + err.Error()})
return
}
// Update registry with new label
d.registry.mu.Lock()
if ts, ok := d.registry.sessions[args.SessionID]; ok {
ts.Info.Label = args.Label
}
d.registry.mu.Unlock()
writeResponse(conn, Response{OK: true})
case "stop":
writeResponse(conn, Response{OK: true})
d.Stop()
default:
writeResponse(conn, Response{Error: "unknown action: " + req.Action})
}
}
func writeResponse(conn net.Conn, resp Response) {
json.NewEncoder(conn).Encode(resp)
}
// cleanStaleSocket removes a leftover socket file if no process is listening on it.
func (d *Daemon) cleanStaleSocket() error {
if _, err := os.Stat(d.sockPath); os.IsNotExist(err) {
return nil
}
// Try connecting to check if another daemon is running
conn, err := net.DialTimeout("unix", d.sockPath, 500*time.Millisecond)
if err == nil {
conn.Close()
return fmt.Errorf("another daemon is already listening on %s", d.sockPath)
}
// Stale socket, remove it
return os.Remove(d.sockPath)
}