diff --git a/broadcast.go b/broadcast.go index 3d9e368..b2dba34 100644 --- a/broadcast.go +++ b/broadcast.go @@ -4,12 +4,152 @@ import ( "context" "log" "net" + "sync" "time" + "github.com/google/gopacket" + "github.com/google/gopacket/pcap" "golang.org/x/net/icmp" "golang.org/x/net/ipv4" ) +type BroadcastSample struct { + Time time.Time + Packets uint64 + Bytes uint64 +} + +type BroadcastStats struct { + mu sync.RWMutex + samples []BroadcastSample + totalPackets uint64 + totalBytes uint64 + windowSize time.Duration + lastNotify time.Time + notifyMinRate time.Duration + t *Tendrils +} + +type BroadcastStatsResponse struct { + TotalPackets uint64 `json:"total_packets"` + TotalBytes uint64 `json:"total_bytes"` + PacketsPerS float64 `json:"packets_per_s"` + BytesPerS float64 `json:"bytes_per_s"` + WindowSecs float64 `json:"window_secs"` +} + +func NewBroadcastStats(t *Tendrils) *BroadcastStats { + return &BroadcastStats{ + samples: []BroadcastSample{}, + windowSize: 60 * time.Second, + notifyMinRate: 1 * time.Second, + t: t, + } +} + +func (b *BroadcastStats) Record(packets, bytes uint64) { + b.mu.Lock() + defer b.mu.Unlock() + + now := time.Now() + b.samples = append(b.samples, BroadcastSample{ + Time: now, + Packets: packets, + Bytes: bytes, + }) + b.totalPackets += packets + b.totalBytes += bytes + + cutoff := now.Add(-b.windowSize) + for len(b.samples) > 0 && b.samples[0].Time.Before(cutoff) { + b.samples = b.samples[1:] + } + + if now.Sub(b.lastNotify) >= b.notifyMinRate { + b.lastNotify = now + b.t.NotifyUpdate() + } +} + +func (b *BroadcastStats) GetStats() BroadcastStatsResponse { + b.mu.RLock() + defer b.mu.RUnlock() + + now := time.Now() + cutoff := now.Add(-b.windowSize) + + var windowPackets, windowBytes uint64 + var oldestTime time.Time + + for _, s := range b.samples { + if s.Time.After(cutoff) { + if oldestTime.IsZero() || s.Time.Before(oldestTime) { + oldestTime = s.Time + } + windowPackets += s.Packets + windowBytes += s.Bytes + } + } + + var windowSecs float64 + if !oldestTime.IsZero() { + windowSecs = now.Sub(oldestTime).Seconds() + } + if windowSecs < 1 { + windowSecs = 1 + } + + return BroadcastStatsResponse{ + TotalPackets: b.totalPackets, + TotalBytes: b.totalBytes, + PacketsPerS: float64(windowPackets) / windowSecs, + BytesPerS: float64(windowBytes) / windowSecs, + WindowSecs: windowSecs, + } +} + +func (t *Tendrils) listenBroadcast(ctx context.Context, iface net.Interface) { + handle, err := pcap.OpenLive(iface.Name, 65536, true, 5*time.Second) + if err != nil { + log.Printf("[ERROR] broadcast: failed to open interface %s: %v", iface.Name, err) + return + } + defer handle.Close() + + if err := handle.SetBPFFilter("ether broadcast"); err != nil { + log.Printf("[ERROR] broadcast: failed to set BPF filter on %s: %v", iface.Name, err) + return + } + + packetSource := gopacket.NewPacketSource(handle, handle.LinkType()) + packets := packetSource.Packets() + + for { + select { + case <-ctx.Done(): + return + case packet, ok := <-packets: + if !ok { + return + } + t.handleBroadcastPacket(packet) + } + } +} + +func (t *Tendrils) handleBroadcastPacket(packet gopacket.Packet) { + if t.broadcast == nil { + return + } + + packetLen := uint64(len(packet.Data())) + t.broadcast.Record(1, packetLen) + + if t.DebugBroadcast { + log.Printf("[broadcast] packet: %d bytes", packetLen) + } +} + func (t *Tendrils) pingBroadcast(ctx context.Context, iface net.Interface) { _, broadcast := getInterfaceIPv4(iface) if broadcast == nil { diff --git a/cmd/tendrils/main.go b/cmd/tendrils/main.go index 66fe878..2d78b93 100644 --- a/cmd/tendrils/main.go +++ b/cmd/tendrils/main.go @@ -31,6 +31,7 @@ func main() { debugBMD := flag.Bool("debug-bmd", false, "debug Blackmagic discovery") debugShure := flag.Bool("debug-shure", false, "debug Shure discovery") debugYamaha := flag.Bool("debug-yamaha", false, "debug Yamaha discovery") + debugBroadcast := flag.Bool("debug-broadcast", false, "debug broadcast traffic monitoring") flag.Parse() t := tendrils.New() @@ -58,5 +59,6 @@ func main() { t.DebugBMD = *debugBMD t.DebugShure = *debugShure t.DebugYamaha = *debugYamaha + t.DebugBroadcast = *debugBroadcast t.Run() } diff --git a/errors.go b/errors.go index 93ad4a1..f89235e 100644 --- a/errors.go +++ b/errors.go @@ -206,14 +206,15 @@ func (e *ErrorTracker) GetUnreachableNodes() []string { return nodes } -func (e *ErrorTracker) SetUnreachable(node *Node, ip string) { - changed := e.setUnreachableLocked(node, ip) +func (e *ErrorTracker) SetUnreachable(node *Node, ip string) bool { + changed, becameUnreachable := e.setUnreachableLocked(node, ip) if changed { e.t.NotifyUpdate() } + return becameUnreachable } -func (e *ErrorTracker) setUnreachableLocked(node *Node, ip string) bool { +func (e *ErrorTracker) setUnreachableLocked(node *Node, ip string) (changed bool, becameUnreachable bool) { e.mu.Lock() defer e.mu.Unlock() @@ -221,13 +222,14 @@ func (e *ErrorTracker) setUnreachableLocked(node *Node, ip string) bool { wasUnreachable := e.unreachableNodes[node.TypeID] e.unreachableNodes[node.TypeID] = true + becameUnreachable = !wasUnreachable if e.suppressedUnreachable[key] { - return !wasUnreachable + return becameUnreachable, becameUnreachable } if _, exists := e.errors[key]; exists { - return !wasUnreachable + return becameUnreachable, becameUnreachable } now := time.Now() @@ -241,17 +243,18 @@ func (e *ErrorTracker) setUnreachableLocked(node *Node, ip string) bool { FirstSeen: now, LastUpdated: now, } - return true + return true, becameUnreachable } -func (e *ErrorTracker) ClearUnreachable(node *Node, ip string) { - changed := e.clearUnreachableLocked(node, ip) +func (e *ErrorTracker) ClearUnreachable(node *Node, ip string) bool { + changed, becameReachable := e.clearUnreachableLocked(node, ip) if changed { e.t.NotifyUpdate() } + return becameReachable } -func (e *ErrorTracker) clearUnreachableLocked(node *Node, ip string) bool { +func (e *ErrorTracker) clearUnreachableLocked(node *Node, ip string) (changed bool, becameReachable bool) { e.mu.Lock() defer e.mu.Unlock() @@ -261,10 +264,11 @@ func (e *ErrorTracker) clearUnreachableLocked(node *Node, ip string) bool { wasUnreachable := e.unreachableNodes[node.TypeID] delete(e.unreachableNodes, node.TypeID) + becameReachable = wasUnreachable if _, exists := e.errors[key]; exists { delete(e.errors, key) - return true + return true, becameReachable } - return wasUnreachable + return becameReachable, becameReachable } diff --git a/http.go b/http.go index ca2944c..1103899 100644 --- a/http.go +++ b/http.go @@ -32,6 +32,7 @@ type StatusResponse struct { DanteFlows []*DanteFlow `json:"dante_flows"` PortErrors []*PortError `json:"port_errors"` UnreachableNodes []string `json:"unreachable_nodes"` + BroadcastStats *BroadcastStatsResponse `json:"broadcast_stats,omitempty"` } func (t *Tendrils) startHTTPServer() { @@ -132,6 +133,11 @@ func (t *Tendrils) handleAPIConfig(w http.ResponseWriter, r *http.Request) { } func (t *Tendrils) GetStatus() *StatusResponse { + var broadcastStats *BroadcastStatsResponse + if t.broadcast != nil { + stats := t.broadcast.GetStats() + broadcastStats = &stats + } return &StatusResponse{ Nodes: t.getNodes(), Links: t.getLinks(), @@ -140,6 +146,7 @@ func (t *Tendrils) GetStatus() *StatusResponse { DanteFlows: t.getDanteFlows(), PortErrors: t.errors.GetErrors(), UnreachableNodes: t.errors.GetUnreachableNodes(), + BroadcastStats: broadcastStats, } } diff --git a/ping.go b/ping.go index 2d0944c..a5a5921 100644 --- a/ping.go +++ b/ping.go @@ -1,16 +1,143 @@ package tendrils import ( + "log" "net" + "sync" "time" "golang.org/x/net/icmp" "golang.org/x/net/ipv4" ) +type pendingPing struct { + ip string + response chan bool +} + +type PingManager struct { + mu sync.Mutex + conn *icmp.PacketConn + pending map[uint16]*pendingPing + nextID uint16 + minID uint16 +} + +func NewPingManager() *PingManager { + pm := &PingManager{ + pending: map[uint16]*pendingPing{}, + nextID: 1000, + minID: 1000, + } + + conn, err := icmp.ListenPacket("ip4:icmp", "0.0.0.0") + if err != nil { + return pm + } + pm.conn = conn + + go pm.readLoop() + + return pm +} + +func (pm *PingManager) readLoop() { + buf := make([]byte, 1500) + for { + n, peer, err := pm.conn.ReadFrom(buf) + if err != nil { + return + } + + msg, err := icmp.ParseMessage(1, buf[:n]) + if err != nil { + continue + } + + if msg.Type != ipv4.ICMPTypeEchoReply { + continue + } + + echo, ok := msg.Body.(*icmp.Echo) + if !ok { + continue + } + + ipAddr, ok := peer.(*net.IPAddr) + if !ok { + continue + } + + pm.mu.Lock() + id := uint16(echo.ID) + if p, exists := pm.pending[id]; exists { + if p.ip == ipAddr.IP.String() { + select { + case p.response <- true: + default: + log.Printf("[ping] late response from %s (channel full)", ipAddr.IP) + } + } + } else if id >= pm.minID { + log.Printf("[ping] late response from %s (id %d expired)", ipAddr.IP, echo.ID) + } + pm.mu.Unlock() + } +} + +func (pm *PingManager) Ping(ipStr string, timeout time.Duration) bool { + if pm.conn == nil { + return false + } + + pm.mu.Lock() + pm.nextID++ + id := pm.nextID + p := &pendingPing{ + ip: ipStr, + response: make(chan bool, 1), + } + pm.pending[id] = p + pm.mu.Unlock() + + defer func() { + pm.mu.Lock() + delete(pm.pending, id) + pm.mu.Unlock() + }() + + msg := icmp.Message{ + Type: ipv4.ICMPTypeEcho, + Code: 0, + Body: &icmp.Echo{ + ID: int(id), + Seq: 1, + Data: []byte("tendrils"), + }, + } + msgBytes, err := msg.Marshal(nil) + if err != nil { + return false + } + + ip := net.ParseIP(ipStr) + _, err = pm.conn.WriteTo(msgBytes, &net.IPAddr{IP: ip}) + if err != nil { + return false + } + + select { + case <-p.response: + return true + case <-time.After(timeout): + return false + } +} + func (t *Tendrils) pingNode(node *Node) { t.nodes.mu.RLock() var ips []string + nodeName := node.DisplayName() for _, iface := range node.Interfaces { for ipStr := range iface.IPs { ip := net.ParseIP(ipStr) @@ -26,63 +153,14 @@ func (t *Tendrils) pingNode(node *Node) { } for _, ipStr := range ips { - reachable := t.pingIP(ipStr) + reachable := t.ping.Ping(ipStr, 2*time.Second) if reachable { - t.errors.ClearUnreachable(node, ipStr) + if t.errors.ClearUnreachable(node, ipStr) { + log.Printf("[ping] %s (%s) is now reachable", nodeName, ipStr) + } } else { - t.errors.SetUnreachable(node, ipStr) - } - } -} - -func (t *Tendrils) pingIP(ipStr string) bool { - conn, err := icmp.ListenPacket("ip4:icmp", "0.0.0.0") - if err != nil { - return false - } - defer conn.Close() - - conn.SetDeadline(time.Now().Add(500 * time.Millisecond)) - - ip := net.ParseIP(ipStr) - seq := uint16(time.Now().UnixNano() & 0xFFFF) - - msg := icmp.Message{ - Type: ipv4.ICMPTypeEcho, - Code: 0, - Body: &icmp.Echo{ - ID: int(seq), - Seq: 1, - Data: []byte("tendrils"), - }, - } - msgBytes, err := msg.Marshal(nil) - if err != nil { - return false - } - - _, err = conn.WriteTo(msgBytes, &net.IPAddr{IP: ip}) - if err != nil { - return false - } - - buf := make([]byte, 1500) - for { - n, peer, err := conn.ReadFrom(buf) - if err != nil { - return false - } - - parsed, err := icmp.ParseMessage(1, buf[:n]) - if err != nil { - continue - } - - if parsed.Type == ipv4.ICMPTypeEchoReply { - if ipAddr, ok := peer.(*net.IPAddr); ok { - if ipAddr.IP.String() == ipStr { - return true - } + if t.errors.SetUnreachable(node, ipStr) { + log.Printf("[ping] %s (%s) is now unreachable", nodeName, ipStr) } } } diff --git a/static/index.html b/static/index.html index 959548b..f547686 100644 --- a/static/index.html +++ b/static/index.html @@ -310,12 +310,14 @@ .node.has-error { box-shadow: 0 0 0 3px #f66; - animation: error-pulse 2s infinite; } - @keyframes error-pulse { - 0%, 100% { box-shadow: 0 0 0 3px #f66; } - 50% { box-shadow: 0 0 0 3px #f00; } + .node.unreachable { + box-shadow: 0 0 0 3px #f90; + } + + .node.has-error.unreachable { + box-shadow: 0 0 0 3px #f66, 0 0 0 6px #f90; } #error-panel { @@ -427,6 +429,48 @@ .node.scroll-highlight { outline: 3px solid white; } + + #broadcast-stats { + position: fixed; + bottom: 10px; + left: 10px; + z-index: 1000; + padding: 8px 12px; + background: #222; + border-radius: 6px; + border: 1px solid #444; + font-size: 11px; + } + + #broadcast-stats.warning { + border-color: #f90; + background: #332a1a; + } + + #broadcast-stats.critical { + border-color: #f44; + background: #331a1a; + } + + #broadcast-stats .label { + color: #888; + margin-right: 4px; + } + + #broadcast-stats .value { + color: #eee; + font-weight: bold; + } + + #broadcast-stats .rate-row { + display: flex; + gap: 12px; + } + + #broadcast-stats .rate-item { + display: flex; + align-items: center; + }
@@ -434,6 +478,17 @@ Connecting... +