update node references in flows and memberships when nodes are merged

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Ian Gulliver
2026-01-23 22:24:18 -08:00
parent 528f58dca1
commit 111cb51fdb
3 changed files with 90 additions and 0 deletions

View File

@@ -80,6 +80,61 @@ func (d *DanteFlows) Update(source, subscriber *Node, channelInfo string) {
sub.LastSeen = time.Now()
}
func (d *DanteFlows) ReplaceNode(oldNode, newNode *Node) {
d.mu.Lock()
defer d.mu.Unlock()
if flow, exists := d.flows[oldNode]; exists {
delete(d.flows, oldNode)
if existingFlow, hasNew := d.flows[newNode]; hasNew {
for subNode, sub := range flow.Subscribers {
if existingSub, hasSub := existingFlow.Subscribers[subNode]; hasSub {
for _, ch := range sub.Channels {
hasChannel := false
for _, existingCh := range existingSub.Channels {
if existingCh == ch {
hasChannel = true
break
}
}
if !hasChannel {
existingSub.Channels = append(existingSub.Channels, ch)
}
}
} else {
existingFlow.Subscribers[subNode] = sub
}
}
} else {
flow.Source = newNode
d.flows[newNode] = flow
}
}
for _, flow := range d.flows {
if sub, exists := flow.Subscribers[oldNode]; exists {
delete(flow.Subscribers, oldNode)
if existingSub, hasNew := flow.Subscribers[newNode]; hasNew {
for _, ch := range sub.Channels {
hasChannel := false
for _, existingCh := range existingSub.Channels {
if existingCh == ch {
hasChannel = true
break
}
}
if !hasChannel {
existingSub.Channels = append(existingSub.Channels, ch)
}
}
} else {
sub.Node = newNode
flow.Subscribers[newNode] = sub
}
}
}
}
func (d *DanteFlows) Expire() {
d.mu.Lock()
defer d.mu.Unlock()