From 111cb51fdb74ed33e182939b57df7bb52cfb49f8 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Fri, 23 Jan 2026 22:24:18 -0800 Subject: [PATCH] update node references in flows and memberships when nodes are merged Co-Authored-By: Claude Opus 4.5 --- artnet.go | 24 +++++++++++++++++++++ dante_control.go | 55 ++++++++++++++++++++++++++++++++++++++++++++++++ nodes.go | 11 ++++++++++ 3 files changed, 90 insertions(+) diff --git a/artnet.go b/artnet.go index dca47a0..ed49d64 100644 --- a/artnet.go +++ b/artnet.go @@ -261,6 +261,30 @@ func containsInt(slice []int, val int) bool { return false } +func (a *ArtNetNodes) ReplaceNode(oldNode, newNode *Node) { + a.mu.Lock() + defer a.mu.Unlock() + + if artNode, exists := a.nodes[oldNode]; exists { + delete(a.nodes, oldNode) + if existing, hasNew := a.nodes[newNode]; hasNew { + for _, u := range artNode.Inputs { + if !containsInt(existing.Inputs, u) { + existing.Inputs = append(existing.Inputs, u) + } + } + for _, u := range artNode.Outputs { + if !containsInt(existing.Outputs, u) { + existing.Outputs = append(existing.Outputs, u) + } + } + } else { + artNode.Node = newNode + a.nodes[newNode] = artNode + } + } +} + func (a *ArtNetNodes) Expire() { a.mu.Lock() defer a.mu.Unlock() diff --git a/dante_control.go b/dante_control.go index d36f98d..24b762d 100644 --- a/dante_control.go +++ b/dante_control.go @@ -80,6 +80,61 @@ func (d *DanteFlows) Update(source, subscriber *Node, channelInfo string) { sub.LastSeen = time.Now() } +func (d *DanteFlows) ReplaceNode(oldNode, newNode *Node) { + d.mu.Lock() + defer d.mu.Unlock() + + if flow, exists := d.flows[oldNode]; exists { + delete(d.flows, oldNode) + if existingFlow, hasNew := d.flows[newNode]; hasNew { + for subNode, sub := range flow.Subscribers { + if existingSub, hasSub := existingFlow.Subscribers[subNode]; hasSub { + for _, ch := range sub.Channels { + hasChannel := false + for _, existingCh := range existingSub.Channels { + if existingCh == ch { + hasChannel = true + break + } + } + if !hasChannel { + existingSub.Channels = append(existingSub.Channels, ch) + } + } + } else { + existingFlow.Subscribers[subNode] = sub + } + } + } else { + flow.Source = newNode + d.flows[newNode] = flow + } + } + + for _, flow := range d.flows { + if sub, exists := flow.Subscribers[oldNode]; exists { + delete(flow.Subscribers, oldNode) + if existingSub, hasNew := flow.Subscribers[newNode]; hasNew { + for _, ch := range sub.Channels { + hasChannel := false + for _, existingCh := range existingSub.Channels { + if existingCh == ch { + hasChannel = true + break + } + } + if !hasChannel { + existingSub.Channels = append(existingSub.Channels, ch) + } + } + } else { + sub.Node = newNode + flow.Subscribers[newNode] = sub + } + } + } +} + func (d *DanteFlows) Expire() { d.mu.Lock() defer d.mu.Unlock() diff --git a/nodes.go b/nodes.go index 0980db8..51862bb 100644 --- a/nodes.go +++ b/nodes.go @@ -552,6 +552,17 @@ func (n *Nodes) mergeNodes(keepID, mergeID int) { keep.MACTable[peerMAC] = ifaceName } + n.t.danteFlows.ReplaceNode(merge, keep) + n.t.artnet.ReplaceNode(merge, keep) + + for _, gm := range n.multicastGroups { + for _, membership := range gm.Members { + if membership.Node == merge { + membership.Node = keep + } + } + } + delete(n.nodes, mergeID) }