Remove separate tracking structures and store protocol data directly on nodes

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Ian Gulliver
2026-01-28 22:15:54 -08:00
parent 41000bd4a0
commit fc5b36cd1c
8 changed files with 484 additions and 709 deletions

435
dante.go
View File

@@ -9,7 +9,6 @@ import (
"net"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
@@ -103,15 +102,10 @@ func (n *Nodes) GetDanteTxDeviceInGroup(groupIP net.IP) *Node {
n.mu.RLock()
defer n.mu.RUnlock()
groupKey := groupIP.String()
gm := n.multicastGroups[groupKey]
if gm == nil {
return nil
}
for _, membership := range gm.Members {
if membership.Node != nil && membership.Node.DanteTxChannels != "" {
return membership.Node
groupName := multicastGroupName(groupIP)
for _, node := range n.nodes {
if node.DanteTxChannels != "" && containsString(node.MulticastGroups, groupName) {
return node
}
}
return nil
@@ -119,43 +113,6 @@ func (n *Nodes) GetDanteTxDeviceInGroup(groupIP net.IP) *Node {
var danteSeqID uint32
type DanteSubscriberMap map[*Node]*DanteFlowSubscriber
func (m DanteSubscriberMap) MarshalJSON() ([]byte, error) {
subs := make([]*DanteFlowSubscriber, 0, len(m))
for _, sub := range m {
subs = append(subs, sub)
}
sort.Slice(subs, func(i, j int) bool {
return sortorder.NaturalLess(subs[i].Node.DisplayName(), subs[j].Node.DisplayName())
})
return json.Marshal(subs)
}
type DanteFlow struct {
TypeID string `json:"typeid"`
Source *Node `json:"source"`
Subscribers DanteSubscriberMap `json:"subscribers"`
}
type DanteFlowSubscriber struct {
Node *Node `json:"node"`
Channels []string `json:"channels,omitempty"`
ChannelStatus map[string]DanteFlowStatus `json:"channel_status,omitempty"`
LastSeen time.Time `json:"last_seen"`
}
type DanteFlows struct {
mu sync.RWMutex
flows map[*Node]*DanteFlow
}
func NewDanteFlows() *DanteFlows {
return &DanteFlows{
flows: map[*Node]*DanteFlow{},
}
}
func containsString(slice []string, val string) bool {
for _, s := range slice {
if s == val {
@@ -165,116 +122,210 @@ func containsString(slice []string, val string) bool {
return false
}
func (d *DanteFlows) Update(source, subscriber *Node, channelInfo string, flowStatus DanteFlowStatus) {
d.mu.Lock()
defer d.mu.Unlock()
type DanteFlowStatus uint8
flow := d.flows[source]
if flow == nil {
flow = &DanteFlow{
TypeID: newTypeID("danteflow"),
Source: source,
Subscribers: DanteSubscriberMap{},
const (
DanteFlowUnsubscribed DanteFlowStatus = 0x00
DanteFlowNoSource DanteFlowStatus = 0x01
DanteFlowActive DanteFlowStatus = 0x09
)
func (s DanteFlowStatus) String() string {
switch s {
case DanteFlowActive:
return "active"
case DanteFlowNoSource:
return "no-source"
default:
return ""
}
}
func (s DanteFlowStatus) MarshalJSON() ([]byte, error) {
return json.Marshal(s.String())
}
type DanteChannelType uint16
const (
DanteChannelUnknown DanteChannelType = 0
DanteChannelAudio DanteChannelType = 0x000f
DanteChannelAudio2 DanteChannelType = 0x0006
DanteChannelVideo DanteChannelType = 0x000e
)
func (t DanteChannelType) String() string {
switch t {
case DanteChannelAudio, DanteChannelAudio2:
return "audio"
case DanteChannelVideo:
return "video"
default:
return ""
}
}
type DanteSubscription struct {
RxChannel int
TxDeviceName string
TxChannelName string
ChannelType DanteChannelType
FlowStatus DanteFlowStatus
}
type DanteDeviceInfo struct {
IP net.IP
Name string
RxChannelCount int
TxChannelCount int
Subscriptions []DanteSubscription
HasMulticast bool
}
func (n *Nodes) UpdateDanteFlow(source, subscriber *Node, channelInfo string, flowStatus DanteFlowStatus) {
n.mu.Lock()
defer n.mu.Unlock()
n.updateDanteTx(source, subscriber, channelInfo, flowStatus)
n.updateDanteRx(subscriber, source, channelInfo, flowStatus)
source.danteLastSeen = time.Now()
subscriber.danteLastSeen = time.Now()
}
func (n *Nodes) updateDanteTx(source, subscriber *Node, channelInfo string, flowStatus DanteFlowStatus) {
var peer *DantePeer
for _, p := range source.DanteTx {
if p.Node == subscriber {
peer = p
break
}
d.flows[source] = flow
}
if peer == nil {
peer = &DantePeer{
Node: subscriber,
Status: map[string]string{},
}
source.DanteTx = append(source.DanteTx, peer)
}
sub := flow.Subscribers[subscriber]
if sub == nil {
sub = &DanteFlowSubscriber{
Node: subscriber,
ChannelStatus: map[string]DanteFlowStatus{},
}
flow.Subscribers[subscriber] = sub
}
if channelInfo != "" && !containsString(sub.Channels, channelInfo) {
sub.Channels = append(sub.Channels, channelInfo)
sort.Strings(sub.Channels)
if channelInfo != "" && !containsString(peer.Channels, channelInfo) {
peer.Channels = append(peer.Channels, channelInfo)
sort.Strings(peer.Channels)
}
if channelInfo != "" {
sub.ChannelStatus[channelInfo] = flowStatus
peer.Status[channelInfo] = flowStatus.String()
}
sub.LastSeen = time.Now()
}
func (d *DanteFlows) ReplaceNode(oldNode, newNode *Node) {
d.mu.Lock()
defer d.mu.Unlock()
if flow, exists := d.flows[oldNode]; exists {
delete(d.flows, oldNode)
if existingFlow, hasNew := d.flows[newNode]; hasNew {
for subNode, sub := range flow.Subscribers {
if existingSub, hasSub := existingFlow.Subscribers[subNode]; hasSub {
for _, ch := range sub.Channels {
if !containsString(existingSub.Channels, ch) {
existingSub.Channels = append(existingSub.Channels, ch)
}
}
} else {
existingFlow.Subscribers[subNode] = sub
}
}
} else {
flow.Source = newNode
d.flows[newNode] = flow
}
}
for _, flow := range d.flows {
if sub, exists := flow.Subscribers[oldNode]; exists {
delete(flow.Subscribers, oldNode)
if existingSub, hasNew := flow.Subscribers[newNode]; hasNew {
for _, ch := range sub.Channels {
if !containsString(existingSub.Channels, ch) {
existingSub.Channels = append(existingSub.Channels, ch)
}
}
} else {
sub.Node = newNode
flow.Subscribers[newNode] = sub
}
}
}
}
func (d *DanteFlows) Expire() {
d.mu.Lock()
defer d.mu.Unlock()
expireTime := time.Now().Add(-5 * time.Minute)
for source, flow := range d.flows {
for subNode, sub := range flow.Subscribers {
if sub.LastSeen.Before(expireTime) {
delete(flow.Subscribers, subNode)
}
}
if len(flow.Subscribers) == 0 {
delete(d.flows, source)
}
}
}
func (d *DanteFlows) LogAll() {
d.Expire()
d.mu.RLock()
defer d.mu.RUnlock()
if len(d.flows) == 0 {
return
}
var flows []*DanteFlow
for _, flow := range d.flows {
flows = append(flows, flow)
}
sort.Slice(flows, func(i, j int) bool {
return sortorder.NaturalLess(flows[i].Source.DisplayName(), flows[j].Source.DisplayName())
sort.Slice(source.DanteTx, func(i, j int) bool {
return sortorder.NaturalLess(source.DanteTx[i].Node.DisplayName(), source.DanteTx[j].Node.DisplayName())
})
}
func (n *Nodes) updateDanteRx(subscriber, source *Node, channelInfo string, flowStatus DanteFlowStatus) {
var peer *DantePeer
for _, p := range subscriber.DanteRx {
if p.Node == source {
peer = p
break
}
}
if peer == nil {
peer = &DantePeer{
Node: source,
Status: map[string]string{},
}
subscriber.DanteRx = append(subscriber.DanteRx, peer)
}
if channelInfo != "" && !containsString(peer.Channels, channelInfo) {
peer.Channels = append(peer.Channels, channelInfo)
sort.Strings(peer.Channels)
}
if channelInfo != "" {
peer.Status[channelInfo] = flowStatus.String()
}
sort.Slice(subscriber.DanteRx, func(i, j int) bool {
return sortorder.NaturalLess(subscriber.DanteRx[i].Node.DisplayName(), subscriber.DanteRx[j].Node.DisplayName())
})
}
func (n *Nodes) expireDante() {
expireTime := time.Now().Add(-5 * time.Minute)
for _, node := range n.nodes {
if !node.danteLastSeen.IsZero() && node.danteLastSeen.Before(expireTime) {
node.DanteTx = nil
node.DanteRx = nil
node.danteLastSeen = time.Time{}
}
}
}
func (n *Nodes) mergeDante(keep, merge *Node) {
for _, peer := range merge.DanteTx {
var existing *DantePeer
for _, p := range keep.DanteTx {
if p.Node == peer.Node {
existing = p
break
}
}
if existing == nil {
keep.DanteTx = append(keep.DanteTx, peer)
} else {
for _, ch := range peer.Channels {
if !containsString(existing.Channels, ch) {
existing.Channels = append(existing.Channels, ch)
}
}
for ch, status := range peer.Status {
existing.Status[ch] = status
}
}
}
for _, peer := range merge.DanteRx {
var existing *DantePeer
for _, p := range keep.DanteRx {
if p.Node == peer.Node {
existing = p
break
}
}
if existing == nil {
keep.DanteRx = append(keep.DanteRx, peer)
} else {
for _, ch := range peer.Channels {
if !containsString(existing.Channels, ch) {
existing.Channels = append(existing.Channels, ch)
}
}
for ch, status := range peer.Status {
existing.Status[ch] = status
}
}
}
if merge.danteLastSeen.After(keep.danteLastSeen) {
keep.danteLastSeen = merge.danteLastSeen
}
for _, node := range n.nodes {
for _, peer := range node.DanteTx {
if peer.Node == merge {
peer.Node = keep
}
}
for _, peer := range node.DanteRx {
if peer.Node == merge {
peer.Node = keep
}
}
}
}
func (n *Nodes) logDante() {
type channelFlow struct {
sourceName string
txCh string
@@ -286,21 +337,24 @@ func (d *DanteFlows) LogAll() {
var allChannelFlows []channelFlow
var allNoChannelFlows []string
for _, flow := range flows {
sourceName := flow.Source.DisplayName()
for _, node := range n.nodes {
if len(node.DanteTx) == 0 {
continue
}
sourceName := node.DisplayName()
if sourceName == "" {
sourceName = "??"
}
for _, sub := range flow.Subscribers {
subName := sub.Node.DisplayName()
for _, peer := range node.DanteTx {
subName := peer.Node.DisplayName()
if subName == "" {
subName = "??"
}
if len(sub.Channels) == 0 {
if len(peer.Channels) == 0 {
allNoChannelFlows = append(allNoChannelFlows, fmt.Sprintf("%s -> %s", sourceName, subName))
} else {
for _, ch := range sub.Channels {
for _, ch := range peer.Channels {
parts := strings.Split(ch, "->")
if len(parts) == 2 {
rxPart := parts[1]
@@ -315,7 +369,7 @@ func (d *DanteFlows) LogAll() {
rxName: subName,
rxCh: rxPart,
channelType: chType,
down: sub.ChannelStatus[ch] == DanteFlowNoSource,
down: peer.Status[ch] == "no-source",
})
} else {
allNoChannelFlows = append(allNoChannelFlows, fmt.Sprintf("%s -> %s[%s]", sourceName, subName, ch))
@@ -326,6 +380,10 @@ func (d *DanteFlows) LogAll() {
}
totalFlows := len(allChannelFlows) + len(allNoChannelFlows)
if totalFlows == 0 {
return
}
log.Printf("[sigusr1] ================ %d dante flows ================", totalFlows)
sort.Slice(allChannelFlows, func(i, j int) bool {
@@ -405,66 +463,6 @@ func (t *Tendrils) queryDanteDeviceWithPort(ip net.IP, port int) *DanteDeviceInf
return info
}
type DanteDeviceInfo struct {
IP net.IP
Name string
RxChannelCount int
TxChannelCount int
Subscriptions []DanteSubscription
HasMulticast bool
}
type DanteChannelType uint16
const (
DanteChannelUnknown DanteChannelType = 0
DanteChannelAudio DanteChannelType = 0x000f
DanteChannelAudio2 DanteChannelType = 0x0006
DanteChannelVideo DanteChannelType = 0x000e
)
func (t DanteChannelType) String() string {
switch t {
case DanteChannelAudio, DanteChannelAudio2:
return "audio"
case DanteChannelVideo:
return "video"
default:
return ""
}
}
type DanteFlowStatus uint8
const (
DanteFlowUnsubscribed DanteFlowStatus = 0x00
DanteFlowNoSource DanteFlowStatus = 0x01
DanteFlowActive DanteFlowStatus = 0x09
)
func (s DanteFlowStatus) String() string {
switch s {
case DanteFlowActive:
return "active"
case DanteFlowNoSource:
return "no-source"
default:
return ""
}
}
func (s DanteFlowStatus) MarshalJSON() ([]byte, error) {
return json.Marshal(s.String())
}
type DanteSubscription struct {
RxChannel int
TxDeviceName string
TxChannelName string
ChannelType DanteChannelType
FlowStatus DanteFlowStatus
}
func buildDantePacket(packetType byte, cmd uint16, args []byte) []byte {
seq := nextDanteSeq()
totalLen := 10 + len(args)
@@ -530,24 +528,19 @@ func (t *Tendrils) sendDanteCommand28(conn *net.UDPConn, ip net.IP, cmd uint16,
}
func (t *Tendrils) queryDanteDeviceName(conn *net.UDPConn, ip net.IP) string {
// 0x1003 returns device info - name position varies by device
resp := t.sendDanteCommand(conn, ip, 0x1003, nil)
if resp == nil || len(resp) < 40 {
return ""
}
// Find the first printable string that looks like a device name
// Look for patterns like "AJA-", "ULXD", etc starting from offset 40
for i := 40; i < len(resp)-4; i++ {
if resp[i] >= 'A' && resp[i] <= 'Z' {
// Found uppercase letter, might be start of name
end := i
for end < len(resp) && resp[end] != 0 && resp[end] >= 0x20 && resp[end] < 0x7f {
end++
}
if end-i >= 4 && end-i < 40 {
name := string(resp[i:end])
// Skip "Audinate" which is the platform name
if name != "Audinate DCM" && !strings.HasPrefix(name, "Audinate") {
if t.DebugDante {
log.Printf("[dante] %s: device name: %q", ip, name)
@@ -873,7 +866,7 @@ func (t *Tendrils) probeDanteDeviceWithPort(ip net.IP, port int) {
}
sourceNode := t.nodes.GetOrCreateByName(txDeviceName)
subscriberNode := t.nodes.GetOrCreateByName(info.Name)
t.danteFlows.Update(sourceNode, subscriberNode, channelInfo, sub.FlowStatus)
t.nodes.UpdateDanteFlow(sourceNode, subscriberNode, channelInfo, sub.FlowStatus)
needIGMPFallback = false
}
}
@@ -893,7 +886,7 @@ func (t *Tendrils) probeDanteDeviceWithPort(ip net.IP, port int) {
sourceNode = t.nodes.GetOrCreateByName(multicastGroupName(groupIP))
}
subscriberNode := t.nodes.GetOrCreateByName(info.Name)
t.danteFlows.Update(sourceNode, subscriberNode, "", DanteFlowActive)
t.nodes.UpdateDanteFlow(sourceNode, subscriberNode, "", DanteFlowActive)
}
}
}