Add broadcast packet bucketing by protocol/port

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Ian Gulliver
2026-01-25 20:16:53 -08:00
parent c701d26f0e
commit 5cd5db1e4a
2 changed files with 122 additions and 8 deletions

View File

@@ -2,12 +2,15 @@ package tendrils
import (
"context"
"fmt"
"log"
"net"
"sort"
"sync"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"golang.org/x/net/icmp"
"golang.org/x/net/ipv4"
@@ -15,6 +18,7 @@ import (
type BroadcastSample struct {
Time time.Time
Bucket string
Packets uint64
Bytes uint64
}
@@ -30,12 +34,19 @@ type BroadcastStats struct {
t *Tendrils
}
type BroadcastBucket struct {
Name string `json:"name"`
PacketsPerS float64 `json:"packets_per_s"`
BytesPerS float64 `json:"bytes_per_s"`
}
type BroadcastStatsResponse struct {
TotalPackets uint64 `json:"total_packets"`
TotalBytes uint64 `json:"total_bytes"`
PacketsPerS float64 `json:"packets_per_s"`
BytesPerS float64 `json:"bytes_per_s"`
WindowSecs float64 `json:"window_secs"`
TotalPackets uint64 `json:"total_packets"`
TotalBytes uint64 `json:"total_bytes"`
PacketsPerS float64 `json:"packets_per_s"`
BytesPerS float64 `json:"bytes_per_s"`
WindowSecs float64 `json:"window_secs"`
Buckets []*BroadcastBucket `json:"buckets,omitempty"`
}
func NewBroadcastStats(t *Tendrils) *BroadcastStats {
@@ -47,13 +58,14 @@ func NewBroadcastStats(t *Tendrils) *BroadcastStats {
}
}
func (b *BroadcastStats) Record(packets, bytes uint64) {
func (b *BroadcastStats) Record(bucket string, packets, bytes uint64) {
b.mu.Lock()
defer b.mu.Unlock()
now := time.Now()
b.samples = append(b.samples, BroadcastSample{
Time: now,
Bucket: bucket,
Packets: packets,
Bytes: bytes,
})
@@ -80,6 +92,8 @@ func (b *BroadcastStats) GetStats() BroadcastStatsResponse {
var windowPackets, windowBytes uint64
var oldestTime time.Time
bucketPackets := map[string]uint64{}
bucketBytes := map[string]uint64{}
for _, s := range b.samples {
if s.Time.After(cutoff) {
@@ -88,6 +102,8 @@ func (b *BroadcastStats) GetStats() BroadcastStatsResponse {
}
windowPackets += s.Packets
windowBytes += s.Bytes
bucketPackets[s.Bucket] += s.Packets
bucketBytes[s.Bucket] += s.Bytes
}
}
@@ -99,12 +115,25 @@ func (b *BroadcastStats) GetStats() BroadcastStatsResponse {
windowSecs = 1
}
var buckets []*BroadcastBucket
for name := range bucketPackets {
buckets = append(buckets, &BroadcastBucket{
Name: name,
PacketsPerS: float64(bucketPackets[name]) / windowSecs,
BytesPerS: float64(bucketBytes[name]) / windowSecs,
})
}
sort.Slice(buckets, func(i, j int) bool {
return buckets[i].PacketsPerS > buckets[j].PacketsPerS
})
return BroadcastStatsResponse{
TotalPackets: b.totalPackets,
TotalBytes: b.totalBytes,
PacketsPerS: float64(windowPackets) / windowSecs,
BytesPerS: float64(windowBytes) / windowSecs,
WindowSecs: windowSecs,
Buckets: buckets,
}
}
@@ -143,13 +172,58 @@ func (t *Tendrils) handleBroadcastPacket(packet gopacket.Packet) {
}
packetLen := uint64(len(packet.Data()))
t.broadcast.Record(1, packetLen)
bucket := classifyBroadcastPacket(packet)
t.broadcast.Record(bucket, 1, packetLen)
if t.DebugBroadcast {
log.Printf("[broadcast] packet: %d bytes", packetLen)
log.Printf("[broadcast] %s: %d bytes", bucket, packetLen)
}
}
func classifyBroadcastPacket(packet gopacket.Packet) string {
if arpLayer := packet.Layer(layers.LayerTypeARP); arpLayer != nil {
return "ARP"
}
if udpLayer := packet.Layer(layers.LayerTypeUDP); udpLayer != nil {
udp := udpLayer.(*layers.UDP)
port := uint16(udp.DstPort)
switch port {
case 67, 68:
return "DHCP"
case 137:
return "NetBIOS-NS"
case 138:
return "NetBIOS-DG"
case 5353:
return "mDNS"
case 1900:
return "SSDP"
case 3702:
return "WS-Discovery"
default:
return fmt.Sprintf("UDP/%d", port)
}
}
if tcpLayer := packet.Layer(layers.LayerTypeTCP); tcpLayer != nil {
tcp := tcpLayer.(*layers.TCP)
return fmt.Sprintf("TCP/%d", tcp.DstPort)
}
if icmpLayer := packet.Layer(layers.LayerTypeICMPv4); icmpLayer != nil {
return "ICMP"
}
ethLayer := packet.Layer(layers.LayerTypeEthernet)
if ethLayer != nil {
eth := ethLayer.(*layers.Ethernet)
return fmt.Sprintf("0x%04x", uint16(eth.EthernetType))
}
return "other"
}
func (t *Tendrils) pingBroadcast(ctx context.Context, iface net.Interface) {
_, broadcast := getInterfaceIPv4(iface)
if broadcast == nil {

View File

@@ -503,6 +503,32 @@
display: flex;
align-items: center;
}
#broadcast-stats .buckets {
display: none;
margin-top: 8px;
padding-top: 8px;
border-top: 1px solid #444;
}
#broadcast-stats:hover .buckets {
display: block;
}
#broadcast-stats .bucket {
display: flex;
justify-content: space-between;
gap: 12px;
padding: 2px 0;
}
#broadcast-stats .bucket-name {
color: #aaa;
}
#broadcast-stats .bucket-rate {
color: #eee;
}
</style>
</head>
<body>
@@ -520,6 +546,7 @@
<span class="value" id="broadcast-bps">0 B/s</span>
</div>
</div>
<div class="buckets" id="broadcast-buckets"></div>
</div>
<div id="mode-selector">
<button id="mode-network" class="active">Network</button>
@@ -554,10 +581,12 @@
const panel = document.getElementById('broadcast-stats');
const ppsEl = document.getElementById('broadcast-pps');
const bpsEl = document.getElementById('broadcast-bps');
const bucketsEl = document.getElementById('broadcast-buckets');
if (!stats) {
ppsEl.textContent = '0 pps';
bpsEl.textContent = '0 B/s';
bucketsEl.innerHTML = '';
panel.className = '';
return;
}
@@ -571,6 +600,17 @@
} else if (stats.packets_per_s > 100) {
panel.classList.add('warning');
}
bucketsEl.innerHTML = '';
if (stats.buckets && stats.buckets.length > 0) {
stats.buckets.filter(b => b.packets_per_s >= 0.5).forEach(bucket => {
const div = document.createElement('div');
div.className = 'bucket';
div.innerHTML = '<span class="bucket-name">' + bucket.name + '</span>' +
'<span class="bucket-rate">' + formatPackets(bucket.packets_per_s) + '</span>';
bucketsEl.appendChild(div);
});
}
}
function getLabel(node) {