diff --git a/main.go b/main.go index 8348a95..ef22314 100644 --- a/main.go +++ b/main.go @@ -32,6 +32,7 @@ type App struct { senders *senders.UniverseSenders artTargets map[uint16]*net.UDPAddr sacnTargets map[uint16][]*net.UDPAddr + senderHz int debug bool } @@ -41,6 +42,7 @@ func main() { artnetBroadcast := flag.String("artnet-broadcast", "auto", "artnet broadcast addresses (comma-separated, or 'auto')") sacnInterface := flag.String("sacn-interface", "", "network interface for sACN multicast") apiListen := flag.String("api-listen", ":8080", "HTTP API listen address (empty to disable)") + senderHz := flag.Int("sender-hz", 40, "fixed sender rate in Hz (0 = send immediately on input)") debug := flag.Bool("debug", false, "log incoming/outgoing dmx packets") flag.Parse() @@ -149,6 +151,7 @@ func main() { senders: senders.New(), artTargets: artTargets, sacnTargets: sacnTargets, + senderHz: *senderHz, debug: *debug, } @@ -228,6 +231,18 @@ func main() { } }() + // Start fixed-rate sender + if *senderHz > 0 { + log.Printf("[sender] starting at %dHz", *senderHz) + go func() { + ticker := time.NewTicker(time.Second / time.Duration(*senderHz)) + defer ticker.Stop() + for range ticker.C { + app.sendOutputs(app.engine.GetDirtyOutputs()) + } + }() + } + // Wait for interrupt sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) @@ -251,7 +266,10 @@ func (a *App) HandleDMX(src *net.UDPAddr, pkt *artnet.DMXPacket) { } u := config.Universe{Protocol: config.ProtocolArtNet, Number: uint16(pkt.Universe)} a.senders.Record(u, src.IP) - a.sendOutputs(a.engine.Remap(u, pkt.Data)) + a.engine.Remap(u, pkt.Data) + if a.senderHz == 0 { + a.sendOutputs(a.engine.GetDirtyOutputs()) + } } // HandlePoll implements artnet.PacketHandler @@ -277,7 +295,10 @@ func (a *App) HandleSACN(src *net.UDPAddr, pkt *sacn.DataPacket) { } u := config.Universe{Protocol: config.ProtocolSACN, Number: pkt.Universe} a.senders.Record(u, src.IP) - a.sendOutputs(a.engine.Remap(u, pkt.Data)) + a.engine.Remap(u, pkt.Data) + if a.senderHz == 0 { + a.sendOutputs(a.engine.GetDirtyOutputs()) + } } func (a *App) sendOutputs(outputs []remap.Output) { diff --git a/remap/engine.go b/remap/engine.go index 40735ec..1969329 100644 --- a/remap/engine.go +++ b/remap/engine.go @@ -19,12 +19,18 @@ type sourceEntry struct { counter atomic.Uint64 } +// universeBuffer holds per-output-universe state with its own lock +type universeBuffer struct { + mu sync.Mutex + data [512]byte + dirty bool +} + // Engine handles DMX channel remapping type Engine struct { mappings []config.NormalizedMapping bySource map[config.Universe]*sourceEntry - state map[config.Universe]*[512]byte - stateMu sync.Mutex + outputs map[config.Universe]*universeBuffer } // NewEngine creates a new remapping engine @@ -39,57 +45,70 @@ func NewEngine(mappings []config.NormalizedMapping) *Engine { entry.mappings = append(entry.mappings, m) } - state := make(map[config.Universe]*[512]byte) + outputs := map[config.Universe]*universeBuffer{} for _, m := range mappings { - if _, ok := state[m.To]; !ok { - state[m.To] = &[512]byte{} + if _, ok := outputs[m.To]; !ok { + outputs[m.To] = &universeBuffer{} } } return &Engine{ mappings: mappings, bySource: bySource, - state: state, + outputs: outputs, } } -// Remap applies mappings to incoming DMX data and returns outputs -func (e *Engine) Remap(src config.Universe, srcData [512]byte) []Output { +// Remap applies mappings to incoming DMX data and marks affected outputs dirty +func (e *Engine) Remap(src config.Universe, srcData [512]byte) { entry := e.bySource[src] if entry == nil { - return nil + return } entry.counter.Add(1) - e.stateMu.Lock() - defer e.stateMu.Unlock() - - affected := make(map[config.Universe]bool) - for _, m := range entry.mappings { - affected[m.To] = true - outState := e.state[m.To] + e.applyMapping(m, srcData) + } +} - for i := 0; i < m.Count; i++ { - srcChan := m.FromChan + i - dstChan := m.ToChan + i - if srcChan < 512 && dstChan < 512 { - outState[dstChan] = srcData[srcChan] - } +func (e *Engine) applyMapping(m config.NormalizedMapping, srcData [512]byte) { + buf := e.outputs[m.To] + buf.mu.Lock() + defer buf.mu.Unlock() + + for i := 0; i < m.Count; i++ { + srcChan := m.FromChan + i + dstChan := m.ToChan + i + if srcChan < 512 && dstChan < 512 { + buf.data[dstChan] = srcData[srcChan] } } + buf.dirty = true +} - result := make([]Output, 0, len(affected)) - for u := range affected { - result = append(result, Output{ - Universe: u, - Data: *e.state[u], - }) +// GetDirtyOutputs returns outputs that have been modified since last call +func (e *Engine) GetDirtyOutputs() []Output { + var result []Output + for u, buf := range e.outputs { + if out, ok := e.getDirtyOutput(u, buf); ok { + result = append(result, out) + } } - return result } +func (e *Engine) getDirtyOutput(u config.Universe, buf *universeBuffer) (Output, bool) { + buf.mu.Lock() + defer buf.mu.Unlock() + + if !buf.dirty { + return Output{}, false + } + buf.dirty = false + return Output{Universe: u, Data: buf.data}, true +} + // SwapStats returns packet counts per source universe since last call and resets them func (e *Engine) SwapStats() map[config.Universe]uint64 { result := map[config.Universe]uint64{} diff --git a/remap/fuzz_test.go b/remap/fuzz_test.go index 51aaec3..a0a7b36 100644 --- a/remap/fuzz_test.go +++ b/remap/fuzz_test.go @@ -45,7 +45,8 @@ func FuzzRemap(f *testing.F) { var srcData [512]byte copy(srcData[:], inputData[:512]) - outputs := engine.Remap(srcU, srcData) + engine.Remap(srcU, srcData) + outputs := engine.GetDirtyOutputs() if len(outputs) != 1 { t.Fatalf("expected 1 output, got %d", len(outputs)) @@ -84,7 +85,8 @@ func FuzzRemapMultipleMappings(f *testing.F) { var srcData [512]byte copy(srcData[:], inputData[:512]) - outputs := engine.Remap(srcU, srcData) + engine.Remap(srcU, srcData) + outputs := engine.GetDirtyOutputs() if len(outputs) != 2 { t.Fatalf("expected 2 outputs, got %d", len(outputs)) @@ -129,9 +131,10 @@ func FuzzRemapUnmatchedUniverse(f *testing.F) { var srcData [512]byte copy(srcData[:], inputData[:512]) - outputs := engine.Remap(otherU, srcData) - if outputs != nil { - t.Fatalf("expected nil output for unmatched universe, got %d outputs", len(outputs)) + engine.Remap(otherU, srcData) + outputs := engine.GetDirtyOutputs() + if len(outputs) != 0 { + t.Fatalf("expected 0 outputs for unmatched universe, got %d outputs", len(outputs)) } }) }