Files
elect/candidate.go
Ian Gulliver 842c62dae8 TODO
2023-06-18 22:42:10 -07:00

352 lines
6.1 KiB
Go

package elect
import (
"context"
"crypto/hmac"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"sync"
"time"
"github.com/dchest/uniuri"
"github.com/gopatchy/event"
"github.com/samber/lo"
)
// TODO: Add event hook to export isLeader
type Candidate struct {
C <-chan CandidateState
ec *event.Client
numVoters int
signingKey []byte
stop chan bool
done chan bool
resp voteResponse
c chan<- CandidateState
votes map[string]*vote
state CandidateState
forceState CandidateState
firstYes time.Time
mu sync.RWMutex
}
type CandidateState int
const (
StateUndefined CandidateState = iota
StateLeader
StateNotLeader
maxVotePeriod = 5 * time.Second
voteTimeout = 10 * time.Second
leadershipWait = 15 * time.Second
maxFastVotePeriod = 100 * time.Millisecond
)
var StateName = map[CandidateState]string{
StateLeader: "LEADER",
StateNotLeader: "NOT_LEADER",
}
var electForceState = flag.String("elect-force-state", "", "'', 'leader', 'notleader'")
func NewCandidate(ctx context.Context, ec *event.Client, numVoters int, signingKey string) *Candidate {
change := make(chan CandidateState, 100)
c := &Candidate{
C: change,
ec: ec,
numVoters: numVoters,
signingKey: []byte(signingKey),
votes: map[string]*vote{},
state: StateNotLeader,
forceState: getForceState(),
stop: make(chan bool),
done: make(chan bool),
c: change,
resp: voteResponse{
CandidateID: uniuri.New(),
},
}
if c.forceState != StateUndefined {
c.log(ctx, ec,
"event", "state forced",
"newState", StateName[c.forceState],
)
c.state = c.forceState
}
go c.loop(ctx, ec)
return c
}
func (c *Candidate) Stop() {
close(c.stop)
<-c.done
}
func (c *Candidate) State() CandidateState {
c.mu.RLock()
defer c.mu.RUnlock()
return c.state
}
func (c *Candidate) IsLeader() bool {
return c.State() == StateLeader
}
func (c *Candidate) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(
w,
fmt.Sprintf("method %s not supported", r.Method),
http.StatusMethodNotAllowed,
)
return
}
sig := r.Header.Get("Signature")
if sig == "" {
http.Error(
w,
"missing Signature header",
http.StatusBadRequest,
)
return
}
if r.Header.Get("Content-Type") != "application/json" {
http.Error(
w,
fmt.Sprintf("Content-Type %s not supported", r.Header.Get("Content-Type")),
http.StatusUnsupportedMediaType,
)
return
}
js, err := io.ReadAll(r.Body)
if err != nil {
http.Error(
w,
fmt.Sprintf("can't read request body: %s", err),
http.StatusBadRequest,
)
return
}
if !hmac.Equal([]byte(sig), []byte(mac(js, c.signingKey))) {
http.Error(
w,
"Signature verification failed",
http.StatusBadRequest,
)
return
}
v := &vote{}
err = json.Unmarshal(js, v)
if err != nil {
http.Error(
w,
fmt.Sprintf("can't parse request body: %s", err),
http.StatusBadRequest,
)
return
}
if time.Since(v.VoteSent).Abs() > 15*time.Second {
http.Error(
w,
fmt.Sprintf("excessive time difference (%.1f seconds); delay, replay, or clock skew", time.Since(v.VoteSent).Seconds()),
http.StatusBadRequest,
)
}
resp := c.resp
resp.ResponseSent = time.Now().UTC()
js = lo.Must(json.Marshal(resp))
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Signature", mac(js, c.signingKey))
_, err = w.Write(js)
if err != nil {
http.Error(
w,
fmt.Sprintf("can't write response: %s", err),
http.StatusInternalServerError,
)
return
}
c.elect(r.Context(), c.ec, v)
}
func (c *Candidate) elect(ctx context.Context, ec *event.Client, v *vote) {
c.mu.Lock()
defer c.mu.Unlock()
state := StateNotLeader
no := 0
yes := 0
defer func() {
if c.state == state {
return
}
// TODO: goCamelCase event names (throughout)
c.log(ctx, ec,
"event", "transitioning state",
"oldState", StateName[c.state],
"newState", StateName[state],
"noVotes", no,
"yesVotes", yes,
"maxNo", 0,
"minYes", c.numVoters/2+1,
)
c.state = state
select {
case c.c <- state:
default:
}
}()
if c.forceState != StateUndefined {
state = c.forceState
return
}
if v != nil {
v.received = time.Now()
c.votes[v.VoterID] = v
}
for key, vote := range c.votes {
if time.Since(vote.received) > voteTimeout {
// Remove stale vote from consideration
delete(c.votes, key)
continue
}
if vote.LastSeenCandidateID != "" && vote.LastSeenCandidateID != c.resp.CandidateID {
// Hard no; voted for someone else
no++
if c.state == StateLeader {
c.log(ctx, ec,
"event", "noVoteWhileLeader",
"voterID", vote.VoterID,
"otherCandidateID", vote.LastSeenCandidateID,
"numPollsSincechange", vote.NumPollsSinceChange,
)
}
}
if vote.NumPollsSinceChange < 10 {
// Soft no; voted for us but not enough times in a row
continue
}
yes++
}
if no > 0 || yes < c.numVoters/2+1 {
// We lost the vote
c.firstYes = time.Time{}
state = StateNotLeader
return
}
if c.firstYes.IsZero() {
// First round of "yes" voting since the last "no" vote
c.firstYes = time.Now()
}
if time.Since(c.firstYes) < leadershipWait {
// Not enough time in "yes" state
state = StateNotLeader
return
}
// All checks passed
state = StateLeader
}
func (c *Candidate) loop(ctx context.Context, ec *event.Client) {
t := time.NewTicker(1 * time.Second)
defer t.Stop()
defer close(c.done)
defer func() {
c.log(ctx, ec,
"event", "stop",
)
}()
c.log(ctx, ec,
"event", "start",
)
for {
select {
case <-c.stop:
return
case <-t.C:
c.elect(ctx, ec, nil)
}
}
}
func (c *Candidate) log(ctx context.Context, ec *event.Client, vals ...any) {
ec.Log(ctx, append([]any{
"library", "elect",
"subsystem", "candidate",
"candidateID", c.resp.CandidateID,
}, vals...)...)
}
func getForceState() CandidateState {
switch *electForceState {
case "":
return StateUndefined
case "leader":
return StateLeader
case "not-leader":
fallthrough
case "not_leader":
fallthrough
case "notleader":
return StateNotLeader
default:
panic("invalid --elect-force-state")
}
}