From b44b1f4c1fe923003665f8ae7c3a3e104b68bce4 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Mon, 12 Jun 2023 20:07:04 -0700 Subject: [PATCH] Put target loop inside target --- client.go | 61 +------------------------------------------------------ target.go | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 60 deletions(-) diff --git a/client.go b/client.go index 609cd8c..9cf6fd3 100644 --- a/client.go +++ b/client.go @@ -1,10 +1,7 @@ package event import ( - "bytes" - "compress/gzip" "context" - "encoding/json" "fmt" "log" "strings" @@ -36,7 +33,7 @@ func (c *Client) AddTarget(url string, headers map[string]string, writePeriodSec lastEvent: time.Now(), } - go c.flushLoop(target) + go target.flushLoop(c) c.mu.Lock() defer c.mu.Unlock() @@ -89,59 +86,3 @@ func (c *Client) writeEvent(ctx context.Context, ev *Event) { target.writeEvent(ev) } } - -func (c *Client) flushLoop(target *Target) { - t := time.NewTicker(time.Duration(target.writePeriodSeconds * float64(time.Second))) - defer t.Stop() - - for { - select { - case <-target.stop: - c.flush(target) - return - - case <-t.C: - c.flush(target) - } - } -} - -func (c *Client) flush(target *Target) { - c.mu.Lock() - events := target.events - target.events = nil - c.mu.Unlock() - - if len(events) == 0 { - return - } - - buf := &bytes.Buffer{} - g := gzip.NewWriter(buf) - enc := json.NewEncoder(g) - - err := enc.Encode(events) - if err != nil { - panic(err) - } - - err = g.Close() - if err != nil { - panic(err) - } - - resp, err := target.client.R(). - SetHeader("Content-Type", "application/json"). - SetHeader("Content-Encoding", "gzip"). - SetBody(buf). - Post("") - if err != nil { - log.Printf("failed write to event target: %s", err) - return - } - - if resp.IsError() { - log.Printf("failed write to event target: %d %s: %s", resp.StatusCode(), resp.Status(), resp.String()) - return - } -} diff --git a/target.go b/target.go index f2c5da0..bf5916d 100644 --- a/target.go +++ b/target.go @@ -1,6 +1,10 @@ package event import ( + "bytes" + "compress/gzip" + "encoding/json" + "log" "math" "math/rand" "time" @@ -68,3 +72,59 @@ func (target *Target) writeEvent(ev *Event) { ev2.SampleRate = int64(math.Max(math.Round(1.0/maxProb), 1.0)) target.events = append(target.events, &ev2) } + +func (target *Target) flushLoop(c *Client) { + t := time.NewTicker(time.Duration(target.writePeriodSeconds * float64(time.Second))) + defer t.Stop() + + for { + select { + case <-target.stop: + target.flush(c) + return + + case <-t.C: + target.flush(c) + } + } +} + +func (target *Target) flush(c *Client) { + c.mu.Lock() + events := target.events + target.events = nil + c.mu.Unlock() + + if len(events) == 0 { + return + } + + buf := &bytes.Buffer{} + g := gzip.NewWriter(buf) + enc := json.NewEncoder(g) + + err := enc.Encode(events) + if err != nil { + panic(err) + } + + err = g.Close() + if err != nil { + panic(err) + } + + resp, err := target.client.R(). + SetHeader("Content-Type", "application/json"). + SetHeader("Content-Encoding", "gzip"). + SetBody(buf). + Post("") + if err != nil { + log.Printf("failed write to event target: %s", err) + return + } + + if resp.IsError() { + log.Printf("failed write to event target: %d %s: %s", resp.StatusCode(), resp.Status(), resp.String()) + return + } +}