From 6ff8003f696120b7908e43a9defe29dd3a0f58e4 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Mon, 12 Jun 2023 20:03:17 -0700 Subject: [PATCH] Initial copy in --- .golangci.yaml | 8 --- client.go | 147 +++++++++++++++++++++++++++++++++++++++++++++++++ event.go | 36 ++++++++++++ go.mod | 4 ++ go.sum | 9 +++ hooks.go | 84 ++++++++++++++++++++++++++++ rateclass.go | 18 ++++++ target.go | 70 +++++++++++++++++++++++ 8 files changed, 368 insertions(+), 8 deletions(-) create mode 100644 client.go create mode 100644 go.sum create mode 100644 hooks.go create mode 100644 rateclass.go create mode 100644 target.go diff --git a/.golangci.yaml b/.golangci.yaml index acc1cc1..9aa3e2e 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -36,11 +36,3 @@ linters: - thelper - varcheck - varnamelen -linters-settings: - tagliatelle: - case: - use-field-name: true - rules: - json: goCamel - wsl: - allow-separated-leading-comment: true diff --git a/client.go b/client.go new file mode 100644 index 0000000..609cd8c --- /dev/null +++ b/client.go @@ -0,0 +1,147 @@ +package event + +import ( + "bytes" + "compress/gzip" + "context" + "encoding/json" + "fmt" + "log" + "strings" + "sync" + "time" + + "github.com/go-resty/resty/v2" +) + +type Client struct { + targets []*Target + hooks []Hook + + mu sync.Mutex +} + +type Hook func(context.Context, *Event) + +func New() *Client { + return &Client{} +} + +func (c *Client) AddTarget(url string, headers map[string]string, writePeriodSeconds float64) *Target { + target := &Target{ + client: resty.New().SetBaseURL(url).SetHeaders(headers), + writePeriodSeconds: writePeriodSeconds, + windowSeconds: 100.0, + stop: make(chan bool), + lastEvent: time.Now(), + } + + go c.flushLoop(target) + + c.mu.Lock() + defer c.mu.Unlock() + + c.targets = append(c.targets, target) + + return target +} + +func (c *Client) AddHook(hook Hook) { + c.mu.Lock() + defer c.mu.Unlock() + + c.hooks = append(c.hooks, hook) +} + +func (c *Client) Log(ctx context.Context, vals ...any) { + ev := newEvent("log", vals...) + c.writeEvent(ctx, ev) + + parts := []string{} + + for i := 0; i < len(vals); i += 2 { + parts = append(parts, fmt.Sprintf("%s=%s", vals[i], vals[i+1])) + } + + log.Print(strings.Join(parts, " ")) +} + +func (c *Client) Close() { + c.mu.Lock() + defer c.mu.Unlock() + + for _, target := range c.targets { + close(target.stop) + } +} + +func (c *Client) writeEvent(ctx context.Context, ev *Event) { + ev.Set("durationMS", time.Since(ev.start).Milliseconds()) + + c.mu.Lock() + defer c.mu.Unlock() + + for _, hook := range c.hooks { + hook(ctx, ev) + } + + for _, target := range c.targets { + target.writeEvent(ev) + } +} + +func (c *Client) flushLoop(target *Target) { + t := time.NewTicker(time.Duration(target.writePeriodSeconds * float64(time.Second))) + defer t.Stop() + + for { + select { + case <-target.stop: + c.flush(target) + return + + case <-t.C: + c.flush(target) + } + } +} + +func (c *Client) flush(target *Target) { + c.mu.Lock() + events := target.events + target.events = nil + c.mu.Unlock() + + if len(events) == 0 { + return + } + + buf := &bytes.Buffer{} + g := gzip.NewWriter(buf) + enc := json.NewEncoder(g) + + err := enc.Encode(events) + if err != nil { + panic(err) + } + + err = g.Close() + if err != nil { + panic(err) + } + + resp, err := target.client.R(). + SetHeader("Content-Type", "application/json"). + SetHeader("Content-Encoding", "gzip"). + SetBody(buf). + Post("") + if err != nil { + log.Printf("failed write to event target: %s", err) + return + } + + if resp.IsError() { + log.Printf("failed write to event target: %d %s: %s", resp.StatusCode(), resp.Status(), resp.String()) + return + } +} diff --git a/event.go b/event.go index 0e4b82e..7dd24f6 100644 --- a/event.go +++ b/event.go @@ -1 +1,37 @@ package event + +import "time" + +type Event struct { + start time.Time + + Time string `json:"time"` + SampleRate int64 `json:"samplerate"` + Data map[string]any `json:"data"` +} + +func newEvent(eventType string, vals ...any) *Event { + now := time.Now() + + ev := &Event{ + start: now, + Time: now.Format(time.RFC3339Nano), + Data: map[string]any{ + "type": eventType, + }, + } + + ev.Set(vals...) + + return ev +} + +func (ev *Event) Set(vals ...any) { + if len(vals)%2 != 0 { + panic(vals) + } + + for i := 0; i < len(vals); i += 2 { + ev.Data[vals[i].(string)] = vals[i+1] + } +} diff --git a/go.mod b/go.mod index 3e03d3e..8bb400e 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,7 @@ module github.com/gopatchy/event go 1.20 + +require github.com/go-resty/resty/v2 v2.7.0 + +require golang.org/x/net v0.0.0-20211029224645-99673261e6eb // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..a5c1a24 --- /dev/null +++ b/go.sum @@ -0,0 +1,9 @@ +github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY= +github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I= +golang.org/x/net v0.0.0-20211029224645-99673261e6eb h1:pirldcYWx7rx7kE5r+9WsOXPXK0+WH5+uZ7uPmJ44uM= +golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/hooks.go b/hooks.go new file mode 100644 index 0000000..828259e --- /dev/null +++ b/hooks.go @@ -0,0 +1,84 @@ +package event + +import ( + "context" + "runtime/debug" + "runtime/metrics" + "strings" + "syscall" + "time" + "unicode" +) + +func HookBuildInfo(_ context.Context, ev *Event) { + buildInfo, ok := debug.ReadBuildInfo() + if !ok { + panic("ReadBuildInfo() failed") + } + + ev.Set( + "goVersion", buildInfo.GoVersion, + "goPackagePath", buildInfo.Path, + "goMainModuleVersion", buildInfo.Main.Version, + ) +} + +func HookMetrics(_ context.Context, ev *Event) { + descs := metrics.All() + + samples := make([]metrics.Sample, len(descs)) + for i := range samples { + samples[i].Name = descs[i].Name + } + + metrics.Read(samples) + + for _, sample := range samples { + name := convertMetricName(sample.Name) + + switch sample.Value.Kind() { //nolint:exhaustive + case metrics.KindUint64: + ev.Set(name, sample.Value.Uint64()) + case metrics.KindFloat64: + ev.Set(name, sample.Value.Float64()) + } + } +} + +func HookRUsage(_ context.Context, ev *Event) { + rusage := &syscall.Rusage{} + + err := syscall.Getrusage(syscall.RUSAGE_SELF, rusage) + if err != nil { + panic(err) + } + + ev.Set( + "rUsageUTime", time.Duration(rusage.Utime.Nano()).Seconds(), + "rUsageSTime", time.Duration(rusage.Stime.Nano()).Seconds(), + ) +} + +func convertMetricName(in string) string { + upperNext := false + + in = strings.TrimLeft(in, "/") + + ret := strings.Builder{} + + for _, r := range in { + if !unicode.IsLetter(r) && !unicode.IsDigit(r) { + upperNext = true + continue + } + + if upperNext { + r = unicode.ToUpper(r) + upperNext = false + } + + ret.WriteRune(r) + } + + return ret.String() +} diff --git a/rateclass.go b/rateclass.go new file mode 100644 index 0000000..5e810cd --- /dev/null +++ b/rateclass.go @@ -0,0 +1,18 @@ +package event + +type rateClass struct { + grantRate float64 + criteria map[string]any + + eventRate float64 +} + +func (rc *rateClass) match(ev *Event) bool { + for k, v := range rc.criteria { + if ev.Data[k] != v { + return false + } + } + + return true +} diff --git a/target.go b/target.go new file mode 100644 index 0000000..f2c5da0 --- /dev/null +++ b/target.go @@ -0,0 +1,70 @@ +package event + +import ( + "math" + "math/rand" + "time" + + "github.com/go-resty/resty/v2" +) + +type Target struct { + client *resty.Client + writePeriodSeconds float64 + windowSeconds float64 + rateClasses []*rateClass + stop chan bool + lastEvent time.Time + events []*Event +} + +func (target *Target) AddRateClass(grantRate float64, vals ...any) { + if len(vals)%2 != 0 { + panic(vals) + } + + erc := &rateClass{ + grantRate: grantRate * target.windowSeconds, + criteria: map[string]any{}, + } + + for i := 0; i < len(vals); i += 2 { + erc.criteria[vals[i].(string)] = vals[i+1] + } + + target.rateClasses = append(target.rateClasses, erc) +} + +func (target *Target) writeEvent(ev *Event) { + now := time.Now() + secondsSinceLastEvent := now.Sub(target.lastEvent).Seconds() + target.lastEvent = now + + // Example: + // windowSeconds = 100 + // secondsSinceLastEvent = 25 + // eventRateMultiplier = 0.75 + eventRateMultiplier := (target.windowSeconds - secondsSinceLastEvent) / target.windowSeconds + + maxProb := 0.0 + + for _, erc := range target.rateClasses { + if !erc.match(ev) { + continue + } + + erc.eventRate++ + erc.eventRate *= eventRateMultiplier + + classProb := erc.grantRate / erc.eventRate + maxProb = math.Max(maxProb, classProb) + } + + if maxProb <= 0.0 || rand.Float64() > maxProb { //nolint:gosec + return + } + + ev2 := *ev + ev2.SampleRate = int64(math.Max(math.Round(1.0/maxProb), 1.0)) + target.events = append(target.events, &ev2) +}