diff --git a/artnet.go b/artnet.go index 824b37d..74bfe81 100644 --- a/artnet.go +++ b/artnet.go @@ -7,21 +7,12 @@ import ( "net" "sort" "strings" - "sync" "time" "github.com/fvbommel/sortorder" "github.com/gopatchy/artnet" ) -type ArtNetNode struct { - TypeID string `json:"typeid"` - Node *Node `json:"node"` - Inputs []int `json:"inputs,omitempty"` - Outputs []int `json:"outputs,omitempty"` - LastSeen time.Time `json:"last_seen"` -} - func (t *Tendrils) startArtNetListener(ctx context.Context) { conn, err := net.ListenUDP("udp4", &net.UDPAddr{Port: artnet.Port}) if err != nil { @@ -178,45 +169,6 @@ func (t *Tendrils) sendArtPoll(conn *net.UDPConn, broadcast net.IP, ifaceName st } } -type ArtNetNodes struct { - mu sync.RWMutex - nodes map[*Node]*ArtNetNode -} - -func NewArtNetNodes() *ArtNetNodes { - return &ArtNetNodes{ - nodes: map[*Node]*ArtNetNode{}, - } -} - -func (a *ArtNetNodes) Update(node *Node, inputs, outputs []int) { - a.mu.Lock() - defer a.mu.Unlock() - - existing, exists := a.nodes[node] - if exists { - for _, u := range inputs { - if !containsInt(existing.Inputs, u) { - existing.Inputs = append(existing.Inputs, u) - } - } - for _, u := range outputs { - if !containsInt(existing.Outputs, u) { - existing.Outputs = append(existing.Outputs, u) - } - } - existing.LastSeen = time.Now() - } else { - a.nodes[node] = &ArtNetNode{ - TypeID: newTypeID("artnetnode"), - Node: node, - Inputs: inputs, - Outputs: outputs, - LastSeen: time.Now(), - } - } -} - func containsInt(slice []int, val int) bool { for _, v := range slice { if v == val { @@ -226,85 +178,78 @@ func containsInt(slice []int, val int) bool { return false } -func (a *ArtNetNodes) ReplaceNode(oldNode, newNode *Node) { - a.mu.Lock() - defer a.mu.Unlock() +func (n *Nodes) UpdateArtNet(node *Node, inputs, outputs []int) { + n.mu.Lock() + defer n.mu.Unlock() - if artNode, exists := a.nodes[oldNode]; exists { - delete(a.nodes, oldNode) - if existing, hasNew := a.nodes[newNode]; hasNew { - for _, u := range artNode.Inputs { - if !containsInt(existing.Inputs, u) { - existing.Inputs = append(existing.Inputs, u) - } - } - for _, u := range artNode.Outputs { - if !containsInt(existing.Outputs, u) { - existing.Outputs = append(existing.Outputs, u) - } - } - } else { - artNode.Node = newNode - a.nodes[newNode] = artNode + for _, u := range inputs { + if !containsInt(node.ArtNetInputs, u) { + node.ArtNetInputs = append(node.ArtNetInputs, u) } } + for _, u := range outputs { + if !containsInt(node.ArtNetOutputs, u) { + node.ArtNetOutputs = append(node.ArtNetOutputs, u) + } + } + sort.Ints(node.ArtNetInputs) + sort.Ints(node.ArtNetOutputs) + node.artnetLastSeen = time.Now() } -func (a *ArtNetNodes) Expire() { - a.mu.Lock() - defer a.mu.Unlock() +func (n *Nodes) expireArtNet() { expireTime := time.Now().Add(-60 * time.Second) - for nodePtr, artNode := range a.nodes { - if artNode.LastSeen.Before(expireTime) { - delete(a.nodes, nodePtr) + for _, node := range n.nodes { + if !node.artnetLastSeen.IsZero() && node.artnetLastSeen.Before(expireTime) { + node.ArtNetInputs = nil + node.ArtNetOutputs = nil + node.artnetLastSeen = time.Time{} } } } -func (a *ArtNetNodes) GetAll() []*ArtNetNode { - a.mu.RLock() - defer a.mu.RUnlock() - result := make([]*ArtNetNode, 0, len(a.nodes)) - for _, node := range a.nodes { - result = append(result, node) +func (n *Nodes) mergeArtNet(keep, merge *Node) { + for _, u := range merge.ArtNetInputs { + if !containsInt(keep.ArtNetInputs, u) { + keep.ArtNetInputs = append(keep.ArtNetInputs, u) + } } - return result + for _, u := range merge.ArtNetOutputs { + if !containsInt(keep.ArtNetOutputs, u) { + keep.ArtNetOutputs = append(keep.ArtNetOutputs, u) + } + } + if merge.artnetLastSeen.After(keep.artnetLastSeen) { + keep.artnetLastSeen = merge.artnetLastSeen + } + sort.Ints(keep.ArtNetInputs) + sort.Ints(keep.ArtNetOutputs) } -func (a *ArtNetNodes) LogAll() { - a.Expire() - - a.mu.RLock() - defer a.mu.RUnlock() - - if len(a.nodes) == 0 { - return - } - - var artNodes []*ArtNetNode - for _, artNode := range a.nodes { - artNodes = append(artNodes, artNode) - } - sort.Slice(artNodes, func(i, j int) bool { - return sortorder.NaturalLess(artNodes[i].Node.DisplayName(), artNodes[j].Node.DisplayName()) - }) - +func (n *Nodes) logArtNet() { inputUniverses := map[int][]string{} outputUniverses := map[int][]string{} - for _, artNode := range artNodes { - name := artNode.Node.DisplayName() + for _, node := range n.nodes { + if len(node.ArtNetInputs) == 0 && len(node.ArtNetOutputs) == 0 { + continue + } + name := node.DisplayName() if name == "" { name = "??" } - for _, u := range artNode.Inputs { + for _, u := range node.ArtNetInputs { inputUniverses[u] = append(inputUniverses[u], name) } - for _, u := range artNode.Outputs { + for _, u := range node.ArtNetOutputs { outputUniverses[u] = append(outputUniverses[u], name) } } + if len(inputUniverses) == 0 && len(outputUniverses) == 0 { + return + } + var allUniverses []int seen := map[int]bool{} for u := range inputUniverses { @@ -340,7 +285,3 @@ func (a *ArtNetNodes) LogAll() { log.Printf("[sigusr1] artnet:%d (%d/%d/%d) %s", u, netVal, subnet, universe, strings.Join(parts, "; ")) } } - -func (n *Nodes) UpdateArtNet(node *Node, inputs, outputs []int) { - n.t.artnet.Update(node, inputs, outputs) -} diff --git a/dante.go b/dante.go index 144891c..e1e5556 100644 --- a/dante.go +++ b/dante.go @@ -9,7 +9,6 @@ import ( "net" "sort" "strings" - "sync" "sync/atomic" "time" @@ -103,15 +102,10 @@ func (n *Nodes) GetDanteTxDeviceInGroup(groupIP net.IP) *Node { n.mu.RLock() defer n.mu.RUnlock() - groupKey := groupIP.String() - gm := n.multicastGroups[groupKey] - if gm == nil { - return nil - } - - for _, membership := range gm.Members { - if membership.Node != nil && membership.Node.DanteTxChannels != "" { - return membership.Node + groupName := multicastGroupName(groupIP) + for _, node := range n.nodes { + if node.DanteTxChannels != "" && containsString(node.MulticastGroups, groupName) { + return node } } return nil @@ -119,43 +113,6 @@ func (n *Nodes) GetDanteTxDeviceInGroup(groupIP net.IP) *Node { var danteSeqID uint32 -type DanteSubscriberMap map[*Node]*DanteFlowSubscriber - -func (m DanteSubscriberMap) MarshalJSON() ([]byte, error) { - subs := make([]*DanteFlowSubscriber, 0, len(m)) - for _, sub := range m { - subs = append(subs, sub) - } - sort.Slice(subs, func(i, j int) bool { - return sortorder.NaturalLess(subs[i].Node.DisplayName(), subs[j].Node.DisplayName()) - }) - return json.Marshal(subs) -} - -type DanteFlow struct { - TypeID string `json:"typeid"` - Source *Node `json:"source"` - Subscribers DanteSubscriberMap `json:"subscribers"` -} - -type DanteFlowSubscriber struct { - Node *Node `json:"node"` - Channels []string `json:"channels,omitempty"` - ChannelStatus map[string]DanteFlowStatus `json:"channel_status,omitempty"` - LastSeen time.Time `json:"last_seen"` -} - -type DanteFlows struct { - mu sync.RWMutex - flows map[*Node]*DanteFlow -} - -func NewDanteFlows() *DanteFlows { - return &DanteFlows{ - flows: map[*Node]*DanteFlow{}, - } -} - func containsString(slice []string, val string) bool { for _, s := range slice { if s == val { @@ -165,116 +122,210 @@ func containsString(slice []string, val string) bool { return false } -func (d *DanteFlows) Update(source, subscriber *Node, channelInfo string, flowStatus DanteFlowStatus) { - d.mu.Lock() - defer d.mu.Unlock() +type DanteFlowStatus uint8 - flow := d.flows[source] - if flow == nil { - flow = &DanteFlow{ - TypeID: newTypeID("danteflow"), - Source: source, - Subscribers: DanteSubscriberMap{}, +const ( + DanteFlowUnsubscribed DanteFlowStatus = 0x00 + DanteFlowNoSource DanteFlowStatus = 0x01 + DanteFlowActive DanteFlowStatus = 0x09 +) + +func (s DanteFlowStatus) String() string { + switch s { + case DanteFlowActive: + return "active" + case DanteFlowNoSource: + return "no-source" + default: + return "" + } +} + +func (s DanteFlowStatus) MarshalJSON() ([]byte, error) { + return json.Marshal(s.String()) +} + +type DanteChannelType uint16 + +const ( + DanteChannelUnknown DanteChannelType = 0 + DanteChannelAudio DanteChannelType = 0x000f + DanteChannelAudio2 DanteChannelType = 0x0006 + DanteChannelVideo DanteChannelType = 0x000e +) + +func (t DanteChannelType) String() string { + switch t { + case DanteChannelAudio, DanteChannelAudio2: + return "audio" + case DanteChannelVideo: + return "video" + default: + return "" + } +} + +type DanteSubscription struct { + RxChannel int + TxDeviceName string + TxChannelName string + ChannelType DanteChannelType + FlowStatus DanteFlowStatus +} + +type DanteDeviceInfo struct { + IP net.IP + Name string + RxChannelCount int + TxChannelCount int + Subscriptions []DanteSubscription + HasMulticast bool +} + +func (n *Nodes) UpdateDanteFlow(source, subscriber *Node, channelInfo string, flowStatus DanteFlowStatus) { + n.mu.Lock() + defer n.mu.Unlock() + + n.updateDanteTx(source, subscriber, channelInfo, flowStatus) + n.updateDanteRx(subscriber, source, channelInfo, flowStatus) + + source.danteLastSeen = time.Now() + subscriber.danteLastSeen = time.Now() +} + +func (n *Nodes) updateDanteTx(source, subscriber *Node, channelInfo string, flowStatus DanteFlowStatus) { + var peer *DantePeer + for _, p := range source.DanteTx { + if p.Node == subscriber { + peer = p + break } - d.flows[source] = flow + } + if peer == nil { + peer = &DantePeer{ + Node: subscriber, + Status: map[string]string{}, + } + source.DanteTx = append(source.DanteTx, peer) } - sub := flow.Subscribers[subscriber] - if sub == nil { - sub = &DanteFlowSubscriber{ - Node: subscriber, - ChannelStatus: map[string]DanteFlowStatus{}, - } - flow.Subscribers[subscriber] = sub - } - - if channelInfo != "" && !containsString(sub.Channels, channelInfo) { - sub.Channels = append(sub.Channels, channelInfo) - sort.Strings(sub.Channels) + if channelInfo != "" && !containsString(peer.Channels, channelInfo) { + peer.Channels = append(peer.Channels, channelInfo) + sort.Strings(peer.Channels) } if channelInfo != "" { - sub.ChannelStatus[channelInfo] = flowStatus + peer.Status[channelInfo] = flowStatus.String() } - sub.LastSeen = time.Now() -} - -func (d *DanteFlows) ReplaceNode(oldNode, newNode *Node) { - d.mu.Lock() - defer d.mu.Unlock() - - if flow, exists := d.flows[oldNode]; exists { - delete(d.flows, oldNode) - if existingFlow, hasNew := d.flows[newNode]; hasNew { - for subNode, sub := range flow.Subscribers { - if existingSub, hasSub := existingFlow.Subscribers[subNode]; hasSub { - for _, ch := range sub.Channels { - if !containsString(existingSub.Channels, ch) { - existingSub.Channels = append(existingSub.Channels, ch) - } - } - } else { - existingFlow.Subscribers[subNode] = sub - } - } - } else { - flow.Source = newNode - d.flows[newNode] = flow - } - } - - for _, flow := range d.flows { - if sub, exists := flow.Subscribers[oldNode]; exists { - delete(flow.Subscribers, oldNode) - if existingSub, hasNew := flow.Subscribers[newNode]; hasNew { - for _, ch := range sub.Channels { - if !containsString(existingSub.Channels, ch) { - existingSub.Channels = append(existingSub.Channels, ch) - } - } - } else { - sub.Node = newNode - flow.Subscribers[newNode] = sub - } - } - } -} - -func (d *DanteFlows) Expire() { - d.mu.Lock() - defer d.mu.Unlock() - - expireTime := time.Now().Add(-5 * time.Minute) - for source, flow := range d.flows { - for subNode, sub := range flow.Subscribers { - if sub.LastSeen.Before(expireTime) { - delete(flow.Subscribers, subNode) - } - } - if len(flow.Subscribers) == 0 { - delete(d.flows, source) - } - } -} - -func (d *DanteFlows) LogAll() { - d.Expire() - - d.mu.RLock() - defer d.mu.RUnlock() - - if len(d.flows) == 0 { - return - } - - var flows []*DanteFlow - for _, flow := range d.flows { - flows = append(flows, flow) - } - sort.Slice(flows, func(i, j int) bool { - return sortorder.NaturalLess(flows[i].Source.DisplayName(), flows[j].Source.DisplayName()) + sort.Slice(source.DanteTx, func(i, j int) bool { + return sortorder.NaturalLess(source.DanteTx[i].Node.DisplayName(), source.DanteTx[j].Node.DisplayName()) }) +} +func (n *Nodes) updateDanteRx(subscriber, source *Node, channelInfo string, flowStatus DanteFlowStatus) { + var peer *DantePeer + for _, p := range subscriber.DanteRx { + if p.Node == source { + peer = p + break + } + } + if peer == nil { + peer = &DantePeer{ + Node: source, + Status: map[string]string{}, + } + subscriber.DanteRx = append(subscriber.DanteRx, peer) + } + + if channelInfo != "" && !containsString(peer.Channels, channelInfo) { + peer.Channels = append(peer.Channels, channelInfo) + sort.Strings(peer.Channels) + } + if channelInfo != "" { + peer.Status[channelInfo] = flowStatus.String() + } + + sort.Slice(subscriber.DanteRx, func(i, j int) bool { + return sortorder.NaturalLess(subscriber.DanteRx[i].Node.DisplayName(), subscriber.DanteRx[j].Node.DisplayName()) + }) +} + +func (n *Nodes) expireDante() { + expireTime := time.Now().Add(-5 * time.Minute) + for _, node := range n.nodes { + if !node.danteLastSeen.IsZero() && node.danteLastSeen.Before(expireTime) { + node.DanteTx = nil + node.DanteRx = nil + node.danteLastSeen = time.Time{} + } + } +} + +func (n *Nodes) mergeDante(keep, merge *Node) { + for _, peer := range merge.DanteTx { + var existing *DantePeer + for _, p := range keep.DanteTx { + if p.Node == peer.Node { + existing = p + break + } + } + if existing == nil { + keep.DanteTx = append(keep.DanteTx, peer) + } else { + for _, ch := range peer.Channels { + if !containsString(existing.Channels, ch) { + existing.Channels = append(existing.Channels, ch) + } + } + for ch, status := range peer.Status { + existing.Status[ch] = status + } + } + } + + for _, peer := range merge.DanteRx { + var existing *DantePeer + for _, p := range keep.DanteRx { + if p.Node == peer.Node { + existing = p + break + } + } + if existing == nil { + keep.DanteRx = append(keep.DanteRx, peer) + } else { + for _, ch := range peer.Channels { + if !containsString(existing.Channels, ch) { + existing.Channels = append(existing.Channels, ch) + } + } + for ch, status := range peer.Status { + existing.Status[ch] = status + } + } + } + + if merge.danteLastSeen.After(keep.danteLastSeen) { + keep.danteLastSeen = merge.danteLastSeen + } + + for _, node := range n.nodes { + for _, peer := range node.DanteTx { + if peer.Node == merge { + peer.Node = keep + } + } + for _, peer := range node.DanteRx { + if peer.Node == merge { + peer.Node = keep + } + } + } +} + +func (n *Nodes) logDante() { type channelFlow struct { sourceName string txCh string @@ -286,21 +337,24 @@ func (d *DanteFlows) LogAll() { var allChannelFlows []channelFlow var allNoChannelFlows []string - for _, flow := range flows { - sourceName := flow.Source.DisplayName() + for _, node := range n.nodes { + if len(node.DanteTx) == 0 { + continue + } + sourceName := node.DisplayName() if sourceName == "" { sourceName = "??" } - for _, sub := range flow.Subscribers { - subName := sub.Node.DisplayName() + for _, peer := range node.DanteTx { + subName := peer.Node.DisplayName() if subName == "" { subName = "??" } - if len(sub.Channels) == 0 { + if len(peer.Channels) == 0 { allNoChannelFlows = append(allNoChannelFlows, fmt.Sprintf("%s -> %s", sourceName, subName)) } else { - for _, ch := range sub.Channels { + for _, ch := range peer.Channels { parts := strings.Split(ch, "->") if len(parts) == 2 { rxPart := parts[1] @@ -315,7 +369,7 @@ func (d *DanteFlows) LogAll() { rxName: subName, rxCh: rxPart, channelType: chType, - down: sub.ChannelStatus[ch] == DanteFlowNoSource, + down: peer.Status[ch] == "no-source", }) } else { allNoChannelFlows = append(allNoChannelFlows, fmt.Sprintf("%s -> %s[%s]", sourceName, subName, ch)) @@ -326,6 +380,10 @@ func (d *DanteFlows) LogAll() { } totalFlows := len(allChannelFlows) + len(allNoChannelFlows) + if totalFlows == 0 { + return + } + log.Printf("[sigusr1] ================ %d dante flows ================", totalFlows) sort.Slice(allChannelFlows, func(i, j int) bool { @@ -405,66 +463,6 @@ func (t *Tendrils) queryDanteDeviceWithPort(ip net.IP, port int) *DanteDeviceInf return info } -type DanteDeviceInfo struct { - IP net.IP - Name string - RxChannelCount int - TxChannelCount int - Subscriptions []DanteSubscription - HasMulticast bool -} - -type DanteChannelType uint16 - -const ( - DanteChannelUnknown DanteChannelType = 0 - DanteChannelAudio DanteChannelType = 0x000f - DanteChannelAudio2 DanteChannelType = 0x0006 - DanteChannelVideo DanteChannelType = 0x000e -) - -func (t DanteChannelType) String() string { - switch t { - case DanteChannelAudio, DanteChannelAudio2: - return "audio" - case DanteChannelVideo: - return "video" - default: - return "" - } -} - -type DanteFlowStatus uint8 - -const ( - DanteFlowUnsubscribed DanteFlowStatus = 0x00 - DanteFlowNoSource DanteFlowStatus = 0x01 - DanteFlowActive DanteFlowStatus = 0x09 -) - -func (s DanteFlowStatus) String() string { - switch s { - case DanteFlowActive: - return "active" - case DanteFlowNoSource: - return "no-source" - default: - return "" - } -} - -func (s DanteFlowStatus) MarshalJSON() ([]byte, error) { - return json.Marshal(s.String()) -} - -type DanteSubscription struct { - RxChannel int - TxDeviceName string - TxChannelName string - ChannelType DanteChannelType - FlowStatus DanteFlowStatus -} - func buildDantePacket(packetType byte, cmd uint16, args []byte) []byte { seq := nextDanteSeq() totalLen := 10 + len(args) @@ -530,24 +528,19 @@ func (t *Tendrils) sendDanteCommand28(conn *net.UDPConn, ip net.IP, cmd uint16, } func (t *Tendrils) queryDanteDeviceName(conn *net.UDPConn, ip net.IP) string { - // 0x1003 returns device info - name position varies by device resp := t.sendDanteCommand(conn, ip, 0x1003, nil) if resp == nil || len(resp) < 40 { return "" } - // Find the first printable string that looks like a device name - // Look for patterns like "AJA-", "ULXD", etc starting from offset 40 for i := 40; i < len(resp)-4; i++ { if resp[i] >= 'A' && resp[i] <= 'Z' { - // Found uppercase letter, might be start of name end := i for end < len(resp) && resp[end] != 0 && resp[end] >= 0x20 && resp[end] < 0x7f { end++ } if end-i >= 4 && end-i < 40 { name := string(resp[i:end]) - // Skip "Audinate" which is the platform name if name != "Audinate DCM" && !strings.HasPrefix(name, "Audinate") { if t.DebugDante { log.Printf("[dante] %s: device name: %q", ip, name) @@ -873,7 +866,7 @@ func (t *Tendrils) probeDanteDeviceWithPort(ip net.IP, port int) { } sourceNode := t.nodes.GetOrCreateByName(txDeviceName) subscriberNode := t.nodes.GetOrCreateByName(info.Name) - t.danteFlows.Update(sourceNode, subscriberNode, channelInfo, sub.FlowStatus) + t.nodes.UpdateDanteFlow(sourceNode, subscriberNode, channelInfo, sub.FlowStatus) needIGMPFallback = false } } @@ -893,7 +886,7 @@ func (t *Tendrils) probeDanteDeviceWithPort(ip net.IP, port int) { sourceNode = t.nodes.GetOrCreateByName(multicastGroupName(groupIP)) } subscriberNode := t.nodes.GetOrCreateByName(info.Name) - t.danteFlows.Update(sourceNode, subscriberNode, "", DanteFlowActive) + t.nodes.UpdateDanteFlow(sourceNode, subscriberNode, "", DanteFlowActive) } } } diff --git a/http.go b/http.go index 4dbdbd8..a81d8f6 100644 --- a/http.go +++ b/http.go @@ -117,7 +117,9 @@ func ensureCert() error { func (t *Tendrils) handleAPIStatus(w http.ResponseWriter, r *http.Request) { status := t.GetStatus() w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(status) + if err := json.NewEncoder(w).Encode(status); err != nil { + log.Printf("[ERROR] failed to encode status: %v", err) + } } func (t *Tendrils) GetStatus() *StatusResponse { @@ -175,6 +177,7 @@ func (t *Tendrils) handleAPIStatusStream(w http.ResponseWriter, r *http.Request) sendStatus := func() error { data, err := json.Marshal(t.GetStatus()) if err != nil { + log.Printf("[ERROR] failed to marshal status: %v", err) return err } _, err = fmt.Fprintf(w, "event: status\ndata: %s\n\n", data) @@ -213,37 +216,22 @@ func (t *Tendrils) handleAPIStatusStream(w http.ResponseWriter, r *http.Request) func (t *Tendrils) getNodes() []*Node { t.nodes.mu.Lock() t.nodes.expireMulticastMemberships() + t.nodes.expireArtNet() + t.nodes.expireSACN() + t.nodes.expireDante() t.nodes.mu.Unlock() - t.artnet.Expire() - t.sacnSources.Expire() - t.danteFlows.Expire() - t.nodes.mu.RLock() defer t.nodes.mu.RUnlock() - multicastByNode := t.buildMulticastByNode() - artnetByNode := t.buildArtNetByNode() - sacnByNode := t.buildSACNByNode() - danteTxByNode, danteRxByNode := t.buildDanteByNode() unreachableNodes := t.errors.GetUnreachableNodeSet() nodes := make([]*Node, 0, len(t.nodes.nodes)) for _, node := range t.nodes.nodes { - nodeCopy := *node - nodeCopy.MulticastGroups = multicastByNode[node] - if artnet := artnetByNode[node]; artnet != nil { - nodeCopy.ArtNetInputs = artnet.Inputs - nodeCopy.ArtNetOutputs = artnet.Outputs - } - if sacn := sacnByNode[node]; sacn != nil { - nodeCopy.SACNInputs = sacn.Inputs - nodeCopy.SACNOutputs = sacn.Outputs - } - nodeCopy.DanteTx = danteTxByNode[node] - nodeCopy.DanteRx = danteRxByNode[node] - nodeCopy.Unreachable = unreachableNodes[node.TypeID] - nodes = append(nodes, &nodeCopy) + n := new(Node) + *n = *node + n.Unreachable = unreachableNodes[node.TypeID] + nodes = append(nodes, n) } sort.Slice(nodes, func(i, j int) bool { @@ -259,150 +247,6 @@ func (t *Tendrils) getNodes() []*Node { return nodes } -func (t *Tendrils) buildMulticastByNode() map[*Node][]string { - result := map[*Node][]string{} - for _, gm := range t.nodes.multicastGroups { - for _, membership := range gm.Members { - if membership.Node != nil { - result[membership.Node] = append(result[membership.Node], gm.Group.Name) - } - } - } - for node, groups := range result { - sort.Strings(groups) - result[node] = groups - } - return result -} - -type artnetNodeData struct { - Inputs []int - Outputs []int -} - -func (t *Tendrils) buildArtNetByNode() map[*Node]*artnetNodeData { - t.artnet.mu.RLock() - defer t.artnet.mu.RUnlock() - - result := map[*Node]*artnetNodeData{} - for _, an := range t.artnet.nodes { - inputs := make([]int, len(an.Inputs)) - for i, u := range an.Inputs { - inputs[i] = int(u) - } - outputs := make([]int, len(an.Outputs)) - for i, u := range an.Outputs { - outputs[i] = int(u) - } - sort.Ints(inputs) - sort.Ints(outputs) - result[an.Node] = &artnetNodeData{Inputs: inputs, Outputs: outputs} - } - return result -} - -type sacnNodeData struct { - Inputs []int - Outputs []int -} - -func (t *Tendrils) buildSACNByNode() map[*Node]*sacnNodeData { - result := map[*Node]*sacnNodeData{} - - for _, gm := range t.nodes.multicastGroups { - if len(gm.Group.Name) < 5 || gm.Group.Name[:5] != "sacn:" { - continue - } - var universe int - if _, err := fmt.Sscanf(gm.Group.Name, "sacn:%d", &universe); err != nil { - continue - } - for _, membership := range gm.Members { - if membership.Node == nil { - continue - } - data := result[membership.Node] - if data == nil { - data = &sacnNodeData{} - result[membership.Node] = data - } - if !containsInt(data.Inputs, universe) { - data.Inputs = append(data.Inputs, universe) - } - } - } - - t.sacnSources.mu.RLock() - for _, source := range t.sacnSources.sources { - if source.SrcIP == nil { - continue - } - node := t.nodes.getByIPLocked(source.SrcIP) - if node == nil { - continue - } - data := result[node] - if data == nil { - data = &sacnNodeData{} - result[node] = data - } - for _, u := range source.Universes { - if !containsInt(data.Outputs, u) { - data.Outputs = append(data.Outputs, u) - } - } - } - t.sacnSources.mu.RUnlock() - - for _, data := range result { - sort.Ints(data.Inputs) - sort.Ints(data.Outputs) - } - - return result -} - -func (t *Tendrils) buildDanteByNode() (map[*Node][]*DantePeer, map[*Node][]*DantePeer) { - t.danteFlows.mu.RLock() - defer t.danteFlows.mu.RUnlock() - - txByNode := map[*Node][]*DantePeer{} - rxByNode := map[*Node][]*DantePeer{} - - for source, flow := range t.danteFlows.flows { - for subNode, sub := range flow.Subscribers { - status := map[string]string{} - for ch, st := range sub.ChannelStatus { - status[ch] = st.String() - } - txByNode[source] = append(txByNode[source], &DantePeer{ - Node: subNode, - Channels: sub.Channels, - Status: status, - }) - rxByNode[subNode] = append(rxByNode[subNode], &DantePeer{ - Node: source, - Channels: sub.Channels, - Status: status, - }) - } - } - - for node, peers := range txByNode { - sort.Slice(peers, func(i, j int) bool { - return sortorder.NaturalLess(peers[i].Node.DisplayName(), peers[j].Node.DisplayName()) - }) - txByNode[node] = peers - } - for node, peers := range rxByNode { - sort.Slice(peers, func(i, j int) bool { - return sortorder.NaturalLess(peers[i].Node.DisplayName(), peers[j].Node.DisplayName()) - }) - rxByNode[node] = peers - } - - return txByNode, rxByNode -} func (t *Tendrils) getLinks() []*Link { t.nodes.mu.RLock() diff --git a/multicast.go b/multicast.go index d55d84a..635913e 100644 --- a/multicast.go +++ b/multicast.go @@ -1,13 +1,10 @@ package tendrils import ( - "encoding/json" "fmt" "net" "sort" "time" - - "github.com/fvbommel/sortorder" ) type MulticastGroup struct { @@ -15,39 +12,6 @@ type MulticastGroup struct { IP string `json:"ip"` } -type MulticastMembership struct { - SourceIP string `json:"source_ip"` - Node *Node `json:"node,omitempty"` - LastSeen time.Time `json:"last_seen"` -} - -type MulticastMembershipMap map[string]*MulticastMembership - -func (m MulticastMembershipMap) MarshalJSON() ([]byte, error) { - members := make([]*MulticastMembership, 0, len(m)) - for _, membership := range m { - members = append(members, membership) - } - sort.Slice(members, func(i, j int) bool { - nameI := members[i].SourceIP - if members[i].Node != nil && members[i].Node.DisplayName() != "" { - nameI = members[i].Node.DisplayName() - } - nameJ := members[j].SourceIP - if members[j].Node != nil && members[j].Node.DisplayName() != "" { - nameJ = members[j].Node.DisplayName() - } - return sortorder.NaturalLess(nameI, nameJ) - }) - return json.Marshal(members) -} - -type MulticastGroupMembers struct { - TypeID string `json:"typeid"` - Group *MulticastGroup `json:"group"` - Members MulticastMembershipMap `json:"members"` -} - func (g *MulticastGroup) IsDante() bool { ip := net.ParseIP(g.IP).To4() if ip == nil { @@ -116,62 +80,71 @@ func (n *Nodes) UpdateMulticastMembership(sourceIP, groupIP net.IP) { defer n.mu.Unlock() node := n.getNodeByIPLocked(sourceIP) - - groupKey := groupIP.String() - sourceKey := sourceIP.String() - - gm := n.multicastGroups[groupKey] - if gm == nil { - gm = &MulticastGroupMembers{ - TypeID: newTypeID("mcastgroup"), - Group: &MulticastGroup{ - Name: multicastGroupName(groupIP), - IP: groupKey, - }, - Members: MulticastMembershipMap{}, - } - n.multicastGroups[groupKey] = gm + if node == nil { + return } - membership := gm.Members[sourceKey] - if membership == nil { - membership = &MulticastMembership{ - SourceIP: sourceKey, - } - gm.Members[sourceKey] = membership + groupName := multicastGroupName(groupIP) + + if node.multicastLastSeen == nil { + node.multicastLastSeen = map[string]time.Time{} + } + node.multicastLastSeen[groupName] = time.Now() + + if !containsString(node.MulticastGroups, groupName) { + node.MulticastGroups = append(node.MulticastGroups, groupName) + sort.Strings(node.MulticastGroups) + } + + if len(groupName) > 5 && groupName[:5] == "sacn:" { + var universe int + if _, err := fmt.Sscanf(groupName, "sacn:%d", &universe); err == nil { + if !containsInt(node.SACNInputs, universe) { + node.SACNInputs = append(node.SACNInputs, universe) + sort.Ints(node.SACNInputs) + } + } } - membership.Node = node - membership.LastSeen = time.Now() } func (n *Nodes) RemoveMulticastMembership(sourceIP, groupIP net.IP) { n.mu.Lock() defer n.mu.Unlock() - groupKey := groupIP.String() - sourceKey := sourceIP.String() + node := n.getNodeByIPLocked(sourceIP) + if node == nil { + return + } - if gm := n.multicastGroups[groupKey]; gm != nil { - delete(gm.Members, sourceKey) - if len(gm.Members) == 0 { - delete(n.multicastGroups, groupKey) + groupName := multicastGroupName(groupIP) + delete(node.multicastLastSeen, groupName) + + var groups []string + for _, g := range node.MulticastGroups { + if g != groupName { + groups = append(groups, g) } } + node.MulticastGroups = groups } func (n *Nodes) GetDanteMulticastGroups(deviceIP net.IP) []net.IP { n.mu.RLock() defer n.mu.RUnlock() - deviceKey := deviceIP.String() - var groups []net.IP + node := n.getNodeByIPLocked(deviceIP) + if node == nil { + return nil + } - for _, gm := range n.multicastGroups { - if !gm.Group.IsDante() { - continue - } - if _, exists := gm.Members[deviceKey]; exists { - groups = append(groups, net.ParseIP(gm.Group.IP)) + var groups []net.IP + for _, groupName := range node.MulticastGroups { + g := &MulticastGroup{Name: groupName} + if g.IsDante() { + ip := net.ParseIP(groupName) + if ip != nil { + groups = append(groups, ip) + } } } return groups @@ -181,16 +154,11 @@ func (n *Nodes) GetMulticastGroupMembers(groupIP net.IP) []*Node { n.mu.RLock() defer n.mu.RUnlock() - groupKey := groupIP.String() - gm := n.multicastGroups[groupKey] - if gm == nil { - return nil - } - + groupName := multicastGroupName(groupIP) var members []*Node - for _, membership := range gm.Members { - if membership.Node != nil { - members = append(members, membership.Node) + for _, node := range n.nodes { + if containsString(node.MulticastGroups, groupName) { + members = append(members, node) } } return members @@ -198,14 +166,54 @@ func (n *Nodes) GetMulticastGroupMembers(groupIP net.IP) []*Node { func (n *Nodes) expireMulticastMemberships() { expireTime := time.Now().Add(-5 * time.Minute) - for groupKey, gm := range n.multicastGroups { - for sourceKey, membership := range gm.Members { - if membership.LastSeen.Before(expireTime) { - delete(gm.Members, sourceKey) + for _, node := range n.nodes { + if node.multicastLastSeen == nil { + continue + } + var keepGroups []string + var keepSACNInputs []int + for _, groupName := range node.MulticastGroups { + if lastSeen, ok := node.multicastLastSeen[groupName]; ok && !lastSeen.Before(expireTime) { + keepGroups = append(keepGroups, groupName) + if len(groupName) > 5 && groupName[:5] == "sacn:" { + var universe int + if _, err := fmt.Sscanf(groupName, "sacn:%d", &universe); err == nil { + keepSACNInputs = append(keepSACNInputs, universe) + } + } + } else { + delete(node.multicastLastSeen, groupName) } } - if len(gm.Members) == 0 { - delete(n.multicastGroups, groupKey) - } + node.MulticastGroups = keepGroups + sort.Ints(keepSACNInputs) + node.SACNInputs = keepSACNInputs } } + +func (n *Nodes) mergeMulticast(keep, merge *Node) { + if merge.multicastLastSeen == nil { + return + } + if keep.multicastLastSeen == nil { + keep.multicastLastSeen = map[string]time.Time{} + } + for groupName, lastSeen := range merge.multicastLastSeen { + if existing, ok := keep.multicastLastSeen[groupName]; !ok || lastSeen.After(existing) { + keep.multicastLastSeen[groupName] = lastSeen + } + if !containsString(keep.MulticastGroups, groupName) { + keep.MulticastGroups = append(keep.MulticastGroups, groupName) + } + if len(groupName) > 5 && groupName[:5] == "sacn:" { + var universe int + if _, err := fmt.Sscanf(groupName, "sacn:%d", &universe); err == nil { + if !containsInt(keep.SACNInputs, universe) { + keep.SACNInputs = append(keep.SACNInputs, universe) + } + } + } + } + sort.Strings(keep.MulticastGroups) + sort.Ints(keep.SACNInputs) +} diff --git a/nodes.go b/nodes.go index 295c72f..3f28bbb 100644 --- a/nodes.go +++ b/nodes.go @@ -14,32 +14,30 @@ import ( ) type Nodes struct { - mu sync.RWMutex - nodes map[int]*Node - ipIndex map[string]int - macIndex map[string]int - nameIndex map[string]int - nodeCancel map[int]context.CancelFunc - multicastGroups map[string]*MulticastGroupMembers - nextID int - t *Tendrils - ctx context.Context - cancelAll context.CancelFunc + mu sync.RWMutex + nodes map[int]*Node + ipIndex map[string]int + macIndex map[string]int + nameIndex map[string]int + nodeCancel map[int]context.CancelFunc + nextID int + t *Tendrils + ctx context.Context + cancelAll context.CancelFunc } func NewNodes(t *Tendrils) *Nodes { ctx, cancel := context.WithCancel(context.Background()) return &Nodes{ - nodes: map[int]*Node{}, - ipIndex: map[string]int{}, - macIndex: map[string]int{}, - nameIndex: map[string]int{}, - nodeCancel: map[int]context.CancelFunc{}, - multicastGroups: map[string]*MulticastGroupMembers{}, - nextID: 1, - t: t, - ctx: ctx, - cancelAll: cancel, + nodes: map[int]*Node{}, + ipIndex: map[string]int{}, + macIndex: map[string]int{}, + nameIndex: map[string]int{}, + nodeCancel: map[int]context.CancelFunc{}, + nextID: 1, + t: t, + ctx: ctx, + cancelAll: cancel, } } @@ -422,16 +420,10 @@ func (n *Nodes) mergeNodes(keepID, mergeID int) { keep.MACTable[peerMAC] = ifaceName } - n.t.danteFlows.ReplaceNode(merge, keep) - n.t.artnet.ReplaceNode(merge, keep) - - for _, gm := range n.multicastGroups { - for _, membership := range gm.Members { - if membership.Node == merge { - membership.Node = keep - } - } - } + n.mergeArtNet(keep, merge) + n.mergeSACN(keep, merge) + n.mergeMulticast(keep, merge) + n.mergeDante(keep, merge) if cancel, exists := n.nodeCancel[mergeID]; exists { cancel() @@ -626,37 +618,36 @@ func (n *Nodes) LogAll() { n.expireMulticastMemberships() - if len(n.multicastGroups) > 0 { - var groups []*MulticastGroupMembers - for _, gm := range n.multicastGroups { - groups = append(groups, gm) - } - sort.Slice(groups, func(i, j int) bool { - return sortorder.NaturalLess(groups[i].Group.Name, groups[j].Group.Name) - }) - - log.Printf("[sigusr1] ================ %d multicast groups ================", len(groups)) - for _, gm := range groups { - var memberNames []string - for sourceIP, membership := range gm.Members { - var name string - if membership.Node != nil { - name = membership.Node.DisplayName() - if name == "" { - name = sourceIP - } - } else { - name = sourceIP - } - memberNames = append(memberNames, name) + groupMembers := map[string][]string{} + for _, node := range n.nodes { + for _, groupName := range node.MulticastGroups { + name := node.DisplayName() + if name == "" { + name = "??" } - sort.Slice(memberNames, func(i, j int) bool { - return sortorder.NaturalLess(memberNames[i], memberNames[j]) - }) - log.Printf("[sigusr1] %s: %s", gm.Group.Name, strings.Join(memberNames, ", ")) + groupMembers[groupName] = append(groupMembers[groupName], name) } } - n.t.artnet.LogAll() - n.t.danteFlows.LogAll() + if len(groupMembers) > 0 { + var groupNames []string + for name := range groupMembers { + groupNames = append(groupNames, name) + } + sort.Slice(groupNames, func(i, j int) bool { + return sortorder.NaturalLess(groupNames[i], groupNames[j]) + }) + + log.Printf("[sigusr1] ================ %d multicast groups ================", len(groupNames)) + for _, groupName := range groupNames { + members := groupMembers[groupName] + sort.Slice(members, func(i, j int) bool { + return sortorder.NaturalLess(members[i], members[j]) + }) + log.Printf("[sigusr1] %s: %s", groupName, strings.Join(members, ", ")) + } + } + + n.logArtNet() + n.logDante() } diff --git a/sacn_discovery.go b/sacn_discovery.go index 1e932d4..23a53f2 100644 --- a/sacn_discovery.go +++ b/sacn_discovery.go @@ -4,70 +4,12 @@ import ( "context" "log" "net" - "sync" + "sort" "time" "github.com/gopatchy/sacn" ) -type SACNSource struct { - CID string - SourceName string - Universes []int - SrcIP net.IP - LastSeen time.Time -} - -type SACNSources struct { - mu sync.RWMutex - sources map[string]*SACNSource -} - -func NewSACNSources() *SACNSources { - return &SACNSources{ - sources: map[string]*SACNSource{}, - } -} - -func (s *SACNSources) Update(cid [16]byte, sourceName string, universes []uint16, srcIP net.IP) { - s.mu.Lock() - defer s.mu.Unlock() - - cidStr := sacn.FormatCID(cid) - intUniverses := make([]int, len(universes)) - for i, u := range universes { - intUniverses[i] = int(u) - } - - existing, exists := s.sources[cidStr] - if exists { - existing.SourceName = sourceName - existing.Universes = intUniverses - existing.SrcIP = srcIP - existing.LastSeen = time.Now() - } else { - s.sources[cidStr] = &SACNSource{ - CID: cidStr, - SourceName: sourceName, - Universes: intUniverses, - SrcIP: srcIP, - LastSeen: time.Now(), - } - } -} - -func (s *SACNSources) Expire() { - s.mu.Lock() - defer s.mu.Unlock() - - expireTime := time.Now().Add(-60 * time.Second) - for cid, source := range s.sources { - if source.LastSeen.Before(expireTime) { - delete(s.sources, cid) - } - } -} - func (t *Tendrils) startSACNDiscoveryListener(ctx context.Context, iface net.Interface) { receiver, err := sacn.NewReceiver("") if err != nil { @@ -104,6 +46,44 @@ func (t *Tendrils) handleSACNDiscoveryPacket(srcIP net.IP, pkt *sacn.DiscoveryPa t.nodes.Update(nil, nil, []net.IP{srcIP}, "", pkt.SourceName, "sacn") } - t.sacnSources.Update(pkt.CID, pkt.SourceName, pkt.Universes, srcIP) + node := t.nodes.GetByIP(srcIP) + if node != nil { + intUniverses := make([]int, len(pkt.Universes)) + for i, u := range pkt.Universes { + intUniverses[i] = int(u) + } + t.nodes.UpdateSACN(node, intUniverses) + } t.NotifyUpdate() } + +func (n *Nodes) UpdateSACN(node *Node, outputs []int) { + n.mu.Lock() + defer n.mu.Unlock() + + node.SACNOutputs = outputs + sort.Ints(node.SACNOutputs) + node.sacnLastSeen = time.Now() +} + +func (n *Nodes) expireSACN() { + expireTime := time.Now().Add(-60 * time.Second) + for _, node := range n.nodes { + if !node.sacnLastSeen.IsZero() && node.sacnLastSeen.Before(expireTime) { + node.SACNOutputs = nil + node.sacnLastSeen = time.Time{} + } + } +} + +func (n *Nodes) mergeSACN(keep, merge *Node) { + for _, u := range merge.SACNOutputs { + if !containsInt(keep.SACNOutputs, u) { + keep.SACNOutputs = append(keep.SACNOutputs, u) + } + } + if merge.sacnLastSeen.After(keep.sacnLastSeen) { + keep.sacnLastSeen = merge.sacnLastSeen + } + sort.Ints(keep.SACNOutputs) +} diff --git a/tendrils.go b/tendrils.go index 1afdfd8..69fbbbd 100644 --- a/tendrils.go +++ b/tendrils.go @@ -33,10 +33,7 @@ func getInterfaceIPv4(iface net.Interface) (srcIP, broadcast net.IP) { type Tendrils struct { activeInterfaces map[string]context.CancelFunc nodes *Nodes - artnet *ArtNetNodes artnetConn *net.UDPConn - sacnSources *SACNSources - danteFlows *DanteFlows errors *ErrorTracker ping *PingManager broadcast *BroadcastStats @@ -78,9 +75,6 @@ type Tendrils struct { func New() *Tendrils { t := &Tendrils{ activeInterfaces: map[string]context.CancelFunc{}, - artnet: NewArtNetNodes(), - sacnSources: NewSACNSources(), - danteFlows: NewDanteFlows(), ping: NewPingManager(), sseSubs: map[int]chan struct{}{}, } diff --git a/types.go b/types.go index 562a345..4401336 100644 --- a/types.go +++ b/types.go @@ -6,6 +6,7 @@ import ( "net" "sort" "strings" + "time" "github.com/fvbommel/sortorder" "go.jetify.com/typeid" @@ -149,6 +150,11 @@ type Node struct { DanteRx []*DantePeer `json:"dante_rx,omitempty"` Unreachable bool `json:"unreachable,omitempty"` pollTrigger chan struct{} + + multicastLastSeen map[string]time.Time + artnetLastSeen time.Time + sacnLastSeen time.Time + danteLastSeen time.Time } type DantePeer struct { @@ -157,6 +163,24 @@ type DantePeer struct { Status map[string]string `json:"status,omitempty"` } +func (p *DantePeer) MarshalJSON() ([]byte, error) { + type peerJSON struct { + Node *Node `json:"node"` + Channels []string `json:"channels,omitempty"` + Status map[string]string `json:"status,omitempty"` + } + nodeRef := &Node{ + TypeID: p.Node.TypeID, + Names: p.Node.Names, + Interfaces: p.Node.Interfaces, + } + return json.Marshal(peerJSON{ + Node: nodeRef, + Channels: p.Channels, + Status: p.Status, + }) +} + func (n *Node) WithInterface(ifaceKey string) *Node { if ifaceKey == "" { return n