diff --git a/main.go b/main.go index 4403c0d..9ccf8bb 100644 --- a/main.go +++ b/main.go @@ -11,7 +11,6 @@ import ( "os/signal" "strconv" "strings" - "sync" "syscall" "time" @@ -32,8 +31,6 @@ type App struct { artTargets map[uint16]*net.UDPAddr sacnTargets map[uint16][]*net.UDPAddr debug bool - statsMu sync.Mutex - stats map[config.Universe]uint64 } func main() { @@ -150,7 +147,6 @@ func main() { artTargets: artTargets, sacnTargets: sacnTargets, debug: *debug, - stats: map[config.Universe]uint64{}, } // Create ArtNet receiver if enabled @@ -245,9 +241,6 @@ func (a *App) HandleDMX(src *net.UDPAddr, pkt *artnet.DMXPacket) { src.IP, pkt.Universe, pkt.Sequence, pkt.Length) } u := config.Universe{Protocol: config.ProtocolArtNet, Number: uint16(pkt.Universe)} - a.statsMu.Lock() - a.stats[u]++ - a.statsMu.Unlock() a.sendOutputs(a.engine.Remap(u, pkt.Data)) } @@ -273,9 +266,6 @@ func (a *App) HandleSACN(universe uint16, data [512]byte) { log.Printf("[<-sacn] universe=%d", universe) } u := config.Universe{Protocol: config.ProtocolSACN, Number: universe} - a.statsMu.Lock() - a.stats[u]++ - a.statsMu.Unlock() a.sendOutputs(a.engine.Remap(u, data)) } @@ -334,19 +324,13 @@ func (a *App) handleConfig(w http.ResponseWriter, r *http.Request) { } func (a *App) printStats() { - a.statsMu.Lock() - stats := a.stats - a.stats = map[config.Universe]uint64{} - a.statsMu.Unlock() - - if len(stats) == 0 && len(a.cfg.Mappings) == 0 { + if len(a.cfg.Mappings) == 0 { return } - + counts := a.engine.SwapStats() log.Printf("[stats] mapping traffic (last 10s):") for _, m := range a.cfg.Mappings { - count := stats[m.From.Universe] - log.Printf("[stats] %s -> %s: %d packets", m.From, m.To, count) + log.Printf("[stats] %s -> %s: %d packets", m.From, m.To, counts[m.From.Universe]) } } diff --git a/remap/engine.go b/remap/engine.go index 48a2872..40735ec 100644 --- a/remap/engine.go +++ b/remap/engine.go @@ -2,6 +2,7 @@ package remap import ( "sync" + "sync/atomic" "github.com/gopatchy/artmap/config" ) @@ -12,19 +13,30 @@ type Output struct { Data [512]byte } +// sourceEntry holds mappings and stats for a source universe +type sourceEntry struct { + mappings []config.NormalizedMapping + counter atomic.Uint64 +} + // Engine handles DMX channel remapping type Engine struct { mappings []config.NormalizedMapping - bySource map[config.Universe][]config.NormalizedMapping + bySource map[config.Universe]*sourceEntry state map[config.Universe]*[512]byte stateMu sync.Mutex } // NewEngine creates a new remapping engine func NewEngine(mappings []config.NormalizedMapping) *Engine { - bySource := make(map[config.Universe][]config.NormalizedMapping) + bySource := map[config.Universe]*sourceEntry{} for _, m := range mappings { - bySource[m.From] = append(bySource[m.From], m) + entry := bySource[m.From] + if entry == nil { + entry = &sourceEntry{} + bySource[m.From] = entry + } + entry.mappings = append(entry.mappings, m) } state := make(map[config.Universe]*[512]byte) @@ -43,17 +55,18 @@ func NewEngine(mappings []config.NormalizedMapping) *Engine { // Remap applies mappings to incoming DMX data and returns outputs func (e *Engine) Remap(src config.Universe, srcData [512]byte) []Output { - mappings, ok := e.bySource[src] - if !ok { + entry := e.bySource[src] + if entry == nil { return nil } + entry.counter.Add(1) e.stateMu.Lock() defer e.stateMu.Unlock() affected := make(map[config.Universe]bool) - for _, m := range mappings { + for _, m := range entry.mappings { affected[m.To] = true outState := e.state[m.To] @@ -77,6 +90,15 @@ func (e *Engine) Remap(src config.Universe, srcData [512]byte) []Output { return result } +// SwapStats returns packet counts per source universe since last call and resets them +func (e *Engine) SwapStats() map[config.Universe]uint64 { + result := map[config.Universe]uint64{} + for u, entry := range e.bySource { + result[u] = entry.counter.Swap(0) + } + return result +} + // SourceArtNetUniverses returns source ArtNet universe numbers (for discovery) func (e *Engine) SourceArtNetUniverses() []uint16 { seen := make(map[uint16]bool)