Put target loop inside target
This commit is contained in:
61
client.go
61
client.go
@@ -1,10 +1,7 @@
|
|||||||
package event
|
package event
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"compress/gzip"
|
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -36,7 +33,7 @@ func (c *Client) AddTarget(url string, headers map[string]string, writePeriodSec
|
|||||||
lastEvent: time.Now(),
|
lastEvent: time.Now(),
|
||||||
}
|
}
|
||||||
|
|
||||||
go c.flushLoop(target)
|
go target.flushLoop(c)
|
||||||
|
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
@@ -89,59 +86,3 @@ func (c *Client) writeEvent(ctx context.Context, ev *Event) {
|
|||||||
target.writeEvent(ev)
|
target.writeEvent(ev)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) flushLoop(target *Target) {
|
|
||||||
t := time.NewTicker(time.Duration(target.writePeriodSeconds * float64(time.Second)))
|
|
||||||
defer t.Stop()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-target.stop:
|
|
||||||
c.flush(target)
|
|
||||||
return
|
|
||||||
|
|
||||||
case <-t.C:
|
|
||||||
c.flush(target)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) flush(target *Target) {
|
|
||||||
c.mu.Lock()
|
|
||||||
events := target.events
|
|
||||||
target.events = nil
|
|
||||||
c.mu.Unlock()
|
|
||||||
|
|
||||||
if len(events) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
buf := &bytes.Buffer{}
|
|
||||||
g := gzip.NewWriter(buf)
|
|
||||||
enc := json.NewEncoder(g)
|
|
||||||
|
|
||||||
err := enc.Encode(events)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = g.Close()
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := target.client.R().
|
|
||||||
SetHeader("Content-Type", "application/json").
|
|
||||||
SetHeader("Content-Encoding", "gzip").
|
|
||||||
SetBody(buf).
|
|
||||||
Post("")
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("failed write to event target: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.IsError() {
|
|
||||||
log.Printf("failed write to event target: %d %s: %s", resp.StatusCode(), resp.Status(), resp.String())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
60
target.go
60
target.go
@@ -1,6 +1,10 @@
|
|||||||
package event
|
package event
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"compress/gzip"
|
||||||
|
"encoding/json"
|
||||||
|
"log"
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"time"
|
"time"
|
||||||
@@ -68,3 +72,59 @@ func (target *Target) writeEvent(ev *Event) {
|
|||||||
ev2.SampleRate = int64(math.Max(math.Round(1.0/maxProb), 1.0))
|
ev2.SampleRate = int64(math.Max(math.Round(1.0/maxProb), 1.0))
|
||||||
target.events = append(target.events, &ev2)
|
target.events = append(target.events, &ev2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (target *Target) flushLoop(c *Client) {
|
||||||
|
t := time.NewTicker(time.Duration(target.writePeriodSeconds * float64(time.Second)))
|
||||||
|
defer t.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-target.stop:
|
||||||
|
target.flush(c)
|
||||||
|
return
|
||||||
|
|
||||||
|
case <-t.C:
|
||||||
|
target.flush(c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (target *Target) flush(c *Client) {
|
||||||
|
c.mu.Lock()
|
||||||
|
events := target.events
|
||||||
|
target.events = nil
|
||||||
|
c.mu.Unlock()
|
||||||
|
|
||||||
|
if len(events) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := &bytes.Buffer{}
|
||||||
|
g := gzip.NewWriter(buf)
|
||||||
|
enc := json.NewEncoder(g)
|
||||||
|
|
||||||
|
err := enc.Encode(events)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = g.Close()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := target.client.R().
|
||||||
|
SetHeader("Content-Type", "application/json").
|
||||||
|
SetHeader("Content-Encoding", "gzip").
|
||||||
|
SetBody(buf).
|
||||||
|
Post("")
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("failed write to event target: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.IsError() {
|
||||||
|
log.Printf("failed write to event target: %d %s: %s", resp.StatusCode(), resp.Status(), resp.String())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user