Add broadcast packet tracking with rate monitoring

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Ian Gulliver
2026-01-25 19:40:39 -08:00
parent b2ec349c51
commit bbd938b924
7 changed files with 438 additions and 105 deletions

View File

@@ -4,12 +4,152 @@ import (
"context"
"log"
"net"
"sync"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/pcap"
"golang.org/x/net/icmp"
"golang.org/x/net/ipv4"
)
type BroadcastSample struct {
Time time.Time
Packets uint64
Bytes uint64
}
type BroadcastStats struct {
mu sync.RWMutex
samples []BroadcastSample
totalPackets uint64
totalBytes uint64
windowSize time.Duration
lastNotify time.Time
notifyMinRate time.Duration
t *Tendrils
}
type BroadcastStatsResponse struct {
TotalPackets uint64 `json:"total_packets"`
TotalBytes uint64 `json:"total_bytes"`
PacketsPerS float64 `json:"packets_per_s"`
BytesPerS float64 `json:"bytes_per_s"`
WindowSecs float64 `json:"window_secs"`
}
func NewBroadcastStats(t *Tendrils) *BroadcastStats {
return &BroadcastStats{
samples: []BroadcastSample{},
windowSize: 60 * time.Second,
notifyMinRate: 1 * time.Second,
t: t,
}
}
func (b *BroadcastStats) Record(packets, bytes uint64) {
b.mu.Lock()
defer b.mu.Unlock()
now := time.Now()
b.samples = append(b.samples, BroadcastSample{
Time: now,
Packets: packets,
Bytes: bytes,
})
b.totalPackets += packets
b.totalBytes += bytes
cutoff := now.Add(-b.windowSize)
for len(b.samples) > 0 && b.samples[0].Time.Before(cutoff) {
b.samples = b.samples[1:]
}
if now.Sub(b.lastNotify) >= b.notifyMinRate {
b.lastNotify = now
b.t.NotifyUpdate()
}
}
func (b *BroadcastStats) GetStats() BroadcastStatsResponse {
b.mu.RLock()
defer b.mu.RUnlock()
now := time.Now()
cutoff := now.Add(-b.windowSize)
var windowPackets, windowBytes uint64
var oldestTime time.Time
for _, s := range b.samples {
if s.Time.After(cutoff) {
if oldestTime.IsZero() || s.Time.Before(oldestTime) {
oldestTime = s.Time
}
windowPackets += s.Packets
windowBytes += s.Bytes
}
}
var windowSecs float64
if !oldestTime.IsZero() {
windowSecs = now.Sub(oldestTime).Seconds()
}
if windowSecs < 1 {
windowSecs = 1
}
return BroadcastStatsResponse{
TotalPackets: b.totalPackets,
TotalBytes: b.totalBytes,
PacketsPerS: float64(windowPackets) / windowSecs,
BytesPerS: float64(windowBytes) / windowSecs,
WindowSecs: windowSecs,
}
}
func (t *Tendrils) listenBroadcast(ctx context.Context, iface net.Interface) {
handle, err := pcap.OpenLive(iface.Name, 65536, true, 5*time.Second)
if err != nil {
log.Printf("[ERROR] broadcast: failed to open interface %s: %v", iface.Name, err)
return
}
defer handle.Close()
if err := handle.SetBPFFilter("ether broadcast"); err != nil {
log.Printf("[ERROR] broadcast: failed to set BPF filter on %s: %v", iface.Name, err)
return
}
packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
packets := packetSource.Packets()
for {
select {
case <-ctx.Done():
return
case packet, ok := <-packets:
if !ok {
return
}
t.handleBroadcastPacket(packet)
}
}
}
func (t *Tendrils) handleBroadcastPacket(packet gopacket.Packet) {
if t.broadcast == nil {
return
}
packetLen := uint64(len(packet.Data()))
t.broadcast.Record(1, packetLen)
if t.DebugBroadcast {
log.Printf("[broadcast] packet: %d bytes", packetLen)
}
}
func (t *Tendrils) pingBroadcast(ctx context.Context, iface net.Interface) {
_, broadcast := getInterfaceIPv4(iface)
if broadcast == nil {

View File

@@ -31,6 +31,7 @@ func main() {
debugBMD := flag.Bool("debug-bmd", false, "debug Blackmagic discovery")
debugShure := flag.Bool("debug-shure", false, "debug Shure discovery")
debugYamaha := flag.Bool("debug-yamaha", false, "debug Yamaha discovery")
debugBroadcast := flag.Bool("debug-broadcast", false, "debug broadcast traffic monitoring")
flag.Parse()
t := tendrils.New()
@@ -58,5 +59,6 @@ func main() {
t.DebugBMD = *debugBMD
t.DebugShure = *debugShure
t.DebugYamaha = *debugYamaha
t.DebugBroadcast = *debugBroadcast
t.Run()
}

View File

@@ -206,14 +206,15 @@ func (e *ErrorTracker) GetUnreachableNodes() []string {
return nodes
}
func (e *ErrorTracker) SetUnreachable(node *Node, ip string) {
changed := e.setUnreachableLocked(node, ip)
func (e *ErrorTracker) SetUnreachable(node *Node, ip string) bool {
changed, becameUnreachable := e.setUnreachableLocked(node, ip)
if changed {
e.t.NotifyUpdate()
}
return becameUnreachable
}
func (e *ErrorTracker) setUnreachableLocked(node *Node, ip string) bool {
func (e *ErrorTracker) setUnreachableLocked(node *Node, ip string) (changed bool, becameUnreachable bool) {
e.mu.Lock()
defer e.mu.Unlock()
@@ -221,13 +222,14 @@ func (e *ErrorTracker) setUnreachableLocked(node *Node, ip string) bool {
wasUnreachable := e.unreachableNodes[node.TypeID]
e.unreachableNodes[node.TypeID] = true
becameUnreachable = !wasUnreachable
if e.suppressedUnreachable[key] {
return !wasUnreachable
return becameUnreachable, becameUnreachable
}
if _, exists := e.errors[key]; exists {
return !wasUnreachable
return becameUnreachable, becameUnreachable
}
now := time.Now()
@@ -241,17 +243,18 @@ func (e *ErrorTracker) setUnreachableLocked(node *Node, ip string) bool {
FirstSeen: now,
LastUpdated: now,
}
return true
return true, becameUnreachable
}
func (e *ErrorTracker) ClearUnreachable(node *Node, ip string) {
changed := e.clearUnreachableLocked(node, ip)
func (e *ErrorTracker) ClearUnreachable(node *Node, ip string) bool {
changed, becameReachable := e.clearUnreachableLocked(node, ip)
if changed {
e.t.NotifyUpdate()
}
return becameReachable
}
func (e *ErrorTracker) clearUnreachableLocked(node *Node, ip string) bool {
func (e *ErrorTracker) clearUnreachableLocked(node *Node, ip string) (changed bool, becameReachable bool) {
e.mu.Lock()
defer e.mu.Unlock()
@@ -261,10 +264,11 @@ func (e *ErrorTracker) clearUnreachableLocked(node *Node, ip string) bool {
wasUnreachable := e.unreachableNodes[node.TypeID]
delete(e.unreachableNodes, node.TypeID)
becameReachable = wasUnreachable
if _, exists := e.errors[key]; exists {
delete(e.errors, key)
return true
return true, becameReachable
}
return wasUnreachable
return becameReachable, becameReachable
}

View File

@@ -32,6 +32,7 @@ type StatusResponse struct {
DanteFlows []*DanteFlow `json:"dante_flows"`
PortErrors []*PortError `json:"port_errors"`
UnreachableNodes []string `json:"unreachable_nodes"`
BroadcastStats *BroadcastStatsResponse `json:"broadcast_stats,omitempty"`
}
func (t *Tendrils) startHTTPServer() {
@@ -132,6 +133,11 @@ func (t *Tendrils) handleAPIConfig(w http.ResponseWriter, r *http.Request) {
}
func (t *Tendrils) GetStatus() *StatusResponse {
var broadcastStats *BroadcastStatsResponse
if t.broadcast != nil {
stats := t.broadcast.GetStats()
broadcastStats = &stats
}
return &StatusResponse{
Nodes: t.getNodes(),
Links: t.getLinks(),
@@ -140,6 +146,7 @@ func (t *Tendrils) GetStatus() *StatusResponse {
DanteFlows: t.getDanteFlows(),
PortErrors: t.errors.GetErrors(),
UnreachableNodes: t.errors.GetUnreachableNodes(),
BroadcastStats: broadcastStats,
}
}

188
ping.go
View File

@@ -1,16 +1,143 @@
package tendrils
import (
"log"
"net"
"sync"
"time"
"golang.org/x/net/icmp"
"golang.org/x/net/ipv4"
)
type pendingPing struct {
ip string
response chan bool
}
type PingManager struct {
mu sync.Mutex
conn *icmp.PacketConn
pending map[uint16]*pendingPing
nextID uint16
minID uint16
}
func NewPingManager() *PingManager {
pm := &PingManager{
pending: map[uint16]*pendingPing{},
nextID: 1000,
minID: 1000,
}
conn, err := icmp.ListenPacket("ip4:icmp", "0.0.0.0")
if err != nil {
return pm
}
pm.conn = conn
go pm.readLoop()
return pm
}
func (pm *PingManager) readLoop() {
buf := make([]byte, 1500)
for {
n, peer, err := pm.conn.ReadFrom(buf)
if err != nil {
return
}
msg, err := icmp.ParseMessage(1, buf[:n])
if err != nil {
continue
}
if msg.Type != ipv4.ICMPTypeEchoReply {
continue
}
echo, ok := msg.Body.(*icmp.Echo)
if !ok {
continue
}
ipAddr, ok := peer.(*net.IPAddr)
if !ok {
continue
}
pm.mu.Lock()
id := uint16(echo.ID)
if p, exists := pm.pending[id]; exists {
if p.ip == ipAddr.IP.String() {
select {
case p.response <- true:
default:
log.Printf("[ping] late response from %s (channel full)", ipAddr.IP)
}
}
} else if id >= pm.minID {
log.Printf("[ping] late response from %s (id %d expired)", ipAddr.IP, echo.ID)
}
pm.mu.Unlock()
}
}
func (pm *PingManager) Ping(ipStr string, timeout time.Duration) bool {
if pm.conn == nil {
return false
}
pm.mu.Lock()
pm.nextID++
id := pm.nextID
p := &pendingPing{
ip: ipStr,
response: make(chan bool, 1),
}
pm.pending[id] = p
pm.mu.Unlock()
defer func() {
pm.mu.Lock()
delete(pm.pending, id)
pm.mu.Unlock()
}()
msg := icmp.Message{
Type: ipv4.ICMPTypeEcho,
Code: 0,
Body: &icmp.Echo{
ID: int(id),
Seq: 1,
Data: []byte("tendrils"),
},
}
msgBytes, err := msg.Marshal(nil)
if err != nil {
return false
}
ip := net.ParseIP(ipStr)
_, err = pm.conn.WriteTo(msgBytes, &net.IPAddr{IP: ip})
if err != nil {
return false
}
select {
case <-p.response:
return true
case <-time.After(timeout):
return false
}
}
func (t *Tendrils) pingNode(node *Node) {
t.nodes.mu.RLock()
var ips []string
nodeName := node.DisplayName()
for _, iface := range node.Interfaces {
for ipStr := range iface.IPs {
ip := net.ParseIP(ipStr)
@@ -26,63 +153,14 @@ func (t *Tendrils) pingNode(node *Node) {
}
for _, ipStr := range ips {
reachable := t.pingIP(ipStr)
reachable := t.ping.Ping(ipStr, 2*time.Second)
if reachable {
t.errors.ClearUnreachable(node, ipStr)
if t.errors.ClearUnreachable(node, ipStr) {
log.Printf("[ping] %s (%s) is now reachable", nodeName, ipStr)
}
} else {
t.errors.SetUnreachable(node, ipStr)
}
}
}
func (t *Tendrils) pingIP(ipStr string) bool {
conn, err := icmp.ListenPacket("ip4:icmp", "0.0.0.0")
if err != nil {
return false
}
defer conn.Close()
conn.SetDeadline(time.Now().Add(500 * time.Millisecond))
ip := net.ParseIP(ipStr)
seq := uint16(time.Now().UnixNano() & 0xFFFF)
msg := icmp.Message{
Type: ipv4.ICMPTypeEcho,
Code: 0,
Body: &icmp.Echo{
ID: int(seq),
Seq: 1,
Data: []byte("tendrils"),
},
}
msgBytes, err := msg.Marshal(nil)
if err != nil {
return false
}
_, err = conn.WriteTo(msgBytes, &net.IPAddr{IP: ip})
if err != nil {
return false
}
buf := make([]byte, 1500)
for {
n, peer, err := conn.ReadFrom(buf)
if err != nil {
return false
}
parsed, err := icmp.ParseMessage(1, buf[:n])
if err != nil {
continue
}
if parsed.Type == ipv4.ICMPTypeEchoReply {
if ipAddr, ok := peer.(*net.IPAddr); ok {
if ipAddr.IP.String() == ipStr {
return true
}
if t.errors.SetUnreachable(node, ipStr) {
log.Printf("[ping] %s (%s) is now unreachable", nodeName, ipStr)
}
}
}

View File

@@ -310,12 +310,14 @@
.node.has-error {
box-shadow: 0 0 0 3px #f66;
animation: error-pulse 2s infinite;
}
@keyframes error-pulse {
0%, 100% { box-shadow: 0 0 0 3px #f66; }
50% { box-shadow: 0 0 0 3px #f00; }
.node.unreachable {
box-shadow: 0 0 0 3px #f90;
}
.node.has-error.unreachable {
box-shadow: 0 0 0 3px #f66, 0 0 0 6px #f90;
}
#error-panel {
@@ -427,6 +429,48 @@
.node.scroll-highlight {
outline: 3px solid white;
}
#broadcast-stats {
position: fixed;
bottom: 10px;
left: 10px;
z-index: 1000;
padding: 8px 12px;
background: #222;
border-radius: 6px;
border: 1px solid #444;
font-size: 11px;
}
#broadcast-stats.warning {
border-color: #f90;
background: #332a1a;
}
#broadcast-stats.critical {
border-color: #f44;
background: #331a1a;
}
#broadcast-stats .label {
color: #888;
margin-right: 4px;
}
#broadcast-stats .value {
color: #eee;
font-weight: bold;
}
#broadcast-stats .rate-row {
display: flex;
gap: 12px;
}
#broadcast-stats .rate-item {
display: flex;
align-items: center;
}
</style>
</head>
<body>
@@ -434,6 +478,17 @@
<div class="dot"></div>
<span class="text">Connecting...</span>
</div>
<div id="broadcast-stats">
<div class="rate-row">
<div class="rate-item">
<span class="label">Broadcast:</span>
<span class="value" id="broadcast-pps">0 pps</span>
</div>
<div class="rate-item">
<span class="value" id="broadcast-bps">0 B/s</span>
</div>
</div>
</div>
<div id="mode-selector">
<button id="mode-network" class="active">Network</button>
<button id="mode-dante">Dante</button>
@@ -450,6 +505,42 @@
<div id="container"></div>
<script>
function formatBytes(bytes) {
if (bytes < 1024) return bytes.toFixed(0) + ' B/s';
if (bytes < 1024 * 1024) return (bytes / 1024).toFixed(1) + ' KB/s';
if (bytes < 1024 * 1024 * 1024) return (bytes / (1024 * 1024)).toFixed(1) + ' MB/s';
return (bytes / (1024 * 1024 * 1024)).toFixed(1) + ' GB/s';
}
function formatPackets(pps) {
if (pps < 1000) return pps.toFixed(0) + ' pps';
if (pps < 1000000) return (pps / 1000).toFixed(1) + 'K pps';
return (pps / 1000000).toFixed(1) + 'M pps';
}
function updateBroadcastStats(stats) {
const panel = document.getElementById('broadcast-stats');
const ppsEl = document.getElementById('broadcast-pps');
const bpsEl = document.getElementById('broadcast-bps');
if (!stats) {
ppsEl.textContent = '0 pps';
bpsEl.textContent = '0 B/s';
panel.className = '';
return;
}
ppsEl.textContent = formatPackets(stats.packets_per_s);
bpsEl.textContent = formatBytes(stats.bytes_per_s);
panel.classList.remove('warning', 'critical');
if (stats.packets_per_s > 1000) {
panel.classList.add('critical');
} else if (stats.packets_per_s > 100) {
panel.classList.add('warning');
}
}
function getLabel(node) {
if (node.names && node.names.length > 0) return node.names.join('\n');
if (node.interfaces && node.interfaces.length > 0) {
@@ -573,11 +664,12 @@
return null;
}
function createNodeElement(node, switchConnection, nodeLocation, uplinkInfo, danteInfo, hasError) {
function createNodeElement(node, switchConnection, nodeLocation, uplinkInfo, danteInfo, hasError, isUnreachable) {
const div = document.createElement('div');
div.className = 'node' + (isSwitch(node) ? ' switch' : '');
div.dataset.typeid = node.typeid;
if (hasError) div.classList.add('has-error');
if (isUnreachable) div.classList.add('unreachable');
if (danteInfo) {
if (danteInfo.isTx) div.classList.add('dante-tx');
@@ -636,12 +728,12 @@
return div;
}
function renderLocation(loc, assignedNodes, isTopLevel, switchConnections, switchUplinks, danteNodes, errorNodeIds) {
function renderLocation(loc, assignedNodes, isTopLevel, switchConnections, switchUplinks, danteNodes, errorNodeIds, unreachableNodeIds) {
const nodes = assignedNodes.get(loc) || [];
const hasNodes = nodes.length > 0;
const childElements = loc.children
.map(child => renderLocation(child, assignedNodes, false, switchConnections, switchUplinks, danteNodes, errorNodeIds))
.map(child => renderLocation(child, assignedNodes, false, switchConnections, switchUplinks, danteNodes, errorNodeIds, unreachableNodeIds))
.filter(el => el !== null);
if (!hasNodes && childElements.length === 0) {
@@ -670,7 +762,8 @@
const uplink = switchUplinks.get(node.typeid);
const danteInfo = danteNodes.get(node.typeid);
const hasError = errorNodeIds.has(node.typeid);
switchRow.appendChild(createNodeElement(node, null, loc, uplink, danteInfo, hasError));
const isUnreachable = unreachableNodeIds.has(node.typeid);
switchRow.appendChild(createNodeElement(node, null, loc, uplink, danteInfo, hasError, isUnreachable));
});
container.appendChild(switchRow);
}
@@ -682,7 +775,8 @@
const conn = switchConnections.get(node.typeid);
const danteInfo = danteNodes.get(node.typeid);
const hasError = errorNodeIds.has(node.typeid);
nodeRow.appendChild(createNodeElement(node, conn, loc, null, danteInfo, hasError));
const isUnreachable = unreachableNodeIds.has(node.typeid);
nodeRow.appendChild(createNodeElement(node, conn, loc, null, danteInfo, hasError, isUnreachable));
});
container.appendChild(nodeRow);
}
@@ -822,9 +916,8 @@
const links = data.links || [];
portErrors = data.port_errors || [];
const unreachableNodes = new Set(data.unreachable_nodes || []);
const errorNodeIds = new Set(portErrors.map(e => e.node_typeid));
unreachableNodes.forEach(id => errorNodeIds.add(id));
const unreachableNodeIds = new Set(data.unreachable_nodes || []);
const errorNodeIds = new Set(portErrors.filter(e => e.error_type !== 'unreachable').map(e => e.node_typeid));
const locationTree = buildLocationTree(config.locations || [], null);
@@ -1011,7 +1104,7 @@
container.innerHTML = '';
locationTree.forEach(loc => {
const el = renderLocation(loc, assignedNodes, true, switchConnections, switchUplinks, danteNodes, errorNodeIds);
const el = renderLocation(loc, assignedNodes, true, switchConnections, switchUplinks, danteNodes, errorNodeIds, unreachableNodeIds);
if (el) container.appendChild(el);
});
@@ -1034,7 +1127,8 @@
const uplink = switchUplinks.get(node.typeid);
const danteInfo = danteNodes.get(node.typeid);
const hasError = errorNodeIds.has(node.typeid);
switchRow.appendChild(createNodeElement(node, null, null, uplink, danteInfo, hasError));
const isUnreachable = unreachableNodeIds.has(node.typeid);
switchRow.appendChild(createNodeElement(node, null, null, uplink, danteInfo, hasError, isUnreachable));
});
unassignedLoc.appendChild(switchRow);
}
@@ -1046,7 +1140,8 @@
const conn = switchConnections.get(node.typeid);
const danteInfo = danteNodes.get(node.typeid);
const hasError = errorNodeIds.has(node.typeid);
nodeRow.appendChild(createNodeElement(node, conn, null, null, danteInfo, hasError));
const isUnreachable = unreachableNodeIds.has(node.typeid);
nodeRow.appendChild(createNodeElement(node, conn, null, null, danteInfo, hasError, isUnreachable));
});
unassignedLoc.appendChild(nodeRow);
}
@@ -1055,6 +1150,7 @@
}
updateErrorPanel();
updateBroadcastStats(data.broadcast_stats);
}
connectSSE();

View File

@@ -36,36 +36,39 @@ type Tendrils struct {
artnet *ArtNetNodes
danteFlows *DanteFlows
errors *ErrorTracker
ping *PingManager
broadcast *BroadcastStats
config *Config
sseSubsMu sync.RWMutex
sseSubsNext int
sseSubs map[int]chan struct{}
Interface string
ConfigFile string
DisableARP bool
DisableLLDP bool
DisableSNMP bool
DisableIGMP bool
DisableMDNS bool
DisableArtNet bool
DisableDante bool
DisableBMD bool
DisableShure bool
DisableYamaha bool
LogEvents bool
LogNodes bool
DebugARP bool
DebugLLDP bool
DebugSNMP bool
DebugIGMP bool
DebugMDNS bool
DebugArtNet bool
DebugDante bool
DebugBMD bool
DebugShure bool
DebugYamaha bool
Interface string
ConfigFile string
DisableARP bool
DisableLLDP bool
DisableSNMP bool
DisableIGMP bool
DisableMDNS bool
DisableArtNet bool
DisableDante bool
DisableBMD bool
DisableShure bool
DisableYamaha bool
LogEvents bool
LogNodes bool
DebugARP bool
DebugLLDP bool
DebugSNMP bool
DebugIGMP bool
DebugMDNS bool
DebugArtNet bool
DebugDante bool
DebugBMD bool
DebugShure bool
DebugYamaha bool
DebugBroadcast bool
}
func New() *Tendrils {
@@ -73,10 +76,12 @@ func New() *Tendrils {
activeInterfaces: map[string]context.CancelFunc{},
artnet: NewArtNetNodes(),
danteFlows: NewDanteFlows(),
ping: NewPingManager(),
sseSubs: map[int]chan struct{}{},
}
t.nodes = NewNodes(t)
t.errors = NewErrorTracker(t)
t.broadcast = NewBroadcastStats(t)
return t
}
@@ -277,6 +282,7 @@ func (t *Tendrils) updateInterfaces(interfaces []net.Interface) {
func (t *Tendrils) startInterface(ctx context.Context, iface net.Interface) {
go t.pingBroadcast(ctx, iface)
go t.listenBroadcast(ctx, iface)
if !t.DisableLLDP {
go t.listenLLDP(ctx, iface)