2023-06-12 20:03:17 -07:00
|
|
|
package event
|
|
|
|
|
|
|
|
|
|
import (
|
2023-06-12 20:07:04 -07:00
|
|
|
"bytes"
|
|
|
|
|
"compress/gzip"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"log"
|
2023-06-12 20:03:17 -07:00
|
|
|
"math"
|
|
|
|
|
"math/rand"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/go-resty/resty/v2"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type Target struct {
|
|
|
|
|
client *resty.Client
|
|
|
|
|
writePeriodSeconds float64
|
|
|
|
|
windowSeconds float64
|
|
|
|
|
rateClasses []*rateClass
|
|
|
|
|
stop chan bool
|
2023-06-17 14:21:32 -07:00
|
|
|
done chan bool
|
2023-06-12 20:03:17 -07:00
|
|
|
lastEvent time.Time
|
|
|
|
|
events []*Event
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (target *Target) AddRateClass(grantRate float64, vals ...any) {
|
|
|
|
|
if len(vals)%2 != 0 {
|
|
|
|
|
panic(vals)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
erc := &rateClass{
|
|
|
|
|
grantRate: grantRate * target.windowSeconds,
|
|
|
|
|
criteria: map[string]any{},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for i := 0; i < len(vals); i += 2 {
|
|
|
|
|
erc.criteria[vals[i].(string)] = vals[i+1]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
target.rateClasses = append(target.rateClasses, erc)
|
|
|
|
|
}
|
|
|
|
|
|
2023-06-17 14:21:32 -07:00
|
|
|
func (target *Target) close() {
|
|
|
|
|
close(target.stop)
|
|
|
|
|
<-target.done
|
|
|
|
|
}
|
|
|
|
|
|
2023-06-12 20:03:17 -07:00
|
|
|
func (target *Target) writeEvent(ev *Event) {
|
|
|
|
|
now := time.Now()
|
|
|
|
|
secondsSinceLastEvent := now.Sub(target.lastEvent).Seconds()
|
|
|
|
|
target.lastEvent = now
|
|
|
|
|
|
|
|
|
|
// Example:
|
|
|
|
|
// windowSeconds = 100
|
|
|
|
|
// secondsSinceLastEvent = 25
|
|
|
|
|
// eventRateMultiplier = 0.75
|
|
|
|
|
eventRateMultiplier := (target.windowSeconds - secondsSinceLastEvent) / target.windowSeconds
|
|
|
|
|
|
|
|
|
|
maxProb := 0.0
|
|
|
|
|
|
|
|
|
|
for _, erc := range target.rateClasses {
|
|
|
|
|
if !erc.match(ev) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
erc.eventRate++
|
|
|
|
|
erc.eventRate *= eventRateMultiplier
|
|
|
|
|
|
|
|
|
|
classProb := erc.grantRate / erc.eventRate
|
|
|
|
|
maxProb = math.Max(maxProb, classProb)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if maxProb <= 0.0 || rand.Float64() > maxProb { //nolint:gosec
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ev2 := *ev
|
|
|
|
|
ev2.SampleRate = int64(math.Max(math.Round(1.0/maxProb), 1.0))
|
|
|
|
|
target.events = append(target.events, &ev2)
|
|
|
|
|
}
|
2023-06-12 20:07:04 -07:00
|
|
|
|
|
|
|
|
func (target *Target) flushLoop(c *Client) {
|
2023-06-17 14:21:32 -07:00
|
|
|
defer close(target.done)
|
|
|
|
|
|
2023-06-12 20:07:04 -07:00
|
|
|
t := time.NewTicker(time.Duration(target.writePeriodSeconds * float64(time.Second)))
|
|
|
|
|
defer t.Stop()
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-target.stop:
|
|
|
|
|
target.flush(c)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
case <-t.C:
|
|
|
|
|
target.flush(c)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (target *Target) flush(c *Client) {
|
|
|
|
|
c.mu.Lock()
|
|
|
|
|
events := target.events
|
|
|
|
|
target.events = nil
|
|
|
|
|
c.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
if len(events) == 0 {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
buf := &bytes.Buffer{}
|
|
|
|
|
g := gzip.NewWriter(buf)
|
|
|
|
|
enc := json.NewEncoder(g)
|
|
|
|
|
|
|
|
|
|
err := enc.Encode(events)
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err = g.Close()
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
resp, err := target.client.R().
|
|
|
|
|
SetHeader("Content-Type", "application/json").
|
|
|
|
|
SetHeader("Content-Encoding", "gzip").
|
|
|
|
|
SetBody(buf).
|
|
|
|
|
Post("")
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Printf("failed write to event target: %s", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if resp.IsError() {
|
|
|
|
|
log.Printf("failed write to event target: %d %s: %s", resp.StatusCode(), resp.Status(), resp.String())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|