From b966ad0feb2254ef82487cb4d421177cd6551e7e Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Wed, 28 Jan 2026 23:15:24 -0800 Subject: [PATCH] Refactor Dante fields to use proper types and group flows with lastSeen Co-Authored-By: Claude Opus 4.5 --- dante.go | 151 ++++++++++++++++++++++++++-------------------- nodes.go | 8 +-- static/index.html | 4 +- types.go | 65 ++++++++++++-------- 4 files changed, 129 insertions(+), 99 deletions(-) diff --git a/dante.go b/dante.go index 18a2132..260de40 100644 --- a/dante.go +++ b/dante.go @@ -87,7 +87,7 @@ func (t *Tendrils) handlePTPPacket(ifaceName string, srcIP net.IP, data []byte) t.nodes.SetDanteClockMaster(srcIP) } -func (n *Nodes) UpdateDanteTxChannels(name string, ip net.IP, channels string) { +func (n *Nodes) UpdateDanteTxChannels(name string, ip net.IP, channels int) { n.mu.Lock() defer n.mu.Unlock() @@ -104,7 +104,7 @@ func (n *Nodes) GetDanteTxDeviceInGroup(groupIP net.IP) *Node { group := ParseMulticastGroup(groupIP) for _, node := range n.nodes { - if node.DanteTxChannels != "" && node.MulticastGroups != nil { + if node.DanteTxChannels > 0 && node.MulticastGroups != nil { if _, exists := node.MulticastGroups[group]; exists { return node } @@ -188,16 +188,24 @@ func (n *Nodes) UpdateDanteFlow(source, subscriber *Node, channelInfo string, fl n.mu.Lock() defer n.mu.Unlock() - n.updateDanteTx(source, subscriber, channelInfo, flowStatus) - n.updateDanteRx(subscriber, source, channelInfo, flowStatus) - - source.danteLastSeen = time.Now() - subscriber.danteLastSeen = time.Now() + now := time.Now() + n.updateDanteTx(source, subscriber, channelInfo, flowStatus, now) + n.updateDanteRx(subscriber, source, channelInfo, flowStatus, now) } -func (n *Nodes) updateDanteTx(source, subscriber *Node, channelInfo string, flowStatus DanteFlowStatus) { +func (n *Nodes) ensureDanteFlows(node *Node) *DanteFlows { + if node.DanteFlows == nil { + node.DanteFlows = &DanteFlows{} + } + return node.DanteFlows +} + +func (n *Nodes) updateDanteTx(source, subscriber *Node, channelInfo string, flowStatus DanteFlowStatus, now time.Time) { + flows := n.ensureDanteFlows(source) + flows.lastSeen = now + var peer *DantePeer - for _, p := range source.DanteTx { + for _, p := range flows.Tx { if p.Node == subscriber { peer = p break @@ -208,7 +216,7 @@ func (n *Nodes) updateDanteTx(source, subscriber *Node, channelInfo string, flow Node: subscriber, Status: map[string]string{}, } - source.DanteTx = append(source.DanteTx, peer) + flows.Tx = append(flows.Tx, peer) } if channelInfo != "" && !containsString(peer.Channels, channelInfo) { @@ -219,14 +227,17 @@ func (n *Nodes) updateDanteTx(source, subscriber *Node, channelInfo string, flow peer.Status[channelInfo] = flowStatus.String() } - sort.Slice(source.DanteTx, func(i, j int) bool { - return sortorder.NaturalLess(source.DanteTx[i].Node.DisplayName(), source.DanteTx[j].Node.DisplayName()) + sort.Slice(flows.Tx, func(i, j int) bool { + return sortorder.NaturalLess(flows.Tx[i].Node.DisplayName(), flows.Tx[j].Node.DisplayName()) }) } -func (n *Nodes) updateDanteRx(subscriber, source *Node, channelInfo string, flowStatus DanteFlowStatus) { +func (n *Nodes) updateDanteRx(subscriber, source *Node, channelInfo string, flowStatus DanteFlowStatus, now time.Time) { + flows := n.ensureDanteFlows(subscriber) + flows.lastSeen = now + var peer *DantePeer - for _, p := range subscriber.DanteRx { + for _, p := range flows.Rx { if p.Node == source { peer = p break @@ -237,7 +248,7 @@ func (n *Nodes) updateDanteRx(subscriber, source *Node, channelInfo string, flow Node: source, Status: map[string]string{}, } - subscriber.DanteRx = append(subscriber.DanteRx, peer) + flows.Rx = append(flows.Rx, peer) } if channelInfo != "" && !containsString(peer.Channels, channelInfo) { @@ -248,78 +259,86 @@ func (n *Nodes) updateDanteRx(subscriber, source *Node, channelInfo string, flow peer.Status[channelInfo] = flowStatus.String() } - sort.Slice(subscriber.DanteRx, func(i, j int) bool { - return sortorder.NaturalLess(subscriber.DanteRx[i].Node.DisplayName(), subscriber.DanteRx[j].Node.DisplayName()) + sort.Slice(flows.Rx, func(i, j int) bool { + return sortorder.NaturalLess(flows.Rx[i].Node.DisplayName(), flows.Rx[j].Node.DisplayName()) }) } func (n *Nodes) expireDante() { - expireTime := time.Now().Add(-5 * time.Minute) for _, node := range n.nodes { - if !node.danteLastSeen.IsZero() && node.danteLastSeen.Before(expireTime) { - node.DanteTx = nil - node.DanteRx = nil - node.danteLastSeen = time.Time{} + if node.DanteFlows != nil && node.DanteFlows.Expire(5*time.Minute) { + node.DanteFlows = nil } } } func (n *Nodes) mergeDante(keep, merge *Node) { - for _, peer := range merge.DanteTx { - var existing *DantePeer - for _, p := range keep.DanteTx { - if p.Node == peer.Node { - existing = p - break - } - } - if existing == nil { - keep.DanteTx = append(keep.DanteTx, peer) - } else { - for _, ch := range peer.Channels { - if !containsString(existing.Channels, ch) { - existing.Channels = append(existing.Channels, ch) - } - } - for ch, status := range peer.Status { - existing.Status[ch] = status - } - } + if merge.DanteFlows == nil { + return } - for _, peer := range merge.DanteRx { - var existing *DantePeer - for _, p := range keep.DanteRx { - if p.Node == peer.Node { - existing = p - break - } - } - if existing == nil { - keep.DanteRx = append(keep.DanteRx, peer) - } else { - for _, ch := range peer.Channels { - if !containsString(existing.Channels, ch) { - existing.Channels = append(existing.Channels, ch) + if keep.DanteFlows == nil { + keep.DanteFlows = merge.DanteFlows + } else { + for _, peer := range merge.DanteFlows.Tx { + var existing *DantePeer + for _, p := range keep.DanteFlows.Tx { + if p.Node == peer.Node { + existing = p + break } } - for ch, status := range peer.Status { - existing.Status[ch] = status + if existing == nil { + keep.DanteFlows.Tx = append(keep.DanteFlows.Tx, peer) + } else { + for _, ch := range peer.Channels { + if !containsString(existing.Channels, ch) { + existing.Channels = append(existing.Channels, ch) + } + } + for ch, status := range peer.Status { + existing.Status[ch] = status + } } } - } - if merge.danteLastSeen.After(keep.danteLastSeen) { - keep.danteLastSeen = merge.danteLastSeen + for _, peer := range merge.DanteFlows.Rx { + var existing *DantePeer + for _, p := range keep.DanteFlows.Rx { + if p.Node == peer.Node { + existing = p + break + } + } + if existing == nil { + keep.DanteFlows.Rx = append(keep.DanteFlows.Rx, peer) + } else { + for _, ch := range peer.Channels { + if !containsString(existing.Channels, ch) { + existing.Channels = append(existing.Channels, ch) + } + } + for ch, status := range peer.Status { + existing.Status[ch] = status + } + } + } + + if merge.DanteFlows.lastSeen.After(keep.DanteFlows.lastSeen) { + keep.DanteFlows.lastSeen = merge.DanteFlows.lastSeen + } } for _, node := range n.nodes { - for _, peer := range node.DanteTx { + if node.DanteFlows == nil { + continue + } + for _, peer := range node.DanteFlows.Tx { if peer.Node == merge { peer.Node = keep } } - for _, peer := range node.DanteRx { + for _, peer := range node.DanteFlows.Rx { if peer.Node == merge { peer.Node = keep } @@ -340,7 +359,7 @@ func (n *Nodes) logDante() { var allNoChannelFlows []string for _, node := range n.nodes { - if len(node.DanteTx) == 0 { + if node.DanteFlows == nil || len(node.DanteFlows.Tx) == 0 { continue } sourceName := node.DisplayName() @@ -348,7 +367,7 @@ func (n *Nodes) logDante() { sourceName = "??" } - for _, peer := range node.DanteTx { + for _, peer := range node.DanteFlows.Tx { subName := peer.Node.DisplayName() if subName == "" { subName = "??" @@ -459,7 +478,7 @@ func (t *Tendrils) queryDanteDeviceWithPort(ip net.IP, port int) *DanteDeviceInf if info.TxChannelCount > 0 { t.queryDanteTxChannels(conn, ip, info.TxChannelCount) - t.nodes.UpdateDanteTxChannels(info.Name, ip, fmt.Sprintf("%d", info.TxChannelCount)) + t.nodes.UpdateDanteTxChannels(info.Name, ip, info.TxChannelCount) } return info diff --git a/nodes.go b/nodes.go index c974ee3..2b49eae 100644 --- a/nodes.go +++ b/nodes.go @@ -487,12 +487,8 @@ func (n *Nodes) SetDanteClockMaster(ip net.IP) { n.mu.Lock() defer n.mu.Unlock() - for _, node := range n.nodes { - node.IsDanteClockMaster = false - } - if node := n.ipIndex[ip.String()]; node != nil { - node.IsDanteClockMaster = true + node.DanteClockMasterSeen = time.Now() } } @@ -509,7 +505,7 @@ func (n *Nodes) logNode(node *Node) { if node.PoEBudget != nil { tags = append(tags, fmt.Sprintf("poe:%.0f/%.0fW", node.PoEBudget.Power, node.PoEBudget.MaxPower)) } - if node.IsDanteClockMaster { + if node.IsDanteClockMaster() { tags = append(tags, "dante-clock-master") } if len(tags) > 0 { diff --git a/static/index.html b/static/index.html index 06efbbd..f6a617d 100644 --- a/static/index.html +++ b/static/index.html @@ -1895,8 +1895,8 @@ nodes.forEach(node => { const nodeId = node.id; - const danteTx = node.dante_tx || []; - const danteRx = node.dante_rx || []; + const danteTx = node.dante_flows?.tx || []; + const danteRx = node.dante_flows?.rx || []; if (danteTx.length === 0 && danteRx.length === 0) return; diff --git a/types.go b/types.go index 4a864ae..7261a80 100644 --- a/types.go +++ b/types.go @@ -378,25 +378,40 @@ type PoEBudget struct { MaxPower float64 `json:"max_power"` } +type DanteFlows struct { + Tx []*DantePeer `json:"tx,omitempty"` + Rx []*DantePeer `json:"rx,omitempty"` + lastSeen time.Time +} + +func (f *DanteFlows) Expire(maxAge time.Duration) bool { + if f.lastSeen.IsZero() { + return false + } + return time.Since(f.lastSeen) > maxAge +} + type Node struct { - ID string `json:"id"` - Names NameSet `json:"names"` - Interfaces InterfaceMap `json:"interfaces"` - MACTable map[string]string `json:"-"` - PoEBudget *PoEBudget `json:"poe_budget,omitempty"` - IsDanteClockMaster bool `json:"is_dante_clock_master,omitempty"` - DanteTxChannels string `json:"dante_tx_channels,omitempty"` - MulticastGroups MulticastMembershipSet `json:"multicast_groups,omitempty"` - ArtNetInputs ArtNetUniverseSet `json:"artnet_inputs,omitempty"` - ArtNetOutputs ArtNetUniverseSet `json:"artnet_outputs,omitempty"` - SACNOutputs SACNUniverseSet `json:"sacn_outputs,omitempty"` - DanteTx []*DantePeer `json:"dante_tx,omitempty"` - DanteRx []*DantePeer `json:"dante_rx,omitempty"` - Unreachable bool `json:"unreachable,omitempty"` - errors *ErrorTracker - pollTrigger chan struct{} - cancelFunc context.CancelFunc - danteLastSeen time.Time + ID string `json:"id"` + Names NameSet `json:"names"` + Interfaces InterfaceMap `json:"interfaces"` + MACTable map[string]string `json:"-"` + PoEBudget *PoEBudget `json:"poe_budget,omitempty"` + DanteTxChannels int `json:"dante_tx_channels,omitempty"` + DanteClockMasterSeen time.Time `json:"-"` + DanteFlows *DanteFlows `json:"dante_flows,omitempty"` + MulticastGroups MulticastMembershipSet `json:"multicast_groups,omitempty"` + ArtNetInputs ArtNetUniverseSet `json:"artnet_inputs,omitempty"` + ArtNetOutputs ArtNetUniverseSet `json:"artnet_outputs,omitempty"` + SACNOutputs SACNUniverseSet `json:"sacn_outputs,omitempty"` + Unreachable bool `json:"unreachable,omitempty"` + errors *ErrorTracker + pollTrigger chan struct{} + cancelFunc context.CancelFunc +} + +func (n *Node) IsDanteClockMaster() bool { + return !n.DanteClockMasterSeen.IsZero() && time.Since(n.DanteClockMasterSeen) < 5*time.Minute } func (n *Node) SetUnreachable(unreachable bool) bool { @@ -507,13 +522,13 @@ func (n *Node) WithInterface(ifaceKey string) *Node { return n } return &Node{ - ID: n.ID, - Names: n.Names, - Interfaces: InterfaceMap{ifaceKey: iface}, - MACTable: n.MACTable, - PoEBudget: n.PoEBudget, - IsDanteClockMaster: n.IsDanteClockMaster, - DanteTxChannels: n.DanteTxChannels, + ID: n.ID, + Names: n.Names, + Interfaces: InterfaceMap{ifaceKey: iface}, + MACTable: n.MACTable, + PoEBudget: n.PoEBudget, + DanteClockMasterSeen: n.DanteClockMasterSeen, + DanteTxChannels: n.DanteTxChannels, } }