diff --git a/errors.go b/errors.go index 1db85fb..6f609fd 100644 --- a/errors.go +++ b/errors.go @@ -38,12 +38,14 @@ type ErrorTracker struct { errors map[string]*PortError baselines map[string]*portErrorBaseline nextID int + t *Tendrils } -func NewErrorTracker() *ErrorTracker { +func NewErrorTracker(t *Tendrils) *ErrorTracker { return &ErrorTracker{ errors: map[string]*PortError{}, baselines: map[string]*portErrorBaseline{}, + t: t, } } @@ -52,6 +54,13 @@ func (e *ErrorTracker) CheckPort(node *Node, portName string, stats *InterfaceSt return } + changed := e.checkPortLocked(node, portName, stats) + if changed { + e.t.NotifyUpdate() + } +} + +func (e *ErrorTracker) checkPortLocked(node *Node, portName string, stats *InterfaceStats) bool { e.mu.Lock() defer e.mu.Unlock() @@ -79,8 +88,9 @@ func (e *ErrorTracker) CheckPort(node *Node, portName string, stats *InterfaceSt FirstSeen: now, LastUpdated: now, } + return true } - return + return false } inDelta := uint64(0) @@ -92,6 +102,7 @@ func (e *ErrorTracker) CheckPort(node *Node, portName string, stats *InterfaceSt outDelta = stats.OutErrors - baseline.OutErrors } + changed := false if inDelta > 0 || outDelta > 0 { if existing, ok := e.errors[key]; ok { existing.InErrors = stats.InErrors @@ -115,29 +126,49 @@ func (e *ErrorTracker) CheckPort(node *Node, portName string, stats *InterfaceSt LastUpdated: now, } } + changed = true } e.baselines[key].InErrors = stats.InErrors e.baselines[key].OutErrors = stats.OutErrors + + return changed } func (e *ErrorTracker) ClearError(errorID string) { + found := e.clearErrorLocked(errorID) + if found { + e.t.NotifyUpdate() + } +} + +func (e *ErrorTracker) clearErrorLocked(errorID string) bool { e.mu.Lock() defer e.mu.Unlock() for key, err := range e.errors { if err.ID == errorID { delete(e.errors, key) - return + return true } } + return false } func (e *ErrorTracker) ClearAllErrors() { + had := e.clearAllErrorsLocked() + if had { + e.t.NotifyUpdate() + } +} + +func (e *ErrorTracker) clearAllErrorsLocked() bool { e.mu.Lock() defer e.mu.Unlock() + had := len(e.errors) > 0 e.errors = map[string]*PortError{} + return had } func (e *ErrorTracker) GetErrors() []*PortError { diff --git a/http.go b/http.go index f54f83c..0046a3a 100644 --- a/http.go +++ b/http.go @@ -8,6 +8,7 @@ import ( "crypto/x509/pkix" "encoding/json" "encoding/pem" + "fmt" "log" "math/big" "net/http" @@ -40,6 +41,7 @@ func (t *Tendrils) startHTTPServer() { mux := http.NewServeMux() mux.HandleFunc("/api/status", t.handleAPIStatus) + mux.HandleFunc("/api/status/stream", t.handleAPIStatusStream) mux.HandleFunc("/api/config", t.handleAPIConfig) mux.HandleFunc("/api/errors/clear", t.handleClearError) mux.Handle("/", http.FileServer(http.Dir("static"))) @@ -157,6 +159,59 @@ func (t *Tendrils) handleClearError(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } +func (t *Tendrils) handleAPIStatusStream(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming not supported", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + + subID, updateCh := t.subscribeSSE() + defer t.unsubscribeSSE(subID) + + sendStatus := func() error { + data, err := json.Marshal(t.GetStatus()) + if err != nil { + return err + } + _, err = fmt.Fprintf(w, "event: status\ndata: %s\n\n", data) + if err != nil { + return err + } + flusher.Flush() + return nil + } + + if err := sendStatus(); err != nil { + return + } + + heartbeat := time.NewTicker(3 * time.Second) + defer heartbeat.Stop() + + for { + select { + case <-r.Context().Done(): + return + case <-updateCh: + if err := sendStatus(); err != nil { + return + } + case <-heartbeat.C: + _, err := fmt.Fprintf(w, ": heartbeat\n\n") + if err != nil { + return + } + flusher.Flush() + } + } +} + func (t *Tendrils) getNodes() []*Node { t.nodes.mu.RLock() defer t.nodes.mu.RUnlock() diff --git a/nodes.go b/nodes.go index b22c633..765192b 100644 --- a/nodes.go +++ b/nodes.go @@ -48,11 +48,18 @@ func (n *Nodes) Shutdown() { } func (n *Nodes) Update(target *Node, mac net.HardwareAddr, ips []net.IP, ifaceName, nodeName, source string) { + changed := n.updateLocked(target, mac, ips, ifaceName, nodeName, source) + if changed { + n.t.NotifyUpdate() + } +} + +func (n *Nodes) updateLocked(target *Node, mac net.HardwareAddr, ips []net.IP, ifaceName, nodeName, source string) bool { n.mu.Lock() defer n.mu.Unlock() if mac == nil && target == nil && len(ips) == 0 { - return + return false } targetID, isNew := n.resolveTargetNode(target, mac, ips, nodeName) @@ -65,6 +72,8 @@ func (n *Nodes) Update(target *Node, mac net.HardwareAddr, ips []net.IP, ifaceNa if hasNewIP(added) { n.triggerPoll(node) } + + return isNew || len(added) > 0 } func (n *Nodes) resolveTargetNode(target *Node, mac net.HardwareAddr, ips []net.IP, nodeName string) (int, bool) { diff --git a/static/index.html b/static/index.html index 7e10f29..b090f70 100644 --- a/static/index.html +++ b/static/index.html @@ -15,6 +15,42 @@ } #error { color: #f66; padding: 20px; } + #connection-status { + position: fixed; + top: 10px; + left: 10px; + z-index: 1000; + display: flex; + align-items: center; + gap: 6px; + padding: 6px 12px; + background: #222; + border-radius: 6px; + border: 1px solid #444; + font-size: 11px; + } + + #connection-status .dot { + width: 8px; + height: 8px; + border-radius: 50%; + background: #666; + } + + #connection-status.connected .dot { + background: #4f4; + } + + #connection-status.disconnected .dot { + background: #f44; + animation: pulse-dot 1s infinite; + } + + @keyframes pulse-dot { + 0%, 100% { opacity: 1; } + 50% { opacity: 0.4; } + } + #container { display: flex; flex-direction: column; @@ -394,6 +430,10 @@ +
+
+ Connecting... +
@@ -720,22 +760,51 @@ async function clearError(id) { await fetch('/api/errors/clear?id=' + encodeURIComponent(id), { method: 'POST' }); - init(); } async function clearAllErrors() { await fetch('/api/errors/clear?all=true', { method: 'POST' }); - init(); } - async function init() { + let currentConfig = null; + + function setConnectionStatus(connected) { + const el = document.getElementById('connection-status'); + const textEl = el.querySelector('.text'); + if (connected) { + el.className = 'connected'; + textEl.textContent = 'Connected'; + } else { + el.className = 'disconnected'; + textEl.textContent = 'Disconnected'; + } + } + + function connectSSE() { + const evtSource = new EventSource('/api/status/stream'); + + evtSource.addEventListener('status', async (event) => { + const data = JSON.parse(event.data); + if (!currentConfig) { + const configResp = await fetch('/api/config'); + currentConfig = await configResp.json(); + } + render(data, currentConfig); + }); + + evtSource.onopen = () => { + setConnectionStatus(true); + }; + + evtSource.onerror = () => { + setConnectionStatus(false); + evtSource.close(); + setTimeout(connectSSE, 2000); + }; + } + + function render(data, config) { anonCounter = 0; - const [statusResp, configResp] = await Promise.all([ - fetch('/api/status'), - fetch('/api/config') - ]); - const data = await statusResp.json(); - const config = await configResp.json(); const nodes = data.nodes || []; const links = data.links || []; @@ -974,9 +1043,7 @@ updateErrorPanel(); } - init().catch(e => { - document.getElementById('error').textContent = e.message; - }); + connectSSE(); function setMode(mode) { if (mode === 'dante') { diff --git a/tendrils.go b/tendrils.go index 8d8bbaa..4769018 100644 --- a/tendrils.go +++ b/tendrils.go @@ -6,6 +6,7 @@ import ( "net" "os" "os/signal" + "sync" "syscall" "time" ) @@ -37,6 +38,10 @@ type Tendrils struct { errors *ErrorTracker config *Config + sseSubsMu sync.RWMutex + sseSubsNext int + sseSubs map[int]chan struct{} + Interface string ConfigFile string DisableARP bool @@ -68,12 +73,46 @@ func New() *Tendrils { activeInterfaces: map[string]context.CancelFunc{}, artnet: NewArtNetNodes(), danteFlows: NewDanteFlows(), - errors: NewErrorTracker(), + sseSubs: map[int]chan struct{}{}, } t.nodes = NewNodes(t) + t.errors = NewErrorTracker(t) return t } +func (t *Tendrils) NotifyUpdate() { + t.sseSubsMu.RLock() + defer t.sseSubsMu.RUnlock() + + for _, ch := range t.sseSubs { + select { + case ch <- struct{}{}: + default: + } + } +} + +func (t *Tendrils) subscribeSSE() (int, chan struct{}) { + t.sseSubsMu.Lock() + defer t.sseSubsMu.Unlock() + + t.sseSubsNext++ + id := t.sseSubsNext + ch := make(chan struct{}, 1) + t.sseSubs[id] = ch + return id, ch +} + +func (t *Tendrils) unsubscribeSSE(id int) { + t.sseSubsMu.Lock() + defer t.sseSubsMu.Unlock() + + if ch, ok := t.sseSubs[id]; ok { + close(ch) + delete(t.sseSubs, id) + } +} + func (t *Tendrils) Run() { ctx, cancel := context.WithCancel(context.Background()) defer cancel()