From 9c7bd671bae14239f6aaf6df7eb1188da8122c98 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Fri, 23 Jan 2026 11:53:30 -0800 Subject: [PATCH] add multicast flow transmitter discovery via mdns _netaudio-chan service Co-Authored-By: Claude Opus 4.5 --- dante.go | 29 ++++++++++++++ dante_control.go | 98 ++++++++++++++++++++++++++++++++++++------------ mdns.go | 41 ++++++++++++++++++++ nodes.go | 21 ++++++++++- 4 files changed, 165 insertions(+), 24 deletions(-) diff --git a/dante.go b/dante.go index 7bd1339..75e41ba 100644 --- a/dante.go +++ b/dante.go @@ -85,3 +85,32 @@ func (n *Nodes) UpdateDante(name string, ip net.IP, arcPort int) { go n.t.probeDanteDeviceWithPort(ip, arcPort) } } + +func (n *Nodes) UpdateDanteTxChannels(name string, ip net.IP, channels string) { + n.mu.Lock() + defer n.mu.Unlock() + + node := n.getNodeByIPLocked(ip) + if node == nil { + return + } + node.DanteTxChannels = channels +} + +func (n *Nodes) GetDanteTxDeviceInGroup(groupIP net.IP) string { + n.mu.RLock() + defer n.mu.RUnlock() + + groupKey := groupIP.String() + gm := n.multicastGroups[groupKey] + if gm == nil { + return "" + } + + for _, membership := range gm.Members { + if membership.Node != nil && membership.Node.DanteTxChannels != "" { + return membership.Node.Name + } + } + return "" +} diff --git a/dante_control.go b/dante_control.go index 5fa3ae9..6f9009a 100644 --- a/dante_control.go +++ b/dante_control.go @@ -26,9 +26,9 @@ type DanteFlow struct { } type DanteFlowSubscriber struct { - Name string - Channels []int - LastSeen time.Time + Name string + Channels []string + LastSeen time.Time } type DanteFlows struct { @@ -42,7 +42,7 @@ func NewDanteFlows() *DanteFlows { } } -func (d *DanteFlows) Update(sourceName, subscriberName string, channel int) { +func (d *DanteFlows) Update(sourceName, subscriberName, channelInfo string) { d.mu.Lock() defer d.mu.Unlock() @@ -63,17 +63,17 @@ func (d *DanteFlows) Update(sourceName, subscriberName string, channel int) { flow.Subscribers[subscriberName] = sub } - if channel > 0 { + if channelInfo != "" { hasChannel := false for _, ch := range sub.Channels { - if ch == channel { + if ch == channelInfo { hasChannel = true - break + break } } if !hasChannel { - sub.Channels = append(sub.Channels, channel) - sort.Ints(sub.Channels) + sub.Channels = append(sub.Channels, channelInfo) + sort.Strings(sub.Channels) } } @@ -121,11 +121,7 @@ func (d *DanteFlows) LogAll() { for _, sub := range flow.Subscribers { name := sub.Name if len(sub.Channels) > 0 { - var chStrs []string - for _, ch := range sub.Channels { - chStrs = append(chStrs, fmt.Sprintf("%d", ch)) - } - name = fmt.Sprintf("%s[%s]", name, strings.Join(chStrs, ",")) + name = fmt.Sprintf("%s[%s]", name, strings.Join(sub.Channels, ",")) } subNames = append(subNames, name) } @@ -170,6 +166,10 @@ func (t *Tendrils) queryDanteDeviceWithPort(ip net.IP, port int) *DanteDeviceInf info.Subscriptions, info.HasMulticast = t.queryDanteSubscriptions(conn, ip, info.RxChannelCount, info.TxChannelCount) } + if info.TxChannelCount > 0 { + t.queryDanteTxChannels(conn, ip, info.TxChannelCount) + } + return info } @@ -278,6 +278,27 @@ func (t *Tendrils) queryDanteChannelCount(conn *net.UDPConn, ip net.IP) (int, in return rxCount, txCount } +func (t *Tendrils) queryDanteTxChannels(conn *net.UDPConn, ip net.IP, txCount int) { + if txCount == 0 { + return + } + + pagesNeeded := (txCount + 15) / 16 + for page := 0; page < pagesNeeded; page++ { + pageNum := byte(page + 1) + args := []byte{0x00, 0x01, 0x00, pageNum, 0x00, 0x00} + + resp := t.sendDanteCommand(conn, ip, 0x2000, args) + if t.DebugDante { + if resp == nil { + log.Printf("[dante] %s: tx channels 0x2000 page %d: no response", ip, page) + } else { + log.Printf("[dante] %s: tx channels 0x2000 page %d (%d bytes): %x", ip, page, len(resp), resp) + } + } + } +} + func (t *Tendrils) queryDanteSubscriptions(conn *net.UDPConn, ip net.IP, rxCount, txCount int) ([]DanteSubscription, bool) { if rxCount == 0 { return nil, false @@ -311,24 +332,38 @@ func (t *Tendrils) queryDanteSubscriptions(conn *net.UDPConn, ip net.IP, rxCount hasMulticast = hasMulticast || isMulticast if isMulticast { + if t.DebugDante { + stringTableStart := 12 + subCount*20 + if stringTableStart < len(resp) { + log.Printf("[dante] %s: multicast string table at offset %d: %x", ip, stringTableStart, resp[stringTableStart:]) + } + } recordOffset := 12 for idx := 0; idx < subCount; idx++ { if recordOffset+20 > len(resp) { break } + if t.DebugDante { + log.Printf("[dante] %s: multicast record %d at offset %d: %x", ip, idx, recordOffset, resp[recordOffset:recordOffset+20]) + } + rxChannelNum := int(binary.BigEndian.Uint16(resp[recordOffset : recordOffset+2])) txDeviceOffset := int(binary.BigEndian.Uint16(resp[recordOffset+4 : recordOffset+6])) + txChannelOffset := int(binary.BigEndian.Uint16(resp[recordOffset+10 : recordOffset+12])) txDeviceName := extractNullTerminatedString(resp, txDeviceOffset) + txChannelName := extractNullTerminatedString(resp, txChannelOffset) - - if txDeviceName != "" { - subscriptions = append(subscriptions, DanteSubscription{ - RxChannel: rxChannelNum, - TxDeviceName: txDeviceName, - }) + if t.DebugDante { + log.Printf("[dante] %s: multicast record %d: rx=%d txDevOffset=%d txDev=%q txChOffset=%d txCh=%q", ip, idx, rxChannelNum, txDeviceOffset, txDeviceName, txChannelOffset, txChannelName) } + subscriptions = append(subscriptions, DanteSubscription{ + RxChannel: rxChannelNum, + TxDeviceName: txDeviceName, + TxChannelName: txChannelName, + }) + recordOffset += 20 } } else { @@ -396,21 +431,38 @@ func (t *Tendrils) probeDanteDeviceWithPort(ip net.IP, port int) { t.nodes.Update(nil, nil, []net.IP{ip}, "", info.Name, "dante-control") } + var multicastChannels []string for _, sub := range info.Subscriptions { if t.DebugDante { log.Printf("[dante] %s: subscription rx=%d -> %s@%s", ip, sub.RxChannel, sub.TxChannelName, sub.TxDeviceName) } if sub.TxDeviceName != "" && info.Name != "" { - t.danteFlows.Update(sub.TxDeviceName, info.Name, sub.RxChannel) + channelInfo := "" + if sub.TxChannelName != "" { + channelInfo = fmt.Sprintf("%s->%d", sub.TxChannelName, sub.RxChannel) + } + t.danteFlows.Update(sub.TxDeviceName, info.Name, channelInfo) + } else if sub.TxChannelName != "" { + multicastChannels = append(multicastChannels, sub.TxChannelName) } } if info.HasMulticast && info.Name != "" { groups := t.nodes.GetDanteMulticastGroups(ip) for _, groupIP := range groups { - groupName := (&MulticastGroup{IP: groupIP}).Name() - t.danteFlows.Update(groupName, info.Name, 0) + sourceName := t.nodes.GetDanteTxDeviceInGroup(groupIP) + if t.DebugDante { + log.Printf("[dante] %s: multicast group %s -> tx device %q", ip, groupIP, sourceName) + } + if sourceName == "" { + sourceName = (&MulticastGroup{IP: groupIP}).Name() + } + channelInfo := "" + if len(multicastChannels) > 0 { + channelInfo = strings.Join(multicastChannels, ",") + } + t.danteFlows.Update(sourceName, info.Name, channelInfo) } } } diff --git a/mdns.go b/mdns.go index 4b6d319..e8391df 100644 --- a/mdns.go +++ b/mdns.go @@ -31,6 +31,19 @@ func extractDanteName(s string) string { return name } +func extractDanteChanService(s string) (channel, device string) { + idx := strings.Index(s, "._netaudio-chan.") + if idx <= 0 { + return "", "" + } + name := s[:idx] + at := strings.Index(name, "@") + if at <= 0 { + return "", "" + } + return name[:at], name[at+1:] +} + func isDanteService(s string) bool { return strings.Contains(s, "_netaudio-") || strings.Contains(s, "._dante") } @@ -108,6 +121,7 @@ func (t *Tendrils) processMDNSResponse(ifaceName string, srcIP net.IP, msg *dns. srvTargets := map[string]string{} danteNames := map[string]bool{} danteARCPorts := map[string]uint16{} + danteTxChannels := map[string]string{} // device name -> channel names skaarhojNames := map[string]bool{} for _, rr := range allRecords { @@ -143,6 +157,16 @@ func (t *Tendrils) processMDNSResponse(ifaceName string, srcIP net.IP, msg *dns. danteARCPorts[name] = r.Port } } + if strings.Contains(r.Hdr.Name, "_netaudio-chan.") { + channel, device := extractDanteChanService(r.Hdr.Name) + if channel != "" && device != "" { + if existing, ok := danteTxChannels[device]; ok { + danteTxChannels[device] = existing + "," + channel + } else { + danteTxChannels[device] = channel + } + } + } } if isSkaarhojService(r.Hdr.Name) { name := extractSkaarhojName(r.Hdr.Name) @@ -171,6 +195,23 @@ func (t *Tendrils) processMDNSResponse(ifaceName string, srcIP net.IP, msg *dns. t.nodes.UpdateDante(name, ip, arcPort) } + for device, channels := range danteTxChannels { + var ip net.IP + if target, ok := srvTargets[device]; ok { + ip = aRecords[target] + } + if ip == nil { + ip = aRecords[device+".local"] + } + if ip == nil { + ip = srcIP + } + if t.DebugMDNS { + log.Printf("[mdns] %s: dante tx channels %s@%s (%s)", ifaceName, channels, device, ip) + } + t.nodes.UpdateDanteTxChannels(device, ip, channels) + } + for name := range skaarhojNames { var ip net.IP if target, ok := srvTargets[name]; ok { diff --git a/nodes.go b/nodes.go index c54d9f9..3bb9364 100644 --- a/nodes.go +++ b/nodes.go @@ -108,6 +108,7 @@ type Node struct { MACTable map[string]string // peer MAC -> local interface name PoEBudget *PoEBudget IsDanteClockMaster bool + DanteTxChannels string pollTrigger chan struct{} } @@ -636,7 +637,25 @@ func (n *Nodes) GetDanteMulticastGroups(deviceIP net.IP) []net.IP { } } return groups - return nil +} + +func (n *Nodes) GetMulticastGroupMembers(groupIP net.IP) []*Node { + n.mu.RLock() + defer n.mu.RUnlock() + + groupKey := groupIP.String() + gm := n.multicastGroups[groupKey] + if gm == nil { + return nil + } + + var members []*Node + for _, membership := range gm.Members { + if membership.Node != nil { + members = append(members, membership.Node) + } + } + return members } func (n *Nodes) logNode(node *Node) {