From 47b48337b378e7c945479274ce9ac8bfed1abed8 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Fri, 23 Jan 2026 11:30:40 -0800 Subject: [PATCH] add dante subscription discovery via arc protocol Co-Authored-By: Claude Opus 4.5 --- dante.go | 8 +- dante_control.go | 417 +++++++++++++++++++++++++++++++++++++++++++++++ mdns.go | 7 +- nodes.go | 33 +++- snmp.go | 6 + tendrils.go | 2 + 6 files changed, 467 insertions(+), 6 deletions(-) create mode 100644 dante_control.go diff --git a/dante.go b/dante.go index 19a4585..7bd1339 100644 --- a/dante.go +++ b/dante.go @@ -75,9 +75,13 @@ func (t *Tendrils) handlePTPPacket(ifaceName string, srcIP net.IP, data []byte) t.nodes.SetDanteClockMaster(srcIP) } -func (n *Nodes) UpdateDante(name string, ip net.IP) { +func (n *Nodes) UpdateDante(name string, ip net.IP, arcPort int) { if n.t.DebugDante { - log.Printf("[dante] mdns response: %s -> %s", name, ip) + log.Printf("[dante] mdns response: %s -> %s (arc port %d)", name, ip, arcPort) } n.Update(nil, nil, []net.IP{ip}, "", name, "dante") + + if arcPort > 0 { + go n.t.probeDanteDeviceWithPort(ip, arcPort) + } } diff --git a/dante_control.go b/dante_control.go new file mode 100644 index 0000000..5fa3ae9 --- /dev/null +++ b/dante_control.go @@ -0,0 +1,417 @@ +package tendrils + +import ( + "encoding/binary" + "fmt" + "log" + "net" + "sort" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/fvbommel/sortorder" +) + +const ( + danteControlPort = 4440 +) + +var danteSeqID uint32 + +type DanteFlow struct { + SourceName string + Subscribers map[string]*DanteFlowSubscriber +} + +type DanteFlowSubscriber struct { + Name string + Channels []int + LastSeen time.Time +} + +type DanteFlows struct { + mu sync.RWMutex + flows map[string]*DanteFlow +} + +func NewDanteFlows() *DanteFlows { + return &DanteFlows{ + flows: map[string]*DanteFlow{}, + } +} + +func (d *DanteFlows) Update(sourceName, subscriberName string, channel int) { + d.mu.Lock() + defer d.mu.Unlock() + + flow := d.flows[sourceName] + if flow == nil { + flow = &DanteFlow{ + SourceName: sourceName, + Subscribers: map[string]*DanteFlowSubscriber{}, + } + d.flows[sourceName] = flow + } + + sub := flow.Subscribers[subscriberName] + if sub == nil { + sub = &DanteFlowSubscriber{ + Name: subscriberName, + } + flow.Subscribers[subscriberName] = sub + } + + if channel > 0 { + hasChannel := false + for _, ch := range sub.Channels { + if ch == channel { + hasChannel = true + break + } + } + if !hasChannel { + sub.Channels = append(sub.Channels, channel) + sort.Ints(sub.Channels) + } + } + + sub.LastSeen = time.Now() +} + +func (d *DanteFlows) Expire() { + d.mu.Lock() + defer d.mu.Unlock() + + expireTime := time.Now().Add(-5 * time.Minute) + for sourceName, flow := range d.flows { + for subName, sub := range flow.Subscribers { + if sub.LastSeen.Before(expireTime) { + delete(flow.Subscribers, subName) + } + } + if len(flow.Subscribers) == 0 { + delete(d.flows, sourceName) + } + } +} + +func (d *DanteFlows) LogAll() { + d.Expire() + + d.mu.RLock() + defer d.mu.RUnlock() + + if len(d.flows) == 0 { + return + } + + var flows []*DanteFlow + for _, flow := range d.flows { + flows = append(flows, flow) + } + sort.Slice(flows, func(i, j int) bool { + return sortorder.NaturalLess(flows[i].SourceName, flows[j].SourceName) + }) + + log.Printf("[sigusr1] ================ %d dante flows ================", len(flows)) + for _, flow := range flows { + var subNames []string + for _, sub := range flow.Subscribers { + name := sub.Name + if len(sub.Channels) > 0 { + var chStrs []string + for _, ch := range sub.Channels { + chStrs = append(chStrs, fmt.Sprintf("%d", ch)) + } + name = fmt.Sprintf("%s[%s]", name, strings.Join(chStrs, ",")) + } + subNames = append(subNames, name) + } + sort.Slice(subNames, func(i, j int) bool { + return sortorder.NaturalLess(subNames[i], subNames[j]) + }) + + log.Printf("[sigusr1] %s -> %s", flow.SourceName, strings.Join(subNames, ", ")) + } +} + +func nextDanteSeq() uint16 { + return uint16(atomic.AddUint32(&danteSeqID, 1)) +} + +func (t *Tendrils) queryDanteDevice(ip net.IP) *DanteDeviceInfo { + return t.queryDanteDeviceWithPort(ip, danteControlPort) +} + +func (t *Tendrils) queryDanteDeviceWithPort(ip net.IP, port int) *DanteDeviceInfo { + conn, err := net.DialUDP("udp4", nil, &net.UDPAddr{IP: ip, Port: port}) + if err != nil { + if t.DebugDante { + log.Printf("[dante] %s:%d: dial failed: %v", ip, port, err) + } + return nil + } + defer conn.Close() + + info := &DanteDeviceInfo{IP: ip} + + if rxCount, txCount := t.queryDanteChannelCount(conn, ip); rxCount > 0 || txCount > 0 { + info.RxChannelCount = rxCount + info.TxChannelCount = txCount + } + + if name := t.queryDanteDeviceName(conn, ip); name != "" { + info.Name = name + } + + if info.RxChannelCount > 0 || info.TxChannelCount > 0 { + info.Subscriptions, info.HasMulticast = t.queryDanteSubscriptions(conn, ip, info.RxChannelCount, info.TxChannelCount) + } + + return info +} + +type DanteDeviceInfo struct { + IP net.IP + Name string + RxChannelCount int + TxChannelCount int + Subscriptions []DanteSubscription + HasMulticast bool +} + +type DanteSubscription struct { + RxChannel int + TxDeviceName string + TxChannelName string +} + +func buildDantePacket(cmd uint16, args []byte) []byte { + seq := nextDanteSeq() + argLen := len(args) + totalLen := 10 + argLen + + pkt := make([]byte, totalLen) + pkt[0] = 0x27 + pkt[1] = byte(seq & 0xff) + binary.BigEndian.PutUint16(pkt[2:4], uint16(totalLen)) + binary.BigEndian.PutUint16(pkt[4:6], 0x1300|seq) + binary.BigEndian.PutUint16(pkt[6:8], cmd) + pkt[8] = 0x00 + pkt[9] = 0x00 + if argLen > 0 { + copy(pkt[10:], args) + } + + return pkt +} + +func (t *Tendrils) sendDanteCommand(conn *net.UDPConn, ip net.IP, cmd uint16, args []byte) []byte { + pkt := buildDantePacket(cmd, args) + + conn.SetWriteDeadline(time.Now().Add(500 * time.Millisecond)) + _, err := conn.Write(pkt) + if err != nil { + if t.DebugDante { + log.Printf("[dante] %s: write failed: %v", ip, err) + } + return nil + } + + conn.SetReadDeadline(time.Now().Add(1 * time.Second)) + buf := make([]byte, 4096) + n, err := conn.Read(buf) + if err != nil { + return nil + } + + if t.DebugDante { + log.Printf("[dante] %s: cmd 0x%04x response (%d bytes): %x", ip, cmd, n, buf[:n]) + } + + return buf[:n] +} + +func (t *Tendrils) queryDanteDeviceName(conn *net.UDPConn, ip net.IP) string { + // 0x1003 returns device info - name position varies by device + resp := t.sendDanteCommand(conn, ip, 0x1003, nil) + if resp == nil || len(resp) < 40 { + return "" + } + + // Find the first printable string that looks like a device name + // Look for patterns like "AJA-", "ULXD", etc starting from offset 40 + for i := 40; i < len(resp)-4; i++ { + if resp[i] >= 'A' && resp[i] <= 'Z' { + // Found uppercase letter, might be start of name + end := i + for end < len(resp) && resp[end] != 0 && resp[end] >= 0x20 && resp[end] < 0x7f { + end++ + } + if end-i >= 4 && end-i < 40 { + name := string(resp[i:end]) + // Skip "Audinate" which is the platform name + if name != "Audinate DCM" && !strings.HasPrefix(name, "Audinate") { + if t.DebugDante { + log.Printf("[dante] %s: device name: %q", ip, name) + } + return name + } + } + } + } + + return "" +} + +func (t *Tendrils) queryDanteChannelCount(conn *net.UDPConn, ip net.IP) (int, int) { + resp := t.sendDanteCommand(conn, ip, 0x1000, nil) + if resp == nil || len(resp) < 16 { + return 0, 0 + } + + txCount := int(binary.BigEndian.Uint16(resp[12:14])) + rxCount := int(binary.BigEndian.Uint16(resp[14:16])) + + return rxCount, txCount +} + +func (t *Tendrils) queryDanteSubscriptions(conn *net.UDPConn, ip net.IP, rxCount, txCount int) ([]DanteSubscription, bool) { + if rxCount == 0 { + return nil, false + } + + var subscriptions []DanteSubscription + hasMulticast := false + + pagesNeeded := (rxCount + 15) / 16 + for page := 0; page < pagesNeeded; page++ { + pageNum := byte(page + 1) + args := []byte{0x00, 0x01, 0x00, pageNum, 0x00, 0x00} + + resp := t.sendDanteCommand(conn, ip, 0x3000, args) + if resp == nil || len(resp) < 14 { + continue + } + + status := binary.BigEndian.Uint16(resp[8:10]) + if status != 0x0001 { + if t.DebugDante { + log.Printf("[dante] %s: 0x3000 status=0x%04x", ip, status) + } + continue + } + + subCount := int(resp[10]) + + recordType := binary.BigEndian.Uint16(resp[14:16]) + isMulticast := recordType == 0x000e + hasMulticast = hasMulticast || isMulticast + + if isMulticast { + recordOffset := 12 + for idx := 0; idx < subCount; idx++ { + if recordOffset+20 > len(resp) { + break + } + + rxChannelNum := int(binary.BigEndian.Uint16(resp[recordOffset : recordOffset+2])) + txDeviceOffset := int(binary.BigEndian.Uint16(resp[recordOffset+4 : recordOffset+6])) + txDeviceName := extractNullTerminatedString(resp, txDeviceOffset) + + + if txDeviceName != "" { + subscriptions = append(subscriptions, DanteSubscription{ + RxChannel: rxChannelNum, + TxDeviceName: txDeviceName, + }) + } + + recordOffset += 20 + } + } else { + recordOffset := 14 + for idx := 0; idx < subCount; idx++ { + if recordOffset+10 > len(resp) { + break + } + + rxChannelNum := int(binary.BigEndian.Uint16(resp[recordOffset : recordOffset+2])) + txChannelOffset := int(binary.BigEndian.Uint16(resp[recordOffset+4 : recordOffset+6])) + txDeviceOffset := int(binary.BigEndian.Uint16(resp[recordOffset+6 : recordOffset+8])) + + txChannelName := extractNullTerminatedString(resp, txChannelOffset) + txDeviceName := extractNullTerminatedString(resp, txDeviceOffset) + + + if txDeviceName != "" { + subscriptions = append(subscriptions, DanteSubscription{ + RxChannel: rxChannelNum, + TxDeviceName: txDeviceName, + TxChannelName: txChannelName, + }) + } + + recordOffset += 10 + } + } + } + + return subscriptions, hasMulticast +} + +func extractNullTerminatedString(data []byte, offset int) string { + if offset <= 0 || offset >= len(data) { + return "" + } + end := offset + for end < len(data) && data[end] != 0 { + end++ + } + if end > offset { + return string(data[offset:end]) + } + return "" +} + +func (t *Tendrils) probeDanteDevice(ip net.IP) { + t.probeDanteDeviceWithPort(ip, danteControlPort) +} + +func (t *Tendrils) probeDanteDeviceWithPort(ip net.IP, port int) { + info := t.queryDanteDeviceWithPort(ip, port) + if info == nil { + return + } + + if info.RxChannelCount > 0 || info.TxChannelCount > 0 { + if t.DebugDante { + log.Printf("[dante] %s:%d: name=%q rx=%d tx=%d subs=%d", + ip, port, info.Name, info.RxChannelCount, info.TxChannelCount, len(info.Subscriptions)) + } + + if info.Name != "" { + t.nodes.Update(nil, nil, []net.IP{ip}, "", info.Name, "dante-control") + } + + for _, sub := range info.Subscriptions { + if t.DebugDante { + log.Printf("[dante] %s: subscription rx=%d -> %s@%s", + ip, sub.RxChannel, sub.TxChannelName, sub.TxDeviceName) + } + if sub.TxDeviceName != "" && info.Name != "" { + t.danteFlows.Update(sub.TxDeviceName, info.Name, sub.RxChannel) + } + } + + if info.HasMulticast && info.Name != "" { + groups := t.nodes.GetDanteMulticastGroups(ip) + for _, groupIP := range groups { + groupName := (&MulticastGroup{IP: groupIP}).Name() + t.danteFlows.Update(groupName, info.Name, 0) + } + } + } +} diff --git a/mdns.go b/mdns.go index 179275a..4b6d319 100644 --- a/mdns.go +++ b/mdns.go @@ -107,6 +107,7 @@ func (t *Tendrils) processMDNSResponse(ifaceName string, srcIP net.IP, msg *dns. aRecords := map[string]net.IP{} srvTargets := map[string]string{} danteNames := map[string]bool{} + danteARCPorts := map[string]uint16{} skaarhojNames := map[string]bool{} for _, rr := range allRecords { @@ -138,6 +139,9 @@ func (t *Tendrils) processMDNSResponse(ifaceName string, srcIP net.IP, msg *dns. if target != "" { srvTargets[name] = target } + if strings.Contains(r.Hdr.Name, "_netaudio-arc.") && r.Port > 0 { + danteARCPorts[name] = r.Port + } } } if isSkaarhojService(r.Hdr.Name) { @@ -163,7 +167,8 @@ func (t *Tendrils) processMDNSResponse(ifaceName string, srcIP net.IP, msg *dns. if ip == nil { ip = srcIP } - t.nodes.UpdateDante(name, ip) + arcPort := int(danteARCPorts[name]) + t.nodes.UpdateDante(name, ip, arcPort) } for name := range skaarhojNames { diff --git a/nodes.go b/nodes.go index 812e1e2..c54d9f9 100644 --- a/nodes.go +++ b/nodes.go @@ -183,6 +183,12 @@ func (g *MulticastGroup) Name() string { return fmt.Sprintf("dante-mcast:%d", flowID) } + // Dante AV multicast (239.253.x.x) + if ip[0] == 239 && ip[1] == 253 { + flowID := (int(ip[2]) << 8) | int(ip[3]) + return fmt.Sprintf("dante-av:%d", flowID) + } + return g.IP.String() } @@ -191,12 +197,12 @@ func (g *MulticastGroup) IsDante() bool { if ip == nil { return false } - if ip[0] == 239 && ip[1] == 255 { - return false - } if ip[0] == 239 && ip[1] >= 69 && ip[1] <= 71 { return true } + if ip[0] == 239 && ip[1] == 253 { + return true + } return false } @@ -530,6 +536,7 @@ func (n *Nodes) GetByMAC(mac net.HardwareAddr) *Node { return nil } + func (n *Nodes) UpdateMACTable(node *Node, peerMAC net.HardwareAddr, ifaceName string) { n.mu.Lock() defer n.mu.Unlock() @@ -613,6 +620,25 @@ func (n *Nodes) getNodeByIPLocked(ip net.IP) *Node { return nil } +func (n *Nodes) GetDanteMulticastGroups(deviceIP net.IP) []net.IP { + n.mu.RLock() + defer n.mu.RUnlock() + + deviceKey := deviceIP.String() + var groups []net.IP + + for _, gm := range n.multicastGroups { + if !gm.Group.IsDante() { + continue + } + if _, exists := gm.Members[deviceKey]; exists { + groups = append(groups, gm.Group.IP) + } + } + return groups + return nil +} + func (n *Nodes) logNode(node *Node) { name := node.Name if name == "" { @@ -730,6 +756,7 @@ func (n *Nodes) LogAll() { } n.t.artnet.LogAll() + n.t.danteFlows.LogAll() } func (n *Nodes) expireMulticastMemberships() { diff --git a/snmp.go b/snmp.go index 66dd99b..30821b0 100644 --- a/snmp.go +++ b/snmp.go @@ -103,6 +103,12 @@ func (t *Tendrils) pollNode(node *Node) { t.probeBMDDevice(ip) } } + + if !t.DisableDante { + for _, ip := range ips { + t.probeDanteDevice(ip) + } + } } func (t *Tendrils) querySNMPDevice(node *Node, ip net.IP) { diff --git a/tendrils.go b/tendrils.go index 47c6c28..b4a1ec4 100644 --- a/tendrils.go +++ b/tendrils.go @@ -14,6 +14,7 @@ type Tendrils struct { activeInterfaces map[string]context.CancelFunc nodes *Nodes artnet *ArtNetNodes + danteFlows *DanteFlows Interface string DisableARP bool @@ -42,6 +43,7 @@ func New() *Tendrils { t := &Tendrils{ activeInterfaces: map[string]context.CancelFunc{}, artnet: NewArtNetNodes(), + danteFlows: NewDanteFlows(), } t.nodes = NewNodes(t) return t