Also restart daemon on make watch so new binary is used. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
471 lines
11 KiB
Go
471 lines
11 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// TrackedSession holds a session's info alongside state-transition tracking.
|
|
type TrackedSession struct {
|
|
Info SessionInfo
|
|
PrevState string
|
|
}
|
|
|
|
// SessionRegistry maintains an in-memory index of active sessions.
|
|
// It tracks WaitingSince transitions: when a session moves to "Needs Input",
|
|
// the timestamp is recorded; when it leaves that state, it is cleared.
|
|
type SessionRegistry struct {
|
|
mu sync.RWMutex
|
|
sessions map[string]*TrackedSession
|
|
}
|
|
|
|
func NewRegistry() *SessionRegistry {
|
|
return &SessionRegistry{
|
|
sessions: make(map[string]*TrackedSession),
|
|
}
|
|
}
|
|
|
|
// Update adds or refreshes a session in the registry.
|
|
// WaitingSince is set when transitioning to "Needs Input" and cleared otherwise.
|
|
func (r *SessionRegistry) Update(info SessionInfo) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
existing, ok := r.sessions[info.SessionID]
|
|
if !ok {
|
|
existing = &TrackedSession{}
|
|
r.sessions[info.SessionID] = existing
|
|
}
|
|
|
|
isWaiting := info.State == "Needs Input"
|
|
wasWaiting := existing.PrevState == "Needs Input"
|
|
|
|
if isWaiting && !wasWaiting {
|
|
now := time.Now()
|
|
info.WaitingSince = &now
|
|
} else if isWaiting && wasWaiting {
|
|
// Keep existing timestamp
|
|
info.WaitingSince = existing.Info.WaitingSince
|
|
}
|
|
// If not waiting, WaitingSince stays nil (default)
|
|
|
|
existing.Info = info
|
|
existing.PrevState = info.State
|
|
}
|
|
|
|
// List returns a snapshot of all tracked sessions, sorted by workspace number.
|
|
func (r *SessionRegistry) List() []SessionInfo {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
result := make([]SessionInfo, 0, len(r.sessions))
|
|
for _, ts := range r.sessions {
|
|
result = append(result, ts.Info)
|
|
}
|
|
sort.Slice(result, func(i, j int) bool {
|
|
wi, _ := strconv.Atoi(result[i].Workspace)
|
|
wj, _ := strconv.Atoi(result[j].Workspace)
|
|
return wi < wj
|
|
})
|
|
return result
|
|
}
|
|
|
|
// RemoveStale removes sessions whose SessionID is not in activeIDs.
|
|
func (r *SessionRegistry) RemoveStale(activeIDs map[string]bool) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
for id := range r.sessions {
|
|
if !activeIDs[id] {
|
|
delete(r.sessions, id)
|
|
}
|
|
}
|
|
}
|
|
|
|
// LabelStore persists user-assigned labels in a JSON file.
|
|
type LabelStore struct {
|
|
mu sync.RWMutex
|
|
labels map[string]string
|
|
path string
|
|
}
|
|
|
|
// NewLabelStore loads labels from path. Returns an empty store if the file does not exist.
|
|
func NewLabelStore(path string) (*LabelStore, error) {
|
|
ls := &LabelStore{
|
|
labels: make(map[string]string),
|
|
path: path,
|
|
}
|
|
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return ls, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
if err := json.Unmarshal(data, &ls.labels); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return ls, nil
|
|
}
|
|
|
|
// Set assigns a label to a session and persists to disk.
|
|
func (ls *LabelStore) Set(sessionID, label string) error {
|
|
ls.mu.Lock()
|
|
defer ls.mu.Unlock()
|
|
|
|
ls.labels[sessionID] = label
|
|
return ls.save()
|
|
}
|
|
|
|
// Get returns the label for a session, or empty string if not found.
|
|
func (ls *LabelStore) Get(sessionID string) string {
|
|
ls.mu.RLock()
|
|
defer ls.mu.RUnlock()
|
|
|
|
return ls.labels[sessionID]
|
|
}
|
|
|
|
func (ls *LabelStore) save() error {
|
|
data, err := json.MarshalIndent(ls.labels, "", " ")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
dir := filepath.Dir(ls.path)
|
|
if err := os.MkdirAll(dir, 0o755); err != nil {
|
|
return err
|
|
}
|
|
|
|
return os.WriteFile(ls.path, data, 0o644)
|
|
}
|
|
|
|
// Daemon is the long-running vmuxd process. It polls /proc for Claude sessions,
|
|
// maintains the registry, and serves requests over a Unix socket.
|
|
type Daemon struct {
|
|
registry *SessionRegistry
|
|
labels *LabelStore
|
|
sockPath string
|
|
procDir string
|
|
claudeDir string
|
|
workspaceResolver func(claudePID int) string // nil = no workspace resolution
|
|
i3commander I3Commander
|
|
pollInterval time.Duration
|
|
stopCh chan struct{}
|
|
listener net.Listener
|
|
hookPort int
|
|
httpServer *http.Server
|
|
lastHookTime time.Time
|
|
mu sync.Mutex // protects lastHookTime
|
|
}
|
|
|
|
// NewDaemon creates a daemon ready to start.
|
|
func NewDaemon(sockPath, procDir, claudeDir string, labels *LabelStore) *Daemon {
|
|
return &Daemon{
|
|
registry: NewRegistry(),
|
|
labels: labels,
|
|
sockPath: sockPath,
|
|
procDir: procDir,
|
|
claudeDir: claudeDir,
|
|
pollInterval: 5 * time.Second,
|
|
stopCh: make(chan struct{}),
|
|
hookPort: 3119,
|
|
}
|
|
}
|
|
|
|
// InitWorkspaceResolver sets up i3+X11 workspace resolution.
|
|
// If i3 or X11 is unavailable, logs a warning and continues without workspace info.
|
|
func (d *Daemon) InitWorkspaceResolver(treeProvider I3TreeProvider, x11 X11PIDResolver) {
|
|
if treeProvider == nil || x11 == nil {
|
|
return
|
|
}
|
|
d.workspaceResolver = func(claudePID int) string {
|
|
termMap, err := BuildTerminalWorkspaceMap(treeProvider, x11)
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
return ResolveWorkspace(d.procDir, claudePID, termMap)
|
|
}
|
|
}
|
|
|
|
// startHookServer starts an HTTP server on hookPort to receive Claude Code hooks.
|
|
// If the port is busy, logs a warning and returns nil (graceful degradation).
|
|
func (d *Daemon) startHookServer() error {
|
|
if d.hookPort == 0 {
|
|
return nil
|
|
}
|
|
|
|
addr := fmt.Sprintf("127.0.0.1:%d", d.hookPort)
|
|
|
|
mux := http.NewServeMux()
|
|
mux.HandleFunc("/hook", d.handleHook)
|
|
|
|
d.httpServer = &http.Server{
|
|
Addr: addr,
|
|
Handler: mux,
|
|
ReadTimeout: 5 * time.Second,
|
|
WriteTimeout: 5 * time.Second,
|
|
}
|
|
|
|
ln, err := net.Listen("tcp", addr)
|
|
if err != nil {
|
|
log.Printf("hook server: port %d busy, continuing without hooks: %v", d.hookPort, err)
|
|
d.httpServer = nil
|
|
return nil
|
|
}
|
|
|
|
go d.httpServer.Serve(ln)
|
|
return nil
|
|
}
|
|
|
|
// currentPollInterval returns 20s when hooks are active (last hook < 60s ago),
|
|
// 5s otherwise.
|
|
func (d *Daemon) currentPollInterval() time.Duration {
|
|
d.mu.Lock()
|
|
lastHook := d.lastHookTime
|
|
d.mu.Unlock()
|
|
|
|
if !lastHook.IsZero() && time.Since(lastHook) < 60*time.Second {
|
|
return 20 * time.Second
|
|
}
|
|
return 5 * time.Second
|
|
}
|
|
|
|
// Start runs the daemon: initial scan, hook server, then listens on the Unix socket
|
|
// and polls for sessions in the background.
|
|
func (d *Daemon) Start() error {
|
|
// Synchronous initial scan before accepting connections
|
|
d.scanOnce(time.Now())
|
|
|
|
// Start hook server (graceful degradation if port busy)
|
|
if err := d.startHookServer(); err != nil {
|
|
return fmt.Errorf("start hook server: %w", err)
|
|
}
|
|
|
|
if err := d.cleanStaleSocket(); err != nil {
|
|
return fmt.Errorf("clean stale socket: %w", err)
|
|
}
|
|
|
|
ln, err := net.Listen("unix", d.sockPath)
|
|
if err != nil {
|
|
return fmt.Errorf("listen %s: %w", d.sockPath, err)
|
|
}
|
|
d.listener = ln
|
|
|
|
// Poll loop in background
|
|
go d.pollLoop()
|
|
|
|
// Accept loop
|
|
go d.acceptLoop()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop shuts down the daemon gracefully.
|
|
func (d *Daemon) Stop() {
|
|
select {
|
|
case <-d.stopCh:
|
|
return // already stopped
|
|
default:
|
|
close(d.stopCh)
|
|
}
|
|
if d.httpServer != nil {
|
|
d.httpServer.Close()
|
|
}
|
|
if d.listener != nil {
|
|
d.listener.Close()
|
|
}
|
|
}
|
|
|
|
// Wait blocks until the daemon stops.
|
|
func (d *Daemon) Wait() {
|
|
<-d.stopCh
|
|
}
|
|
|
|
func (d *Daemon) acceptLoop() {
|
|
for {
|
|
conn, err := d.listener.Accept()
|
|
if err != nil {
|
|
select {
|
|
case <-d.stopCh:
|
|
return
|
|
default:
|
|
continue
|
|
}
|
|
}
|
|
go d.handleConnection(conn)
|
|
}
|
|
}
|
|
|
|
func (d *Daemon) pollLoop() {
|
|
for {
|
|
select {
|
|
case <-d.stopCh:
|
|
return
|
|
case t := <-time.After(d.currentPollInterval()):
|
|
d.scanOnce(t)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Daemon) scanOnce(now time.Time) {
|
|
procs, err := FindClaudeProcesses(d.procDir)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
activeIDs := make(map[string]bool)
|
|
|
|
for _, proc := range procs {
|
|
_, messages, err := FindSessionForProcess(d.claudeDir, proc)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
state := DetectState(messages, now)
|
|
preview := ExtractPreview(messages)
|
|
|
|
var sessionID, gitBranch string
|
|
for _, msg := range messages {
|
|
if msg.SessionID != "" {
|
|
sessionID = msg.SessionID
|
|
}
|
|
if msg.GitBranch != "" {
|
|
gitBranch = msg.GitBranch
|
|
}
|
|
}
|
|
|
|
if sessionID == "" {
|
|
continue
|
|
}
|
|
|
|
workspace := ""
|
|
if d.workspaceResolver != nil {
|
|
workspace = d.workspaceResolver(proc.PID)
|
|
}
|
|
|
|
label := d.labels.Get(sessionID)
|
|
|
|
info := SessionInfo{
|
|
PID: proc.PID,
|
|
SessionID: sessionID,
|
|
Cwd: proc.Cwd,
|
|
GitBranch: gitBranch,
|
|
State: state.String(),
|
|
Preview: preview,
|
|
Workspace: workspace,
|
|
Label: label,
|
|
}
|
|
|
|
d.registry.Update(info)
|
|
activeIDs[sessionID] = true
|
|
}
|
|
|
|
d.registry.RemoveStale(activeIDs)
|
|
}
|
|
|
|
func (d *Daemon) handleConnection(conn net.Conn) {
|
|
defer conn.Close()
|
|
|
|
var req Request
|
|
if err := json.NewDecoder(conn).Decode(&req); err != nil {
|
|
writeResponse(conn, Response{Error: "invalid request: " + err.Error()})
|
|
return
|
|
}
|
|
|
|
switch req.Action {
|
|
case "list":
|
|
sessions := d.registry.List()
|
|
writeResponse(conn, Response{OK: true, Sessions: sessions})
|
|
|
|
case "label":
|
|
var args LabelArgs
|
|
if err := json.Unmarshal(req.Args, &args); err != nil {
|
|
writeResponse(conn, Response{Error: "invalid label args: " + err.Error()})
|
|
return
|
|
}
|
|
// Resolve SessionID via fuzzy match if it's not a UUID
|
|
sessionID := args.SessionID
|
|
sessions := d.registry.List()
|
|
match := FuzzyMatch(sessionID, sessions)
|
|
if match != nil {
|
|
sessionID = match.SessionID
|
|
}
|
|
if err := d.labels.Set(sessionID, args.Label); err != nil {
|
|
writeResponse(conn, Response{Error: "set label: " + err.Error()})
|
|
return
|
|
}
|
|
d.registry.mu.Lock()
|
|
if ts, ok := d.registry.sessions[sessionID]; ok {
|
|
ts.Info.Label = args.Label
|
|
}
|
|
d.registry.mu.Unlock()
|
|
writeResponse(conn, Response{OK: true})
|
|
|
|
case "switch":
|
|
var args SwitchArgs
|
|
if err := json.Unmarshal(req.Args, &args); err != nil {
|
|
writeResponse(conn, Response{Error: "invalid switch args: " + err.Error()})
|
|
return
|
|
}
|
|
sessions := d.registry.List()
|
|
match := FuzzyMatch(args.Query, sessions)
|
|
if match == nil {
|
|
writeResponse(conn, Response{Error: "no session matching: " + args.Query})
|
|
return
|
|
}
|
|
if match.Workspace == "" {
|
|
writeResponse(conn, Response{Error: "no workspace for session " + match.SessionID})
|
|
return
|
|
}
|
|
if d.i3commander == nil {
|
|
writeResponse(conn, Response{Error: "i3 commander not available"})
|
|
return
|
|
}
|
|
if err := SwitchToWorkspace(d.i3commander, match.Workspace); err != nil {
|
|
writeResponse(conn, Response{Error: "switch workspace: " + err.Error()})
|
|
return
|
|
}
|
|
writeResponse(conn, Response{OK: true})
|
|
|
|
case "stop":
|
|
writeResponse(conn, Response{OK: true})
|
|
d.Stop()
|
|
|
|
default:
|
|
writeResponse(conn, Response{Error: "unknown action: " + req.Action})
|
|
}
|
|
}
|
|
|
|
func writeResponse(conn net.Conn, resp Response) {
|
|
json.NewEncoder(conn).Encode(resp)
|
|
}
|
|
|
|
// cleanStaleSocket removes a leftover socket file if no process is listening on it.
|
|
func (d *Daemon) cleanStaleSocket() error {
|
|
if _, err := os.Stat(d.sockPath); os.IsNotExist(err) {
|
|
return nil
|
|
}
|
|
|
|
// Try connecting to check if another daemon is running
|
|
conn, err := net.DialTimeout("unix", d.sockPath, 500*time.Millisecond)
|
|
if err == nil {
|
|
conn.Close()
|
|
return fmt.Errorf("another daemon is already listening on %s", d.sockPath)
|
|
}
|
|
|
|
// Stale socket, remove it
|
|
return os.Remove(d.sockPath)
|
|
}
|