Files
tendrils/dante_control.go

502 lines
12 KiB
Go

package tendrils
import (
"encoding/binary"
"fmt"
"log"
"net"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/fvbommel/sortorder"
)
const (
danteControlPort = 4440
)
var danteSeqID uint32
type DanteFlow struct {
SourceName string
Subscribers map[string]*DanteFlowSubscriber
}
type DanteFlowSubscriber struct {
Name string
Channels []string
LastSeen time.Time
}
type DanteFlows struct {
mu sync.RWMutex
flows map[string]*DanteFlow
}
func NewDanteFlows() *DanteFlows {
return &DanteFlows{
flows: map[string]*DanteFlow{},
}
}
func (d *DanteFlows) Update(sourceName, subscriberName, channelInfo string) {
d.mu.Lock()
defer d.mu.Unlock()
flow := d.flows[sourceName]
if flow == nil {
flow = &DanteFlow{
SourceName: sourceName,
Subscribers: map[string]*DanteFlowSubscriber{},
}
d.flows[sourceName] = flow
}
sub := flow.Subscribers[subscriberName]
if sub == nil {
sub = &DanteFlowSubscriber{
Name: subscriberName,
}
flow.Subscribers[subscriberName] = sub
}
if channelInfo != "" {
hasChannel := false
for _, ch := range sub.Channels {
if ch == channelInfo {
hasChannel = true
break
}
}
if !hasChannel {
sub.Channels = append(sub.Channels, channelInfo)
sort.Strings(sub.Channels)
}
}
sub.LastSeen = time.Now()
}
func (d *DanteFlows) Expire() {
d.mu.Lock()
defer d.mu.Unlock()
expireTime := time.Now().Add(-5 * time.Minute)
for sourceName, flow := range d.flows {
for subName, sub := range flow.Subscribers {
if sub.LastSeen.Before(expireTime) {
delete(flow.Subscribers, subName)
}
}
if len(flow.Subscribers) == 0 {
delete(d.flows, sourceName)
}
}
}
func (d *DanteFlows) LogAll() {
d.Expire()
d.mu.RLock()
defer d.mu.RUnlock()
if len(d.flows) == 0 {
return
}
var flows []*DanteFlow
for _, flow := range d.flows {
flows = append(flows, flow)
}
sort.Slice(flows, func(i, j int) bool {
return sortorder.NaturalLess(flows[i].SourceName, flows[j].SourceName)
})
log.Printf("[sigusr1] ================ %d dante flows ================", len(flows))
for _, flow := range flows {
type channelFlow struct {
txCh string
rxName string
rxCh string
}
var channelFlows []channelFlow
var noChannelSubs []string
for _, sub := range flow.Subscribers {
if len(sub.Channels) == 0 {
noChannelSubs = append(noChannelSubs, sub.Name)
} else {
for _, ch := range sub.Channels {
parts := strings.Split(ch, "->")
if len(parts) == 2 {
channelFlows = append(channelFlows, channelFlow{
txCh: parts[0],
rxName: sub.Name,
rxCh: parts[1],
})
} else {
noChannelSubs = append(noChannelSubs, fmt.Sprintf("%s[%s]", sub.Name, ch))
}
}
}
}
sort.Slice(channelFlows, func(i, j int) bool {
if channelFlows[i].txCh != channelFlows[j].txCh {
return sortorder.NaturalLess(channelFlows[i].txCh, channelFlows[j].txCh)
}
return sortorder.NaturalLess(channelFlows[i].rxName, channelFlows[j].rxName)
})
sort.Slice(noChannelSubs, func(i, j int) bool {
return sortorder.NaturalLess(noChannelSubs[i], noChannelSubs[j])
})
sourceName := flow.SourceName
if strings.HasPrefix(sourceName, "dante-av:") || strings.HasPrefix(sourceName, "dante-mcast:") {
sourceName = "?? (" + sourceName + ")"
}
for _, cf := range channelFlows {
log.Printf("[sigusr1] %s[%s] -> %s[%s]", sourceName, cf.txCh, cf.rxName, cf.rxCh)
}
if len(noChannelSubs) > 0 {
log.Printf("[sigusr1] %s -> %s", sourceName, strings.Join(noChannelSubs, ", "))
}
}
}
func nextDanteSeq() uint16 {
return uint16(atomic.AddUint32(&danteSeqID, 1))
}
func (t *Tendrils) queryDanteDevice(ip net.IP) *DanteDeviceInfo {
return t.queryDanteDeviceWithPort(ip, danteControlPort)
}
func (t *Tendrils) queryDanteDeviceWithPort(ip net.IP, port int) *DanteDeviceInfo {
conn, err := net.DialUDP("udp4", nil, &net.UDPAddr{IP: ip, Port: port})
if err != nil {
if t.DebugDante {
log.Printf("[dante] %s:%d: dial failed: %v", ip, port, err)
}
return nil
}
defer conn.Close()
info := &DanteDeviceInfo{IP: ip}
if rxCount, txCount := t.queryDanteChannelCount(conn, ip); rxCount > 0 || txCount > 0 {
info.RxChannelCount = rxCount
info.TxChannelCount = txCount
}
if name := t.queryDanteDeviceName(conn, ip); name != "" {
info.Name = name
}
if info.RxChannelCount > 0 || info.TxChannelCount > 0 {
info.Subscriptions, info.HasMulticast = t.queryDanteSubscriptions(conn, ip, info.RxChannelCount, info.TxChannelCount)
}
if info.TxChannelCount > 0 {
t.queryDanteTxChannels(conn, ip, info.TxChannelCount)
t.nodes.UpdateDanteTxChannels(info.Name, ip, fmt.Sprintf("%d", info.TxChannelCount))
}
return info
}
type DanteDeviceInfo struct {
IP net.IP
Name string
RxChannelCount int
TxChannelCount int
Subscriptions []DanteSubscription
HasMulticast bool
}
type DanteSubscription struct {
RxChannel int
TxDeviceName string
TxChannelName string
}
func buildDantePacket(cmd uint16, args []byte) []byte {
seq := nextDanteSeq()
argLen := len(args)
totalLen := 10 + argLen
pkt := make([]byte, totalLen)
pkt[0] = 0x27
pkt[1] = byte(seq & 0xff)
binary.BigEndian.PutUint16(pkt[2:4], uint16(totalLen))
binary.BigEndian.PutUint16(pkt[4:6], 0x1300|seq)
binary.BigEndian.PutUint16(pkt[6:8], cmd)
pkt[8] = 0x00
pkt[9] = 0x00
if argLen > 0 {
copy(pkt[10:], args)
}
return pkt
}
func (t *Tendrils) sendDanteCommand(conn *net.UDPConn, ip net.IP, cmd uint16, args []byte) []byte {
pkt := buildDantePacket(cmd, args)
conn.SetWriteDeadline(time.Now().Add(500 * time.Millisecond))
_, err := conn.Write(pkt)
if err != nil {
if t.DebugDante {
log.Printf("[dante] %s: write failed: %v", ip, err)
}
return nil
}
conn.SetReadDeadline(time.Now().Add(1 * time.Second))
buf := make([]byte, 4096)
n, err := conn.Read(buf)
if err != nil {
return nil
}
if t.DebugDante {
log.Printf("[dante] %s: cmd 0x%04x response (%d bytes): %x", ip, cmd, n, buf[:n])
}
return buf[:n]
}
func (t *Tendrils) queryDanteDeviceName(conn *net.UDPConn, ip net.IP) string {
// 0x1003 returns device info - name position varies by device
resp := t.sendDanteCommand(conn, ip, 0x1003, nil)
if resp == nil || len(resp) < 40 {
return ""
}
// Find the first printable string that looks like a device name
// Look for patterns like "AJA-", "ULXD", etc starting from offset 40
for i := 40; i < len(resp)-4; i++ {
if resp[i] >= 'A' && resp[i] <= 'Z' {
// Found uppercase letter, might be start of name
end := i
for end < len(resp) && resp[end] != 0 && resp[end] >= 0x20 && resp[end] < 0x7f {
end++
}
if end-i >= 4 && end-i < 40 {
name := string(resp[i:end])
// Skip "Audinate" which is the platform name
if name != "Audinate DCM" && !strings.HasPrefix(name, "Audinate") {
if t.DebugDante {
log.Printf("[dante] %s: device name: %q", ip, name)
}
return name
}
}
}
}
return ""
}
func (t *Tendrils) queryDanteChannelCount(conn *net.UDPConn, ip net.IP) (int, int) {
resp := t.sendDanteCommand(conn, ip, 0x1000, nil)
if resp == nil || len(resp) < 16 {
return 0, 0
}
txCount := int(binary.BigEndian.Uint16(resp[12:14]))
rxCount := int(binary.BigEndian.Uint16(resp[14:16]))
return rxCount, txCount
}
func (t *Tendrils) queryDanteTxChannels(conn *net.UDPConn, ip net.IP, txCount int) {
if txCount == 0 {
return
}
pagesNeeded := (txCount + 15) / 16
for page := 0; page < pagesNeeded; page++ {
pageNum := byte(page + 1)
args := []byte{0x00, 0x01, 0x00, pageNum, 0x00, 0x00}
resp := t.sendDanteCommand(conn, ip, 0x2000, args)
if t.DebugDante {
if resp == nil {
log.Printf("[dante] %s: tx channels 0x2000 page %d: no response", ip, page)
} else {
log.Printf("[dante] %s: tx channels 0x2000 page %d (%d bytes): %x", ip, page, len(resp), resp)
}
}
}
}
func (t *Tendrils) queryDanteSubscriptions(conn *net.UDPConn, ip net.IP, rxCount, txCount int) ([]DanteSubscription, bool) {
if rxCount == 0 {
return nil, false
}
var subscriptions []DanteSubscription
hasMulticast := false
pagesNeeded := (rxCount + 15) / 16
for page := 0; page < pagesNeeded; page++ {
pageNum := byte(page + 1)
args := []byte{0x00, 0x01, 0x00, pageNum, 0x00, 0x00}
resp := t.sendDanteCommand(conn, ip, 0x3000, args)
if resp == nil || len(resp) < 14 {
continue
}
status := binary.BigEndian.Uint16(resp[8:10])
if status != 0x0001 {
if t.DebugDante {
log.Printf("[dante] %s: 0x3000 status=0x%04x", ip, status)
}
continue
}
subCount := int(resp[10])
recordType := binary.BigEndian.Uint16(resp[14:16])
isMulticast := recordType == 0x000e
hasMulticast = hasMulticast || isMulticast
if isMulticast {
if t.DebugDante {
stringTableStart := 12 + subCount*20
if stringTableStart < len(resp) {
log.Printf("[dante] %s: multicast string table at offset %d: %x", ip, stringTableStart, resp[stringTableStart:])
}
}
recordOffset := 12
for idx := 0; idx < subCount; idx++ {
if recordOffset+20 > len(resp) {
break
}
if t.DebugDante {
log.Printf("[dante] %s: multicast record %d at offset %d: %x", ip, idx, recordOffset, resp[recordOffset:recordOffset+20])
}
rxChannelNum := int(binary.BigEndian.Uint16(resp[recordOffset : recordOffset+2]))
txDeviceOffset := int(binary.BigEndian.Uint16(resp[recordOffset+4 : recordOffset+6]))
txChannelOffset := int(binary.BigEndian.Uint16(resp[recordOffset+10 : recordOffset+12]))
txDeviceName := extractNullTerminatedString(resp, txDeviceOffset)
txChannelName := extractNullTerminatedString(resp, txChannelOffset)
if t.DebugDante {
log.Printf("[dante] %s: multicast record %d: rx=%d txDevOffset=%d txDev=%q txChOffset=%d txCh=%q", ip, idx, rxChannelNum, txDeviceOffset, txDeviceName, txChannelOffset, txChannelName)
}
subscriptions = append(subscriptions, DanteSubscription{
RxChannel: rxChannelNum,
TxDeviceName: txDeviceName,
TxChannelName: txChannelName,
})
recordOffset += 20
}
} else {
recordOffset := 14
for idx := 0; idx < subCount; idx++ {
if recordOffset+10 > len(resp) {
break
}
rxChannelNum := idx + 1
txChannelOffset := int(binary.BigEndian.Uint16(resp[recordOffset+4 : recordOffset+6]))
txDeviceOffset := int(binary.BigEndian.Uint16(resp[recordOffset+6 : recordOffset+8]))
txChannelName := extractNullTerminatedString(resp, txChannelOffset)
txDeviceName := extractNullTerminatedString(resp, txDeviceOffset)
if txDeviceName != "" {
subscriptions = append(subscriptions, DanteSubscription{
RxChannel: rxChannelNum,
TxDeviceName: txDeviceName,
TxChannelName: txChannelName,
})
}
recordOffset += 10
}
}
}
return subscriptions, hasMulticast
}
func extractNullTerminatedString(data []byte, offset int) string {
if offset <= 0 || offset >= len(data) {
return ""
}
end := offset
for end < len(data) && data[end] != 0 {
end++
}
if end > offset {
return string(data[offset:end])
}
return ""
}
func (t *Tendrils) probeDanteDevice(ip net.IP) {
t.probeDanteDeviceWithPort(ip, danteControlPort)
}
func (t *Tendrils) probeDanteDeviceWithPort(ip net.IP, port int) {
info := t.queryDanteDeviceWithPort(ip, port)
if info == nil {
return
}
if info.RxChannelCount > 0 || info.TxChannelCount > 0 {
if t.DebugDante {
log.Printf("[dante] %s:%d: name=%q rx=%d tx=%d subs=%d",
ip, port, info.Name, info.RxChannelCount, info.TxChannelCount, len(info.Subscriptions))
}
if info.Name != "" {
t.nodes.Update(nil, nil, []net.IP{ip}, "", info.Name, "dante-control")
}
needIGMPFallback := info.HasMulticast && info.Name != ""
for _, sub := range info.Subscriptions {
if t.DebugDante {
log.Printf("[dante] %s: subscription rx=%d -> %s@%s",
ip, sub.RxChannel, sub.TxChannelName, sub.TxDeviceName)
}
if sub.TxDeviceName != "" && info.Name != "" {
channelInfo := ""
if sub.TxChannelName != "" {
channelInfo = fmt.Sprintf("%s->%d", sub.TxChannelName, sub.RxChannel)
}
t.danteFlows.Update(sub.TxDeviceName, info.Name, channelInfo)
needIGMPFallback = false
}
}
if needIGMPFallback {
groups := t.nodes.GetDanteMulticastGroups(ip)
for _, groupIP := range groups {
sourceName := t.nodes.GetDanteTxDeviceInGroup(groupIP)
if t.DebugDante {
log.Printf("[dante] %s: multicast group %s -> tx device %q", ip, groupIP, sourceName)
}
if sourceName == "" {
sourceName = (&MulticastGroup{IP: groupIP}).Name()
}
t.danteFlows.Update(sourceName, info.Name, "")
}
}
}
}