Add SSE endpoint for real-time status updates

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Ian Gulliver
2026-01-25 18:49:39 -08:00
parent a96eb7db8c
commit a94f816f3d
5 changed files with 218 additions and 17 deletions

View File

@@ -38,12 +38,14 @@ type ErrorTracker struct {
errors map[string]*PortError errors map[string]*PortError
baselines map[string]*portErrorBaseline baselines map[string]*portErrorBaseline
nextID int nextID int
t *Tendrils
} }
func NewErrorTracker() *ErrorTracker { func NewErrorTracker(t *Tendrils) *ErrorTracker {
return &ErrorTracker{ return &ErrorTracker{
errors: map[string]*PortError{}, errors: map[string]*PortError{},
baselines: map[string]*portErrorBaseline{}, baselines: map[string]*portErrorBaseline{},
t: t,
} }
} }
@@ -52,6 +54,13 @@ func (e *ErrorTracker) CheckPort(node *Node, portName string, stats *InterfaceSt
return return
} }
changed := e.checkPortLocked(node, portName, stats)
if changed {
e.t.NotifyUpdate()
}
}
func (e *ErrorTracker) checkPortLocked(node *Node, portName string, stats *InterfaceStats) bool {
e.mu.Lock() e.mu.Lock()
defer e.mu.Unlock() defer e.mu.Unlock()
@@ -79,8 +88,9 @@ func (e *ErrorTracker) CheckPort(node *Node, portName string, stats *InterfaceSt
FirstSeen: now, FirstSeen: now,
LastUpdated: now, LastUpdated: now,
} }
return true
} }
return return false
} }
inDelta := uint64(0) inDelta := uint64(0)
@@ -92,6 +102,7 @@ func (e *ErrorTracker) CheckPort(node *Node, portName string, stats *InterfaceSt
outDelta = stats.OutErrors - baseline.OutErrors outDelta = stats.OutErrors - baseline.OutErrors
} }
changed := false
if inDelta > 0 || outDelta > 0 { if inDelta > 0 || outDelta > 0 {
if existing, ok := e.errors[key]; ok { if existing, ok := e.errors[key]; ok {
existing.InErrors = stats.InErrors existing.InErrors = stats.InErrors
@@ -115,29 +126,49 @@ func (e *ErrorTracker) CheckPort(node *Node, portName string, stats *InterfaceSt
LastUpdated: now, LastUpdated: now,
} }
} }
changed = true
} }
e.baselines[key].InErrors = stats.InErrors e.baselines[key].InErrors = stats.InErrors
e.baselines[key].OutErrors = stats.OutErrors e.baselines[key].OutErrors = stats.OutErrors
return changed
} }
func (e *ErrorTracker) ClearError(errorID string) { func (e *ErrorTracker) ClearError(errorID string) {
found := e.clearErrorLocked(errorID)
if found {
e.t.NotifyUpdate()
}
}
func (e *ErrorTracker) clearErrorLocked(errorID string) bool {
e.mu.Lock() e.mu.Lock()
defer e.mu.Unlock() defer e.mu.Unlock()
for key, err := range e.errors { for key, err := range e.errors {
if err.ID == errorID { if err.ID == errorID {
delete(e.errors, key) delete(e.errors, key)
return return true
} }
} }
return false
} }
func (e *ErrorTracker) ClearAllErrors() { func (e *ErrorTracker) ClearAllErrors() {
had := e.clearAllErrorsLocked()
if had {
e.t.NotifyUpdate()
}
}
func (e *ErrorTracker) clearAllErrorsLocked() bool {
e.mu.Lock() e.mu.Lock()
defer e.mu.Unlock() defer e.mu.Unlock()
had := len(e.errors) > 0
e.errors = map[string]*PortError{} e.errors = map[string]*PortError{}
return had
} }
func (e *ErrorTracker) GetErrors() []*PortError { func (e *ErrorTracker) GetErrors() []*PortError {

55
http.go
View File

@@ -8,6 +8,7 @@ import (
"crypto/x509/pkix" "crypto/x509/pkix"
"encoding/json" "encoding/json"
"encoding/pem" "encoding/pem"
"fmt"
"log" "log"
"math/big" "math/big"
"net/http" "net/http"
@@ -40,6 +41,7 @@ func (t *Tendrils) startHTTPServer() {
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc("/api/status", t.handleAPIStatus) mux.HandleFunc("/api/status", t.handleAPIStatus)
mux.HandleFunc("/api/status/stream", t.handleAPIStatusStream)
mux.HandleFunc("/api/config", t.handleAPIConfig) mux.HandleFunc("/api/config", t.handleAPIConfig)
mux.HandleFunc("/api/errors/clear", t.handleClearError) mux.HandleFunc("/api/errors/clear", t.handleClearError)
mux.Handle("/", http.FileServer(http.Dir("static"))) mux.Handle("/", http.FileServer(http.Dir("static")))
@@ -157,6 +159,59 @@ func (t *Tendrils) handleClearError(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
} }
func (t *Tendrils) handleAPIStatusStream(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
subID, updateCh := t.subscribeSSE()
defer t.unsubscribeSSE(subID)
sendStatus := func() error {
data, err := json.Marshal(t.GetStatus())
if err != nil {
return err
}
_, err = fmt.Fprintf(w, "event: status\ndata: %s\n\n", data)
if err != nil {
return err
}
flusher.Flush()
return nil
}
if err := sendStatus(); err != nil {
return
}
heartbeat := time.NewTicker(3 * time.Second)
defer heartbeat.Stop()
for {
select {
case <-r.Context().Done():
return
case <-updateCh:
if err := sendStatus(); err != nil {
return
}
case <-heartbeat.C:
_, err := fmt.Fprintf(w, ": heartbeat\n\n")
if err != nil {
return
}
flusher.Flush()
}
}
}
func (t *Tendrils) getNodes() []*Node { func (t *Tendrils) getNodes() []*Node {
t.nodes.mu.RLock() t.nodes.mu.RLock()
defer t.nodes.mu.RUnlock() defer t.nodes.mu.RUnlock()

View File

@@ -48,11 +48,18 @@ func (n *Nodes) Shutdown() {
} }
func (n *Nodes) Update(target *Node, mac net.HardwareAddr, ips []net.IP, ifaceName, nodeName, source string) { func (n *Nodes) Update(target *Node, mac net.HardwareAddr, ips []net.IP, ifaceName, nodeName, source string) {
changed := n.updateLocked(target, mac, ips, ifaceName, nodeName, source)
if changed {
n.t.NotifyUpdate()
}
}
func (n *Nodes) updateLocked(target *Node, mac net.HardwareAddr, ips []net.IP, ifaceName, nodeName, source string) bool {
n.mu.Lock() n.mu.Lock()
defer n.mu.Unlock() defer n.mu.Unlock()
if mac == nil && target == nil && len(ips) == 0 { if mac == nil && target == nil && len(ips) == 0 {
return return false
} }
targetID, isNew := n.resolveTargetNode(target, mac, ips, nodeName) targetID, isNew := n.resolveTargetNode(target, mac, ips, nodeName)
@@ -65,6 +72,8 @@ func (n *Nodes) Update(target *Node, mac net.HardwareAddr, ips []net.IP, ifaceNa
if hasNewIP(added) { if hasNewIP(added) {
n.triggerPoll(node) n.triggerPoll(node)
} }
return isNew || len(added) > 0
} }
func (n *Nodes) resolveTargetNode(target *Node, mac net.HardwareAddr, ips []net.IP, nodeName string) (int, bool) { func (n *Nodes) resolveTargetNode(target *Node, mac net.HardwareAddr, ips []net.IP, nodeName string) (int, bool) {

View File

@@ -15,6 +15,42 @@
} }
#error { color: #f66; padding: 20px; } #error { color: #f66; padding: 20px; }
#connection-status {
position: fixed;
top: 10px;
left: 10px;
z-index: 1000;
display: flex;
align-items: center;
gap: 6px;
padding: 6px 12px;
background: #222;
border-radius: 6px;
border: 1px solid #444;
font-size: 11px;
}
#connection-status .dot {
width: 8px;
height: 8px;
border-radius: 50%;
background: #666;
}
#connection-status.connected .dot {
background: #4f4;
}
#connection-status.disconnected .dot {
background: #f44;
animation: pulse-dot 1s infinite;
}
@keyframes pulse-dot {
0%, 100% { opacity: 1; }
50% { opacity: 0.4; }
}
#container { #container {
display: flex; display: flex;
flex-direction: column; flex-direction: column;
@@ -394,6 +430,10 @@
</style> </style>
</head> </head>
<body> <body>
<div id="connection-status" class="disconnected">
<div class="dot"></div>
<span class="text">Connecting...</span>
</div>
<div id="mode-selector"> <div id="mode-selector">
<button id="mode-network" class="active">Network</button> <button id="mode-network" class="active">Network</button>
<button id="mode-dante">Dante</button> <button id="mode-dante">Dante</button>
@@ -720,22 +760,51 @@
async function clearError(id) { async function clearError(id) {
await fetch('/api/errors/clear?id=' + encodeURIComponent(id), { method: 'POST' }); await fetch('/api/errors/clear?id=' + encodeURIComponent(id), { method: 'POST' });
init();
} }
async function clearAllErrors() { async function clearAllErrors() {
await fetch('/api/errors/clear?all=true', { method: 'POST' }); await fetch('/api/errors/clear?all=true', { method: 'POST' });
init();
} }
async function init() { let currentConfig = null;
function setConnectionStatus(connected) {
const el = document.getElementById('connection-status');
const textEl = el.querySelector('.text');
if (connected) {
el.className = 'connected';
textEl.textContent = 'Connected';
} else {
el.className = 'disconnected';
textEl.textContent = 'Disconnected';
}
}
function connectSSE() {
const evtSource = new EventSource('/api/status/stream');
evtSource.addEventListener('status', async (event) => {
const data = JSON.parse(event.data);
if (!currentConfig) {
const configResp = await fetch('/api/config');
currentConfig = await configResp.json();
}
render(data, currentConfig);
});
evtSource.onopen = () => {
setConnectionStatus(true);
};
evtSource.onerror = () => {
setConnectionStatus(false);
evtSource.close();
setTimeout(connectSSE, 2000);
};
}
function render(data, config) {
anonCounter = 0; anonCounter = 0;
const [statusResp, configResp] = await Promise.all([
fetch('/api/status'),
fetch('/api/config')
]);
const data = await statusResp.json();
const config = await configResp.json();
const nodes = data.nodes || []; const nodes = data.nodes || [];
const links = data.links || []; const links = data.links || [];
@@ -974,9 +1043,7 @@
updateErrorPanel(); updateErrorPanel();
} }
init().catch(e => { connectSSE();
document.getElementById('error').textContent = e.message;
});
function setMode(mode) { function setMode(mode) {
if (mode === 'dante') { if (mode === 'dante') {

View File

@@ -6,6 +6,7 @@ import (
"net" "net"
"os" "os"
"os/signal" "os/signal"
"sync"
"syscall" "syscall"
"time" "time"
) )
@@ -37,6 +38,10 @@ type Tendrils struct {
errors *ErrorTracker errors *ErrorTracker
config *Config config *Config
sseSubsMu sync.RWMutex
sseSubsNext int
sseSubs map[int]chan struct{}
Interface string Interface string
ConfigFile string ConfigFile string
DisableARP bool DisableARP bool
@@ -68,12 +73,46 @@ func New() *Tendrils {
activeInterfaces: map[string]context.CancelFunc{}, activeInterfaces: map[string]context.CancelFunc{},
artnet: NewArtNetNodes(), artnet: NewArtNetNodes(),
danteFlows: NewDanteFlows(), danteFlows: NewDanteFlows(),
errors: NewErrorTracker(), sseSubs: map[int]chan struct{}{},
} }
t.nodes = NewNodes(t) t.nodes = NewNodes(t)
t.errors = NewErrorTracker(t)
return t return t
} }
func (t *Tendrils) NotifyUpdate() {
t.sseSubsMu.RLock()
defer t.sseSubsMu.RUnlock()
for _, ch := range t.sseSubs {
select {
case ch <- struct{}{}:
default:
}
}
}
func (t *Tendrils) subscribeSSE() (int, chan struct{}) {
t.sseSubsMu.Lock()
defer t.sseSubsMu.Unlock()
t.sseSubsNext++
id := t.sseSubsNext
ch := make(chan struct{}, 1)
t.sseSubs[id] = ch
return id, ch
}
func (t *Tendrils) unsubscribeSSE(id int) {
t.sseSubsMu.Lock()
defer t.sseSubsMu.Unlock()
if ch, ok := t.sseSubs[id]; ok {
close(ch)
delete(t.sseSubs, id)
}
}
func (t *Tendrils) Run() { func (t *Tendrils) Run() {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()