Refactor Dante fields to use proper types and group flows with lastSeen

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Ian Gulliver
2026-01-28 23:15:24 -08:00
parent 99083ecde5
commit b966ad0feb
4 changed files with 129 additions and 99 deletions

151
dante.go
View File

@@ -87,7 +87,7 @@ func (t *Tendrils) handlePTPPacket(ifaceName string, srcIP net.IP, data []byte)
t.nodes.SetDanteClockMaster(srcIP)
}
func (n *Nodes) UpdateDanteTxChannels(name string, ip net.IP, channels string) {
func (n *Nodes) UpdateDanteTxChannels(name string, ip net.IP, channels int) {
n.mu.Lock()
defer n.mu.Unlock()
@@ -104,7 +104,7 @@ func (n *Nodes) GetDanteTxDeviceInGroup(groupIP net.IP) *Node {
group := ParseMulticastGroup(groupIP)
for _, node := range n.nodes {
if node.DanteTxChannels != "" && node.MulticastGroups != nil {
if node.DanteTxChannels > 0 && node.MulticastGroups != nil {
if _, exists := node.MulticastGroups[group]; exists {
return node
}
@@ -188,16 +188,24 @@ func (n *Nodes) UpdateDanteFlow(source, subscriber *Node, channelInfo string, fl
n.mu.Lock()
defer n.mu.Unlock()
n.updateDanteTx(source, subscriber, channelInfo, flowStatus)
n.updateDanteRx(subscriber, source, channelInfo, flowStatus)
source.danteLastSeen = time.Now()
subscriber.danteLastSeen = time.Now()
now := time.Now()
n.updateDanteTx(source, subscriber, channelInfo, flowStatus, now)
n.updateDanteRx(subscriber, source, channelInfo, flowStatus, now)
}
func (n *Nodes) updateDanteTx(source, subscriber *Node, channelInfo string, flowStatus DanteFlowStatus) {
func (n *Nodes) ensureDanteFlows(node *Node) *DanteFlows {
if node.DanteFlows == nil {
node.DanteFlows = &DanteFlows{}
}
return node.DanteFlows
}
func (n *Nodes) updateDanteTx(source, subscriber *Node, channelInfo string, flowStatus DanteFlowStatus, now time.Time) {
flows := n.ensureDanteFlows(source)
flows.lastSeen = now
var peer *DantePeer
for _, p := range source.DanteTx {
for _, p := range flows.Tx {
if p.Node == subscriber {
peer = p
break
@@ -208,7 +216,7 @@ func (n *Nodes) updateDanteTx(source, subscriber *Node, channelInfo string, flow
Node: subscriber,
Status: map[string]string{},
}
source.DanteTx = append(source.DanteTx, peer)
flows.Tx = append(flows.Tx, peer)
}
if channelInfo != "" && !containsString(peer.Channels, channelInfo) {
@@ -219,14 +227,17 @@ func (n *Nodes) updateDanteTx(source, subscriber *Node, channelInfo string, flow
peer.Status[channelInfo] = flowStatus.String()
}
sort.Slice(source.DanteTx, func(i, j int) bool {
return sortorder.NaturalLess(source.DanteTx[i].Node.DisplayName(), source.DanteTx[j].Node.DisplayName())
sort.Slice(flows.Tx, func(i, j int) bool {
return sortorder.NaturalLess(flows.Tx[i].Node.DisplayName(), flows.Tx[j].Node.DisplayName())
})
}
func (n *Nodes) updateDanteRx(subscriber, source *Node, channelInfo string, flowStatus DanteFlowStatus) {
func (n *Nodes) updateDanteRx(subscriber, source *Node, channelInfo string, flowStatus DanteFlowStatus, now time.Time) {
flows := n.ensureDanteFlows(subscriber)
flows.lastSeen = now
var peer *DantePeer
for _, p := range subscriber.DanteRx {
for _, p := range flows.Rx {
if p.Node == source {
peer = p
break
@@ -237,7 +248,7 @@ func (n *Nodes) updateDanteRx(subscriber, source *Node, channelInfo string, flow
Node: source,
Status: map[string]string{},
}
subscriber.DanteRx = append(subscriber.DanteRx, peer)
flows.Rx = append(flows.Rx, peer)
}
if channelInfo != "" && !containsString(peer.Channels, channelInfo) {
@@ -248,78 +259,86 @@ func (n *Nodes) updateDanteRx(subscriber, source *Node, channelInfo string, flow
peer.Status[channelInfo] = flowStatus.String()
}
sort.Slice(subscriber.DanteRx, func(i, j int) bool {
return sortorder.NaturalLess(subscriber.DanteRx[i].Node.DisplayName(), subscriber.DanteRx[j].Node.DisplayName())
sort.Slice(flows.Rx, func(i, j int) bool {
return sortorder.NaturalLess(flows.Rx[i].Node.DisplayName(), flows.Rx[j].Node.DisplayName())
})
}
func (n *Nodes) expireDante() {
expireTime := time.Now().Add(-5 * time.Minute)
for _, node := range n.nodes {
if !node.danteLastSeen.IsZero() && node.danteLastSeen.Before(expireTime) {
node.DanteTx = nil
node.DanteRx = nil
node.danteLastSeen = time.Time{}
if node.DanteFlows != nil && node.DanteFlows.Expire(5*time.Minute) {
node.DanteFlows = nil
}
}
}
func (n *Nodes) mergeDante(keep, merge *Node) {
for _, peer := range merge.DanteTx {
var existing *DantePeer
for _, p := range keep.DanteTx {
if p.Node == peer.Node {
existing = p
break
}
}
if existing == nil {
keep.DanteTx = append(keep.DanteTx, peer)
} else {
for _, ch := range peer.Channels {
if !containsString(existing.Channels, ch) {
existing.Channels = append(existing.Channels, ch)
}
}
for ch, status := range peer.Status {
existing.Status[ch] = status
}
}
if merge.DanteFlows == nil {
return
}
for _, peer := range merge.DanteRx {
var existing *DantePeer
for _, p := range keep.DanteRx {
if p.Node == peer.Node {
existing = p
break
}
}
if existing == nil {
keep.DanteRx = append(keep.DanteRx, peer)
} else {
for _, ch := range peer.Channels {
if !containsString(existing.Channels, ch) {
existing.Channels = append(existing.Channels, ch)
if keep.DanteFlows == nil {
keep.DanteFlows = merge.DanteFlows
} else {
for _, peer := range merge.DanteFlows.Tx {
var existing *DantePeer
for _, p := range keep.DanteFlows.Tx {
if p.Node == peer.Node {
existing = p
break
}
}
for ch, status := range peer.Status {
existing.Status[ch] = status
if existing == nil {
keep.DanteFlows.Tx = append(keep.DanteFlows.Tx, peer)
} else {
for _, ch := range peer.Channels {
if !containsString(existing.Channels, ch) {
existing.Channels = append(existing.Channels, ch)
}
}
for ch, status := range peer.Status {
existing.Status[ch] = status
}
}
}
}
if merge.danteLastSeen.After(keep.danteLastSeen) {
keep.danteLastSeen = merge.danteLastSeen
for _, peer := range merge.DanteFlows.Rx {
var existing *DantePeer
for _, p := range keep.DanteFlows.Rx {
if p.Node == peer.Node {
existing = p
break
}
}
if existing == nil {
keep.DanteFlows.Rx = append(keep.DanteFlows.Rx, peer)
} else {
for _, ch := range peer.Channels {
if !containsString(existing.Channels, ch) {
existing.Channels = append(existing.Channels, ch)
}
}
for ch, status := range peer.Status {
existing.Status[ch] = status
}
}
}
if merge.DanteFlows.lastSeen.After(keep.DanteFlows.lastSeen) {
keep.DanteFlows.lastSeen = merge.DanteFlows.lastSeen
}
}
for _, node := range n.nodes {
for _, peer := range node.DanteTx {
if node.DanteFlows == nil {
continue
}
for _, peer := range node.DanteFlows.Tx {
if peer.Node == merge {
peer.Node = keep
}
}
for _, peer := range node.DanteRx {
for _, peer := range node.DanteFlows.Rx {
if peer.Node == merge {
peer.Node = keep
}
@@ -340,7 +359,7 @@ func (n *Nodes) logDante() {
var allNoChannelFlows []string
for _, node := range n.nodes {
if len(node.DanteTx) == 0 {
if node.DanteFlows == nil || len(node.DanteFlows.Tx) == 0 {
continue
}
sourceName := node.DisplayName()
@@ -348,7 +367,7 @@ func (n *Nodes) logDante() {
sourceName = "??"
}
for _, peer := range node.DanteTx {
for _, peer := range node.DanteFlows.Tx {
subName := peer.Node.DisplayName()
if subName == "" {
subName = "??"
@@ -459,7 +478,7 @@ func (t *Tendrils) queryDanteDeviceWithPort(ip net.IP, port int) *DanteDeviceInf
if info.TxChannelCount > 0 {
t.queryDanteTxChannels(conn, ip, info.TxChannelCount)
t.nodes.UpdateDanteTxChannels(info.Name, ip, fmt.Sprintf("%d", info.TxChannelCount))
t.nodes.UpdateDanteTxChannels(info.Name, ip, info.TxChannelCount)
}
return info

View File

@@ -487,12 +487,8 @@ func (n *Nodes) SetDanteClockMaster(ip net.IP) {
n.mu.Lock()
defer n.mu.Unlock()
for _, node := range n.nodes {
node.IsDanteClockMaster = false
}
if node := n.ipIndex[ip.String()]; node != nil {
node.IsDanteClockMaster = true
node.DanteClockMasterSeen = time.Now()
}
}
@@ -509,7 +505,7 @@ func (n *Nodes) logNode(node *Node) {
if node.PoEBudget != nil {
tags = append(tags, fmt.Sprintf("poe:%.0f/%.0fW", node.PoEBudget.Power, node.PoEBudget.MaxPower))
}
if node.IsDanteClockMaster {
if node.IsDanteClockMaster() {
tags = append(tags, "dante-clock-master")
}
if len(tags) > 0 {

View File

@@ -1895,8 +1895,8 @@
nodes.forEach(node => {
const nodeId = node.id;
const danteTx = node.dante_tx || [];
const danteRx = node.dante_rx || [];
const danteTx = node.dante_flows?.tx || [];
const danteRx = node.dante_flows?.rx || [];
if (danteTx.length === 0 && danteRx.length === 0) return;

View File

@@ -378,25 +378,40 @@ type PoEBudget struct {
MaxPower float64 `json:"max_power"`
}
type DanteFlows struct {
Tx []*DantePeer `json:"tx,omitempty"`
Rx []*DantePeer `json:"rx,omitempty"`
lastSeen time.Time
}
func (f *DanteFlows) Expire(maxAge time.Duration) bool {
if f.lastSeen.IsZero() {
return false
}
return time.Since(f.lastSeen) > maxAge
}
type Node struct {
ID string `json:"id"`
Names NameSet `json:"names"`
Interfaces InterfaceMap `json:"interfaces"`
MACTable map[string]string `json:"-"`
PoEBudget *PoEBudget `json:"poe_budget,omitempty"`
IsDanteClockMaster bool `json:"is_dante_clock_master,omitempty"`
DanteTxChannels string `json:"dante_tx_channels,omitempty"`
MulticastGroups MulticastMembershipSet `json:"multicast_groups,omitempty"`
ArtNetInputs ArtNetUniverseSet `json:"artnet_inputs,omitempty"`
ArtNetOutputs ArtNetUniverseSet `json:"artnet_outputs,omitempty"`
SACNOutputs SACNUniverseSet `json:"sacn_outputs,omitempty"`
DanteTx []*DantePeer `json:"dante_tx,omitempty"`
DanteRx []*DantePeer `json:"dante_rx,omitempty"`
Unreachable bool `json:"unreachable,omitempty"`
errors *ErrorTracker
pollTrigger chan struct{}
cancelFunc context.CancelFunc
danteLastSeen time.Time
ID string `json:"id"`
Names NameSet `json:"names"`
Interfaces InterfaceMap `json:"interfaces"`
MACTable map[string]string `json:"-"`
PoEBudget *PoEBudget `json:"poe_budget,omitempty"`
DanteTxChannels int `json:"dante_tx_channels,omitempty"`
DanteClockMasterSeen time.Time `json:"-"`
DanteFlows *DanteFlows `json:"dante_flows,omitempty"`
MulticastGroups MulticastMembershipSet `json:"multicast_groups,omitempty"`
ArtNetInputs ArtNetUniverseSet `json:"artnet_inputs,omitempty"`
ArtNetOutputs ArtNetUniverseSet `json:"artnet_outputs,omitempty"`
SACNOutputs SACNUniverseSet `json:"sacn_outputs,omitempty"`
Unreachable bool `json:"unreachable,omitempty"`
errors *ErrorTracker
pollTrigger chan struct{}
cancelFunc context.CancelFunc
}
func (n *Node) IsDanteClockMaster() bool {
return !n.DanteClockMasterSeen.IsZero() && time.Since(n.DanteClockMasterSeen) < 5*time.Minute
}
func (n *Node) SetUnreachable(unreachable bool) bool {
@@ -507,13 +522,13 @@ func (n *Node) WithInterface(ifaceKey string) *Node {
return n
}
return &Node{
ID: n.ID,
Names: n.Names,
Interfaces: InterfaceMap{ifaceKey: iface},
MACTable: n.MACTable,
PoEBudget: n.PoEBudget,
IsDanteClockMaster: n.IsDanteClockMaster,
DanteTxChannels: n.DanteTxChannels,
ID: n.ID,
Names: n.Names,
Interfaces: InterfaceMap{ifaceKey: iface},
MACTable: n.MACTable,
PoEBudget: n.PoEBudget,
DanteClockMasterSeen: n.DanteClockMasterSeen,
DanteTxChannels: n.DanteTxChannels,
}
}