From 11ddf35e811080a53c57b50cfe495b1b6e9879cd Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Sat, 17 Jun 2023 15:05:42 -0700 Subject: [PATCH] Integrate event into candidate --- candidate.go | 46 +++++++++++++++++++++++++++++----------------- lib_test.go | 6 +++--- 2 files changed, 32 insertions(+), 20 deletions(-) diff --git a/candidate.go b/candidate.go index 4907e3b..cda1526 100644 --- a/candidate.go +++ b/candidate.go @@ -1,23 +1,26 @@ package elect import ( + "context" "crypto/hmac" "encoding/json" "flag" "fmt" "io" - "log" "net/http" "sync" "time" "github.com/dchest/uniuri" + "github.com/gopatchy/event" "github.com/samber/lo" ) type Candidate struct { C <-chan CandidateState + ec *event.Client + numVoters int signingKey []byte stop chan bool @@ -53,11 +56,12 @@ var StateName = map[CandidateState]string{ var electForceState = flag.String("elect-force-state", "", "'', 'leader', 'notleader'") -func NewCandidate(numVoters int, signingKey string) *Candidate { +func NewCandidate(ctx context.Context, ec *event.Client, numVoters int, signingKey string) *Candidate { change := make(chan CandidateState, 100) c := &Candidate{ C: change, + ec: ec, numVoters: numVoters, signingKey: []byte(signingKey), votes: map[string]*vote{}, @@ -72,11 +76,15 @@ func NewCandidate(numVoters int, signingKey string) *Candidate { } if c.forceState != StateUndefined { - c.log("state forced to %s", StateName[c.forceState]) + c.log(ctx, ec, + "event", "state forced", + "newState", StateName[c.forceState], + ) + c.state = c.forceState } - go c.loop() + go c.loop(ctx, ec) return c } @@ -189,10 +197,10 @@ func (c *Candidate) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - c.elect(v) + c.elect(r.Context(), c.ec, v) } -func (c *Candidate) elect(v *vote) { +func (c *Candidate) elect(ctx context.Context, ec *event.Client, v *vote) { c.mu.Lock() defer c.mu.Unlock() @@ -205,13 +213,14 @@ func (c *Candidate) elect(v *vote) { return } - c.log( - "transitioning %s -> %s (no=%d yes=%d max_no=0 min_yes=%d)", - StateName[c.state], - StateName[state], - no, - yes, - c.numVoters/2+1, + c.log(ctx, ec, + "event", "transitioning state", + "oldState", StateName[c.state], + "newState", StateName[state], + "noVotes", no, + "yesVotes", yes, + "maxNo", 0, + "minYes", c.numVoters/2+1, ) c.state = state @@ -275,7 +284,7 @@ func (c *Candidate) elect(v *vote) { state = StateLeader } -func (c *Candidate) loop() { +func (c *Candidate) loop(ctx context.Context, ec *event.Client) { t := time.NewTicker(1 * time.Second) defer t.Stop() defer close(c.done) @@ -286,13 +295,16 @@ func (c *Candidate) loop() { return case <-t.C: - c.elect(nil) + c.elect(ctx, ec, nil) } } } -func (c *Candidate) log(format string, args ...any) { - log.Printf("[candidate] "+format, args...) +func (c *Candidate) log(ctx context.Context, ec *event.Client, vals ...any) { + ec.Log(ctx, append([]any{ + "library", "elect", + "subsystem", "candidate", + }, vals...)...) } func getForceState() CandidateState { diff --git a/lib_test.go b/lib_test.go index cf63e2c..ae33dc7 100644 --- a/lib_test.go +++ b/lib_test.go @@ -33,9 +33,9 @@ type Waiter struct { chans []<-chan bool } -func NewTestServer(t *testing.T, numVoters int, signingKey string) *TestServer { +func NewTestServer(ctx context.Context, t *testing.T, ec *event.Client, numVoters int, signingKey string) *TestServer { ts := &TestServer{ - Candidate: elect.NewCandidate(numVoters, signingKey), + Candidate: elect.NewCandidate(ctx, ec, numVoters, signingKey), listener: lo.Must(net.ListenTCP("tcp", nil)), } @@ -72,7 +72,7 @@ func NewTestSystem(t *testing.T, numCandidates, numVoters int) *TestSystem { } for i := 0; i < numCandidates; i++ { - ts.servers = append(ts.servers, NewTestServer(t, numVoters, ts.signingKey)) + ts.servers = append(ts.servers, NewTestServer(ctx, t, ec, numVoters, ts.signingKey)) } for i := 0; i < numVoters; i++ {