Files
event/target.go
2023-06-17 14:21:32 -07:00

139 lines
2.6 KiB
Go

package event
import (
"bytes"
"compress/gzip"
"encoding/json"
"log"
"math"
"math/rand"
"time"
"github.com/go-resty/resty/v2"
)
type Target struct {
client *resty.Client
writePeriodSeconds float64
windowSeconds float64
rateClasses []*rateClass
stop chan bool
done chan bool
lastEvent time.Time
events []*Event
}
func (target *Target) AddRateClass(grantRate float64, vals ...any) {
if len(vals)%2 != 0 {
panic(vals)
}
erc := &rateClass{
grantRate: grantRate * target.windowSeconds,
criteria: map[string]any{},
}
for i := 0; i < len(vals); i += 2 {
erc.criteria[vals[i].(string)] = vals[i+1]
}
target.rateClasses = append(target.rateClasses, erc)
}
func (target *Target) close() {
close(target.stop)
<-target.done
}
func (target *Target) writeEvent(ev *Event) {
now := time.Now()
secondsSinceLastEvent := now.Sub(target.lastEvent).Seconds()
target.lastEvent = now
// Example:
// windowSeconds = 100
// secondsSinceLastEvent = 25
// eventRateMultiplier = 0.75
eventRateMultiplier := (target.windowSeconds - secondsSinceLastEvent) / target.windowSeconds
maxProb := 0.0
for _, erc := range target.rateClasses {
if !erc.match(ev) {
continue
}
erc.eventRate++
erc.eventRate *= eventRateMultiplier
classProb := erc.grantRate / erc.eventRate
maxProb = math.Max(maxProb, classProb)
}
if maxProb <= 0.0 || rand.Float64() > maxProb { //nolint:gosec
return
}
ev2 := *ev
ev2.SampleRate = int64(math.Max(math.Round(1.0/maxProb), 1.0))
target.events = append(target.events, &ev2)
}
func (target *Target) flushLoop(c *Client) {
defer close(target.done)
t := time.NewTicker(time.Duration(target.writePeriodSeconds * float64(time.Second)))
defer t.Stop()
for {
select {
case <-target.stop:
target.flush(c)
return
case <-t.C:
target.flush(c)
}
}
}
func (target *Target) flush(c *Client) {
c.mu.Lock()
events := target.events
target.events = nil
c.mu.Unlock()
if len(events) == 0 {
return
}
buf := &bytes.Buffer{}
g := gzip.NewWriter(buf)
enc := json.NewEncoder(g)
err := enc.Encode(events)
if err != nil {
panic(err)
}
err = g.Close()
if err != nil {
panic(err)
}
resp, err := target.client.R().
SetHeader("Content-Type", "application/json").
SetHeader("Content-Encoding", "gzip").
SetBody(buf).
Post("")
if err != nil {
log.Printf("failed write to event target: %s", err)
return
}
if resp.IsError() {
log.Printf("failed write to event target: %d %s: %s", resp.StatusCode(), resp.Status(), resp.String())
return
}
}