fix goroutine leak and reduce code duplication

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Ian Gulliver
2026-01-23 23:28:58 -08:00
parent 7e8ec697ae
commit f02b25ca29
8 changed files with 87 additions and 163 deletions

View File

@@ -45,7 +45,7 @@ func (t *Tendrils) parseARPTable() []arpEntry {
macStr = normalizeMACAddress(macStr)
mac, err := net.ParseMAC(macStr)
if err != nil {
log.Printf("[arp] failed to parse MAC %q for IP %s: %v", macStr, ipStr, err)
log.Printf("[ERROR] [arp] failed to parse MAC %q for IP %s: %v", macStr, ipStr, err)
continue
}

View File

@@ -30,18 +30,7 @@ type ArtNetNode struct {
}
func (t *Tendrils) listenArtNet(ctx context.Context, iface net.Interface) {
addrs, err := iface.Addrs()
if err != nil {
return
}
var srcIP net.IP
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.To4() != nil {
srcIP = ipnet.IP.To4()
break
}
}
srcIP, _ := getInterfaceIPv4(iface)
if srcIP == nil {
return
}
@@ -157,23 +146,7 @@ func (t *Tendrils) handleArtPollReply(ifaceName string, srcIP net.IP, data []byt
}
func (t *Tendrils) runArtNetPoller(ctx context.Context, iface net.Interface, conn *net.UDPConn) {
addrs, err := iface.Addrs()
if err != nil {
return
}
var broadcast net.IP
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.To4() != nil {
ip := ipnet.IP.To4()
mask := ipnet.Mask
broadcast = make(net.IP, 4)
for i := 0; i < 4; i++ {
broadcast[i] = ip[i] | ^mask[i]
}
break
}
}
_, broadcast := getInterfaceIPv4(iface)
if broadcast == nil {
return
}

19
bmd.go
View File

@@ -14,24 +14,7 @@ func (t *Tendrils) listenBMD(ctx context.Context, iface net.Interface) {
}
func (t *Tendrils) discoverATEMs(ctx context.Context, iface net.Interface) {
addrs, err := iface.Addrs()
if err != nil {
return
}
var srcIP net.IP
var broadcast net.IP
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.To4() != nil {
srcIP = ipnet.IP.To4()
mask := ipnet.Mask
broadcast = make(net.IP, 4)
for i := 0; i < 4; i++ {
broadcast[i] = srcIP[i] | ^mask[i]
}
break
}
}
srcIP, broadcast := getInterfaceIPv4(iface)
if srcIP == nil {
return
}

View File

@@ -140,6 +140,15 @@ func NewDanteFlows() *DanteFlows {
}
}
func containsString(slice []string, val string) bool {
for _, s := range slice {
if s == val {
return true
}
}
return false
}
func (d *DanteFlows) Update(source, subscriber *Node, channelInfo string) {
d.mu.Lock()
defer d.mu.Unlock()
@@ -161,18 +170,9 @@ func (d *DanteFlows) Update(source, subscriber *Node, channelInfo string) {
flow.Subscribers[subscriber] = sub
}
if channelInfo != "" {
hasChannel := false
for _, ch := range sub.Channels {
if ch == channelInfo {
hasChannel = true
break
}
}
if !hasChannel {
sub.Channels = append(sub.Channels, channelInfo)
sort.Strings(sub.Channels)
}
if channelInfo != "" && !containsString(sub.Channels, channelInfo) {
sub.Channels = append(sub.Channels, channelInfo)
sort.Strings(sub.Channels)
}
sub.LastSeen = time.Now()
@@ -188,14 +188,7 @@ func (d *DanteFlows) ReplaceNode(oldNode, newNode *Node) {
for subNode, sub := range flow.Subscribers {
if existingSub, hasSub := existingFlow.Subscribers[subNode]; hasSub {
for _, ch := range sub.Channels {
hasChannel := false
for _, existingCh := range existingSub.Channels {
if existingCh == ch {
hasChannel = true
break
}
}
if !hasChannel {
if !containsString(existingSub.Channels, ch) {
existingSub.Channels = append(existingSub.Channels, ch)
}
}
@@ -214,14 +207,7 @@ func (d *DanteFlows) ReplaceNode(oldNode, newNode *Node) {
delete(flow.Subscribers, oldNode)
if existingSub, hasNew := flow.Subscribers[newNode]; hasNew {
for _, ch := range sub.Channels {
hasChannel := false
for _, existingCh := range existingSub.Channels {
if existingCh == ch {
hasChannel = true
break
}
}
if !hasChannel {
if !containsString(existingSub.Channels, ch) {
existingSub.Channels = append(existingSub.Channels, ch)
}
}

13
igmp.go
View File

@@ -111,18 +111,7 @@ func (t *Tendrils) handleIGMPv3(ifaceName string, sourceIP net.IP, igmp *layers.
}
func (t *Tendrils) runIGMPQuerier(ctx context.Context, iface net.Interface) {
addrs, err := iface.Addrs()
if err != nil {
return
}
var srcIP net.IP
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.To4() != nil {
srcIP = ipnet.IP.To4()
break
}
}
srcIP, _ := getInterfaceIPv4(iface)
if srcIP == nil {
return
}

View File

@@ -83,18 +83,7 @@ func (s *InterfaceStats) String() string {
}
}
return "[" + fmt.Sprintf("%s", joinParts(parts)) + "]"
}
func joinParts(parts []string) string {
result := ""
for i, p := range parts {
if i > 0 {
result += " "
}
result += p
}
return result
return "[" + strings.Join(parts, " ") + "]"
}
type PoEBudget struct {
@@ -133,7 +122,7 @@ func (n *Node) String() string {
parts = append(parts, fmt.Sprintf("{%v}", ifaces))
return joinParts(parts)
return strings.Join(parts, " ")
}
func (n *Node) DisplayName() string {
@@ -541,8 +530,10 @@ func (n *Nodes) mergeNodes(keepID, mergeID int) {
for _, ip := range iface.IPs {
ips = append(ips, ip)
}
n.updateNodeInterface(keep, keepID, iface.MAC, ips, iface.Name)
n.macIndex[iface.MAC.String()] = keepID
if iface.MAC != nil {
n.updateNodeInterface(keep, keepID, iface.MAC, ips, iface.Name)
n.macIndex[iface.MAC.String()] = keepID
}
}
for peerMAC, ifaceName := range merge.MACTable {
@@ -563,6 +554,11 @@ func (n *Nodes) mergeNodes(keepID, mergeID int) {
}
}
if cancel, exists := n.nodeCancel[mergeID]; exists {
cancel()
delete(n.nodeCancel, mergeID)
}
delete(n.nodes, mergeID)
}
@@ -747,7 +743,7 @@ func (n *Nodes) logNode(node *Node) {
tags = append(tags, "dante-clock-master")
}
if len(tags) > 0 {
log.Printf("[node] %s [%s]", name, joinParts(tags))
log.Printf("[node] %s [%s]", name, strings.Join(tags, " "))
} else {
log.Printf("[node] %s", name)
}
@@ -911,7 +907,9 @@ func (n *Nodes) getDirectLinks() []*Link {
macToNode := map[string]*Node{}
for _, node := range n.nodes {
for _, iface := range node.Interfaces {
macToNode[iface.MAC.String()] = node
if iface.MAC != nil {
macToNode[iface.MAC.String()] = node
}
}
}
@@ -921,6 +919,9 @@ func (n *Nodes) getDirectLinks() []*Link {
for _, target := range n.nodes {
seenMACs := map[string]bool{}
for _, iface := range target.Interfaces {
if iface.MAC == nil {
continue
}
mac := iface.MAC.String()
if seenMACs[mac] {
continue

89
snmp.go
View File

@@ -51,6 +51,21 @@ func defaultSNMPConfig() *snmpConfig {
}
}
func snmpToInt(val interface{}) (int, bool) {
switch v := val.(type) {
case int:
return v, true
case uint:
return int(v), true
case int64:
return int(v), true
case uint64:
return int(v), true
default:
return 0, false
}
}
func (t *Tendrils) connectSNMP(ip net.IP) (*gosnmp.GoSNMP, error) {
cfg := defaultSNMPConfig()
@@ -89,11 +104,13 @@ func (t *Tendrils) querySNMPDevice(node *Node, ip net.IP) {
}
defer snmp.Conn.Close()
ifNames := t.getInterfaceNames(snmp)
t.querySysName(snmp, node)
t.queryInterfaceMACs(snmp, node)
t.queryInterfaceStats(snmp, node)
t.queryInterfaceMACs(snmp, node, ifNames)
t.queryInterfaceStats(snmp, node, ifNames)
t.queryPoEBudget(snmp, node)
t.queryBridgeMIB(snmp, node)
t.queryBridgeMIB(snmp, node, ifNames)
}
func (t *Tendrils) querySysName(snmp *gosnmp.GoSNMP, node *Node) {
@@ -121,7 +138,7 @@ func (t *Tendrils) querySysName(snmp *gosnmp.GoSNMP, node *Node) {
t.nodes.Update(node, nil, nil, "", sysName, "snmp-sysname")
}
func (t *Tendrils) queryInterfaceMACs(snmp *gosnmp.GoSNMP, node *Node) {
func (t *Tendrils) queryInterfaceMACs(snmp *gosnmp.GoSNMP, node *Node, ifNames map[int]string) {
oid := "1.3.6.1.2.1.2.2.1.6"
results, err := snmp.BulkWalkAll(oid)
@@ -129,8 +146,6 @@ func (t *Tendrils) queryInterfaceMACs(snmp *gosnmp.GoSNMP, node *Node) {
return
}
ifNames := t.getInterfaceNames(snmp)
for _, result := range results {
if result.Type != gosnmp.OctetString {
continue
@@ -161,9 +176,7 @@ func (t *Tendrils) queryInterfaceMACs(snmp *gosnmp.GoSNMP, node *Node) {
}
}
func (t *Tendrils) queryInterfaceStats(snmp *gosnmp.GoSNMP, node *Node) {
ifNames := t.getInterfaceNames(snmp)
func (t *Tendrils) queryInterfaceStats(snmp *gosnmp.GoSNMP, node *Node, ifNames map[int]string) {
ifOperStatus := t.getInterfaceTable(snmp, "1.3.6.1.2.1.2.2.1.8")
ifHighSpeed := t.getInterfaceTable(snmp, "1.3.6.1.2.1.31.1.1.1.15")
ifInErrors := t.getInterfaceTable(snmp, "1.3.6.1.2.1.2.2.1.14")
@@ -221,20 +234,10 @@ func (t *Tendrils) queryPoEBudget(snmp *gosnmp.GoSNMP, node *Node) {
var power, maxPower float64
for _, v := range result.Variables {
var val int
switch x := v.Value.(type) {
case int:
val = x
case uint:
val = int(x)
case int64:
val = int(x)
case uint64:
val = int(x)
default:
val, ok := snmpToInt(v.Value)
if !ok {
continue
}
if v.Name == "."+powerOID {
power = float64(val)
} else if v.Name == "."+maxPowerOID {
@@ -263,20 +266,10 @@ func (t *Tendrils) getInterfaceTable(snmp *gosnmp.GoSNMP, oid string) map[int]in
continue
}
var value int
switch v := result.Value.(type) {
case int:
value = v
case uint:
value = int(v)
case int64:
value = int(v)
case uint64:
value = int(v)
default:
value, ok := snmpToInt(result.Value)
if !ok {
continue
}
table[ifIndex] = value
}
return table
@@ -306,17 +299,8 @@ func (t *Tendrils) getPoEStats(snmp *gosnmp.GoSNMP, ifNames map[int]string) map[
continue
}
var status int
switch v := result.Value.(type) {
case int:
status = v
case uint:
status = int(v)
case int64:
status = int(v)
case uint64:
status = int(v)
default:
status, ok := snmpToInt(result.Value)
if !ok {
continue
}
@@ -367,26 +351,16 @@ func (t *Tendrils) getPoETable(snmp *gosnmp.GoSNMP, oid string) map[int]int {
continue
}
var value int
switch v := result.Value.(type) {
case int:
value = v
case uint:
value = int(v)
case int64:
value = int(v)
case uint64:
value = int(v)
default:
value, ok := snmpToInt(result.Value)
if !ok {
continue
}
table[portIndex] = value
}
return table
}
func (t *Tendrils) queryBridgeMIB(snmp *gosnmp.GoSNMP, node *Node) {
func (t *Tendrils) queryBridgeMIB(snmp *gosnmp.GoSNMP, node *Node, ifNames map[int]string) {
portOID := "1.3.6.1.2.1.17.7.1.2.2.1.2"
portResults, err := snmp.BulkWalkAll(portOID)
@@ -423,7 +397,6 @@ func (t *Tendrils) queryBridgeMIB(snmp *gosnmp.GoSNMP, node *Node) {
}
bridgePortToIfIndex := t.getBridgePortMapping(snmp)
ifNames := t.getInterfaceNames(snmp)
for _, entry := range macPorts {
mac := entry.mac

View File

@@ -10,6 +10,25 @@ import (
"time"
)
func getInterfaceIPv4(iface net.Interface) (srcIP, broadcast net.IP) {
addrs, err := iface.Addrs()
if err != nil {
return nil, nil
}
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.To4() != nil {
srcIP = ipnet.IP.To4()
mask := ipnet.Mask
broadcast = make(net.IP, 4)
for i := 0; i < 4; i++ {
broadcast[i] = srcIP[i] | ^mask[i]
}
return srcIP, broadcast
}
}
return nil, nil
}
type Tendrils struct {
activeInterfaces map[string]context.CancelFunc
nodes *Nodes