From aebd6f5e2c30dcf766f25e6bc8ffa166698e4d57 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Wed, 28 Jan 2026 22:48:55 -0800 Subject: [PATCH] Use MulticastGroup as map key and serialize under lock Co-Authored-By: Claude Opus 4.5 --- dante.go | 3 +-- http.go | 51 ++++++++++++++++++-------------------- multicast.go | 9 +++---- types.go | 69 +++++++++------------------------------------------- 4 files changed, 40 insertions(+), 92 deletions(-) diff --git a/dante.go b/dante.go index a1685f4..18a2132 100644 --- a/dante.go +++ b/dante.go @@ -103,10 +103,9 @@ func (n *Nodes) GetDanteTxDeviceInGroup(groupIP net.IP) *Node { defer n.mu.RUnlock() group := ParseMulticastGroup(groupIP) - groupKey := group.String() for _, node := range n.nodes { if node.DanteTxChannels != "" && node.MulticastGroups != nil { - if _, exists := node.MulticastGroups[groupKey]; exists { + if _, exists := node.MulticastGroups[group]; exists { return node } } diff --git a/http.go b/http.go index 5865e7b..f4e945e 100644 --- a/http.go +++ b/http.go @@ -115,14 +115,26 @@ func ensureCert() error { } func (t *Tendrils) handleAPIStatus(w http.ResponseWriter, r *http.Request) { - status := t.GetStatus() w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(status); err != nil { + data, err := t.GetStatusJSON() + if err != nil { log.Printf("[ERROR] failed to encode status: %v", err) + return } + w.Write(data) } -func (t *Tendrils) GetStatus() *StatusResponse { +func (t *Tendrils) GetStatusJSON() ([]byte, error) { + t.nodes.mu.Lock() + t.nodes.expireMulticastMemberships() + t.nodes.expireArtNet() + t.nodes.expireSACN() + t.nodes.expireDante() + t.nodes.mu.Unlock() + + t.nodes.mu.RLock() + defer t.nodes.mu.RUnlock() + var broadcastStats *BroadcastStatsResponse if t.broadcast != nil { stats := t.broadcast.GetStats() @@ -132,13 +144,13 @@ func (t *Tendrils) GetStatus() *StatusResponse { if config == nil { config = &Config{} } - return &StatusResponse{ + return json.Marshal(&StatusResponse{ Config: config, - Nodes: t.getNodes(), - Links: t.getLinks(), + Nodes: t.getNodesLocked(), + Links: t.getLinksLocked(), Errors: t.errors.GetErrors(), BroadcastStats: broadcastStats, - } + }) } func (t *Tendrils) handleClearError(w http.ResponseWriter, r *http.Request) { @@ -175,7 +187,7 @@ func (t *Tendrils) handleAPIStatusStream(w http.ResponseWriter, r *http.Request) defer t.unsubscribeSSE(subID) sendStatus := func() error { - data, err := json.Marshal(t.GetStatus()) + data, err := t.GetStatusJSON() if err != nil { log.Printf("[ERROR] failed to marshal status: %v", err) return err @@ -213,25 +225,13 @@ func (t *Tendrils) handleAPIStatusStream(w http.ResponseWriter, r *http.Request) } } -func (t *Tendrils) getNodes() []*Node { - t.nodes.mu.Lock() - t.nodes.expireMulticastMemberships() - t.nodes.expireArtNet() - t.nodes.expireSACN() - t.nodes.expireDante() - t.nodes.mu.Unlock() - - t.nodes.mu.RLock() - defer t.nodes.mu.RUnlock() - +func (t *Tendrils) getNodesLocked() []*Node { unreachableNodes := t.errors.GetUnreachableNodeSet() nodes := make([]*Node, 0, len(t.nodes.nodes)) for _, node := range t.nodes.nodes { - n := new(Node) - *n = *node - n.Unreachable = unreachableNodes[node.ID] - nodes = append(nodes, n) + node.Unreachable = unreachableNodes[node.ID] + nodes = append(nodes, node) } sort.Slice(nodes, func(i, j int) bool { @@ -248,10 +248,7 @@ func (t *Tendrils) getNodes() []*Node { } -func (t *Tendrils) getLinks() []*Link { - t.nodes.mu.RLock() - defer t.nodes.mu.RUnlock() - +func (t *Tendrils) getLinksLocked() []*Link { links := t.nodes.getDirectLinks() sort.Slice(links, func(i, j int) bool { if links[i].NodeA.DisplayName() != links[j].NodeA.DisplayName() { diff --git a/multicast.go b/multicast.go index 50d7feb..334f1a5 100644 --- a/multicast.go +++ b/multicast.go @@ -65,13 +65,12 @@ func (n *Nodes) GetMulticastGroupMembers(groupIP net.IP) []*Node { defer n.mu.RUnlock() group := ParseMulticastGroup(groupIP) - groupKey := group.String() var members []*Node for _, node := range n.nodes { if node.MulticastGroups == nil { continue } - if _, exists := node.MulticastGroups[groupKey]; exists { + if _, exists := node.MulticastGroups[group]; exists { members = append(members, node) } } @@ -93,9 +92,9 @@ func (n *Nodes) mergeMulticast(keep, merge *Node) { if keep.MulticastGroups == nil { keep.MulticastGroups = MulticastMembershipSet{} } - for key, membership := range merge.MulticastGroups { - if existing, ok := keep.MulticastGroups[key]; !ok || membership.LastSeen.After(existing.LastSeen) { - keep.MulticastGroups[key] = membership + for group, lastSeen := range merge.MulticastGroups { + if existing, ok := keep.MulticastGroups[group]; !ok || lastSeen.After(existing) { + keep.MulticastGroups[group] = lastSeen } } } diff --git a/types.go b/types.go index 0db1801..740c50c 100644 --- a/types.go +++ b/types.go @@ -218,30 +218,20 @@ func ParseMulticastGroup(ip net.IP) MulticastGroup { return MulticastGroup{RawIP: ip.String()} } -type MulticastMembership struct { - Group MulticastGroup - LastSeen time.Time -} - -type MulticastMembershipSet map[string]*MulticastMembership +type MulticastMembershipSet map[MulticastGroup]time.Time func (s MulticastMembershipSet) Add(group MulticastGroup) { - key := group.String() - if m, exists := s[key]; exists { - m.LastSeen = time.Now() - } else { - s[key] = &MulticastMembership{Group: group, LastSeen: time.Now()} - } + s[group] = time.Now() } func (s MulticastMembershipSet) Remove(group MulticastGroup) { - delete(s, group.String()) + delete(s, group) } func (s MulticastMembershipSet) Groups() []MulticastGroup { result := make([]MulticastGroup, 0, len(s)) - for _, m := range s { - result = append(result, m.Group) + for g := range s { + result = append(result, g) } sort.Slice(result, func(i, j int) bool { return result[i].String() < result[j].String() @@ -251,9 +241,9 @@ func (s MulticastMembershipSet) Groups() []MulticastGroup { func (s MulticastMembershipSet) SACNInputs() []SACNUniverse { var result []SACNUniverse - for _, m := range s { - if m.Group.IsSACN() { - result = append(result, m.Group.SACNUniverse) + for g := range s { + if g.IsSACN() { + result = append(result, g.SACNUniverse) } } sort.Slice(result, func(i, j int) bool { return result[i] < result[j] }) @@ -262,9 +252,9 @@ func (s MulticastMembershipSet) SACNInputs() []SACNUniverse { func (s MulticastMembershipSet) Expire(maxAge time.Duration) { expireTime := time.Now().Add(-maxAge) - for key, m := range s { - if m.LastSeen.Before(expireTime) { - delete(s, key) + for g, lastSeen := range s { + if lastSeen.Before(expireTime) { + delete(s, g) } } } @@ -419,43 +409,6 @@ func (n *Node) SACNInputs() []SACNUniverse { return n.MulticastGroups.SACNInputs() } -func (n *Node) MarshalJSON() ([]byte, error) { - type nodeJSON struct { - ID string `json:"id"` - Names NameSet `json:"names"` - Interfaces InterfaceMap `json:"interfaces"` - MACTableSize int `json:"mac_table_size,omitempty"` - PoEBudget *PoEBudget `json:"poe_budget,omitempty"` - IsDanteClockMaster bool `json:"is_dante_clock_master,omitempty"` - DanteTxChannels string `json:"dante_tx_channels,omitempty"` - MulticastGroups MulticastMembershipSet `json:"multicast_groups,omitempty"` - ArtNetInputs ArtNetUniverseSet `json:"artnet_inputs,omitempty"` - ArtNetOutputs ArtNetUniverseSet `json:"artnet_outputs,omitempty"` - SACNInputs []SACNUniverse `json:"sacn_inputs,omitempty"` - SACNOutputs SACNUniverseSet `json:"sacn_outputs,omitempty"` - DanteTx []*DantePeer `json:"dante_tx,omitempty"` - DanteRx []*DantePeer `json:"dante_rx,omitempty"` - Unreachable bool `json:"unreachable,omitempty"` - } - return json.Marshal(nodeJSON{ - ID: n.ID, - Names: n.Names, - Interfaces: n.Interfaces, - MACTableSize: n.MACTableSize(), - PoEBudget: n.PoEBudget, - IsDanteClockMaster: n.IsDanteClockMaster, - DanteTxChannels: n.DanteTxChannels, - MulticastGroups: n.MulticastGroups, - ArtNetInputs: n.ArtNetInputs, - ArtNetOutputs: n.ArtNetOutputs, - SACNInputs: n.SACNInputs(), - SACNOutputs: n.SACNOutputs, - DanteTx: n.DanteTx, - DanteRx: n.DanteRx, - Unreachable: n.Unreachable, - }) -} - type DantePeer struct { Node *Node `json:"node"` Channels []string `json:"channels,omitempty"`