Replace Dante channel strings with structured data

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Ian Gulliver
2026-01-28 23:21:33 -08:00
parent b966ad0feb
commit 88763946a4
3 changed files with 122 additions and 122 deletions

160
dante.go
View File

@@ -3,7 +3,6 @@ package tendrils
import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"log"
"net"
@@ -115,58 +114,6 @@ func (n *Nodes) GetDanteTxDeviceInGroup(groupIP net.IP) *Node {
var danteSeqID uint32
func containsString(slice []string, val string) bool {
for _, s := range slice {
if s == val {
return true
}
}
return false
}
type DanteFlowStatus uint8
const (
DanteFlowUnsubscribed DanteFlowStatus = 0x00
DanteFlowNoSource DanteFlowStatus = 0x01
DanteFlowActive DanteFlowStatus = 0x09
)
func (s DanteFlowStatus) String() string {
switch s {
case DanteFlowActive:
return "active"
case DanteFlowNoSource:
return "no-source"
default:
return ""
}
}
func (s DanteFlowStatus) MarshalJSON() ([]byte, error) {
return json.Marshal(s.String())
}
type DanteChannelType uint16
const (
DanteChannelUnknown DanteChannelType = 0
DanteChannelAudio DanteChannelType = 0x000f
DanteChannelAudio2 DanteChannelType = 0x0006
DanteChannelVideo DanteChannelType = 0x000e
)
func (t DanteChannelType) String() string {
switch t {
case DanteChannelAudio, DanteChannelAudio2:
return "audio"
case DanteChannelVideo:
return "video"
default:
return ""
}
}
type DanteSubscription struct {
RxChannel int
TxDeviceName string
@@ -184,13 +131,13 @@ type DanteDeviceInfo struct {
HasMulticast bool
}
func (n *Nodes) UpdateDanteFlow(source, subscriber *Node, channelInfo string, flowStatus DanteFlowStatus) {
func (n *Nodes) UpdateDanteFlow(source, subscriber *Node, channel *DanteChannel) {
n.mu.Lock()
defer n.mu.Unlock()
now := time.Now()
n.updateDanteTx(source, subscriber, channelInfo, flowStatus, now)
n.updateDanteRx(subscriber, source, channelInfo, flowStatus, now)
n.updateDanteTx(source, subscriber, channel, now)
n.updateDanteRx(subscriber, source, channel, now)
}
func (n *Nodes) ensureDanteFlows(node *Node) *DanteFlows {
@@ -200,7 +147,7 @@ func (n *Nodes) ensureDanteFlows(node *Node) *DanteFlows {
return node.DanteFlows
}
func (n *Nodes) updateDanteTx(source, subscriber *Node, channelInfo string, flowStatus DanteFlowStatus, now time.Time) {
func (n *Nodes) updateDanteTx(source, subscriber *Node, channel *DanteChannel, now time.Time) {
flows := n.ensureDanteFlows(source)
flows.lastSeen = now
@@ -213,18 +160,13 @@ func (n *Nodes) updateDanteTx(source, subscriber *Node, channelInfo string, flow
}
if peer == nil {
peer = &DantePeer{
Node: subscriber,
Status: map[string]string{},
Node: subscriber,
}
flows.Tx = append(flows.Tx, peer)
}
if channelInfo != "" && !containsString(peer.Channels, channelInfo) {
peer.Channels = append(peer.Channels, channelInfo)
sort.Strings(peer.Channels)
}
if channelInfo != "" {
peer.Status[channelInfo] = flowStatus.String()
if channel != nil {
peer.Channels = addOrUpdateChannel(peer.Channels, channel)
}
sort.Slice(flows.Tx, func(i, j int) bool {
@@ -232,7 +174,7 @@ func (n *Nodes) updateDanteTx(source, subscriber *Node, channelInfo string, flow
})
}
func (n *Nodes) updateDanteRx(subscriber, source *Node, channelInfo string, flowStatus DanteFlowStatus, now time.Time) {
func (n *Nodes) updateDanteRx(subscriber, source *Node, channel *DanteChannel, now time.Time) {
flows := n.ensureDanteFlows(subscriber)
flows.lastSeen = now
@@ -245,18 +187,13 @@ func (n *Nodes) updateDanteRx(subscriber, source *Node, channelInfo string, flow
}
if peer == nil {
peer = &DantePeer{
Node: source,
Status: map[string]string{},
Node: source,
}
flows.Rx = append(flows.Rx, peer)
}
if channelInfo != "" && !containsString(peer.Channels, channelInfo) {
peer.Channels = append(peer.Channels, channelInfo)
sort.Strings(peer.Channels)
}
if channelInfo != "" {
peer.Status[channelInfo] = flowStatus.String()
if channel != nil {
peer.Channels = addOrUpdateChannel(peer.Channels, channel)
}
sort.Slice(flows.Rx, func(i, j int) bool {
@@ -264,6 +201,24 @@ func (n *Nodes) updateDanteRx(subscriber, source *Node, channelInfo string, flow
})
}
func addOrUpdateChannel(channels []*DanteChannel, channel *DanteChannel) []*DanteChannel {
for _, ch := range channels {
if ch.TxChannel == channel.TxChannel && ch.RxChannel == channel.RxChannel {
ch.Type = channel.Type
ch.Status = channel.Status
return channels
}
}
channels = append(channels, channel)
sort.Slice(channels, func(i, j int) bool {
if channels[i].RxChannel != channels[j].RxChannel {
return channels[i].RxChannel < channels[j].RxChannel
}
return channels[i].TxChannel < channels[j].TxChannel
})
return channels
}
func (n *Nodes) expireDante() {
for _, node := range n.nodes {
if node.DanteFlows != nil && node.DanteFlows.Expire(5*time.Minute) {
@@ -292,12 +247,7 @@ func (n *Nodes) mergeDante(keep, merge *Node) {
keep.DanteFlows.Tx = append(keep.DanteFlows.Tx, peer)
} else {
for _, ch := range peer.Channels {
if !containsString(existing.Channels, ch) {
existing.Channels = append(existing.Channels, ch)
}
}
for ch, status := range peer.Status {
existing.Status[ch] = status
existing.Channels = addOrUpdateChannel(existing.Channels, ch)
}
}
}
@@ -314,12 +264,7 @@ func (n *Nodes) mergeDante(keep, merge *Node) {
keep.DanteFlows.Rx = append(keep.DanteFlows.Rx, peer)
} else {
for _, ch := range peer.Channels {
if !containsString(existing.Channels, ch) {
existing.Channels = append(existing.Channels, ch)
}
}
for ch, status := range peer.Status {
existing.Status[ch] = status
existing.Channels = addOrUpdateChannel(existing.Channels, ch)
}
}
}
@@ -376,25 +321,14 @@ func (n *Nodes) logDante() {
allNoChannelFlows = append(allNoChannelFlows, fmt.Sprintf("%s -> %s", sourceName, subName))
} else {
for _, ch := range peer.Channels {
parts := strings.Split(ch, "->")
if len(parts) == 2 {
rxPart := parts[1]
chType := ""
if idx := strings.LastIndex(rxPart, ":"); idx != -1 {
chType = rxPart[idx+1:]
rxPart = rxPart[:idx]
}
allChannelFlows = append(allChannelFlows, channelFlow{
sourceName: sourceName,
txCh: parts[0],
rxName: subName,
rxCh: rxPart,
channelType: chType,
down: peer.Status[ch] == "no-source",
})
} else {
allNoChannelFlows = append(allNoChannelFlows, fmt.Sprintf("%s -> %s[%s]", sourceName, subName, ch))
}
allChannelFlows = append(allChannelFlows, channelFlow{
sourceName: sourceName,
txCh: ch.TxChannel,
rxName: subName,
rxCh: fmt.Sprintf("%02d", ch.RxChannel),
channelType: ch.Type.String(),
down: ch.Status == DanteFlowNoSource,
})
}
}
}
@@ -876,18 +810,18 @@ func (t *Tendrils) probeDanteDeviceWithPort(ip net.IP, port int) {
if txDeviceName == "." {
txDeviceName = info.Name
}
channelInfo := ""
var channel *DanteChannel
if sub.TxChannelName != "" {
typeStr := sub.ChannelType.String()
if typeStr != "" {
channelInfo = fmt.Sprintf("%s → %02d [%s]", sub.TxChannelName, sub.RxChannel, typeStr)
} else {
channelInfo = fmt.Sprintf("%s → %02d", sub.TxChannelName, sub.RxChannel)
channel = &DanteChannel{
TxChannel: sub.TxChannelName,
RxChannel: sub.RxChannel,
Type: sub.ChannelType,
Status: sub.FlowStatus,
}
}
sourceNode := t.nodes.GetOrCreateByName(txDeviceName)
subscriberNode := t.nodes.GetOrCreateByName(info.Name)
t.nodes.UpdateDanteFlow(sourceNode, subscriberNode, channelInfo, sub.FlowStatus)
t.nodes.UpdateDanteFlow(sourceNode, subscriberNode, channel)
needIGMPFallback = false
}
}
@@ -907,7 +841,7 @@ func (t *Tendrils) probeDanteDeviceWithPort(ip net.IP, port int) {
sourceNode = t.nodes.GetOrCreateByName(ParseMulticastGroup(groupIP).String())
}
subscriberNode := t.nodes.GetOrCreateByName(info.Name)
t.nodes.UpdateDanteFlow(sourceNode, subscriberNode, "", DanteFlowActive)
t.nodes.UpdateDanteFlow(sourceNode, subscriberNode, nil)
}
}
}