diff --git a/go.mod b/go.mod index d99fdba..5dfa7b8 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.20 require ( github.com/dchest/uniuri v1.2.0 github.com/go-resty/resty/v2 v2.7.0 + github.com/gopatchy/event v0.0.0-20230617212132-152b30a10ad7 github.com/gopatchy/proxy v0.0.0-20230608062432-7adc14d65656 github.com/samber/lo v1.38.1 github.com/stretchr/testify v1.8.4 diff --git a/go.sum b/go.sum index eb8c0c9..c6ccf1a 100644 --- a/go.sum +++ b/go.sum @@ -5,6 +5,8 @@ github.com/dchest/uniuri v1.2.0 h1:koIcOUdrTIivZgSLhHQvKgqdWZq5d7KdMEWF1Ud6+5g= github.com/dchest/uniuri v1.2.0/go.mod h1:fSzm4SLHzNZvWLvWJew423PhAzkpNQYq+uNLq4kxhkY= github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY= github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I= +github.com/gopatchy/event v0.0.0-20230617212132-152b30a10ad7 h1:xSRLvyWogantNY6mybpX+mViJgJZaBmsI4fuyT94N+E= +github.com/gopatchy/event v0.0.0-20230617212132-152b30a10ad7/go.mod h1:SplGDUsTiPon8FasSqw3eogKiNgHuoV0kHu+jG2E2xo= github.com/gopatchy/proxy v0.0.0-20230608062432-7adc14d65656 h1:q1ENffcg6/E0gbraIY39aGZ9To+1vGFxk8ZNbbqD/jI= github.com/gopatchy/proxy v0.0.0-20230608062432-7adc14d65656/go.mod h1:k0bdf6Mh7yJIAeO1dogSND27OpXn6w+KS9TUPKUQAHI= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= diff --git a/lib_test.go b/lib_test.go index 448f288..cf63e2c 100644 --- a/lib_test.go +++ b/lib_test.go @@ -1,6 +1,7 @@ package elect_test import ( + "context" "net" "net/http" "testing" @@ -8,6 +9,7 @@ import ( "github.com/dchest/uniuri" "github.com/gopatchy/elect" + "github.com/gopatchy/event" "github.com/gopatchy/proxy" "github.com/samber/lo" "github.com/stretchr/testify/require" @@ -60,6 +62,11 @@ func (ts *TestServer) Addr() *net.TCPAddr { } func NewTestSystem(t *testing.T, numCandidates, numVoters int) *TestSystem { + ctx := context.Background() + + ec := event.New() + defer ec.Close() + ts := &TestSystem{ signingKey: uniuri.New(), } @@ -70,7 +77,7 @@ func NewTestSystem(t *testing.T, numCandidates, numVoters int) *TestSystem { for i := 0; i < numVoters; i++ { ts.proxies = append(ts.proxies, proxy.NewProxy(t, ts.Server(0).Addr())) - ts.voters = append(ts.voters, elect.NewVoter(ts.Proxy(i).HTTP(), ts.signingKey)) + ts.voters = append(ts.voters, elect.NewVoter(ctx, ec, ts.Proxy(i).HTTP(), ts.signingKey)) } return ts diff --git a/voter.go b/voter.go index bce7754..542d772 100644 --- a/voter.go +++ b/voter.go @@ -1,6 +1,7 @@ package elect import ( + "context" "crypto/hmac" "encoding/json" "log" @@ -8,6 +9,7 @@ import ( "github.com/dchest/uniuri" "github.com/go-resty/resty/v2" + "github.com/gopatchy/event" "github.com/samber/lo" ) @@ -38,7 +40,7 @@ type voteResponse struct { ResponseSent time.Time `json:"responseSent"` } -func NewVoter(url string, signingKey string) *Voter { +func NewVoter(ctx context.Context, ec *event.Client, url string, signingKey string) *Voter { v := &Voter{ client: resty.New(). SetCloseConnection(true). @@ -52,7 +54,7 @@ func NewVoter(url string, signingKey string) *Voter { period: maxVotePeriod, } - go v.loop() + go v.loop(ctx, ec) return v } @@ -67,17 +69,17 @@ func (v *Voter) Stop() { v.update = nil } -func (v *Voter) loop() { +func (v *Voter) loop(ctx context.Context, ec *event.Client) { defer close(v.done) for { - if !v.poll() { + if !v.poll(ctx, ec) { break } } } -func (v *Voter) poll() bool { +func (v *Voter) poll(ctx context.Context, ec *event.Client) bool { t := time.NewTimer(RandDurationN(v.period)) defer t.Stop() @@ -90,10 +92,10 @@ func (v *Voter) poll() bool { select { case <-t.C: - v.sendVote() + v.sendVote(ctx, ec) case <-t2.C: - v.sendVote() + v.sendVote(ctx, ec) case period, ok := <-v.update: if !ok { @@ -106,7 +108,7 @@ func (v *Voter) poll() bool { return true } -func (v *Voter) sendVote() { +func (v *Voter) sendVote(ctx context.Context, ec *event.Client) { v.vote.VoteSent = time.Now().UTC() js := lo.Must(json.Marshal(v.vote)) @@ -126,7 +128,10 @@ func (v *Voter) sendVote() { } if resp.IsError() { - v.log("response: %d", resp.StatusCode()) + v.log(ctx, ec, + "event", "error", + "response", resp.StatusCode(), + ) v.vote.NumPollsSinceChange = 0 @@ -135,12 +140,20 @@ func (v *Voter) sendVote() { sig := resp.Header().Get("Signature") if sig == "" { - v.log("missing Signature response header") + v.log(ctx, ec, + "event", "error", + "error", "missing Signature response header", + ) + return } if !hmac.Equal([]byte(sig), []byte(mac(resp.Body(), v.signingKey))) { - v.log("invalid Signature response header") + v.log(ctx, ec, + "event", "error", + "error", "invalid Signature response header", + ) + return } @@ -148,12 +161,19 @@ func (v *Voter) sendVote() { err = json.Unmarshal(resp.Body(), vr) if err != nil { - v.log("invalid response: %s", resp.String()) + v.log(ctx, ec, + "event", "error", + "error", err, + ) + return } if time.Since(vr.ResponseSent).Abs() > 15*time.Second { - v.log("excessive time difference (%.1f seconds); delay, replay, or clock skew", time.Since(vr.ResponseSent).Seconds()) + v.log(ctx, ec, + "event", "error", + "error", "excessive time difference", + ) } if vr.CandidateID == v.vote.LastSeenCandidateID { @@ -164,6 +184,9 @@ func (v *Voter) sendVote() { } } -func (v *Voter) log(format string, args ...any) { - log.Printf("[voter] "+format, args...) +func (v *Voter) log(ctx context.Context, ec *event.Client, vals ...any) { + ec.Log(ctx, append([]any{ + "library", "elect", + "subsystem", "voter", + }, vals...)...) }