Use MulticastGroup as map key and serialize under lock

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Ian Gulliver
2026-01-28 22:48:55 -08:00
parent 042ccab74e
commit aebd6f5e2c
4 changed files with 40 additions and 92 deletions

View File

@@ -103,10 +103,9 @@ func (n *Nodes) GetDanteTxDeviceInGroup(groupIP net.IP) *Node {
defer n.mu.RUnlock() defer n.mu.RUnlock()
group := ParseMulticastGroup(groupIP) group := ParseMulticastGroup(groupIP)
groupKey := group.String()
for _, node := range n.nodes { for _, node := range n.nodes {
if node.DanteTxChannels != "" && node.MulticastGroups != nil { if node.DanteTxChannels != "" && node.MulticastGroups != nil {
if _, exists := node.MulticastGroups[groupKey]; exists { if _, exists := node.MulticastGroups[group]; exists {
return node return node
} }
} }

51
http.go
View File

@@ -115,14 +115,26 @@ func ensureCert() error {
} }
func (t *Tendrils) handleAPIStatus(w http.ResponseWriter, r *http.Request) { func (t *Tendrils) handleAPIStatus(w http.ResponseWriter, r *http.Request) {
status := t.GetStatus()
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(status); err != nil { data, err := t.GetStatusJSON()
if err != nil {
log.Printf("[ERROR] failed to encode status: %v", err) log.Printf("[ERROR] failed to encode status: %v", err)
return
} }
w.Write(data)
} }
func (t *Tendrils) GetStatus() *StatusResponse { func (t *Tendrils) GetStatusJSON() ([]byte, error) {
t.nodes.mu.Lock()
t.nodes.expireMulticastMemberships()
t.nodes.expireArtNet()
t.nodes.expireSACN()
t.nodes.expireDante()
t.nodes.mu.Unlock()
t.nodes.mu.RLock()
defer t.nodes.mu.RUnlock()
var broadcastStats *BroadcastStatsResponse var broadcastStats *BroadcastStatsResponse
if t.broadcast != nil { if t.broadcast != nil {
stats := t.broadcast.GetStats() stats := t.broadcast.GetStats()
@@ -132,13 +144,13 @@ func (t *Tendrils) GetStatus() *StatusResponse {
if config == nil { if config == nil {
config = &Config{} config = &Config{}
} }
return &StatusResponse{ return json.Marshal(&StatusResponse{
Config: config, Config: config,
Nodes: t.getNodes(), Nodes: t.getNodesLocked(),
Links: t.getLinks(), Links: t.getLinksLocked(),
Errors: t.errors.GetErrors(), Errors: t.errors.GetErrors(),
BroadcastStats: broadcastStats, BroadcastStats: broadcastStats,
} })
} }
func (t *Tendrils) handleClearError(w http.ResponseWriter, r *http.Request) { func (t *Tendrils) handleClearError(w http.ResponseWriter, r *http.Request) {
@@ -175,7 +187,7 @@ func (t *Tendrils) handleAPIStatusStream(w http.ResponseWriter, r *http.Request)
defer t.unsubscribeSSE(subID) defer t.unsubscribeSSE(subID)
sendStatus := func() error { sendStatus := func() error {
data, err := json.Marshal(t.GetStatus()) data, err := t.GetStatusJSON()
if err != nil { if err != nil {
log.Printf("[ERROR] failed to marshal status: %v", err) log.Printf("[ERROR] failed to marshal status: %v", err)
return err return err
@@ -213,25 +225,13 @@ func (t *Tendrils) handleAPIStatusStream(w http.ResponseWriter, r *http.Request)
} }
} }
func (t *Tendrils) getNodes() []*Node { func (t *Tendrils) getNodesLocked() []*Node {
t.nodes.mu.Lock()
t.nodes.expireMulticastMemberships()
t.nodes.expireArtNet()
t.nodes.expireSACN()
t.nodes.expireDante()
t.nodes.mu.Unlock()
t.nodes.mu.RLock()
defer t.nodes.mu.RUnlock()
unreachableNodes := t.errors.GetUnreachableNodeSet() unreachableNodes := t.errors.GetUnreachableNodeSet()
nodes := make([]*Node, 0, len(t.nodes.nodes)) nodes := make([]*Node, 0, len(t.nodes.nodes))
for _, node := range t.nodes.nodes { for _, node := range t.nodes.nodes {
n := new(Node) node.Unreachable = unreachableNodes[node.ID]
*n = *node nodes = append(nodes, node)
n.Unreachable = unreachableNodes[node.ID]
nodes = append(nodes, n)
} }
sort.Slice(nodes, func(i, j int) bool { sort.Slice(nodes, func(i, j int) bool {
@@ -248,10 +248,7 @@ func (t *Tendrils) getNodes() []*Node {
} }
func (t *Tendrils) getLinks() []*Link { func (t *Tendrils) getLinksLocked() []*Link {
t.nodes.mu.RLock()
defer t.nodes.mu.RUnlock()
links := t.nodes.getDirectLinks() links := t.nodes.getDirectLinks()
sort.Slice(links, func(i, j int) bool { sort.Slice(links, func(i, j int) bool {
if links[i].NodeA.DisplayName() != links[j].NodeA.DisplayName() { if links[i].NodeA.DisplayName() != links[j].NodeA.DisplayName() {

View File

@@ -65,13 +65,12 @@ func (n *Nodes) GetMulticastGroupMembers(groupIP net.IP) []*Node {
defer n.mu.RUnlock() defer n.mu.RUnlock()
group := ParseMulticastGroup(groupIP) group := ParseMulticastGroup(groupIP)
groupKey := group.String()
var members []*Node var members []*Node
for _, node := range n.nodes { for _, node := range n.nodes {
if node.MulticastGroups == nil { if node.MulticastGroups == nil {
continue continue
} }
if _, exists := node.MulticastGroups[groupKey]; exists { if _, exists := node.MulticastGroups[group]; exists {
members = append(members, node) members = append(members, node)
} }
} }
@@ -93,9 +92,9 @@ func (n *Nodes) mergeMulticast(keep, merge *Node) {
if keep.MulticastGroups == nil { if keep.MulticastGroups == nil {
keep.MulticastGroups = MulticastMembershipSet{} keep.MulticastGroups = MulticastMembershipSet{}
} }
for key, membership := range merge.MulticastGroups { for group, lastSeen := range merge.MulticastGroups {
if existing, ok := keep.MulticastGroups[key]; !ok || membership.LastSeen.After(existing.LastSeen) { if existing, ok := keep.MulticastGroups[group]; !ok || lastSeen.After(existing) {
keep.MulticastGroups[key] = membership keep.MulticastGroups[group] = lastSeen
} }
} }
} }

View File

@@ -218,30 +218,20 @@ func ParseMulticastGroup(ip net.IP) MulticastGroup {
return MulticastGroup{RawIP: ip.String()} return MulticastGroup{RawIP: ip.String()}
} }
type MulticastMembership struct { type MulticastMembershipSet map[MulticastGroup]time.Time
Group MulticastGroup
LastSeen time.Time
}
type MulticastMembershipSet map[string]*MulticastMembership
func (s MulticastMembershipSet) Add(group MulticastGroup) { func (s MulticastMembershipSet) Add(group MulticastGroup) {
key := group.String() s[group] = time.Now()
if m, exists := s[key]; exists {
m.LastSeen = time.Now()
} else {
s[key] = &MulticastMembership{Group: group, LastSeen: time.Now()}
}
} }
func (s MulticastMembershipSet) Remove(group MulticastGroup) { func (s MulticastMembershipSet) Remove(group MulticastGroup) {
delete(s, group.String()) delete(s, group)
} }
func (s MulticastMembershipSet) Groups() []MulticastGroup { func (s MulticastMembershipSet) Groups() []MulticastGroup {
result := make([]MulticastGroup, 0, len(s)) result := make([]MulticastGroup, 0, len(s))
for _, m := range s { for g := range s {
result = append(result, m.Group) result = append(result, g)
} }
sort.Slice(result, func(i, j int) bool { sort.Slice(result, func(i, j int) bool {
return result[i].String() < result[j].String() return result[i].String() < result[j].String()
@@ -251,9 +241,9 @@ func (s MulticastMembershipSet) Groups() []MulticastGroup {
func (s MulticastMembershipSet) SACNInputs() []SACNUniverse { func (s MulticastMembershipSet) SACNInputs() []SACNUniverse {
var result []SACNUniverse var result []SACNUniverse
for _, m := range s { for g := range s {
if m.Group.IsSACN() { if g.IsSACN() {
result = append(result, m.Group.SACNUniverse) result = append(result, g.SACNUniverse)
} }
} }
sort.Slice(result, func(i, j int) bool { return result[i] < result[j] }) sort.Slice(result, func(i, j int) bool { return result[i] < result[j] })
@@ -262,9 +252,9 @@ func (s MulticastMembershipSet) SACNInputs() []SACNUniverse {
func (s MulticastMembershipSet) Expire(maxAge time.Duration) { func (s MulticastMembershipSet) Expire(maxAge time.Duration) {
expireTime := time.Now().Add(-maxAge) expireTime := time.Now().Add(-maxAge)
for key, m := range s { for g, lastSeen := range s {
if m.LastSeen.Before(expireTime) { if lastSeen.Before(expireTime) {
delete(s, key) delete(s, g)
} }
} }
} }
@@ -419,43 +409,6 @@ func (n *Node) SACNInputs() []SACNUniverse {
return n.MulticastGroups.SACNInputs() return n.MulticastGroups.SACNInputs()
} }
func (n *Node) MarshalJSON() ([]byte, error) {
type nodeJSON struct {
ID string `json:"id"`
Names NameSet `json:"names"`
Interfaces InterfaceMap `json:"interfaces"`
MACTableSize int `json:"mac_table_size,omitempty"`
PoEBudget *PoEBudget `json:"poe_budget,omitempty"`
IsDanteClockMaster bool `json:"is_dante_clock_master,omitempty"`
DanteTxChannels string `json:"dante_tx_channels,omitempty"`
MulticastGroups MulticastMembershipSet `json:"multicast_groups,omitempty"`
ArtNetInputs ArtNetUniverseSet `json:"artnet_inputs,omitempty"`
ArtNetOutputs ArtNetUniverseSet `json:"artnet_outputs,omitempty"`
SACNInputs []SACNUniverse `json:"sacn_inputs,omitempty"`
SACNOutputs SACNUniverseSet `json:"sacn_outputs,omitempty"`
DanteTx []*DantePeer `json:"dante_tx,omitempty"`
DanteRx []*DantePeer `json:"dante_rx,omitempty"`
Unreachable bool `json:"unreachable,omitempty"`
}
return json.Marshal(nodeJSON{
ID: n.ID,
Names: n.Names,
Interfaces: n.Interfaces,
MACTableSize: n.MACTableSize(),
PoEBudget: n.PoEBudget,
IsDanteClockMaster: n.IsDanteClockMaster,
DanteTxChannels: n.DanteTxChannels,
MulticastGroups: n.MulticastGroups,
ArtNetInputs: n.ArtNetInputs,
ArtNetOutputs: n.ArtNetOutputs,
SACNInputs: n.SACNInputs(),
SACNOutputs: n.SACNOutputs,
DanteTx: n.DanteTx,
DanteRx: n.DanteRx,
Unreachable: n.Unreachable,
})
}
type DantePeer struct { type DantePeer struct {
Node *Node `json:"node"` Node *Node `json:"node"`
Channels []string `json:"channels,omitempty"` Channels []string `json:"channels,omitempty"`