Add fixed-rate 40Hz sender with per-universe buffering

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Ian Gulliver
2026-01-31 07:31:12 -08:00
parent 13b10bebc7
commit 80a307da94
3 changed files with 79 additions and 36 deletions

25
main.go
View File

@@ -32,6 +32,7 @@ type App struct {
senders *senders.UniverseSenders senders *senders.UniverseSenders
artTargets map[uint16]*net.UDPAddr artTargets map[uint16]*net.UDPAddr
sacnTargets map[uint16][]*net.UDPAddr sacnTargets map[uint16][]*net.UDPAddr
senderHz int
debug bool debug bool
} }
@@ -41,6 +42,7 @@ func main() {
artnetBroadcast := flag.String("artnet-broadcast", "auto", "artnet broadcast addresses (comma-separated, or 'auto')") artnetBroadcast := flag.String("artnet-broadcast", "auto", "artnet broadcast addresses (comma-separated, or 'auto')")
sacnInterface := flag.String("sacn-interface", "", "network interface for sACN multicast") sacnInterface := flag.String("sacn-interface", "", "network interface for sACN multicast")
apiListen := flag.String("api-listen", ":8080", "HTTP API listen address (empty to disable)") apiListen := flag.String("api-listen", ":8080", "HTTP API listen address (empty to disable)")
senderHz := flag.Int("sender-hz", 40, "fixed sender rate in Hz (0 = send immediately on input)")
debug := flag.Bool("debug", false, "log incoming/outgoing dmx packets") debug := flag.Bool("debug", false, "log incoming/outgoing dmx packets")
flag.Parse() flag.Parse()
@@ -149,6 +151,7 @@ func main() {
senders: senders.New(), senders: senders.New(),
artTargets: artTargets, artTargets: artTargets,
sacnTargets: sacnTargets, sacnTargets: sacnTargets,
senderHz: *senderHz,
debug: *debug, debug: *debug,
} }
@@ -228,6 +231,18 @@ func main() {
} }
}() }()
// Start fixed-rate sender
if *senderHz > 0 {
log.Printf("[sender] starting at %dHz", *senderHz)
go func() {
ticker := time.NewTicker(time.Second / time.Duration(*senderHz))
defer ticker.Stop()
for range ticker.C {
app.sendOutputs(app.engine.GetDirtyOutputs())
}
}()
}
// Wait for interrupt // Wait for interrupt
sigChan := make(chan os.Signal, 1) sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
@@ -251,7 +266,10 @@ func (a *App) HandleDMX(src *net.UDPAddr, pkt *artnet.DMXPacket) {
} }
u := config.Universe{Protocol: config.ProtocolArtNet, Number: uint16(pkt.Universe)} u := config.Universe{Protocol: config.ProtocolArtNet, Number: uint16(pkt.Universe)}
a.senders.Record(u, src.IP) a.senders.Record(u, src.IP)
a.sendOutputs(a.engine.Remap(u, pkt.Data)) a.engine.Remap(u, pkt.Data)
if a.senderHz == 0 {
a.sendOutputs(a.engine.GetDirtyOutputs())
}
} }
// HandlePoll implements artnet.PacketHandler // HandlePoll implements artnet.PacketHandler
@@ -277,7 +295,10 @@ func (a *App) HandleSACN(src *net.UDPAddr, pkt *sacn.DataPacket) {
} }
u := config.Universe{Protocol: config.ProtocolSACN, Number: pkt.Universe} u := config.Universe{Protocol: config.ProtocolSACN, Number: pkt.Universe}
a.senders.Record(u, src.IP) a.senders.Record(u, src.IP)
a.sendOutputs(a.engine.Remap(u, pkt.Data)) a.engine.Remap(u, pkt.Data)
if a.senderHz == 0 {
a.sendOutputs(a.engine.GetDirtyOutputs())
}
} }
func (a *App) sendOutputs(outputs []remap.Output) { func (a *App) sendOutputs(outputs []remap.Output) {

View File

@@ -19,12 +19,18 @@ type sourceEntry struct {
counter atomic.Uint64 counter atomic.Uint64
} }
// universeBuffer holds per-output-universe state with its own lock
type universeBuffer struct {
mu sync.Mutex
data [512]byte
dirty bool
}
// Engine handles DMX channel remapping // Engine handles DMX channel remapping
type Engine struct { type Engine struct {
mappings []config.NormalizedMapping mappings []config.NormalizedMapping
bySource map[config.Universe]*sourceEntry bySource map[config.Universe]*sourceEntry
state map[config.Universe]*[512]byte outputs map[config.Universe]*universeBuffer
stateMu sync.Mutex
} }
// NewEngine creates a new remapping engine // NewEngine creates a new remapping engine
@@ -39,57 +45,70 @@ func NewEngine(mappings []config.NormalizedMapping) *Engine {
entry.mappings = append(entry.mappings, m) entry.mappings = append(entry.mappings, m)
} }
state := make(map[config.Universe]*[512]byte) outputs := map[config.Universe]*universeBuffer{}
for _, m := range mappings { for _, m := range mappings {
if _, ok := state[m.To]; !ok { if _, ok := outputs[m.To]; !ok {
state[m.To] = &[512]byte{} outputs[m.To] = &universeBuffer{}
} }
} }
return &Engine{ return &Engine{
mappings: mappings, mappings: mappings,
bySource: bySource, bySource: bySource,
state: state, outputs: outputs,
} }
} }
// Remap applies mappings to incoming DMX data and returns outputs // Remap applies mappings to incoming DMX data and marks affected outputs dirty
func (e *Engine) Remap(src config.Universe, srcData [512]byte) []Output { func (e *Engine) Remap(src config.Universe, srcData [512]byte) {
entry := e.bySource[src] entry := e.bySource[src]
if entry == nil { if entry == nil {
return nil return
} }
entry.counter.Add(1) entry.counter.Add(1)
e.stateMu.Lock()
defer e.stateMu.Unlock()
affected := make(map[config.Universe]bool)
for _, m := range entry.mappings { for _, m := range entry.mappings {
affected[m.To] = true e.applyMapping(m, srcData)
outState := e.state[m.To] }
}
for i := 0; i < m.Count; i++ { func (e *Engine) applyMapping(m config.NormalizedMapping, srcData [512]byte) {
srcChan := m.FromChan + i buf := e.outputs[m.To]
dstChan := m.ToChan + i buf.mu.Lock()
if srcChan < 512 && dstChan < 512 { defer buf.mu.Unlock()
outState[dstChan] = srcData[srcChan]
} for i := 0; i < m.Count; i++ {
srcChan := m.FromChan + i
dstChan := m.ToChan + i
if srcChan < 512 && dstChan < 512 {
buf.data[dstChan] = srcData[srcChan]
} }
} }
buf.dirty = true
}
result := make([]Output, 0, len(affected)) // GetDirtyOutputs returns outputs that have been modified since last call
for u := range affected { func (e *Engine) GetDirtyOutputs() []Output {
result = append(result, Output{ var result []Output
Universe: u, for u, buf := range e.outputs {
Data: *e.state[u], if out, ok := e.getDirtyOutput(u, buf); ok {
}) result = append(result, out)
}
} }
return result return result
} }
func (e *Engine) getDirtyOutput(u config.Universe, buf *universeBuffer) (Output, bool) {
buf.mu.Lock()
defer buf.mu.Unlock()
if !buf.dirty {
return Output{}, false
}
buf.dirty = false
return Output{Universe: u, Data: buf.data}, true
}
// SwapStats returns packet counts per source universe since last call and resets them // SwapStats returns packet counts per source universe since last call and resets them
func (e *Engine) SwapStats() map[config.Universe]uint64 { func (e *Engine) SwapStats() map[config.Universe]uint64 {
result := map[config.Universe]uint64{} result := map[config.Universe]uint64{}

View File

@@ -45,7 +45,8 @@ func FuzzRemap(f *testing.F) {
var srcData [512]byte var srcData [512]byte
copy(srcData[:], inputData[:512]) copy(srcData[:], inputData[:512])
outputs := engine.Remap(srcU, srcData) engine.Remap(srcU, srcData)
outputs := engine.GetDirtyOutputs()
if len(outputs) != 1 { if len(outputs) != 1 {
t.Fatalf("expected 1 output, got %d", len(outputs)) t.Fatalf("expected 1 output, got %d", len(outputs))
@@ -84,7 +85,8 @@ func FuzzRemapMultipleMappings(f *testing.F) {
var srcData [512]byte var srcData [512]byte
copy(srcData[:], inputData[:512]) copy(srcData[:], inputData[:512])
outputs := engine.Remap(srcU, srcData) engine.Remap(srcU, srcData)
outputs := engine.GetDirtyOutputs()
if len(outputs) != 2 { if len(outputs) != 2 {
t.Fatalf("expected 2 outputs, got %d", len(outputs)) t.Fatalf("expected 2 outputs, got %d", len(outputs))
@@ -129,9 +131,10 @@ func FuzzRemapUnmatchedUniverse(f *testing.F) {
var srcData [512]byte var srcData [512]byte
copy(srcData[:], inputData[:512]) copy(srcData[:], inputData[:512])
outputs := engine.Remap(otherU, srcData) engine.Remap(otherU, srcData)
if outputs != nil { outputs := engine.GetDirtyOutputs()
t.Fatalf("expected nil output for unmatched universe, got %d outputs", len(outputs)) if len(outputs) != 0 {
t.Fatalf("expected 0 outputs for unmatched universe, got %d outputs", len(outputs))
} }
}) })
} }