Add flow status parsing and DOWN indicator for inactive dante flows

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Ian Gulliver
2026-01-24 10:28:10 -08:00
parent 894e355f56
commit 168cdedbcb
2 changed files with 61 additions and 15 deletions

View File

@@ -126,6 +126,7 @@ type DanteFlow struct {
type DanteFlowSubscriber struct { type DanteFlowSubscriber struct {
Node *Node Node *Node
Channels []string Channels []string
ChannelStatus map[string]DanteFlowStatus
LastSeen time.Time LastSeen time.Time
} }
@@ -149,7 +150,7 @@ func containsString(slice []string, val string) bool {
return false return false
} }
func (d *DanteFlows) Update(source, subscriber *Node, channelInfo string) { func (d *DanteFlows) Update(source, subscriber *Node, channelInfo string, flowStatus DanteFlowStatus) {
d.mu.Lock() d.mu.Lock()
defer d.mu.Unlock() defer d.mu.Unlock()
@@ -166,6 +167,7 @@ func (d *DanteFlows) Update(source, subscriber *Node, channelInfo string) {
if sub == nil { if sub == nil {
sub = &DanteFlowSubscriber{ sub = &DanteFlowSubscriber{
Node: subscriber, Node: subscriber,
ChannelStatus: map[string]DanteFlowStatus{},
} }
flow.Subscribers[subscriber] = sub flow.Subscribers[subscriber] = sub
} }
@@ -174,6 +176,9 @@ func (d *DanteFlows) Update(source, subscriber *Node, channelInfo string) {
sub.Channels = append(sub.Channels, channelInfo) sub.Channels = append(sub.Channels, channelInfo)
sort.Strings(sub.Channels) sort.Strings(sub.Channels)
} }
if channelInfo != "" {
sub.ChannelStatus[channelInfo] = flowStatus
}
sub.LastSeen = time.Now() sub.LastSeen = time.Now()
} }
@@ -260,6 +265,7 @@ func (d *DanteFlows) LogAll() {
rxName string rxName string
rxCh string rxCh string
channelType string channelType string
down bool
} }
var allChannelFlows []channelFlow var allChannelFlows []channelFlow
var allNoChannelFlows []string var allNoChannelFlows []string
@@ -293,6 +299,7 @@ func (d *DanteFlows) LogAll() {
rxName: subName, rxName: subName,
rxCh: rxPart, rxCh: rxPart,
channelType: chType, channelType: chType,
down: sub.ChannelStatus[ch] == DanteFlowNoSource,
}) })
} else { } else {
allNoChannelFlows = append(allNoChannelFlows, fmt.Sprintf("%s -> %s[%s]", sourceName, subName, ch)) allNoChannelFlows = append(allNoChannelFlows, fmt.Sprintf("%s -> %s[%s]", sourceName, subName, ch))
@@ -317,10 +324,14 @@ func (d *DanteFlows) LogAll() {
sort.Strings(allNoChannelFlows) sort.Strings(allNoChannelFlows)
for _, cf := range allChannelFlows { for _, cf := range allChannelFlows {
suffix := ""
if cf.down {
suffix = " DOWN"
}
if cf.channelType != "" { if cf.channelType != "" {
log.Printf("[sigusr1] %s[%s] -> %s[%s] (%s)", cf.sourceName, cf.txCh, cf.rxName, cf.rxCh, cf.channelType) log.Printf("[sigusr1] %s[%s] -> %s[%s] (%s)%s", cf.sourceName, cf.txCh, cf.rxName, cf.rxCh, cf.channelType, suffix)
} else { } else {
log.Printf("[sigusr1] %s[%s] -> %s[%s]", cf.sourceName, cf.txCh, cf.rxName, cf.rxCh) log.Printf("[sigusr1] %s[%s] -> %s[%s]%s", cf.sourceName, cf.txCh, cf.rxName, cf.rxCh, suffix)
} }
} }
for _, flow := range allNoChannelFlows { for _, flow := range allNoChannelFlows {
@@ -407,11 +418,31 @@ func (t DanteChannelType) String() string {
} }
} }
type DanteFlowStatus uint8
const (
DanteFlowUnsubscribed DanteFlowStatus = 0x00
DanteFlowNoSource DanteFlowStatus = 0x01
DanteFlowActive DanteFlowStatus = 0x09
)
func (s DanteFlowStatus) String() string {
switch s {
case DanteFlowActive:
return "active"
case DanteFlowNoSource:
return "no-source"
default:
return ""
}
}
type DanteSubscription struct { type DanteSubscription struct {
RxChannel int RxChannel int
TxDeviceName string TxDeviceName string
TxChannelName string TxChannelName string
ChannelType DanteChannelType ChannelType DanteChannelType
FlowStatus DanteFlowStatus
} }
func buildDantePacket(packetType byte, cmd uint16, args []byte) []byte { func buildDantePacket(packetType byte, cmd uint16, args []byte) []byte {
@@ -714,12 +745,13 @@ func (t *Tendrils) queryDanteSubscriptions3400(conn *net.UDPConn, ip net.IP, rxC
} }
var channelType DanteChannelType var channelType DanteChannelType
var flowStatus DanteFlowStatus
var txChOffset, txDevOffset int var txChOffset, txDevOffset int
marker := binary.BigEndian.Uint16(resp[rawOffset : rawOffset+2]) marker := binary.BigEndian.Uint16(resp[rawOffset : rawOffset+2])
if marker == 0x141c { if marker == 0x141c {
if rawOffset+48 > len(resp) { if rawOffset+50 > len(resp) {
log.Printf("[ERROR] [dante] %s: 0x3400 record %d at 0x%04x: 0x141c record truncated (need %d, have %d)", ip, i, rawOffset, rawOffset+48, len(resp)) log.Printf("[ERROR] [dante] %s: 0x3400 record %d at 0x%04x: 0x141c record truncated (need %d, have %d)", ip, i, rawOffset, rawOffset+50, len(resp))
continue continue
} }
channelType = DanteChannelType(binary.BigEndian.Uint16(resp[rawOffset+14 : rawOffset+16])) channelType = DanteChannelType(binary.BigEndian.Uint16(resp[rawOffset+14 : rawOffset+16]))
@@ -728,14 +760,16 @@ func (t *Tendrils) queryDanteSubscriptions3400(conn *net.UDPConn, ip net.IP, rxC
} }
txChOffset = int(binary.BigEndian.Uint16(resp[rawOffset+44 : rawOffset+46])) txChOffset = int(binary.BigEndian.Uint16(resp[rawOffset+44 : rawOffset+46]))
txDevOffset = int(binary.BigEndian.Uint16(resp[rawOffset+46 : rawOffset+48])) txDevOffset = int(binary.BigEndian.Uint16(resp[rawOffset+46 : rawOffset+48]))
flowStatus = DanteFlowStatus(resp[rawOffset+49])
} else if marker == 0x141a { } else if marker == 0x141a {
if rawOffset+48 > len(resp) { if rawOffset+50 > len(resp) {
log.Printf("[ERROR] [dante] %s: 0x3400 record %d at 0x%04x: 0x141a record truncated", ip, i, rawOffset) log.Printf("[ERROR] [dante] %s: 0x3400 record %d at 0x%04x: 0x141a record truncated", ip, i, rawOffset)
continue continue
} }
channelType = DanteChannelVideo channelType = DanteChannelVideo
txChOffset = int(binary.BigEndian.Uint16(resp[rawOffset+44 : rawOffset+46])) txChOffset = int(binary.BigEndian.Uint16(resp[rawOffset+44 : rawOffset+46]))
txDevOffset = int(binary.BigEndian.Uint16(resp[rawOffset+46 : rawOffset+48])) txDevOffset = int(binary.BigEndian.Uint16(resp[rawOffset+46 : rawOffset+48]))
flowStatus = DanteFlowStatus(resp[rawOffset+49])
} else { } else {
log.Printf("[ERROR] [dante] %s: 0x3400 record %d at 0x%04x: unknown marker 0x%04x (bytes: %x)", ip, i, rawOffset, marker, resp[rawOffset:rawOffset+8]) log.Printf("[ERROR] [dante] %s: 0x3400 record %d at 0x%04x: unknown marker 0x%04x (bytes: %x)", ip, i, rawOffset, marker, resp[rawOffset:rawOffset+8])
continue continue
@@ -759,7 +793,7 @@ func (t *Tendrils) queryDanteSubscriptions3400(conn *net.UDPConn, ip net.IP, rxC
rxChannel := startChannel + i rxChannel := startChannel + i
if t.DebugDante { if t.DebugDante {
log.Printf("[dante] %s: 0x3400 sub: rx=%d txDev=%q txCh=%q type=%s", ip, rxChannel, txDeviceName, txChannelName, channelType) log.Printf("[dante] %s: 0x3400 sub: rx=%d txDev=%q txCh=%q type=%s status=%s", ip, rxChannel, txDeviceName, txChannelName, channelType, flowStatus)
} }
subscriptions = append(subscriptions, DanteSubscription{ subscriptions = append(subscriptions, DanteSubscription{
@@ -767,6 +801,7 @@ func (t *Tendrils) queryDanteSubscriptions3400(conn *net.UDPConn, ip net.IP, rxC
TxDeviceName: txDeviceName, TxDeviceName: txDeviceName,
TxChannelName: txChannelName, TxChannelName: txChannelName,
ChannelType: channelType, ChannelType: channelType,
FlowStatus: flowStatus,
}) })
} }
@@ -818,7 +853,7 @@ func (t *Tendrils) probeDanteDeviceWithPort(ip net.IP, port int) {
} }
sourceNode := t.nodes.GetOrCreateByName(txDeviceName) sourceNode := t.nodes.GetOrCreateByName(txDeviceName)
subscriberNode := t.nodes.GetOrCreateByName(info.Name) subscriberNode := t.nodes.GetOrCreateByName(info.Name)
t.danteFlows.Update(sourceNode, subscriberNode, channelInfo) t.danteFlows.Update(sourceNode, subscriberNode, channelInfo, sub.FlowStatus)
needIGMPFallback = false needIGMPFallback = false
} }
} }
@@ -835,7 +870,7 @@ func (t *Tendrils) probeDanteDeviceWithPort(ip net.IP, port int) {
} }
sourceNode := t.nodes.GetOrCreateByName(sourceName) sourceNode := t.nodes.GetOrCreateByName(sourceName)
subscriberNode := t.nodes.GetOrCreateByName(info.Name) subscriberNode := t.nodes.GetOrCreateByName(info.Name)
t.danteFlows.Update(sourceNode, subscriberNode, "") t.danteFlows.Update(sourceNode, subscriberNode, "", DanteFlowActive)
} }
} }
} }

