Files
vmux/daemon.go
Pierre Martin a79a0e154c feat(02-03): client socket, autostart daemon, switch handler, workspace wiring
- Client struct sends JSON requests to daemon over Unix socket
- EnsureDaemon auto-starts the daemon if not running (retry 50ms x 20)
- Switch handler uses FuzzyMatch + SwitchToWorkspace via i3 IPC
- InitWorkspaceResolver wires BuildTerminalWorkspaceMap + ResolveWorkspace
- sysattr_linux.go for Setsid detach on daemon spawn
2026-03-23 17:51:31 +01:00

403 lines
9.2 KiB
Go

package main
import (
"encoding/json"
"fmt"
"net"
"os"
"path/filepath"
"sync"
"time"
)
// TrackedSession holds a session's info alongside state-transition tracking.
type TrackedSession struct {
Info SessionInfo
PrevState string
}
// SessionRegistry maintains an in-memory index of active sessions.
// It tracks WaitingSince transitions: when a session moves to "Needs Input",
// the timestamp is recorded; when it leaves that state, it is cleared.
type SessionRegistry struct {
mu sync.RWMutex
sessions map[string]*TrackedSession
}
func NewRegistry() *SessionRegistry {
return &SessionRegistry{
sessions: make(map[string]*TrackedSession),
}
}
// Update adds or refreshes a session in the registry.
// WaitingSince is set when transitioning to "Needs Input" and cleared otherwise.
func (r *SessionRegistry) Update(info SessionInfo) {
r.mu.Lock()
defer r.mu.Unlock()
existing, ok := r.sessions[info.SessionID]
if !ok {
existing = &TrackedSession{}
r.sessions[info.SessionID] = existing
}
isWaiting := info.State == "Needs Input"
wasWaiting := existing.PrevState == "Needs Input"
if isWaiting && !wasWaiting {
now := time.Now()
info.WaitingSince = &now
} else if isWaiting && wasWaiting {
// Keep existing timestamp
info.WaitingSince = existing.Info.WaitingSince
}
// If not waiting, WaitingSince stays nil (default)
existing.Info = info
existing.PrevState = info.State
}
// List returns a snapshot of all tracked sessions.
func (r *SessionRegistry) List() []SessionInfo {
r.mu.RLock()
defer r.mu.RUnlock()
result := make([]SessionInfo, 0, len(r.sessions))
for _, ts := range r.sessions {
result = append(result, ts.Info)
}
return result
}
// RemoveStale removes sessions whose SessionID is not in activeIDs.
func (r *SessionRegistry) RemoveStale(activeIDs map[string]bool) {
r.mu.Lock()
defer r.mu.Unlock()
for id := range r.sessions {
if !activeIDs[id] {
delete(r.sessions, id)
}
}
}
// LabelStore persists user-assigned labels in a JSON file.
type LabelStore struct {
mu sync.RWMutex
labels map[string]string
path string
}
// NewLabelStore loads labels from path. Returns an empty store if the file does not exist.
func NewLabelStore(path string) (*LabelStore, error) {
ls := &LabelStore{
labels: make(map[string]string),
path: path,
}
data, err := os.ReadFile(path)
if err != nil {
if os.IsNotExist(err) {
return ls, nil
}
return nil, err
}
if err := json.Unmarshal(data, &ls.labels); err != nil {
return nil, err
}
return ls, nil
}
// Set assigns a label to a session and persists to disk.
func (ls *LabelStore) Set(sessionID, label string) error {
ls.mu.Lock()
defer ls.mu.Unlock()
ls.labels[sessionID] = label
return ls.save()
}
// Get returns the label for a session, or empty string if not found.
func (ls *LabelStore) Get(sessionID string) string {
ls.mu.RLock()
defer ls.mu.RUnlock()
return ls.labels[sessionID]
}
func (ls *LabelStore) save() error {
data, err := json.MarshalIndent(ls.labels, "", " ")
if err != nil {
return err
}
dir := filepath.Dir(ls.path)
if err := os.MkdirAll(dir, 0o755); err != nil {
return err
}
return os.WriteFile(ls.path, data, 0o644)
}
// Daemon is the long-running vmuxd process. It polls /proc for Claude sessions,
// maintains the registry, and serves requests over a Unix socket.
type Daemon struct {
registry *SessionRegistry
labels *LabelStore
sockPath string
procDir string
claudeDir string
workspaceResolver func(claudePID int) string // nil = no workspace resolution
i3commander I3Commander
pollInterval time.Duration
stopCh chan struct{}
listener net.Listener
}
// NewDaemon creates a daemon ready to start.
func NewDaemon(sockPath, procDir, claudeDir string, labels *LabelStore) *Daemon {
return &Daemon{
registry: NewRegistry(),
labels: labels,
sockPath: sockPath,
procDir: procDir,
claudeDir: claudeDir,
pollInterval: 5 * time.Second,
stopCh: make(chan struct{}),
}
}
// InitWorkspaceResolver sets up i3+X11 workspace resolution.
// If i3 or X11 is unavailable, logs a warning and continues without workspace info.
func (d *Daemon) InitWorkspaceResolver(treeProvider I3TreeProvider, x11 X11PIDResolver) {
if treeProvider == nil || x11 == nil {
return
}
d.workspaceResolver = func(claudePID int) string {
termMap, err := BuildTerminalWorkspaceMap(treeProvider, x11)
if err != nil {
return ""
}
return ResolveWorkspace(d.procDir, claudePID, termMap)
}
}
// Start runs the daemon: initial scan, then listens on the Unix socket
// and polls for sessions in the background.
func (d *Daemon) Start() error {
// Synchronous initial scan before accepting connections
d.scanOnce(time.Now())
if err := d.cleanStaleSocket(); err != nil {
return fmt.Errorf("clean stale socket: %w", err)
}
ln, err := net.Listen("unix", d.sockPath)
if err != nil {
return fmt.Errorf("listen %s: %w", d.sockPath, err)
}
d.listener = ln
// Poll loop in background
go d.pollLoop()
// Accept loop
go d.acceptLoop()
return nil
}
// Stop shuts down the daemon gracefully.
func (d *Daemon) Stop() {
select {
case <-d.stopCh:
return // already stopped
default:
close(d.stopCh)
}
if d.listener != nil {
d.listener.Close()
}
}
// Wait blocks until the daemon stops.
func (d *Daemon) Wait() {
<-d.stopCh
}
func (d *Daemon) acceptLoop() {
for {
conn, err := d.listener.Accept()
if err != nil {
select {
case <-d.stopCh:
return
default:
continue
}
}
go d.handleConnection(conn)
}
}
func (d *Daemon) pollLoop() {
ticker := time.NewTicker(d.pollInterval)
defer ticker.Stop()
for {
select {
case <-d.stopCh:
return
case t := <-ticker.C:
d.scanOnce(t)
}
}
}
func (d *Daemon) scanOnce(now time.Time) {
procs, err := FindClaudeProcesses(d.procDir)
if err != nil {
return
}
activeIDs := make(map[string]bool)
for _, proc := range procs {
_, messages, err := FindSessionForProcess(d.claudeDir, proc)
if err != nil {
continue
}
state := DetectState(messages, now)
preview := ExtractPreview(messages)
var sessionID, gitBranch string
for _, msg := range messages {
if msg.SessionID != "" {
sessionID = msg.SessionID
}
if msg.GitBranch != "" {
gitBranch = msg.GitBranch
}
}
if sessionID == "" {
continue
}
workspace := ""
if d.workspaceResolver != nil {
workspace = d.workspaceResolver(proc.PID)
}
label := d.labels.Get(sessionID)
info := SessionInfo{
PID: proc.PID,
SessionID: sessionID,
Cwd: proc.Cwd,
GitBranch: gitBranch,
State: state.String(),
Preview: preview,
Workspace: workspace,
Label: label,
}
d.registry.Update(info)
activeIDs[sessionID] = true
}
d.registry.RemoveStale(activeIDs)
}
func (d *Daemon) handleConnection(conn net.Conn) {
defer conn.Close()
var req Request
if err := json.NewDecoder(conn).Decode(&req); err != nil {
writeResponse(conn, Response{Error: "invalid request: " + err.Error()})
return
}
switch req.Action {
case "list":
sessions := d.registry.List()
writeResponse(conn, Response{OK: true, Sessions: sessions})
case "label":
var args LabelArgs
if err := json.Unmarshal(req.Args, &args); err != nil {
writeResponse(conn, Response{Error: "invalid label args: " + err.Error()})
return
}
if err := d.labels.Set(args.SessionID, args.Label); err != nil {
writeResponse(conn, Response{Error: "set label: " + err.Error()})
return
}
// Update registry with new label
d.registry.mu.Lock()
if ts, ok := d.registry.sessions[args.SessionID]; ok {
ts.Info.Label = args.Label
}
d.registry.mu.Unlock()
writeResponse(conn, Response{OK: true})
case "switch":
var args SwitchArgs
if err := json.Unmarshal(req.Args, &args); err != nil {
writeResponse(conn, Response{Error: "invalid switch args: " + err.Error()})
return
}
sessions := d.registry.List()
match := FuzzyMatch(args.Query, sessions)
if match == nil {
writeResponse(conn, Response{Error: "no session matching: " + args.Query})
return
}
if match.Workspace == "" {
writeResponse(conn, Response{Error: "no workspace for session " + match.SessionID})
return
}
if d.i3commander == nil {
writeResponse(conn, Response{Error: "i3 commander not available"})
return
}
if err := SwitchToWorkspace(d.i3commander, match.Workspace); err != nil {
writeResponse(conn, Response{Error: "switch workspace: " + err.Error()})
return
}
writeResponse(conn, Response{OK: true})
case "stop":
writeResponse(conn, Response{OK: true})
d.Stop()
default:
writeResponse(conn, Response{Error: "unknown action: " + req.Action})
}
}
func writeResponse(conn net.Conn, resp Response) {
json.NewEncoder(conn).Encode(resp)
}
// cleanStaleSocket removes a leftover socket file if no process is listening on it.
func (d *Daemon) cleanStaleSocket() error {
if _, err := os.Stat(d.sockPath); os.IsNotExist(err) {
return nil
}
// Try connecting to check if another daemon is running
conn, err := net.DialTimeout("unix", d.sockPath, 500*time.Millisecond)
if err == nil {
conn.Close()
return fmt.Errorf("another daemon is already listening on %s", d.sockPath)
}
// Stale socket, remove it
return os.Remove(d.sockPath)
}