Refactor error tracking to be edge-triggered with node setters

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Ian Gulliver
2026-01-28 23:06:26 -08:00
parent ed9a0cd60d
commit 99083ecde5
7 changed files with 150 additions and 243 deletions

313
errors.go
View File

@@ -8,7 +8,6 @@ import (
)
const (
ErrorTypeStartup = "startup"
ErrorTypeNew = "new"
ErrorTypeUnreachable = "unreachable"
ErrorTypeHighUtilization = "high_utilization"
@@ -16,7 +15,7 @@ const (
type Error struct {
ID string `json:"id"`
NodeID string `json:"node_id"`
NodeID string `json:"node_id"`
NodeName string `json:"node_name"`
Type string `json:"type"`
Port string `json:"port,omitempty"`
@@ -29,86 +28,105 @@ type Error struct {
LastUpdated time.Time `json:"last_updated,omitempty"`
}
type errorBaseline struct {
InErrors uint64
OutErrors uint64
HasData bool
}
type ErrorTracker struct {
mu sync.RWMutex
errors map[string]*Error
baselines map[string]*errorBaseline
suppressedUnreachable map[string]bool
unreachableNodes map[string]bool
nextID int
t *Tendrils
mu sync.RWMutex
errors map[string]*Error
nextID int
t *Tendrils
}
func NewErrorTracker(t *Tendrils) *ErrorTracker {
return &ErrorTracker{
errors: map[string]*Error{},
baselines: map[string]*errorBaseline{},
suppressedUnreachable: map[string]bool{},
unreachableNodes: map[string]bool{},
t: t,
errors: map[string]*Error{},
t: t,
}
}
func (e *ErrorTracker) CheckPort(node *Node, portName string, stats *InterfaceStats) {
if stats == nil {
return
}
changed := e.checkPortLocked(node, portName, stats)
if changed {
e.t.NotifyUpdate()
}
}
func (e *ErrorTracker) CheckUtilization(node *Node, portName string, stats *InterfaceStats) {
if stats == nil || stats.Speed == 0 {
return
}
changed := e.checkUtilizationLocked(node, portName, stats)
if changed {
e.t.NotifyUpdate()
}
}
func (e *ErrorTracker) checkUtilizationLocked(node *Node, portName string, stats *InterfaceStats) bool {
func (e *ErrorTracker) AddUnreachable(node *Node) {
e.mu.Lock()
defer e.mu.Unlock()
maxBytesRate := stats.InBytesRate
if stats.OutBytesRate > maxBytesRate {
maxBytesRate = stats.OutBytesRate
key := "unreachable:" + node.ID
if _, exists := e.errors[key]; exists {
return
}
speedBytes := float64(stats.Speed) / 8.0
utilization := (maxBytesRate / speedBytes) * 100.0
now := time.Now()
e.nextID++
e.errors[key] = &Error{
ID: fmt.Sprintf("err-%d", e.nextID),
NodeID: node.ID,
NodeName: node.DisplayName(),
Type: ErrorTypeUnreachable,
FirstSeen: now,
LastUpdated: now,
}
e.t.NotifyUpdate()
}
func (e *ErrorTracker) RemoveUnreachable(node *Node) {
e.mu.Lock()
defer e.mu.Unlock()
key := "unreachable:" + node.ID
if _, exists := e.errors[key]; exists {
delete(e.errors, key)
e.t.NotifyUpdate()
}
}
func (e *ErrorTracker) AddPortError(node *Node, portName string, stats *InterfaceStats, inDelta, outDelta uint64) {
e.mu.Lock()
defer e.mu.Unlock()
key := node.ID + ":" + portName
now := time.Now()
if existing, ok := e.errors[key]; ok {
existing.InErrors = stats.InErrors
existing.OutErrors = stats.OutErrors
existing.InDelta += inDelta
existing.OutDelta += outDelta
existing.LastUpdated = now
} else {
e.nextID++
e.errors[key] = &Error{
ID: fmt.Sprintf("err-%d", e.nextID),
NodeID: node.ID,
NodeName: node.DisplayName(),
Port: portName,
Type: ErrorTypeNew,
InErrors: stats.InErrors,
OutErrors: stats.OutErrors,
InDelta: inDelta,
OutDelta: outDelta,
FirstSeen: now,
LastUpdated: now,
}
}
e.t.NotifyUpdate()
}
func (e *ErrorTracker) AddUtilizationError(node *Node, portName string, utilization float64) {
e.mu.Lock()
defer e.mu.Unlock()
key := "util:" + node.ID + ":" + portName
now := time.Now()
if utilization < 70.0 {
return false
}
if existing, ok := e.errors[key]; ok {
if utilization > existing.Utilization {
existing.Utilization = utilization
existing.LastUpdated = now
return true
e.t.NotifyUpdate()
}
return false
return
}
e.nextID++
e.errors[key] = &Error{
ID: fmt.Sprintf("err-%d", e.nextID),
NodeID: node.ID,
NodeID: node.ID,
NodeName: node.DisplayName(),
Port: portName,
Type: ErrorTypeHighUtilization,
@@ -116,126 +134,30 @@ func (e *ErrorTracker) checkUtilizationLocked(node *Node, portName string, stats
FirstSeen: now,
LastUpdated: now,
}
return true
}
func (e *ErrorTracker) checkPortLocked(node *Node, portName string, stats *InterfaceStats) bool {
e.mu.Lock()
defer e.mu.Unlock()
key := node.ID + ":" + portName
baseline := e.baselines[key]
now := time.Now()
if baseline == nil || !baseline.HasData {
e.baselines[key] = &errorBaseline{
InErrors: stats.InErrors,
OutErrors: stats.OutErrors,
HasData: true,
}
if stats.InErrors > 0 || stats.OutErrors > 0 {
e.nextID++
e.errors[key] = &Error{
ID: fmt.Sprintf("err-%d", e.nextID),
NodeID: node.ID,
NodeName: node.DisplayName(),
Port: portName,
Type: ErrorTypeStartup,
InErrors: stats.InErrors,
OutErrors: stats.OutErrors,
FirstSeen: now,
LastUpdated: now,
}
return true
}
return false
}
inDelta := uint64(0)
outDelta := uint64(0)
if stats.InErrors > baseline.InErrors {
inDelta = stats.InErrors - baseline.InErrors
}
if stats.OutErrors > baseline.OutErrors {
outDelta = stats.OutErrors - baseline.OutErrors
}
changed := false
if inDelta > 0 || outDelta > 0 {
if existing, ok := e.errors[key]; ok {
existing.InErrors = stats.InErrors
existing.OutErrors = stats.OutErrors
existing.InDelta += inDelta
existing.OutDelta += outDelta
existing.LastUpdated = now
} else {
e.nextID++
e.errors[key] = &Error{
ID: fmt.Sprintf("err-%d", e.nextID),
NodeID: node.ID,
NodeName: node.DisplayName(),
Port: portName,
Type: ErrorTypeNew,
InErrors: stats.InErrors,
OutErrors: stats.OutErrors,
InDelta: inDelta,
OutDelta: outDelta,
FirstSeen: now,
LastUpdated: now,
}
}
changed = true
}
e.baselines[key].InErrors = stats.InErrors
e.baselines[key].OutErrors = stats.OutErrors
return changed
e.t.NotifyUpdate()
}
func (e *ErrorTracker) ClearError(errorID string) {
found := e.clearErrorLocked(errorID)
if found {
e.t.NotifyUpdate()
}
}
func (e *ErrorTracker) clearErrorLocked(errorID string) bool {
e.mu.Lock()
defer e.mu.Unlock()
for key, err := range e.errors {
if err.ID == errorID {
if err.Type == ErrorTypeUnreachable {
e.suppressedUnreachable[key] = true
}
delete(e.errors, key)
return true
e.t.NotifyUpdate()
return
}
}
return false
}
func (e *ErrorTracker) ClearAllErrors() {
had := e.clearAllErrorsLocked()
if had {
e.t.NotifyUpdate()
}
}
func (e *ErrorTracker) clearAllErrorsLocked() bool {
e.mu.Lock()
defer e.mu.Unlock()
had := len(e.errors) > 0
for key, err := range e.errors {
if err.Type == ErrorTypeUnreachable {
e.suppressedUnreachable[key] = true
}
if len(e.errors) > 0 {
e.errors = map[string]*Error{}
e.t.NotifyUpdate()
}
e.errors = map[string]*Error{}
return had
}
func (e *ErrorTracker) GetErrors() []*Error {
@@ -254,80 +176,3 @@ func (e *ErrorTracker) GetErrors() []*Error {
})
return errors
}
func (e *ErrorTracker) GetUnreachableNodeSet() map[string]bool {
e.mu.RLock()
defer e.mu.RUnlock()
result := map[string]bool{}
for nodeID := range e.unreachableNodes {
result[nodeID] = true
}
return result
}
func (e *ErrorTracker) SetUnreachable(node *Node) bool {
changed, becameUnreachable := e.setUnreachableLocked(node)
if changed {
e.t.NotifyUpdate()
}
return becameUnreachable
}
func (e *ErrorTracker) setUnreachableLocked(node *Node) (changed bool, becameUnreachable bool) {
e.mu.Lock()
defer e.mu.Unlock()
key := "unreachable:" + node.ID
wasUnreachable := e.unreachableNodes[node.ID]
e.unreachableNodes[node.ID] = true
becameUnreachable = !wasUnreachable
if e.suppressedUnreachable[key] {
return becameUnreachable, becameUnreachable
}
if _, exists := e.errors[key]; exists {
return becameUnreachable, becameUnreachable
}
now := time.Now()
e.nextID++
e.errors[key] = &Error{
ID: fmt.Sprintf("err-%d", e.nextID),
NodeID: node.ID,
NodeName: node.DisplayName(),
Type: ErrorTypeUnreachable,
FirstSeen: now,
LastUpdated: now,
}
return true, becameUnreachable
}
func (e *ErrorTracker) ClearUnreachable(node *Node) bool {
changed, becameReachable := e.clearUnreachableLocked(node)
if changed {
e.t.NotifyUpdate()
}
return becameReachable
}
func (e *ErrorTracker) clearUnreachableLocked(node *Node) (changed bool, becameReachable bool) {
e.mu.Lock()
defer e.mu.Unlock()
key := "unreachable:" + node.ID
delete(e.suppressedUnreachable, key)
wasUnreachable := e.unreachableNodes[node.ID]
delete(e.unreachableNodes, node.ID)
becameReachable = wasUnreachable
if _, exists := e.errors[key]; exists {
delete(e.errors, key)
return true, becameReachable
}
return becameReachable, becameReachable
}

View File

@@ -226,11 +226,8 @@ func (t *Tendrils) handleAPIStatusStream(w http.ResponseWriter, r *http.Request)
}
func (t *Tendrils) getNodesLocked() []*Node {
unreachableNodes := t.errors.GetUnreachableNodeSet()
nodes := make([]*Node, 0, len(t.nodes.nodes))
for _, node := range t.nodes.nodes {
node.Unreachable = unreachableNodes[node.ID]
nodes = append(nodes, node)
}

View File

@@ -130,6 +130,7 @@ func (n *Nodes) createNode() *Node {
ID: newID("node"),
Interfaces: InterfaceMap{},
MACTable: map[string]string{},
errors: n.t.errors,
pollTrigger: make(chan struct{}, 1),
}
n.nodes = append(n.nodes, node)
@@ -456,6 +457,7 @@ func (n *Nodes) GetOrCreateByName(name string) *Node {
Names: NameSet{name: true},
Interfaces: InterfaceMap{},
MACTable: map[string]string{},
errors: n.t.errors,
pollTrigger: make(chan struct{}, 1),
}
n.nodes = append(n.nodes, node)

View File

@@ -169,7 +169,7 @@ func (t *Tendrils) pingNode(node *Node) {
if anyReachable {
t.ping.failures[nodeID] = 0
t.ping.mu.Unlock()
if t.errors.ClearUnreachable(node) {
if node.SetUnreachable(false) {
log.Printf("[ping] %s is now reachable", nodeName)
}
} else {
@@ -177,7 +177,7 @@ func (t *Tendrils) pingNode(node *Node) {
failures := t.ping.failures[nodeID]
t.ping.mu.Unlock()
if failures >= pingFailureThreshold {
if t.errors.SetUnreachable(node) {
if node.SetUnreachable(true) {
log.Printf("[ping] %s is now unreachable", nodeName)
}
}

View File

@@ -284,9 +284,7 @@ func (t *Tendrils) queryInterfaceStats(snmp *gosnmp.GoSNMP, node *Node, ifNames
stats.PoE = poe
}
iface.Stats = stats
t.errors.CheckPort(node, name, stats)
t.errors.CheckUtilization(node, name, stats)
node.SetInterfaceStats(name, stats)
}
}

View File

@@ -1723,7 +1723,7 @@
const typeEl = document.createElement('div');
typeEl.className = 'error-type';
typeEl.textContent = err.type === 'startup' ? 'Present at startup' : 'New errors detected';
typeEl.textContent = 'New errors detected';
item.appendChild(typeEl);
}

View File

@@ -393,11 +393,76 @@ type Node struct {
DanteTx []*DantePeer `json:"dante_tx,omitempty"`
DanteRx []*DantePeer `json:"dante_rx,omitempty"`
Unreachable bool `json:"unreachable,omitempty"`
errors *ErrorTracker
pollTrigger chan struct{}
cancelFunc context.CancelFunc
danteLastSeen time.Time
}
func (n *Node) SetUnreachable(unreachable bool) bool {
if n.Unreachable == unreachable {
return false
}
n.Unreachable = unreachable
if n.errors != nil {
if unreachable {
n.errors.AddUnreachable(n)
} else {
n.errors.RemoveUnreachable(n)
}
}
return true
}
func (n *Node) SetInterfaceStats(portName string, stats *InterfaceStats) {
iface := n.Interfaces[portName]
if iface == nil {
return
}
oldStats := iface.Stats
iface.Stats = stats
if n.errors == nil {
return
}
var inDelta, outDelta uint64
if oldStats != nil {
if stats.InErrors > oldStats.InErrors {
inDelta = stats.InErrors - oldStats.InErrors
}
if stats.OutErrors > oldStats.OutErrors {
outDelta = stats.OutErrors - oldStats.OutErrors
}
}
if inDelta > 0 || outDelta > 0 {
n.errors.AddPortError(n, portName, stats, inDelta, outDelta)
}
if stats.Speed > 0 {
maxBytesRate := stats.InBytesRate
if stats.OutBytesRate > maxBytesRate {
maxBytesRate = stats.OutBytesRate
}
speedBytes := float64(stats.Speed) / 8.0
utilization := (maxBytesRate / speedBytes) * 100.0
var oldUtilization float64
if oldStats != nil && oldStats.Speed > 0 {
oldMax := oldStats.InBytesRate
if oldStats.OutBytesRate > oldMax {
oldMax = oldStats.OutBytesRate
}
oldUtilization = (oldMax / (float64(oldStats.Speed) / 8.0)) * 100.0
}
if oldUtilization < 70.0 && utilization >= 70.0 {
n.errors.AddUtilizationError(n, portName, utilization)
}
}
}
func (n *Node) MACTableSize() int {
return len(n.MACTable)
}