View File

@@ -88,12 +88,18 @@ record+0x0e: u16 channel_type # 0x000f=audio, 0x000e=video
... ...
record+0x2c: u16 tx_ch_offset # absolute offset to TX channel name string record+0x2c: u16 tx_ch_offset # absolute offset to TX channel name string
record+0x2e: u16 tx_dev_offset # absolute offset to TX device name string record+0x2e: u16 tx_dev_offset # absolute offset to TX device name string
record+0x31: u8 flow_status # 0x00=unsubscribed, 0x01=no active source, 0x09=active
``` ```
**Channel type (at record+0x0e):** **Channel type (at record+0x0e):**
- `0x000f` = audio channel - `0x000f` = audio channel
- `0x000e` = video channel - `0x000e` = video channel
**Flow status (at record+0x31):**
- `0x00` = unsubscribed
- `0x01` = subscribed but source not available (no active source)
- `0x09` = subscribed and receiving (active flow)
#### Format 2: 0x141a records (special channels: Video, Serial, USB) #### Format 2: 0x141a records (special channels: Video, Serial, USB)
``` ```
@@ -110,10 +116,15 @@ These are video channels (Dante AV "Video" aggregate channel).
Both 0x141c and 0x141a records use the same offsets for tx_ch and tx_dev (+44 and +46). Both 0x141c and 0x141a records use the same offsets for tx_ch and tx_dev (+44 and +46).
**Subscription status:** **Subscription presence:**
- Both offsets non-zero = subscribed - Both offsets non-zero = subscribed to a source
- Both offsets zero = unsubscribed - Both offsets zero = unsubscribed
**Flow status (record+0x31):**
- `0x00` = unsubscribed (no subscription configured)
- `0x01` = subscribed but not receiving (source device offline or unavailable)
- `0x09` = subscribed and actively receiving audio/video
### String Table ### String Table
Null-terminated strings referenced by absolute offset from packet start. Null-terminated strings referenced by absolute offset from packet start.