Integrate event into candidate

This commit is contained in:
Ian Gulliver
2023-06-17 15:05:42 -07:00
parent 6dfcf57109
commit 11ddf35e81
2 changed files with 32 additions and 20 deletions

View File

@@ -1,23 +1,26 @@
package elect package elect
import ( import (
"context"
"crypto/hmac" "crypto/hmac"
"encoding/json" "encoding/json"
"flag" "flag"
"fmt" "fmt"
"io" "io"
"log"
"net/http" "net/http"
"sync" "sync"
"time" "time"
"github.com/dchest/uniuri" "github.com/dchest/uniuri"
"github.com/gopatchy/event"
"github.com/samber/lo" "github.com/samber/lo"
) )
type Candidate struct { type Candidate struct {
C <-chan CandidateState C <-chan CandidateState
ec *event.Client
numVoters int numVoters int
signingKey []byte signingKey []byte
stop chan bool stop chan bool
@@ -53,11 +56,12 @@ var StateName = map[CandidateState]string{
var electForceState = flag.String("elect-force-state", "", "'', 'leader', 'notleader'") var electForceState = flag.String("elect-force-state", "", "'', 'leader', 'notleader'")
func NewCandidate(numVoters int, signingKey string) *Candidate { func NewCandidate(ctx context.Context, ec *event.Client, numVoters int, signingKey string) *Candidate {
change := make(chan CandidateState, 100) change := make(chan CandidateState, 100)
c := &Candidate{ c := &Candidate{
C: change, C: change,
ec: ec,
numVoters: numVoters, numVoters: numVoters,
signingKey: []byte(signingKey), signingKey: []byte(signingKey),
votes: map[string]*vote{}, votes: map[string]*vote{},
@@ -72,11 +76,15 @@ func NewCandidate(numVoters int, signingKey string) *Candidate {
} }
if c.forceState != StateUndefined { if c.forceState != StateUndefined {
c.log("state forced to %s", StateName[c.forceState]) c.log(ctx, ec,
"event", "state forced",
"newState", StateName[c.forceState],
)
c.state = c.forceState c.state = c.forceState
} }
go c.loop() go c.loop(ctx, ec)
return c return c
} }
@@ -189,10 +197,10 @@ func (c *Candidate) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
c.elect(v) c.elect(r.Context(), c.ec, v)
} }
func (c *Candidate) elect(v *vote) { func (c *Candidate) elect(ctx context.Context, ec *event.Client, v *vote) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
@@ -205,13 +213,14 @@ func (c *Candidate) elect(v *vote) {
return return
} }
c.log( c.log(ctx, ec,
"transitioning %s -> %s (no=%d yes=%d max_no=0 min_yes=%d)", "event", "transitioning state",
StateName[c.state], "oldState", StateName[c.state],
StateName[state], "newState", StateName[state],
no, "noVotes", no,
yes, "yesVotes", yes,
c.numVoters/2+1, "maxNo", 0,
"minYes", c.numVoters/2+1,
) )
c.state = state c.state = state
@@ -275,7 +284,7 @@ func (c *Candidate) elect(v *vote) {
state = StateLeader state = StateLeader
} }
func (c *Candidate) loop() { func (c *Candidate) loop(ctx context.Context, ec *event.Client) {
t := time.NewTicker(1 * time.Second) t := time.NewTicker(1 * time.Second)
defer t.Stop() defer t.Stop()
defer close(c.done) defer close(c.done)
@@ -286,13 +295,16 @@ func (c *Candidate) loop() {
return return
case <-t.C: case <-t.C:
c.elect(nil) c.elect(ctx, ec, nil)
} }
} }
} }
func (c *Candidate) log(format string, args ...any) { func (c *Candidate) log(ctx context.Context, ec *event.Client, vals ...any) {
log.Printf("[candidate] "+format, args...) ec.Log(ctx, append([]any{
"library", "elect",
"subsystem", "candidate",
}, vals...)...)
} }
func getForceState() CandidateState { func getForceState() CandidateState {

View File

@@ -33,9 +33,9 @@ type Waiter struct {
chans []<-chan bool chans []<-chan bool
} }
func NewTestServer(t *testing.T, numVoters int, signingKey string) *TestServer { func NewTestServer(ctx context.Context, t *testing.T, ec *event.Client, numVoters int, signingKey string) *TestServer {
ts := &TestServer{ ts := &TestServer{
Candidate: elect.NewCandidate(numVoters, signingKey), Candidate: elect.NewCandidate(ctx, ec, numVoters, signingKey),
listener: lo.Must(net.ListenTCP("tcp", nil)), listener: lo.Must(net.ListenTCP("tcp", nil)),
} }
@@ -72,7 +72,7 @@ func NewTestSystem(t *testing.T, numCandidates, numVoters int) *TestSystem {
} }
for i := 0; i < numCandidates; i++ { for i := 0; i < numCandidates; i++ {
ts.servers = append(ts.servers, NewTestServer(t, numVoters, ts.signingKey)) ts.servers = append(ts.servers, NewTestServer(ctx, t, ec, numVoters, ts.signingKey))
} }
for i := 0; i < numVoters; i++ { for i := 0; i < numVoters; i++ {