refactor dante flows and artnet to use node references instead of names
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
89
artnet.go
89
artnet.go
@@ -23,10 +23,7 @@ const (
|
||||
)
|
||||
|
||||
type ArtNetNode struct {
|
||||
IP net.IP
|
||||
MAC net.HardwareAddr
|
||||
ShortName string
|
||||
LongName string
|
||||
Node *Node
|
||||
Inputs []int
|
||||
Outputs []int
|
||||
LastSeen time.Time
|
||||
@@ -139,18 +136,6 @@ func (t *Tendrils) handleArtPollReply(ifaceName string, srcIP net.IP, data []byt
|
||||
log.Printf("[artnet] %s: %s %s short=%q long=%q numPorts=%d portTypes=%v in=%v out=%v", ifaceName, ip, mac, shortName, longName, numPorts, data[174:178], inputs, outputs)
|
||||
}
|
||||
|
||||
node := &ArtNetNode{
|
||||
IP: ip,
|
||||
MAC: mac,
|
||||
ShortName: shortName,
|
||||
LongName: longName,
|
||||
Inputs: inputs,
|
||||
Outputs: outputs,
|
||||
LastSeen: time.Now(),
|
||||
}
|
||||
|
||||
t.nodes.UpdateArtNet(node)
|
||||
|
||||
name := longName
|
||||
if name == "" {
|
||||
name = shortName
|
||||
@@ -158,6 +143,17 @@ func (t *Tendrils) handleArtPollReply(ifaceName string, srcIP net.IP, data []byt
|
||||
if name != "" {
|
||||
t.nodes.Update(nil, mac, []net.IP{ip}, "", name, "artnet")
|
||||
}
|
||||
|
||||
node := t.nodes.GetByIP(ip)
|
||||
if node == nil && mac != nil {
|
||||
node = t.nodes.GetByMAC(mac)
|
||||
}
|
||||
if node == nil && name != "" {
|
||||
node = t.nodes.GetOrCreateByName(name)
|
||||
}
|
||||
if node != nil {
|
||||
t.nodes.UpdateArtNet(node, inputs, outputs)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Tendrils) runArtNetPoller(ctx context.Context, iface net.Interface, conn *net.UDPConn) {
|
||||
@@ -220,41 +216,39 @@ func (t *Tendrils) sendArtPoll(conn *net.UDPConn, broadcast net.IP, ifaceName st
|
||||
|
||||
type ArtNetNodes struct {
|
||||
mu sync.RWMutex
|
||||
nodes map[string]*ArtNetNode
|
||||
nodes map[*Node]*ArtNetNode
|
||||
}
|
||||
|
||||
func NewArtNetNodes() *ArtNetNodes {
|
||||
return &ArtNetNodes{
|
||||
nodes: map[string]*ArtNetNode{},
|
||||
nodes: map[*Node]*ArtNetNode{},
|
||||
}
|
||||
}
|
||||
|
||||
func (a *ArtNetNodes) Update(node *ArtNetNode) {
|
||||
func (a *ArtNetNodes) Update(node *Node, inputs, outputs []int) {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
key := node.IP.String()
|
||||
|
||||
existing, exists := a.nodes[key]
|
||||
existing, exists := a.nodes[node]
|
||||
if exists {
|
||||
for _, u := range node.Inputs {
|
||||
for _, u := range inputs {
|
||||
if !containsInt(existing.Inputs, u) {
|
||||
existing.Inputs = append(existing.Inputs, u)
|
||||
}
|
||||
}
|
||||
for _, u := range node.Outputs {
|
||||
for _, u := range outputs {
|
||||
if !containsInt(existing.Outputs, u) {
|
||||
existing.Outputs = append(existing.Outputs, u)
|
||||
}
|
||||
}
|
||||
existing.LastSeen = node.LastSeen
|
||||
if node.ShortName != "" {
|
||||
existing.ShortName = node.ShortName
|
||||
}
|
||||
if node.LongName != "" {
|
||||
existing.LongName = node.LongName
|
||||
}
|
||||
existing.LastSeen = time.Now()
|
||||
} else {
|
||||
a.nodes[key] = node
|
||||
a.nodes[node] = &ArtNetNode{
|
||||
Node: node,
|
||||
Inputs: inputs,
|
||||
Outputs: outputs,
|
||||
LastSeen: time.Now(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -271,9 +265,9 @@ func (a *ArtNetNodes) Expire() {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
expireTime := time.Now().Add(-60 * time.Second)
|
||||
for key, node := range a.nodes {
|
||||
if node.LastSeen.Before(expireTime) {
|
||||
delete(a.nodes, key)
|
||||
for nodePtr, artNode := range a.nodes {
|
||||
if artNode.LastSeen.Before(expireTime) {
|
||||
delete(a.nodes, nodePtr)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -298,29 +292,26 @@ func (a *ArtNetNodes) LogAll() {
|
||||
return
|
||||
}
|
||||
|
||||
var nodes []*ArtNetNode
|
||||
for _, node := range a.nodes {
|
||||
nodes = append(nodes, node)
|
||||
var artNodes []*ArtNetNode
|
||||
for _, artNode := range a.nodes {
|
||||
artNodes = append(artNodes, artNode)
|
||||
}
|
||||
sort.Slice(nodes, func(i, j int) bool {
|
||||
return sortorder.NaturalLess(nodes[i].LongName, nodes[j].LongName)
|
||||
sort.Slice(artNodes, func(i, j int) bool {
|
||||
return sortorder.NaturalLess(artNodes[i].Node.DisplayName(), artNodes[j].Node.DisplayName())
|
||||
})
|
||||
|
||||
inputUniverses := map[int][]string{}
|
||||
outputUniverses := map[int][]string{}
|
||||
|
||||
for _, node := range nodes {
|
||||
name := node.LongName
|
||||
for _, artNode := range artNodes {
|
||||
name := artNode.Node.DisplayName()
|
||||
if name == "" {
|
||||
name = node.ShortName
|
||||
name = "??"
|
||||
}
|
||||
if name == "" {
|
||||
name = node.IP.String()
|
||||
}
|
||||
for _, u := range node.Inputs {
|
||||
for _, u := range artNode.Inputs {
|
||||
inputUniverses[u] = append(inputUniverses[u], name)
|
||||
}
|
||||
for _, u := range node.Outputs {
|
||||
for _, u := range artNode.Outputs {
|
||||
outputUniverses[u] = append(outputUniverses[u], name)
|
||||
}
|
||||
}
|
||||
@@ -361,6 +352,6 @@ func (a *ArtNetNodes) LogAll() {
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Nodes) UpdateArtNet(artNode *ArtNetNode) {
|
||||
n.t.artnet.Update(artNode)
|
||||
func (n *Nodes) UpdateArtNet(node *Node, inputs, outputs []int) {
|
||||
n.t.artnet.Update(node, inputs, outputs)
|
||||
}
|
||||
|
||||
@@ -21,46 +21,46 @@ const (
|
||||
var danteSeqID uint32
|
||||
|
||||
type DanteFlow struct {
|
||||
SourceName string
|
||||
Subscribers map[string]*DanteFlowSubscriber
|
||||
Source *Node
|
||||
Subscribers map[*Node]*DanteFlowSubscriber
|
||||
}
|
||||
|
||||
type DanteFlowSubscriber struct {
|
||||
Name string
|
||||
Node *Node
|
||||
Channels []string
|
||||
LastSeen time.Time
|
||||
}
|
||||
|
||||
type DanteFlows struct {
|
||||
mu sync.RWMutex
|
||||
flows map[string]*DanteFlow
|
||||
flows map[*Node]*DanteFlow
|
||||
}
|
||||
|
||||
func NewDanteFlows() *DanteFlows {
|
||||
return &DanteFlows{
|
||||
flows: map[string]*DanteFlow{},
|
||||
flows: map[*Node]*DanteFlow{},
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DanteFlows) Update(sourceName, subscriberName, channelInfo string) {
|
||||
func (d *DanteFlows) Update(source, subscriber *Node, channelInfo string) {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
flow := d.flows[sourceName]
|
||||
flow := d.flows[source]
|
||||
if flow == nil {
|
||||
flow = &DanteFlow{
|
||||
SourceName: sourceName,
|
||||
Subscribers: map[string]*DanteFlowSubscriber{},
|
||||
Source: source,
|
||||
Subscribers: map[*Node]*DanteFlowSubscriber{},
|
||||
}
|
||||
d.flows[sourceName] = flow
|
||||
d.flows[source] = flow
|
||||
}
|
||||
|
||||
sub := flow.Subscribers[subscriberName]
|
||||
sub := flow.Subscribers[subscriber]
|
||||
if sub == nil {
|
||||
sub = &DanteFlowSubscriber{
|
||||
Name: subscriberName,
|
||||
Node: subscriber,
|
||||
}
|
||||
flow.Subscribers[subscriberName] = sub
|
||||
flow.Subscribers[subscriber] = sub
|
||||
}
|
||||
|
||||
if channelInfo != "" {
|
||||
@@ -85,14 +85,14 @@ func (d *DanteFlows) Expire() {
|
||||
defer d.mu.Unlock()
|
||||
|
||||
expireTime := time.Now().Add(-5 * time.Minute)
|
||||
for sourceName, flow := range d.flows {
|
||||
for subName, sub := range flow.Subscribers {
|
||||
for source, flow := range d.flows {
|
||||
for subNode, sub := range flow.Subscribers {
|
||||
if sub.LastSeen.Before(expireTime) {
|
||||
delete(flow.Subscribers, subName)
|
||||
delete(flow.Subscribers, subNode)
|
||||
}
|
||||
}
|
||||
if len(flow.Subscribers) == 0 {
|
||||
delete(d.flows, sourceName)
|
||||
delete(d.flows, source)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -112,7 +112,7 @@ func (d *DanteFlows) LogAll() {
|
||||
flows = append(flows, flow)
|
||||
}
|
||||
sort.Slice(flows, func(i, j int) bool {
|
||||
return sortorder.NaturalLess(flows[i].SourceName, flows[j].SourceName)
|
||||
return sortorder.NaturalLess(flows[i].Source.DisplayName(), flows[j].Source.DisplayName())
|
||||
})
|
||||
|
||||
type channelFlow struct {
|
||||
@@ -125,14 +125,18 @@ func (d *DanteFlows) LogAll() {
|
||||
var allNoChannelFlows []string
|
||||
|
||||
for _, flow := range flows {
|
||||
sourceName := flow.SourceName
|
||||
if strings.HasPrefix(sourceName, "dante-av:") || strings.HasPrefix(sourceName, "dante-mcast:") {
|
||||
sourceName = "?? (" + sourceName + ")"
|
||||
sourceName := flow.Source.DisplayName()
|
||||
if sourceName == "" {
|
||||
sourceName = "??"
|
||||
}
|
||||
|
||||
for _, sub := range flow.Subscribers {
|
||||
subName := sub.Node.DisplayName()
|
||||
if subName == "" {
|
||||
subName = "??"
|
||||
}
|
||||
if len(sub.Channels) == 0 {
|
||||
allNoChannelFlows = append(allNoChannelFlows, fmt.Sprintf("%s -> %s", sourceName, sub.Name))
|
||||
allNoChannelFlows = append(allNoChannelFlows, fmt.Sprintf("%s -> %s", sourceName, subName))
|
||||
} else {
|
||||
for _, ch := range sub.Channels {
|
||||
parts := strings.Split(ch, "->")
|
||||
@@ -140,11 +144,11 @@ func (d *DanteFlows) LogAll() {
|
||||
allChannelFlows = append(allChannelFlows, channelFlow{
|
||||
sourceName: sourceName,
|
||||
txCh: parts[0],
|
||||
rxName: sub.Name,
|
||||
rxName: subName,
|
||||
rxCh: parts[1],
|
||||
})
|
||||
} else {
|
||||
allNoChannelFlows = append(allNoChannelFlows, fmt.Sprintf("%s -> %s[%s]", sourceName, sub.Name, ch))
|
||||
allNoChannelFlows = append(allNoChannelFlows, fmt.Sprintf("%s -> %s[%s]", sourceName, subName, ch))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -627,7 +631,9 @@ func (t *Tendrils) probeDanteDeviceWithPort(ip net.IP, port int) {
|
||||
if sub.TxChannelName != "" {
|
||||
channelInfo = fmt.Sprintf("%s->%02d", sub.TxChannelName, sub.RxChannel)
|
||||
}
|
||||
t.danteFlows.Update(txDeviceName, info.Name, channelInfo)
|
||||
sourceNode := t.nodes.GetOrCreateByName(txDeviceName)
|
||||
subscriberNode := t.nodes.GetOrCreateByName(info.Name)
|
||||
t.danteFlows.Update(sourceNode, subscriberNode, channelInfo)
|
||||
needIGMPFallback = false
|
||||
}
|
||||
}
|
||||
@@ -642,7 +648,9 @@ func (t *Tendrils) probeDanteDeviceWithPort(ip net.IP, port int) {
|
||||
if sourceName == "" {
|
||||
sourceName = (&MulticastGroup{IP: groupIP}).Name()
|
||||
}
|
||||
t.danteFlows.Update(sourceName, info.Name, "")
|
||||
sourceNode := t.nodes.GetOrCreateByName(sourceName)
|
||||
subscriberNode := t.nodes.GetOrCreateByName(info.Name)
|
||||
t.danteFlows.Update(sourceNode, subscriberNode, "")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
57
nodes.go
57
nodes.go
@@ -234,6 +234,7 @@ type Nodes struct {
|
||||
nodes map[int]*Node
|
||||
ipIndex map[string]int
|
||||
macIndex map[string]int
|
||||
nameIndex map[string]int
|
||||
nodeCancel map[int]context.CancelFunc
|
||||
multicastGroups map[string]*MulticastGroupMembers // group IP string -> group with members
|
||||
nextID int
|
||||
@@ -248,6 +249,7 @@ func NewNodes(t *Tendrils) *Nodes {
|
||||
nodes: map[int]*Node{},
|
||||
ipIndex: map[string]int{},
|
||||
macIndex: map[string]int{},
|
||||
nameIndex: map[string]int{},
|
||||
nodeCancel: map[int]context.CancelFunc{},
|
||||
multicastGroups: map[string]*MulticastGroupMembers{},
|
||||
nextID: 1,
|
||||
@@ -307,6 +309,20 @@ func (n *Nodes) Update(target *Node, mac net.HardwareAddr, ips []net.IP, ifaceNa
|
||||
}
|
||||
}
|
||||
|
||||
if nodeName != "" {
|
||||
if id, exists := n.nameIndex[nodeName]; exists {
|
||||
if nameNode, nodeExists := n.nodes[id]; nodeExists {
|
||||
if targetID == -1 {
|
||||
targetID = id
|
||||
} else if id != targetID && len(nameNode.Interfaces) == 0 {
|
||||
n.mergeNodes(targetID, id)
|
||||
}
|
||||
} else {
|
||||
delete(n.nameIndex, nodeName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var node *Node
|
||||
if targetID == -1 {
|
||||
targetID = n.nextID
|
||||
@@ -351,6 +367,7 @@ func (n *Nodes) Update(target *Node, mac net.HardwareAddr, ips []net.IP, ifaceNa
|
||||
}
|
||||
if !node.Names[nodeName] {
|
||||
node.Names[nodeName] = true
|
||||
n.nameIndex[nodeName] = targetID
|
||||
added = append(added, "name="+nodeName)
|
||||
}
|
||||
}
|
||||
@@ -516,6 +533,7 @@ func (n *Nodes) mergeNodes(keepID, mergeID int) {
|
||||
keep.Names = map[string]bool{}
|
||||
}
|
||||
keep.Names[name] = true
|
||||
n.nameIndex[name] = keepID
|
||||
}
|
||||
|
||||
for _, iface := range merge.Interfaces {
|
||||
@@ -557,6 +575,45 @@ func (n *Nodes) GetByMAC(mac net.HardwareAddr) *Node {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Nodes) GetByName(name string) *Node {
|
||||
n.mu.RLock()
|
||||
defer n.mu.RUnlock()
|
||||
|
||||
if id, exists := n.nameIndex[name]; exists {
|
||||
return n.nodes[id]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Nodes) GetOrCreateByName(name string) *Node {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
|
||||
if id, exists := n.nameIndex[name]; exists {
|
||||
if node, nodeExists := n.nodes[id]; nodeExists {
|
||||
return node
|
||||
}
|
||||
delete(n.nameIndex, name)
|
||||
}
|
||||
|
||||
targetID := n.nextID
|
||||
n.nextID++
|
||||
node := &Node{
|
||||
Names: map[string]bool{name: true},
|
||||
Interfaces: map[string]*Interface{},
|
||||
MACTable: map[string]string{},
|
||||
pollTrigger: make(chan struct{}, 1),
|
||||
}
|
||||
n.nodes[targetID] = node
|
||||
n.nameIndex[name] = targetID
|
||||
n.startNodePoller(targetID, node)
|
||||
|
||||
if n.t.LogEvents {
|
||||
log.Printf("[add] %s [name=%s] (via name-lookup)", node, name)
|
||||
}
|
||||
|
||||
return node
|
||||
}
|
||||
|
||||
func (n *Nodes) UpdateMACTable(node *Node, peerMAC net.HardwareAddr, ifaceName string) {
|
||||
n.mu.Lock()
|
||||
|
||||
Reference in New Issue
Block a user