diff --git a/dante.go b/dante.go index afa423c..136e02c 100644 --- a/dante.go +++ b/dante.go @@ -124,9 +124,10 @@ type DanteFlow struct { } type DanteFlowSubscriber struct { - Node *Node - Channels []string - LastSeen time.Time + Node *Node + Channels []string + ChannelStatus map[string]DanteFlowStatus + LastSeen time.Time } type DanteFlows struct { @@ -149,7 +150,7 @@ func containsString(slice []string, val string) bool { return false } -func (d *DanteFlows) Update(source, subscriber *Node, channelInfo string) { +func (d *DanteFlows) Update(source, subscriber *Node, channelInfo string, flowStatus DanteFlowStatus) { d.mu.Lock() defer d.mu.Unlock() @@ -165,7 +166,8 @@ func (d *DanteFlows) Update(source, subscriber *Node, channelInfo string) { sub := flow.Subscribers[subscriber] if sub == nil { sub = &DanteFlowSubscriber{ - Node: subscriber, + Node: subscriber, + ChannelStatus: map[string]DanteFlowStatus{}, } flow.Subscribers[subscriber] = sub } @@ -174,6 +176,9 @@ func (d *DanteFlows) Update(source, subscriber *Node, channelInfo string) { sub.Channels = append(sub.Channels, channelInfo) sort.Strings(sub.Channels) } + if channelInfo != "" { + sub.ChannelStatus[channelInfo] = flowStatus + } sub.LastSeen = time.Now() } @@ -260,6 +265,7 @@ func (d *DanteFlows) LogAll() { rxName string rxCh string channelType string + down bool } var allChannelFlows []channelFlow var allNoChannelFlows []string @@ -293,6 +299,7 @@ func (d *DanteFlows) LogAll() { rxName: subName, rxCh: rxPart, channelType: chType, + down: sub.ChannelStatus[ch] == DanteFlowNoSource, }) } else { allNoChannelFlows = append(allNoChannelFlows, fmt.Sprintf("%s -> %s[%s]", sourceName, subName, ch)) @@ -317,10 +324,14 @@ func (d *DanteFlows) LogAll() { sort.Strings(allNoChannelFlows) for _, cf := range allChannelFlows { + suffix := "" + if cf.down { + suffix = " DOWN" + } if cf.channelType != "" { - log.Printf("[sigusr1] %s[%s] -> %s[%s] (%s)", cf.sourceName, cf.txCh, cf.rxName, cf.rxCh, cf.channelType) + log.Printf("[sigusr1] %s[%s] -> %s[%s] (%s)%s", cf.sourceName, cf.txCh, cf.rxName, cf.rxCh, cf.channelType, suffix) } else { - log.Printf("[sigusr1] %s[%s] -> %s[%s]", cf.sourceName, cf.txCh, cf.rxName, cf.rxCh) + log.Printf("[sigusr1] %s[%s] -> %s[%s]%s", cf.sourceName, cf.txCh, cf.rxName, cf.rxCh, suffix) } } for _, flow := range allNoChannelFlows { @@ -407,11 +418,31 @@ func (t DanteChannelType) String() string { } } +type DanteFlowStatus uint8 + +const ( + DanteFlowUnsubscribed DanteFlowStatus = 0x00 + DanteFlowNoSource DanteFlowStatus = 0x01 + DanteFlowActive DanteFlowStatus = 0x09 +) + +func (s DanteFlowStatus) String() string { + switch s { + case DanteFlowActive: + return "active" + case DanteFlowNoSource: + return "no-source" + default: + return "" + } +} + type DanteSubscription struct { RxChannel int TxDeviceName string TxChannelName string ChannelType DanteChannelType + FlowStatus DanteFlowStatus } func buildDantePacket(packetType byte, cmd uint16, args []byte) []byte { @@ -714,12 +745,13 @@ func (t *Tendrils) queryDanteSubscriptions3400(conn *net.UDPConn, ip net.IP, rxC } var channelType DanteChannelType + var flowStatus DanteFlowStatus var txChOffset, txDevOffset int marker := binary.BigEndian.Uint16(resp[rawOffset : rawOffset+2]) if marker == 0x141c { - if rawOffset+48 > len(resp) { - log.Printf("[ERROR] [dante] %s: 0x3400 record %d at 0x%04x: 0x141c record truncated (need %d, have %d)", ip, i, rawOffset, rawOffset+48, len(resp)) + if rawOffset+50 > len(resp) { + log.Printf("[ERROR] [dante] %s: 0x3400 record %d at 0x%04x: 0x141c record truncated (need %d, have %d)", ip, i, rawOffset, rawOffset+50, len(resp)) continue } channelType = DanteChannelType(binary.BigEndian.Uint16(resp[rawOffset+14 : rawOffset+16])) @@ -728,14 +760,16 @@ func (t *Tendrils) queryDanteSubscriptions3400(conn *net.UDPConn, ip net.IP, rxC } txChOffset = int(binary.BigEndian.Uint16(resp[rawOffset+44 : rawOffset+46])) txDevOffset = int(binary.BigEndian.Uint16(resp[rawOffset+46 : rawOffset+48])) + flowStatus = DanteFlowStatus(resp[rawOffset+49]) } else if marker == 0x141a { - if rawOffset+48 > len(resp) { + if rawOffset+50 > len(resp) { log.Printf("[ERROR] [dante] %s: 0x3400 record %d at 0x%04x: 0x141a record truncated", ip, i, rawOffset) continue } channelType = DanteChannelVideo txChOffset = int(binary.BigEndian.Uint16(resp[rawOffset+44 : rawOffset+46])) txDevOffset = int(binary.BigEndian.Uint16(resp[rawOffset+46 : rawOffset+48])) + flowStatus = DanteFlowStatus(resp[rawOffset+49]) } else { log.Printf("[ERROR] [dante] %s: 0x3400 record %d at 0x%04x: unknown marker 0x%04x (bytes: %x)", ip, i, rawOffset, marker, resp[rawOffset:rawOffset+8]) continue @@ -759,7 +793,7 @@ func (t *Tendrils) queryDanteSubscriptions3400(conn *net.UDPConn, ip net.IP, rxC rxChannel := startChannel + i if t.DebugDante { - log.Printf("[dante] %s: 0x3400 sub: rx=%d txDev=%q txCh=%q type=%s", ip, rxChannel, txDeviceName, txChannelName, channelType) + log.Printf("[dante] %s: 0x3400 sub: rx=%d txDev=%q txCh=%q type=%s status=%s", ip, rxChannel, txDeviceName, txChannelName, channelType, flowStatus) } subscriptions = append(subscriptions, DanteSubscription{ @@ -767,6 +801,7 @@ func (t *Tendrils) queryDanteSubscriptions3400(conn *net.UDPConn, ip net.IP, rxC TxDeviceName: txDeviceName, TxChannelName: txChannelName, ChannelType: channelType, + FlowStatus: flowStatus, }) } @@ -818,7 +853,7 @@ func (t *Tendrils) probeDanteDeviceWithPort(ip net.IP, port int) { } sourceNode := t.nodes.GetOrCreateByName(txDeviceName) subscriberNode := t.nodes.GetOrCreateByName(info.Name) - t.danteFlows.Update(sourceNode, subscriberNode, channelInfo) + t.danteFlows.Update(sourceNode, subscriberNode, channelInfo, sub.FlowStatus) needIGMPFallback = false } } @@ -835,7 +870,7 @@ func (t *Tendrils) probeDanteDeviceWithPort(ip net.IP, port int) { } sourceNode := t.nodes.GetOrCreateByName(sourceName) subscriberNode := t.nodes.GetOrCreateByName(info.Name) - t.danteFlows.Update(sourceNode, subscriberNode, "") + t.danteFlows.Update(sourceNode, subscriberNode, "", DanteFlowActive) } } } diff --git a/notes/dantepacket.md b/notes/dantepacket.md index 66f1e9e..c72540e 100644 --- a/notes/dantepacket.md +++ b/notes/dantepacket.md @@ -88,12 +88,18 @@ record+0x0e: u16 channel_type # 0x000f=audio, 0x000e=video ... record+0x2c: u16 tx_ch_offset # absolute offset to TX channel name string record+0x2e: u16 tx_dev_offset # absolute offset to TX device name string +record+0x31: u8 flow_status # 0x00=unsubscribed, 0x01=no active source, 0x09=active ``` **Channel type (at record+0x0e):** - `0x000f` = audio channel - `0x000e` = video channel +**Flow status (at record+0x31):** +- `0x00` = unsubscribed +- `0x01` = subscribed but source not available (no active source) +- `0x09` = subscribed and receiving (active flow) + #### Format 2: 0x141a records (special channels: Video, Serial, USB) ``` @@ -110,10 +116,15 @@ These are video channels (Dante AV "Video" aggregate channel). Both 0x141c and 0x141a records use the same offsets for tx_ch and tx_dev (+44 and +46). -**Subscription status:** -- Both offsets non-zero = subscribed +**Subscription presence:** +- Both offsets non-zero = subscribed to a source - Both offsets zero = unsubscribed +**Flow status (record+0x31):** +- `0x00` = unsubscribed (no subscription configured) +- `0x01` = subscribed but not receiving (source device offline or unavailable) +- `0x09` = subscribed and actively receiving audio/video + ### String Table Null-terminated strings referenced by absolute offset from packet start.