add multicast flow transmitter discovery via mdns _netaudio-chan service

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Ian Gulliver
2026-01-23 11:53:30 -08:00
parent 47b48337b3
commit 9c7bd671ba
4 changed files with 165 additions and 24 deletions

View File

@@ -85,3 +85,32 @@ func (n *Nodes) UpdateDante(name string, ip net.IP, arcPort int) {
go n.t.probeDanteDeviceWithPort(ip, arcPort) go n.t.probeDanteDeviceWithPort(ip, arcPort)
} }
} }
func (n *Nodes) UpdateDanteTxChannels(name string, ip net.IP, channels string) {
n.mu.Lock()
defer n.mu.Unlock()
node := n.getNodeByIPLocked(ip)
if node == nil {
return
}
node.DanteTxChannels = channels
}
func (n *Nodes) GetDanteTxDeviceInGroup(groupIP net.IP) string {
n.mu.RLock()
defer n.mu.RUnlock()
groupKey := groupIP.String()
gm := n.multicastGroups[groupKey]
if gm == nil {
return ""
}
for _, membership := range gm.Members {
if membership.Node != nil && membership.Node.DanteTxChannels != "" {
return membership.Node.Name
}
}
return ""
}

View File

@@ -27,7 +27,7 @@ type DanteFlow struct {
type DanteFlowSubscriber struct { type DanteFlowSubscriber struct {
Name string Name string
Channels []int Channels []string
LastSeen time.Time LastSeen time.Time
} }
@@ -42,7 +42,7 @@ func NewDanteFlows() *DanteFlows {
} }
} }
func (d *DanteFlows) Update(sourceName, subscriberName string, channel int) { func (d *DanteFlows) Update(sourceName, subscriberName, channelInfo string) {
d.mu.Lock() d.mu.Lock()
defer d.mu.Unlock() defer d.mu.Unlock()
@@ -63,17 +63,17 @@ func (d *DanteFlows) Update(sourceName, subscriberName string, channel int) {
flow.Subscribers[subscriberName] = sub flow.Subscribers[subscriberName] = sub
} }
if channel > 0 { if channelInfo != "" {
hasChannel := false hasChannel := false
for _, ch := range sub.Channels { for _, ch := range sub.Channels {
if ch == channel { if ch == channelInfo {
hasChannel = true hasChannel = true
break break
} }
} }
if !hasChannel { if !hasChannel {
sub.Channels = append(sub.Channels, channel) sub.Channels = append(sub.Channels, channelInfo)
sort.Ints(sub.Channels) sort.Strings(sub.Channels)
} }
} }
@@ -121,11 +121,7 @@ func (d *DanteFlows) LogAll() {
for _, sub := range flow.Subscribers { for _, sub := range flow.Subscribers {
name := sub.Name name := sub.Name
if len(sub.Channels) > 0 { if len(sub.Channels) > 0 {
var chStrs []string name = fmt.Sprintf("%s[%s]", name, strings.Join(sub.Channels, ","))
for _, ch := range sub.Channels {
chStrs = append(chStrs, fmt.Sprintf("%d", ch))
}
name = fmt.Sprintf("%s[%s]", name, strings.Join(chStrs, ","))
} }
subNames = append(subNames, name) subNames = append(subNames, name)
} }
@@ -170,6 +166,10 @@ func (t *Tendrils) queryDanteDeviceWithPort(ip net.IP, port int) *DanteDeviceInf
info.Subscriptions, info.HasMulticast = t.queryDanteSubscriptions(conn, ip, info.RxChannelCount, info.TxChannelCount) info.Subscriptions, info.HasMulticast = t.queryDanteSubscriptions(conn, ip, info.RxChannelCount, info.TxChannelCount)
} }
if info.TxChannelCount > 0 {
t.queryDanteTxChannels(conn, ip, info.TxChannelCount)
}
return info return info
} }
@@ -278,6 +278,27 @@ func (t *Tendrils) queryDanteChannelCount(conn *net.UDPConn, ip net.IP) (int, in
return rxCount, txCount return rxCount, txCount
} }
func (t *Tendrils) queryDanteTxChannels(conn *net.UDPConn, ip net.IP, txCount int) {
if txCount == 0 {
return
}
pagesNeeded := (txCount + 15) / 16
for page := 0; page < pagesNeeded; page++ {
pageNum := byte(page + 1)
args := []byte{0x00, 0x01, 0x00, pageNum, 0x00, 0x00}
resp := t.sendDanteCommand(conn, ip, 0x2000, args)
if t.DebugDante {
if resp == nil {
log.Printf("[dante] %s: tx channels 0x2000 page %d: no response", ip, page)
} else {
log.Printf("[dante] %s: tx channels 0x2000 page %d (%d bytes): %x", ip, page, len(resp), resp)
}
}
}
}
func (t *Tendrils) queryDanteSubscriptions(conn *net.UDPConn, ip net.IP, rxCount, txCount int) ([]DanteSubscription, bool) { func (t *Tendrils) queryDanteSubscriptions(conn *net.UDPConn, ip net.IP, rxCount, txCount int) ([]DanteSubscription, bool) {
if rxCount == 0 { if rxCount == 0 {
return nil, false return nil, false
@@ -311,23 +332,37 @@ func (t *Tendrils) queryDanteSubscriptions(conn *net.UDPConn, ip net.IP, rxCount
hasMulticast = hasMulticast || isMulticast hasMulticast = hasMulticast || isMulticast
if isMulticast { if isMulticast {
if t.DebugDante {
stringTableStart := 12 + subCount*20
if stringTableStart < len(resp) {
log.Printf("[dante] %s: multicast string table at offset %d: %x", ip, stringTableStart, resp[stringTableStart:])
}
}
recordOffset := 12 recordOffset := 12
for idx := 0; idx < subCount; idx++ { for idx := 0; idx < subCount; idx++ {
if recordOffset+20 > len(resp) { if recordOffset+20 > len(resp) {
break break
} }
if t.DebugDante {
log.Printf("[dante] %s: multicast record %d at offset %d: %x", ip, idx, recordOffset, resp[recordOffset:recordOffset+20])
}
rxChannelNum := int(binary.BigEndian.Uint16(resp[recordOffset : recordOffset+2])) rxChannelNum := int(binary.BigEndian.Uint16(resp[recordOffset : recordOffset+2]))
txDeviceOffset := int(binary.BigEndian.Uint16(resp[recordOffset+4 : recordOffset+6])) txDeviceOffset := int(binary.BigEndian.Uint16(resp[recordOffset+4 : recordOffset+6]))
txChannelOffset := int(binary.BigEndian.Uint16(resp[recordOffset+10 : recordOffset+12]))
txDeviceName := extractNullTerminatedString(resp, txDeviceOffset) txDeviceName := extractNullTerminatedString(resp, txDeviceOffset)
txChannelName := extractNullTerminatedString(resp, txChannelOffset)
if t.DebugDante {
log.Printf("[dante] %s: multicast record %d: rx=%d txDevOffset=%d txDev=%q txChOffset=%d txCh=%q", ip, idx, rxChannelNum, txDeviceOffset, txDeviceName, txChannelOffset, txChannelName)
}
if txDeviceName != "" {
subscriptions = append(subscriptions, DanteSubscription{ subscriptions = append(subscriptions, DanteSubscription{
RxChannel: rxChannelNum, RxChannel: rxChannelNum,
TxDeviceName: txDeviceName, TxDeviceName: txDeviceName,
TxChannelName: txChannelName,
}) })
}
recordOffset += 20 recordOffset += 20
} }
@@ -396,21 +431,38 @@ func (t *Tendrils) probeDanteDeviceWithPort(ip net.IP, port int) {
t.nodes.Update(nil, nil, []net.IP{ip}, "", info.Name, "dante-control") t.nodes.Update(nil, nil, []net.IP{ip}, "", info.Name, "dante-control")
} }
var multicastChannels []string
for _, sub := range info.Subscriptions { for _, sub := range info.Subscriptions {
if t.DebugDante { if t.DebugDante {
log.Printf("[dante] %s: subscription rx=%d -> %s@%s", log.Printf("[dante] %s: subscription rx=%d -> %s@%s",
ip, sub.RxChannel, sub.TxChannelName, sub.TxDeviceName) ip, sub.RxChannel, sub.TxChannelName, sub.TxDeviceName)
} }
if sub.TxDeviceName != "" && info.Name != "" { if sub.TxDeviceName != "" && info.Name != "" {
t.danteFlows.Update(sub.TxDeviceName, info.Name, sub.RxChannel) channelInfo := ""
if sub.TxChannelName != "" {
channelInfo = fmt.Sprintf("%s->%d", sub.TxChannelName, sub.RxChannel)
}
t.danteFlows.Update(sub.TxDeviceName, info.Name, channelInfo)
} else if sub.TxChannelName != "" {
multicastChannels = append(multicastChannels, sub.TxChannelName)
} }
} }
if info.HasMulticast && info.Name != "" { if info.HasMulticast && info.Name != "" {
groups := t.nodes.GetDanteMulticastGroups(ip) groups := t.nodes.GetDanteMulticastGroups(ip)
for _, groupIP := range groups { for _, groupIP := range groups {
groupName := (&MulticastGroup{IP: groupIP}).Name() sourceName := t.nodes.GetDanteTxDeviceInGroup(groupIP)
t.danteFlows.Update(groupName, info.Name, 0) if t.DebugDante {
log.Printf("[dante] %s: multicast group %s -> tx device %q", ip, groupIP, sourceName)
}
if sourceName == "" {
sourceName = (&MulticastGroup{IP: groupIP}).Name()
}
channelInfo := ""
if len(multicastChannels) > 0 {
channelInfo = strings.Join(multicastChannels, ",")
}
t.danteFlows.Update(sourceName, info.Name, channelInfo)
} }
} }
} }

41
mdns.go
View File

@@ -31,6 +31,19 @@ func extractDanteName(s string) string {
return name return name
} }
func extractDanteChanService(s string) (channel, device string) {
idx := strings.Index(s, "._netaudio-chan.")
if idx <= 0 {
return "", ""
}
name := s[:idx]
at := strings.Index(name, "@")
if at <= 0 {
return "", ""
}
return name[:at], name[at+1:]
}
func isDanteService(s string) bool { func isDanteService(s string) bool {
return strings.Contains(s, "_netaudio-") || strings.Contains(s, "._dante") return strings.Contains(s, "_netaudio-") || strings.Contains(s, "._dante")
} }
@@ -108,6 +121,7 @@ func (t *Tendrils) processMDNSResponse(ifaceName string, srcIP net.IP, msg *dns.
srvTargets := map[string]string{} srvTargets := map[string]string{}
danteNames := map[string]bool{} danteNames := map[string]bool{}
danteARCPorts := map[string]uint16{} danteARCPorts := map[string]uint16{}
danteTxChannels := map[string]string{} // device name -> channel names
skaarhojNames := map[string]bool{} skaarhojNames := map[string]bool{}
for _, rr := range allRecords { for _, rr := range allRecords {
@@ -143,6 +157,16 @@ func (t *Tendrils) processMDNSResponse(ifaceName string, srcIP net.IP, msg *dns.
danteARCPorts[name] = r.Port danteARCPorts[name] = r.Port
} }
} }
if strings.Contains(r.Hdr.Name, "_netaudio-chan.") {
channel, device := extractDanteChanService(r.Hdr.Name)
if channel != "" && device != "" {
if existing, ok := danteTxChannels[device]; ok {
danteTxChannels[device] = existing + "," + channel
} else {
danteTxChannels[device] = channel
}
}
}
} }
if isSkaarhojService(r.Hdr.Name) { if isSkaarhojService(r.Hdr.Name) {
name := extractSkaarhojName(r.Hdr.Name) name := extractSkaarhojName(r.Hdr.Name)
@@ -171,6 +195,23 @@ func (t *Tendrils) processMDNSResponse(ifaceName string, srcIP net.IP, msg *dns.
t.nodes.UpdateDante(name, ip, arcPort) t.nodes.UpdateDante(name, ip, arcPort)
} }
for device, channels := range danteTxChannels {
var ip net.IP
if target, ok := srvTargets[device]; ok {
ip = aRecords[target]
}
if ip == nil {
ip = aRecords[device+".local"]
}
if ip == nil {
ip = srcIP
}
if t.DebugMDNS {
log.Printf("[mdns] %s: dante tx channels %s@%s (%s)", ifaceName, channels, device, ip)
}
t.nodes.UpdateDanteTxChannels(device, ip, channels)
}
for name := range skaarhojNames { for name := range skaarhojNames {
var ip net.IP var ip net.IP
if target, ok := srvTargets[name]; ok { if target, ok := srvTargets[name]; ok {

View File

@@ -108,6 +108,7 @@ type Node struct {
MACTable map[string]string // peer MAC -> local interface name MACTable map[string]string // peer MAC -> local interface name
PoEBudget *PoEBudget PoEBudget *PoEBudget
IsDanteClockMaster bool IsDanteClockMaster bool
DanteTxChannels string
pollTrigger chan struct{} pollTrigger chan struct{}
} }
@@ -636,9 +637,27 @@ func (n *Nodes) GetDanteMulticastGroups(deviceIP net.IP) []net.IP {
} }
} }
return groups return groups
}
func (n *Nodes) GetMulticastGroupMembers(groupIP net.IP) []*Node {
n.mu.RLock()
defer n.mu.RUnlock()
groupKey := groupIP.String()
gm := n.multicastGroups[groupKey]
if gm == nil {
return nil return nil
} }
var members []*Node
for _, membership := range gm.Members {
if membership.Node != nil {
members = append(members, membership.Node)
}
}
return members
}
func (n *Nodes) logNode(node *Node) { func (n *Nodes) logNode(node *Node) {
name := node.Name name := node.Name
if name == "" { if name == "" {