diff --git a/arp.go b/arp.go index 9c705c7..103c3b7 100644 --- a/arp.go +++ b/arp.go @@ -126,3 +126,13 @@ func normalizeMACAddress(mac string) string { } return strings.Join(parts, ":") } + +func (t *Tendrils) requestARP(ip net.IP) { + if t.DisableARP { + return + } + conn, err := net.DialTimeout("udp4", ip.String()+":1", 100*time.Millisecond) + if err == nil { + conn.Close() + } +} diff --git a/cmd/tendrils/main.go b/cmd/tendrils/main.go index 8c2c1d9..57e596d 100644 --- a/cmd/tendrils/main.go +++ b/cmd/tendrils/main.go @@ -14,6 +14,7 @@ func main() { noIGMP := flag.Bool("no-igmp", false, "disable IGMP querier") noMDNS := flag.Bool("no-mdns", false, "disable mDNS discovery") noArtNet := flag.Bool("no-artnet", false, "disable Art-Net discovery") + noDante := flag.Bool("no-dante", false, "disable Dante discovery") logEvents := flag.Bool("log-events", false, "log node events") logNodes := flag.Bool("log-nodes", false, "log full node details on changes") debugARP := flag.Bool("debug-arp", false, "debug ARP discovery") @@ -22,6 +23,7 @@ func main() { debugIGMP := flag.Bool("debug-igmp", false, "debug IGMP querier") debugMDNS := flag.Bool("debug-mdns", false, "debug mDNS discovery") debugArtNet := flag.Bool("debug-artnet", false, "debug Art-Net discovery") + debugDante := flag.Bool("debug-dante", false, "debug Dante discovery") flag.Parse() t := tendrils.New() @@ -32,6 +34,7 @@ func main() { t.DisableIGMP = *noIGMP t.DisableMDNS = *noMDNS t.DisableArtNet = *noArtNet + t.DisableDante = *noDante t.LogEvents = *logEvents t.LogNodes = *logNodes t.DebugARP = *debugARP @@ -40,5 +43,6 @@ func main() { t.DebugIGMP = *debugIGMP t.DebugMDNS = *debugMDNS t.DebugArtNet = *debugArtNet + t.DebugDante = *debugDante t.Run() } diff --git a/dante.go b/dante.go new file mode 100644 index 0000000..5e45665 --- /dev/null +++ b/dante.go @@ -0,0 +1,152 @@ +package tendrils + +import ( + "context" + "log" + "net" + "time" + + "github.com/miekg/dns" +) + +func (t *Tendrils) listenDante(ctx context.Context, iface net.Interface) { + go t.queryDanteMDNS(ctx, iface) + go t.listenPTP(ctx, iface) +} + +func (t *Tendrils) queryDanteMDNS(ctx context.Context, iface net.Interface) { + addrs, err := iface.Addrs() + if err != nil { + return + } + + var srcIP net.IP + for _, addr := range addrs { + if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.To4() != nil { + srcIP = ipnet.IP.To4() + break + } + } + if srcIP == nil { + return + } + + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + t.sendDanteMDNSQuery(iface.Name, srcIP) + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + t.sendDanteMDNSQuery(iface.Name, srcIP) + } + } +} + +func (t *Tendrils) sendDanteMDNSQuery(ifaceName string, srcIP net.IP) { + conn, err := net.DialUDP("udp4", &net.UDPAddr{IP: srcIP}, &net.UDPAddr{IP: net.IPv4(224, 0, 0, 251), Port: 5353}) + if err != nil { + return + } + defer conn.Close() + + services := []string{ + "_netaudio-arc._udp.local.", + "_netaudio-cmc._udp.local.", + "_netaudio-dbc._udp.local.", + "_netaudio-chan._udp.local.", + "_dantevideo._udp.local.", + "_dantevid-flow._udp.local.", + } + + for _, service := range services { + msg := new(dns.Msg) + msg.SetQuestion(service, dns.TypePTR) + msg.RecursionDesired = false + + data, err := msg.Pack() + if err != nil { + continue + } + + conn.Write(data) + } + + if t.DebugDante { + log.Printf("[dante] %s: sent mdns queries for netaudio services", ifaceName) + } +} + +func (t *Tendrils) listenPTP(ctx context.Context, iface net.Interface) { + addr, err := net.ResolveUDPAddr("udp4", "224.0.1.129:319") + if err != nil { + return + } + + conn, err := net.ListenMulticastUDP("udp4", &iface, addr) + if err != nil { + if t.DebugDante { + log.Printf("[dante] %s: failed to listen ptp: %v", iface.Name, err) + } + return + } + defer conn.Close() + + buf := make([]byte, 1500) + for { + select { + case <-ctx.Done(): + return + default: + } + + conn.SetReadDeadline(time.Now().Add(1 * time.Second)) + n, src, err := conn.ReadFromUDP(buf) + if err != nil { + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + continue + } + continue + } + + t.handlePTPPacket(iface.Name, src.IP, buf[:n]) + } +} + +func (t *Tendrils) handlePTPPacket(ifaceName string, srcIP net.IP, data []byte) { + if len(data) < 34 { + return + } + + messageType := data[0] & 0x0f + + if messageType != 0x0b { + return + } + + if len(data) < 64 { + return + } + + clockClass := data[48] + clockAccuracy := data[49] + priority1 := data[47] + priority2 := data[51] + + if t.DebugDante { + log.Printf("[dante] %s: ptp announce from %s class=%d accuracy=%d p1=%d p2=%d", + ifaceName, srcIP, clockClass, clockAccuracy, priority1, priority2) + } + + t.nodes.SetDanteClockMaster(srcIP) +} + +func (n *Nodes) UpdateDante(name string, ip net.IP) { + if n.t.DebugDante { + log.Printf("[dante] mdns response: %s -> %s", name, ip) + } + n.Update(nil, nil, []net.IP{ip}, "", name, "dante") +} diff --git a/mdns.go b/mdns.go index 4135964..da3a0e7 100644 --- a/mdns.go +++ b/mdns.go @@ -14,6 +14,27 @@ const ( mdnsAddr = "224.0.0.251:5353" ) +func extractDanteName(s string) string { + var name string + for _, prefix := range []string{"._netaudio-", "._dante"} { + if idx := strings.Index(s, prefix); idx > 0 { + name = s[:idx] + break + } + } + if name == "" { + return "" + } + if at := strings.LastIndex(name, "@"); at >= 0 { + name = name[at+1:] + } + return name +} + +func isDanteService(s string) bool { + return strings.Contains(s, "_netaudio-") || strings.Contains(s, "._dante") +} + func (t *Tendrils) listenMDNS(ctx context.Context, iface net.Interface) { addr, err := net.ResolveUDPAddr("udp4", mdnsAddr) if err != nil { @@ -66,26 +87,67 @@ func (t *Tendrils) processMDNSResponse(ifaceName string, srcIP net.IP, msg *dns. var hostname string allRecords := append(msg.Answer, msg.Extra...) + + if t.DebugMDNS { + for _, rr := range allRecords { + log.Printf("[mdns] %s: record %s", ifaceName, rr.String()) + } + } + + aRecords := map[string]net.IP{} + srvTargets := map[string]string{} + danteNames := map[string]bool{} + for _, rr := range allRecords { switch r := rr.(type) { case *dns.A: - name := strings.TrimSuffix(r.Hdr.Name, ".local.") - name = strings.TrimSuffix(name, ".") - if name != "" && r.A.Equal(srcIP) { - hostname = name + name := strings.TrimSuffix(r.Hdr.Name, ".") + aRecords[name] = r.A + localName := strings.TrimSuffix(name, ".local") + if localName != "" && r.A.Equal(srcIP) { + hostname = localName } case *dns.AAAA: continue case *dns.PTR: - name := strings.TrimSuffix(r.Ptr, ".local.") - name = strings.TrimSuffix(name, ".") - if hostname == "" && name != "" && !strings.HasPrefix(name, "_") { - hostname = name + if isDanteService(r.Hdr.Name) { + name := extractDanteName(r.Ptr) + if name != "" { + danteNames[name] = true + } + } else { + name := strings.TrimSuffix(r.Ptr, ".local.") + name = strings.TrimSuffix(name, ".") + if hostname == "" && name != "" && !strings.HasPrefix(name, "_") { + hostname = name + } + } + case *dns.SRV: + if isDanteService(r.Hdr.Name) { + name := extractDanteName(r.Hdr.Name) + target := strings.TrimSuffix(r.Target, ".") + if name != "" && target != "" { + srvTargets[name] = target + } } } } - if hostname != "" { + for name := range danteNames { + var ip net.IP + if target, ok := srvTargets[name]; ok { + ip = aRecords[target] + } + if ip == nil { + ip = aRecords[name+".local"] + } + if ip == nil { + ip = srcIP + } + t.nodes.UpdateDante(name, ip) + } + + if len(danteNames) == 0 && hostname != "" { if t.DebugMDNS { log.Printf("[mdns] %s: %s -> %s", ifaceName, srcIP, hostname) } diff --git a/nodes.go b/nodes.go index c9cffa7..2e61e4e 100644 --- a/nodes.go +++ b/nodes.go @@ -102,11 +102,12 @@ type PoEBudget struct { } type Node struct { - Name string - Interfaces map[string]*Interface - MACTable map[string]string // peer MAC -> local interface name - PoEBudget *PoEBudget - pollTrigger chan struct{} + Name string + Interfaces map[string]*Interface + MACTable map[string]string // peer MAC -> local interface name + PoEBudget *PoEBudget + IsDanteClockMaster bool + pollTrigger chan struct{} } func (n *Node) String() string { @@ -148,9 +149,28 @@ func (g *MulticastGroup) Name() string { return fmt.Sprintf("sacn:%d", universe) } + if ip[0] == 239 && ip[1] >= 69 && ip[1] <= 71 { + flowID := (int(ip[1]-69) << 16) | (int(ip[2]) << 8) | int(ip[3]) + return fmt.Sprintf("dante-mcast:%d", flowID) + } + return g.IP.String() } +func (g *MulticastGroup) IsDante() bool { + ip := g.IP.To4() + if ip == nil { + return false + } + if ip[0] == 239 && ip[1] == 255 { + return false + } + if ip[0] == 239 && ip[1] >= 69 && ip[1] <= 71 { + return true + } + return false +} + type MulticastMembership struct { Node *Node LastSeen time.Time @@ -197,7 +217,7 @@ func (n *Nodes) Update(target *Node, mac net.HardwareAddr, ips []net.IP, ifaceNa n.mu.Lock() defer n.mu.Unlock() - if mac == nil && target == nil { + if mac == nil && target == nil && len(ips) == 0 { return } @@ -228,6 +248,17 @@ func (n *Nodes) Update(target *Node, mac net.HardwareAddr, ips []net.IP, ifaceNa } } + if targetID == -1 { + for _, ip := range ips { + if id, exists := n.ipIndex[ip.String()]; exists { + if _, nodeExists := n.nodes[id]; nodeExists { + targetID = id + break + } + } + } + } + var node *Node if targetID == -1 { targetID = n.nextID @@ -247,6 +278,15 @@ func (n *Nodes) Update(target *Node, mac net.HardwareAddr, ips []net.IP, ifaceNa var added []string if mac != nil { added = n.updateNodeInterface(node, targetID, mac, ips, ifaceName) + } else { + for _, ip := range ips { + ipKey := ip.String() + if _, exists := n.ipIndex[ipKey]; !exists { + n.ipIndex[ipKey] = targetID + added = append(added, "ip="+ipKey) + go n.t.requestARP(ip) + } + } } if nodeName != "" && node.Name == "" { @@ -463,6 +503,33 @@ func (n *Nodes) UpdateMACTable(node *Node, peerMAC net.HardwareAddr, ifaceName s node.MACTable[peerMAC.String()] = ifaceName } +func (n *Nodes) SetDanteClockMaster(ip net.IP) { + n.mu.RLock() + currentMaster := "" + for _, node := range n.nodes { + if node.IsDanteClockMaster { + currentMaster = ip.String() + break + } + } + n.mu.RUnlock() + + if currentMaster != ip.String() { + n.Update(nil, nil, []net.IP{ip}, "", "", "ptp") + } + + n.mu.Lock() + defer n.mu.Unlock() + + for _, node := range n.nodes { + node.IsDanteClockMaster = false + } + + if id, exists := n.ipIndex[ip.String()]; exists { + n.nodes[id].IsDanteClockMaster = true + } +} + func (n *Nodes) UpdateMulticastMembership(sourceIP, groupIP net.IP) { n.mu.Lock() defer n.mu.Unlock() @@ -514,8 +581,15 @@ func (n *Nodes) logNode(node *Node) { if name == "" { name = "??" } + var tags []string if node.PoEBudget != nil { - log.Printf("[node] %s [poe:%.0f/%.0fW]", name, node.PoEBudget.Power, node.PoEBudget.MaxPower) + tags = append(tags, fmt.Sprintf("poe:%.0f/%.0fW", node.PoEBudget.Power, node.PoEBudget.MaxPower)) + } + if node.IsDanteClockMaster { + tags = append(tags, "dante-clock-master") + } + if len(tags) > 0 { + log.Printf("[node] %s [%s]", name, joinParts(tags)) } else { log.Printf("[node] %s", name) } diff --git a/tendrils.go b/tendrils.go index bb71e3d..817c0f2 100644 --- a/tendrils.go +++ b/tendrils.go @@ -22,6 +22,7 @@ type Tendrils struct { DisableIGMP bool DisableMDNS bool DisableArtNet bool + DisableDante bool LogEvents bool LogNodes bool DebugARP bool @@ -30,6 +31,7 @@ type Tendrils struct { DebugIGMP bool DebugMDNS bool DebugArtNet bool + DebugDante bool } func New() *Tendrils { @@ -195,4 +197,7 @@ func (t *Tendrils) startInterface(ctx context.Context, iface net.Interface) { if !t.DisableArtNet { go t.listenArtNet(ctx, iface) } + if !t.DisableDante { + go t.listenDante(ctx, iface) + } }