diff --git a/dante.go b/dante.go index 260de40..98981ef 100644 --- a/dante.go +++ b/dante.go @@ -3,7 +3,6 @@ package tendrils import ( "context" "encoding/binary" - "encoding/json" "fmt" "log" "net" @@ -115,58 +114,6 @@ func (n *Nodes) GetDanteTxDeviceInGroup(groupIP net.IP) *Node { var danteSeqID uint32 -func containsString(slice []string, val string) bool { - for _, s := range slice { - if s == val { - return true - } - } - return false -} - -type DanteFlowStatus uint8 - -const ( - DanteFlowUnsubscribed DanteFlowStatus = 0x00 - DanteFlowNoSource DanteFlowStatus = 0x01 - DanteFlowActive DanteFlowStatus = 0x09 -) - -func (s DanteFlowStatus) String() string { - switch s { - case DanteFlowActive: - return "active" - case DanteFlowNoSource: - return "no-source" - default: - return "" - } -} - -func (s DanteFlowStatus) MarshalJSON() ([]byte, error) { - return json.Marshal(s.String()) -} - -type DanteChannelType uint16 - -const ( - DanteChannelUnknown DanteChannelType = 0 - DanteChannelAudio DanteChannelType = 0x000f - DanteChannelAudio2 DanteChannelType = 0x0006 - DanteChannelVideo DanteChannelType = 0x000e -) - -func (t DanteChannelType) String() string { - switch t { - case DanteChannelAudio, DanteChannelAudio2: - return "audio" - case DanteChannelVideo: - return "video" - default: - return "" - } -} - type DanteSubscription struct { RxChannel int TxDeviceName string @@ -184,13 +131,13 @@ type DanteDeviceInfo struct { HasMulticast bool } -func (n *Nodes) UpdateDanteFlow(source, subscriber *Node, channelInfo string, flowStatus DanteFlowStatus) { +func (n *Nodes) UpdateDanteFlow(source, subscriber *Node, channel *DanteChannel) { n.mu.Lock() defer n.mu.Unlock() now := time.Now() - n.updateDanteTx(source, subscriber, channelInfo, flowStatus, now) - n.updateDanteRx(subscriber, source, channelInfo, flowStatus, now) + n.updateDanteTx(source, subscriber, channel, now) + n.updateDanteRx(subscriber, source, channel, now) } func (n *Nodes) ensureDanteFlows(node *Node) *DanteFlows { @@ -200,7 +147,7 @@ func (n *Nodes) ensureDanteFlows(node *Node) *DanteFlows { return node.DanteFlows } -func (n *Nodes) updateDanteTx(source, subscriber *Node, channelInfo string, flowStatus DanteFlowStatus, now time.Time) { +func (n *Nodes) updateDanteTx(source, subscriber *Node, channel *DanteChannel, now time.Time) { flows := n.ensureDanteFlows(source) flows.lastSeen = now @@ -213,18 +160,13 @@ func (n *Nodes) updateDanteTx(source, subscriber *Node, channelInfo string, flow } if peer == nil { peer = &DantePeer{ - Node: subscriber, - Status: map[string]string{}, + Node: subscriber, } flows.Tx = append(flows.Tx, peer) } - if channelInfo != "" && !containsString(peer.Channels, channelInfo) { - peer.Channels = append(peer.Channels, channelInfo) - sort.Strings(peer.Channels) - } - if channelInfo != "" { - peer.Status[channelInfo] = flowStatus.String() + if channel != nil { + peer.Channels = addOrUpdateChannel(peer.Channels, channel) } sort.Slice(flows.Tx, func(i, j int) bool { @@ -232,7 +174,7 @@ func (n *Nodes) updateDanteTx(source, subscriber *Node, channelInfo string, flow }) } -func (n *Nodes) updateDanteRx(subscriber, source *Node, channelInfo string, flowStatus DanteFlowStatus, now time.Time) { +func (n *Nodes) updateDanteRx(subscriber, source *Node, channel *DanteChannel, now time.Time) { flows := n.ensureDanteFlows(subscriber) flows.lastSeen = now @@ -245,18 +187,13 @@ func (n *Nodes) updateDanteRx(subscriber, source *Node, channelInfo string, flow } if peer == nil { peer = &DantePeer{ - Node: source, - Status: map[string]string{}, + Node: source, } flows.Rx = append(flows.Rx, peer) } - if channelInfo != "" && !containsString(peer.Channels, channelInfo) { - peer.Channels = append(peer.Channels, channelInfo) - sort.Strings(peer.Channels) - } - if channelInfo != "" { - peer.Status[channelInfo] = flowStatus.String() + if channel != nil { + peer.Channels = addOrUpdateChannel(peer.Channels, channel) } sort.Slice(flows.Rx, func(i, j int) bool { @@ -264,6 +201,24 @@ func (n *Nodes) updateDanteRx(subscriber, source *Node, channelInfo string, flow }) } +func addOrUpdateChannel(channels []*DanteChannel, channel *DanteChannel) []*DanteChannel { + for _, ch := range channels { + if ch.TxChannel == channel.TxChannel && ch.RxChannel == channel.RxChannel { + ch.Type = channel.Type + ch.Status = channel.Status + return channels + } + } + channels = append(channels, channel) + sort.Slice(channels, func(i, j int) bool { + if channels[i].RxChannel != channels[j].RxChannel { + return channels[i].RxChannel < channels[j].RxChannel + } + return channels[i].TxChannel < channels[j].TxChannel + }) + return channels +} + func (n *Nodes) expireDante() { for _, node := range n.nodes { if node.DanteFlows != nil && node.DanteFlows.Expire(5*time.Minute) { @@ -292,12 +247,7 @@ func (n *Nodes) mergeDante(keep, merge *Node) { keep.DanteFlows.Tx = append(keep.DanteFlows.Tx, peer) } else { for _, ch := range peer.Channels { - if !containsString(existing.Channels, ch) { - existing.Channels = append(existing.Channels, ch) - } - } - for ch, status := range peer.Status { - existing.Status[ch] = status + existing.Channels = addOrUpdateChannel(existing.Channels, ch) } } } @@ -314,12 +264,7 @@ func (n *Nodes) mergeDante(keep, merge *Node) { keep.DanteFlows.Rx = append(keep.DanteFlows.Rx, peer) } else { for _, ch := range peer.Channels { - if !containsString(existing.Channels, ch) { - existing.Channels = append(existing.Channels, ch) - } - } - for ch, status := range peer.Status { - existing.Status[ch] = status + existing.Channels = addOrUpdateChannel(existing.Channels, ch) } } } @@ -376,25 +321,14 @@ func (n *Nodes) logDante() { allNoChannelFlows = append(allNoChannelFlows, fmt.Sprintf("%s -> %s", sourceName, subName)) } else { for _, ch := range peer.Channels { - parts := strings.Split(ch, "->") - if len(parts) == 2 { - rxPart := parts[1] - chType := "" - if idx := strings.LastIndex(rxPart, ":"); idx != -1 { - chType = rxPart[idx+1:] - rxPart = rxPart[:idx] - } - allChannelFlows = append(allChannelFlows, channelFlow{ - sourceName: sourceName, - txCh: parts[0], - rxName: subName, - rxCh: rxPart, - channelType: chType, - down: peer.Status[ch] == "no-source", - }) - } else { - allNoChannelFlows = append(allNoChannelFlows, fmt.Sprintf("%s -> %s[%s]", sourceName, subName, ch)) - } + allChannelFlows = append(allChannelFlows, channelFlow{ + sourceName: sourceName, + txCh: ch.TxChannel, + rxName: subName, + rxCh: fmt.Sprintf("%02d", ch.RxChannel), + channelType: ch.Type.String(), + down: ch.Status == DanteFlowNoSource, + }) } } } @@ -876,18 +810,18 @@ func (t *Tendrils) probeDanteDeviceWithPort(ip net.IP, port int) { if txDeviceName == "." { txDeviceName = info.Name } - channelInfo := "" + var channel *DanteChannel if sub.TxChannelName != "" { - typeStr := sub.ChannelType.String() - if typeStr != "" { - channelInfo = fmt.Sprintf("%s → %02d [%s]", sub.TxChannelName, sub.RxChannel, typeStr) - } else { - channelInfo = fmt.Sprintf("%s → %02d", sub.TxChannelName, sub.RxChannel) + channel = &DanteChannel{ + TxChannel: sub.TxChannelName, + RxChannel: sub.RxChannel, + Type: sub.ChannelType, + Status: sub.FlowStatus, } } sourceNode := t.nodes.GetOrCreateByName(txDeviceName) subscriberNode := t.nodes.GetOrCreateByName(info.Name) - t.nodes.UpdateDanteFlow(sourceNode, subscriberNode, channelInfo, sub.FlowStatus) + t.nodes.UpdateDanteFlow(sourceNode, subscriberNode, channel) needIGMPFallback = false } } @@ -907,7 +841,7 @@ func (t *Tendrils) probeDanteDeviceWithPort(ip net.IP, port int) { sourceNode = t.nodes.GetOrCreateByName(ParseMulticastGroup(groupIP).String()) } subscriberNode := t.nodes.GetOrCreateByName(info.Name) - t.nodes.UpdateDanteFlow(sourceNode, subscriberNode, "", DanteFlowActive) + t.nodes.UpdateDanteFlow(sourceNode, subscriberNode, nil) } } } diff --git a/static/index.html b/static/index.html index f6a617d..f1c14ca 100644 --- a/static/index.html +++ b/static/index.html @@ -1893,6 +1893,13 @@ const danteNodes = new Map(); + const formatDanteChannel = (ch) => { + let str = ch.tx_channel + ' → ' + String(ch.rx_channel).padStart(2, '0'); + if (ch.type) str += ' [' + ch.type + ']'; + if (ch.status === 'no-source') str += ' ⚠'; + return str; + }; + nodes.forEach(node => { const nodeId = node.id; const danteTx = node.dante_flows?.tx || []; @@ -1902,14 +1909,14 @@ const txTo = danteTx.map(peer => { const peerName = getShortLabel(peer.node); - const channels = peer.channels || []; + const channels = (peer.channels || []).map(formatDanteChannel); const channelSummary = channels.length > 0 ? '\n ' + channels.join('\n ') : ''; return peerName + channelSummary; }); const rxFrom = danteRx.map(peer => { const peerName = getShortLabel(peer.node); - const channels = peer.channels || []; + const channels = (peer.channels || []).map(formatDanteChannel); const channelSummary = channels.length > 0 ? '\n ' + channels.join('\n ') : ''; return peerName + channelSummary; }); diff --git a/types.go b/types.go index 7261a80..43efb11 100644 --- a/types.go +++ b/types.go @@ -489,17 +489,77 @@ func (n *Node) SACNInputs() []SACNUniverse { return n.MulticastGroups.SACNInputs() } +type DanteFlowStatus uint8 + +const ( + DanteFlowUnsubscribed DanteFlowStatus = 0x00 + DanteFlowNoSource DanteFlowStatus = 0x01 + DanteFlowActive DanteFlowStatus = 0x09 +) + +func (s DanteFlowStatus) String() string { + switch s { + case DanteFlowActive: + return "active" + case DanteFlowNoSource: + return "no-source" + default: + return "" + } +} + +func (s DanteFlowStatus) MarshalJSON() ([]byte, error) { + str := s.String() + if str == "" { + return []byte("null"), nil + } + return json.Marshal(str) +} + +type DanteChannelType uint16 + +const ( + DanteChannelUnknown DanteChannelType = 0 + DanteChannelAudio DanteChannelType = 0x000f + DanteChannelAudio2 DanteChannelType = 0x0006 + DanteChannelVideo DanteChannelType = 0x000e +) + +func (t DanteChannelType) String() string { + switch t { + case DanteChannelAudio, DanteChannelAudio2: + return "audio" + case DanteChannelVideo: + return "video" + default: + return "" + } +} + +func (t DanteChannelType) MarshalJSON() ([]byte, error) { + str := t.String() + if str == "" { + return []byte("null"), nil + } + return json.Marshal(str) +} + +type DanteChannel struct { + TxChannel string `json:"tx_channel"` + RxChannel int `json:"rx_channel"` + Type DanteChannelType `json:"type,omitempty"` + Status DanteFlowStatus `json:"status,omitempty"` +} + type DantePeer struct { - Node *Node `json:"node"` - Channels []string `json:"channels,omitempty"` - Status map[string]string `json:"status,omitempty"` + Node *Node `json:"node"` + Channels []*DanteChannel `json:"channels,omitempty"` } func (p *DantePeer) MarshalJSON() ([]byte, error) { type peerJSON struct { - Node *Node `json:"node"` - Channels []string `json:"channels,omitempty"` - Status map[string]string `json:"status,omitempty"` + Node *Node `json:"node"` + Channels []*DanteChannel `json:"channels,omitempty"` } nodeRef := &Node{ ID: p.Node.ID, @@ -509,7 +569,6 @@ func (p *DantePeer) MarshalJSON() ([]byte, error) { return json.Marshal(peerJSON{ Node: nodeRef, Channels: p.Channels, - Status: p.Status, }) }