diff --git a/artnet.go b/artnet.go index a62885b..dca47a0 100644 --- a/artnet.go +++ b/artnet.go @@ -23,13 +23,10 @@ const ( ) type ArtNetNode struct { - IP net.IP - MAC net.HardwareAddr - ShortName string - LongName string - Inputs []int - Outputs []int - LastSeen time.Time + Node *Node + Inputs []int + Outputs []int + LastSeen time.Time } func (t *Tendrils) listenArtNet(ctx context.Context, iface net.Interface) { @@ -139,18 +136,6 @@ func (t *Tendrils) handleArtPollReply(ifaceName string, srcIP net.IP, data []byt log.Printf("[artnet] %s: %s %s short=%q long=%q numPorts=%d portTypes=%v in=%v out=%v", ifaceName, ip, mac, shortName, longName, numPorts, data[174:178], inputs, outputs) } - node := &ArtNetNode{ - IP: ip, - MAC: mac, - ShortName: shortName, - LongName: longName, - Inputs: inputs, - Outputs: outputs, - LastSeen: time.Now(), - } - - t.nodes.UpdateArtNet(node) - name := longName if name == "" { name = shortName @@ -158,6 +143,17 @@ func (t *Tendrils) handleArtPollReply(ifaceName string, srcIP net.IP, data []byt if name != "" { t.nodes.Update(nil, mac, []net.IP{ip}, "", name, "artnet") } + + node := t.nodes.GetByIP(ip) + if node == nil && mac != nil { + node = t.nodes.GetByMAC(mac) + } + if node == nil && name != "" { + node = t.nodes.GetOrCreateByName(name) + } + if node != nil { + t.nodes.UpdateArtNet(node, inputs, outputs) + } } func (t *Tendrils) runArtNetPoller(ctx context.Context, iface net.Interface, conn *net.UDPConn) { @@ -220,41 +216,39 @@ func (t *Tendrils) sendArtPoll(conn *net.UDPConn, broadcast net.IP, ifaceName st type ArtNetNodes struct { mu sync.RWMutex - nodes map[string]*ArtNetNode + nodes map[*Node]*ArtNetNode } func NewArtNetNodes() *ArtNetNodes { return &ArtNetNodes{ - nodes: map[string]*ArtNetNode{}, + nodes: map[*Node]*ArtNetNode{}, } } -func (a *ArtNetNodes) Update(node *ArtNetNode) { +func (a *ArtNetNodes) Update(node *Node, inputs, outputs []int) { a.mu.Lock() defer a.mu.Unlock() - key := node.IP.String() - existing, exists := a.nodes[key] + existing, exists := a.nodes[node] if exists { - for _, u := range node.Inputs { + for _, u := range inputs { if !containsInt(existing.Inputs, u) { existing.Inputs = append(existing.Inputs, u) } } - for _, u := range node.Outputs { + for _, u := range outputs { if !containsInt(existing.Outputs, u) { existing.Outputs = append(existing.Outputs, u) } } - existing.LastSeen = node.LastSeen - if node.ShortName != "" { - existing.ShortName = node.ShortName - } - if node.LongName != "" { - existing.LongName = node.LongName - } + existing.LastSeen = time.Now() } else { - a.nodes[key] = node + a.nodes[node] = &ArtNetNode{ + Node: node, + Inputs: inputs, + Outputs: outputs, + LastSeen: time.Now(), + } } } @@ -271,9 +265,9 @@ func (a *ArtNetNodes) Expire() { a.mu.Lock() defer a.mu.Unlock() expireTime := time.Now().Add(-60 * time.Second) - for key, node := range a.nodes { - if node.LastSeen.Before(expireTime) { - delete(a.nodes, key) + for nodePtr, artNode := range a.nodes { + if artNode.LastSeen.Before(expireTime) { + delete(a.nodes, nodePtr) } } } @@ -298,29 +292,26 @@ func (a *ArtNetNodes) LogAll() { return } - var nodes []*ArtNetNode - for _, node := range a.nodes { - nodes = append(nodes, node) + var artNodes []*ArtNetNode + for _, artNode := range a.nodes { + artNodes = append(artNodes, artNode) } - sort.Slice(nodes, func(i, j int) bool { - return sortorder.NaturalLess(nodes[i].LongName, nodes[j].LongName) + sort.Slice(artNodes, func(i, j int) bool { + return sortorder.NaturalLess(artNodes[i].Node.DisplayName(), artNodes[j].Node.DisplayName()) }) inputUniverses := map[int][]string{} outputUniverses := map[int][]string{} - for _, node := range nodes { - name := node.LongName + for _, artNode := range artNodes { + name := artNode.Node.DisplayName() if name == "" { - name = node.ShortName + name = "??" } - if name == "" { - name = node.IP.String() - } - for _, u := range node.Inputs { + for _, u := range artNode.Inputs { inputUniverses[u] = append(inputUniverses[u], name) } - for _, u := range node.Outputs { + for _, u := range artNode.Outputs { outputUniverses[u] = append(outputUniverses[u], name) } } @@ -361,6 +352,6 @@ func (a *ArtNetNodes) LogAll() { } } -func (n *Nodes) UpdateArtNet(artNode *ArtNetNode) { - n.t.artnet.Update(artNode) +func (n *Nodes) UpdateArtNet(node *Node, inputs, outputs []int) { + n.t.artnet.Update(node, inputs, outputs) } diff --git a/dante_control.go b/dante_control.go index a37fe00..d36f98d 100644 --- a/dante_control.go +++ b/dante_control.go @@ -21,46 +21,46 @@ const ( var danteSeqID uint32 type DanteFlow struct { - SourceName string - Subscribers map[string]*DanteFlowSubscriber + Source *Node + Subscribers map[*Node]*DanteFlowSubscriber } type DanteFlowSubscriber struct { - Name string - Channels []string - LastSeen time.Time + Node *Node + Channels []string + LastSeen time.Time } type DanteFlows struct { mu sync.RWMutex - flows map[string]*DanteFlow + flows map[*Node]*DanteFlow } func NewDanteFlows() *DanteFlows { return &DanteFlows{ - flows: map[string]*DanteFlow{}, + flows: map[*Node]*DanteFlow{}, } } -func (d *DanteFlows) Update(sourceName, subscriberName, channelInfo string) { +func (d *DanteFlows) Update(source, subscriber *Node, channelInfo string) { d.mu.Lock() defer d.mu.Unlock() - flow := d.flows[sourceName] + flow := d.flows[source] if flow == nil { flow = &DanteFlow{ - SourceName: sourceName, - Subscribers: map[string]*DanteFlowSubscriber{}, + Source: source, + Subscribers: map[*Node]*DanteFlowSubscriber{}, } - d.flows[sourceName] = flow + d.flows[source] = flow } - sub := flow.Subscribers[subscriberName] + sub := flow.Subscribers[subscriber] if sub == nil { sub = &DanteFlowSubscriber{ - Name: subscriberName, + Node: subscriber, } - flow.Subscribers[subscriberName] = sub + flow.Subscribers[subscriber] = sub } if channelInfo != "" { @@ -85,14 +85,14 @@ func (d *DanteFlows) Expire() { defer d.mu.Unlock() expireTime := time.Now().Add(-5 * time.Minute) - for sourceName, flow := range d.flows { - for subName, sub := range flow.Subscribers { + for source, flow := range d.flows { + for subNode, sub := range flow.Subscribers { if sub.LastSeen.Before(expireTime) { - delete(flow.Subscribers, subName) + delete(flow.Subscribers, subNode) } } if len(flow.Subscribers) == 0 { - delete(d.flows, sourceName) + delete(d.flows, source) } } } @@ -112,7 +112,7 @@ func (d *DanteFlows) LogAll() { flows = append(flows, flow) } sort.Slice(flows, func(i, j int) bool { - return sortorder.NaturalLess(flows[i].SourceName, flows[j].SourceName) + return sortorder.NaturalLess(flows[i].Source.DisplayName(), flows[j].Source.DisplayName()) }) type channelFlow struct { @@ -125,14 +125,18 @@ func (d *DanteFlows) LogAll() { var allNoChannelFlows []string for _, flow := range flows { - sourceName := flow.SourceName - if strings.HasPrefix(sourceName, "dante-av:") || strings.HasPrefix(sourceName, "dante-mcast:") { - sourceName = "?? (" + sourceName + ")" + sourceName := flow.Source.DisplayName() + if sourceName == "" { + sourceName = "??" } for _, sub := range flow.Subscribers { + subName := sub.Node.DisplayName() + if subName == "" { + subName = "??" + } if len(sub.Channels) == 0 { - allNoChannelFlows = append(allNoChannelFlows, fmt.Sprintf("%s -> %s", sourceName, sub.Name)) + allNoChannelFlows = append(allNoChannelFlows, fmt.Sprintf("%s -> %s", sourceName, subName)) } else { for _, ch := range sub.Channels { parts := strings.Split(ch, "->") @@ -140,11 +144,11 @@ func (d *DanteFlows) LogAll() { allChannelFlows = append(allChannelFlows, channelFlow{ sourceName: sourceName, txCh: parts[0], - rxName: sub.Name, + rxName: subName, rxCh: parts[1], }) } else { - allNoChannelFlows = append(allNoChannelFlows, fmt.Sprintf("%s -> %s[%s]", sourceName, sub.Name, ch)) + allNoChannelFlows = append(allNoChannelFlows, fmt.Sprintf("%s -> %s[%s]", sourceName, subName, ch)) } } } @@ -627,7 +631,9 @@ func (t *Tendrils) probeDanteDeviceWithPort(ip net.IP, port int) { if sub.TxChannelName != "" { channelInfo = fmt.Sprintf("%s->%02d", sub.TxChannelName, sub.RxChannel) } - t.danteFlows.Update(txDeviceName, info.Name, channelInfo) + sourceNode := t.nodes.GetOrCreateByName(txDeviceName) + subscriberNode := t.nodes.GetOrCreateByName(info.Name) + t.danteFlows.Update(sourceNode, subscriberNode, channelInfo) needIGMPFallback = false } } @@ -642,7 +648,9 @@ func (t *Tendrils) probeDanteDeviceWithPort(ip net.IP, port int) { if sourceName == "" { sourceName = (&MulticastGroup{IP: groupIP}).Name() } - t.danteFlows.Update(sourceName, info.Name, "") + sourceNode := t.nodes.GetOrCreateByName(sourceName) + subscriberNode := t.nodes.GetOrCreateByName(info.Name) + t.danteFlows.Update(sourceNode, subscriberNode, "") } } } diff --git a/nodes.go b/nodes.go index 551d5eb..67ddf6f 100644 --- a/nodes.go +++ b/nodes.go @@ -234,6 +234,7 @@ type Nodes struct { nodes map[int]*Node ipIndex map[string]int macIndex map[string]int + nameIndex map[string]int nodeCancel map[int]context.CancelFunc multicastGroups map[string]*MulticastGroupMembers // group IP string -> group with members nextID int @@ -248,6 +249,7 @@ func NewNodes(t *Tendrils) *Nodes { nodes: map[int]*Node{}, ipIndex: map[string]int{}, macIndex: map[string]int{}, + nameIndex: map[string]int{}, nodeCancel: map[int]context.CancelFunc{}, multicastGroups: map[string]*MulticastGroupMembers{}, nextID: 1, @@ -307,6 +309,20 @@ func (n *Nodes) Update(target *Node, mac net.HardwareAddr, ips []net.IP, ifaceNa } } + if nodeName != "" { + if id, exists := n.nameIndex[nodeName]; exists { + if nameNode, nodeExists := n.nodes[id]; nodeExists { + if targetID == -1 { + targetID = id + } else if id != targetID && len(nameNode.Interfaces) == 0 { + n.mergeNodes(targetID, id) + } + } else { + delete(n.nameIndex, nodeName) + } + } + } + var node *Node if targetID == -1 { targetID = n.nextID @@ -351,6 +367,7 @@ func (n *Nodes) Update(target *Node, mac net.HardwareAddr, ips []net.IP, ifaceNa } if !node.Names[nodeName] { node.Names[nodeName] = true + n.nameIndex[nodeName] = targetID added = append(added, "name="+nodeName) } } @@ -516,6 +533,7 @@ func (n *Nodes) mergeNodes(keepID, mergeID int) { keep.Names = map[string]bool{} } keep.Names[name] = true + n.nameIndex[name] = keepID } for _, iface := range merge.Interfaces { @@ -557,6 +575,45 @@ func (n *Nodes) GetByMAC(mac net.HardwareAddr) *Node { return nil } +func (n *Nodes) GetByName(name string) *Node { + n.mu.RLock() + defer n.mu.RUnlock() + + if id, exists := n.nameIndex[name]; exists { + return n.nodes[id] + } + return nil +} + +func (n *Nodes) GetOrCreateByName(name string) *Node { + n.mu.Lock() + defer n.mu.Unlock() + + if id, exists := n.nameIndex[name]; exists { + if node, nodeExists := n.nodes[id]; nodeExists { + return node + } + delete(n.nameIndex, name) + } + + targetID := n.nextID + n.nextID++ + node := &Node{ + Names: map[string]bool{name: true}, + Interfaces: map[string]*Interface{}, + MACTable: map[string]string{}, + pollTrigger: make(chan struct{}, 1), + } + n.nodes[targetID] = node + n.nameIndex[name] = targetID + n.startNodePoller(targetID, node) + + if n.t.LogEvents { + log.Printf("[add] %s [name=%s] (via name-lookup)", node, name) + } + + return node +} func (n *Nodes) UpdateMACTable(node *Node, peerMAC net.HardwareAddr, ifaceName string) { n.mu.Lock()