Files
elect/candidate.go

339 lines
5.7 KiB
Go
Raw Normal View History

2023-05-31 22:40:48 -07:00
package elect
import (
2023-06-17 15:05:42 -07:00
"context"
2023-05-31 22:40:48 -07:00
"crypto/hmac"
"encoding/json"
2023-06-11 19:14:34 -07:00
"flag"
2023-05-31 22:40:48 -07:00
"fmt"
"io"
"net/http"
"sync"
"time"
"github.com/dchest/uniuri"
2023-06-17 15:05:42 -07:00
"github.com/gopatchy/event"
2023-06-03 21:12:48 -07:00
"github.com/samber/lo"
2023-05-31 22:40:48 -07:00
)
type Candidate struct {
C <-chan CandidateState
2023-06-17 15:05:42 -07:00
ec *event.Client
2023-05-31 22:40:48 -07:00
numVoters int
signingKey []byte
stop chan bool
done chan bool
2023-05-31 22:40:48 -07:00
resp voteResponse
c chan<- CandidateState
2023-06-11 19:38:39 -07:00
votes map[string]*vote
state CandidateState
forceState CandidateState
firstYes time.Time
mu sync.RWMutex
2023-05-31 22:40:48 -07:00
}
type CandidateState int
2023-05-31 22:40:48 -07:00
const (
2023-06-11 19:38:39 -07:00
StateUndefined CandidateState = iota
StateLeader
StateNotLeader
2023-05-31 22:40:48 -07:00
maxVotePeriod = 5 * time.Second
voteTimeout = 10 * time.Second
leadershipWait = 15 * time.Second
maxFastVotePeriod = 100 * time.Millisecond
)
2023-06-07 23:22:53 -07:00
var StateName = map[CandidateState]string{
StateLeader: "LEADER",
StateNotLeader: "NOT_LEADER",
}
2023-06-11 19:14:34 -07:00
var electForceState = flag.String("elect-force-state", "", "'', 'leader', 'notleader'")
2023-06-17 15:05:42 -07:00
func NewCandidate(ctx context.Context, ec *event.Client, numVoters int, signingKey string) *Candidate {
2023-05-31 22:40:48 -07:00
change := make(chan CandidateState, 100)
c := &Candidate{
C: change,
2023-06-17 15:05:42 -07:00
ec: ec,
2023-05-31 22:40:48 -07:00
numVoters: numVoters,
signingKey: []byte(signingKey),
votes: map[string]*vote{},
state: StateNotLeader,
2023-06-11 19:38:39 -07:00
forceState: getForceState(),
stop: make(chan bool),
done: make(chan bool),
2023-05-31 22:40:48 -07:00
c: change,
resp: voteResponse{
CandidateID: uniuri.New(),
},
}
2023-06-11 19:38:39 -07:00
if c.forceState != StateUndefined {
2023-06-17 15:05:42 -07:00
c.log(ctx, ec,
"event", "state forced",
"newState", StateName[c.forceState],
)
2023-06-11 19:38:39 -07:00
c.state = c.forceState
}
2023-06-17 15:05:42 -07:00
go c.loop(ctx, ec)
2023-05-31 22:40:48 -07:00
return c
}
func (c *Candidate) Stop() {
close(c.stop)
<-c.done
}
func (c *Candidate) State() CandidateState {
c.mu.RLock()
defer c.mu.RUnlock()
2023-05-31 22:40:48 -07:00
return c.state
}
2023-06-01 06:43:35 -07:00
func (c *Candidate) IsLeader() bool {
return c.State() == StateLeader
}
2023-05-31 22:40:48 -07:00
func (c *Candidate) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(
w,
fmt.Sprintf("method %s not supported", r.Method),
http.StatusMethodNotAllowed,
)
return
}
sig := r.Header.Get("Signature")
if sig == "" {
http.Error(
w,
"missing Signature header",
http.StatusBadRequest,
)
return
}
if r.Header.Get("Content-Type") != "application/json" {
http.Error(
w,
fmt.Sprintf("Content-Type %s not supported", r.Header.Get("Content-Type")),
http.StatusUnsupportedMediaType,
)
return
}
js, err := io.ReadAll(r.Body)
if err != nil {
http.Error(
w,
fmt.Sprintf("can't read request body: %s", err),
http.StatusBadRequest,
)
return
}
if !hmac.Equal([]byte(sig), []byte(mac(js, c.signingKey))) {
http.Error(
w,
"Signature verification failed",
http.StatusBadRequest,
)
return
}
v := &vote{}
err = json.Unmarshal(js, v)
if err != nil {
http.Error(
w,
fmt.Sprintf("can't parse request body: %s", err),
http.StatusBadRequest,
)
return
}
2023-06-03 21:51:07 -07:00
if time.Since(v.VoteSent).Abs() > 15*time.Second {
http.Error(
w,
fmt.Sprintf("excessive time difference (%.1f seconds); delay, replay, or clock skew", time.Since(v.VoteSent).Seconds()),
http.StatusBadRequest,
)
}
resp := c.resp
resp.ResponseSent = time.Now().UTC()
js = lo.Must(json.Marshal(resp))
2023-06-01 06:43:35 -07:00
2023-06-03 21:12:48 -07:00
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Signature", mac(js, c.signingKey))
2023-05-31 22:40:48 -07:00
2023-06-03 21:12:48 -07:00
_, err = w.Write(js)
2023-05-31 22:40:48 -07:00
if err != nil {
http.Error(
w,
fmt.Sprintf("can't write response: %s", err),
http.StatusInternalServerError,
)
return
}
2023-06-17 15:05:42 -07:00
c.elect(r.Context(), c.ec, v)
2023-05-31 22:40:48 -07:00
}
2023-06-17 15:05:42 -07:00
func (c *Candidate) elect(ctx context.Context, ec *event.Client, v *vote) {
2023-06-04 13:44:26 -07:00
c.mu.Lock()
defer c.mu.Unlock()
2023-05-31 22:40:48 -07:00
state := StateNotLeader
2023-06-07 23:22:53 -07:00
no := 0
yes := 0
2023-05-31 22:40:48 -07:00
defer func() {
if c.state == state {
return
}
2023-06-04 13:44:26 -07:00
2023-06-17 15:05:42 -07:00
c.log(ctx, ec,
"event", "transitioning state",
"oldState", StateName[c.state],
"newState", StateName[state],
"noVotes", no,
"yesVotes", yes,
"maxNo", 0,
"minYes", c.numVoters/2+1,
2023-06-07 23:22:53 -07:00
)
c.state = state
select {
case c.c <- state:
default:
}
}()
2023-05-31 22:40:48 -07:00
2023-06-11 19:38:39 -07:00
if c.forceState != StateUndefined {
2023-06-17 08:41:22 -07:00
state = c.forceState
2023-06-11 19:14:34 -07:00
return
}
if v != nil {
v.received = time.Now()
c.votes[v.VoterID] = v
}
2023-06-04 13:44:26 -07:00
2023-05-31 22:40:48 -07:00
for key, vote := range c.votes {
if time.Since(vote.received) > voteTimeout {
// Remove stale vote from consideration
2023-05-31 22:40:48 -07:00
delete(c.votes, key)
continue
}
if vote.LastSeenCandidateID != c.resp.CandidateID {
// Hard no; voted for someone else
2023-05-31 22:40:48 -07:00
no++
}
if vote.NumPollsSinceChange < 10 {
// Soft no; voted for us but not enough times in a row
2023-05-31 22:40:48 -07:00
continue
}
yes++
}
2023-06-07 23:22:53 -07:00
if no > 0 || yes < c.numVoters/2+1 {
// We lost the vote
c.firstYes = time.Time{}
state = StateNotLeader
return
2023-05-31 22:40:48 -07:00
}
if c.firstYes.IsZero() {
// First round of "yes" voting since the last "no" vote
c.firstYes = time.Now()
}
if time.Since(c.firstYes) < leadershipWait {
// Not enough time in "yes" state
state = StateNotLeader
return
}
// All checks passed
state = StateLeader
2023-05-31 22:40:48 -07:00
}
2023-06-17 15:05:42 -07:00
func (c *Candidate) loop(ctx context.Context, ec *event.Client) {
2023-05-31 22:40:48 -07:00
t := time.NewTicker(1 * time.Second)
defer t.Stop()
defer close(c.done)
2023-05-31 22:40:48 -07:00
2023-06-17 15:09:33 -07:00
defer func() {
c.log(ctx, ec,
"event", "stop",
)
}()
c.log(ctx, ec,
"event", "start",
)
2023-05-31 22:40:48 -07:00
for {
select {
case <-c.stop:
2023-05-31 22:40:48 -07:00
return
case <-t.C:
2023-06-17 15:05:42 -07:00
c.elect(ctx, ec, nil)
2023-05-31 22:40:48 -07:00
}
}
}
2023-06-11 19:38:39 -07:00
2023-06-17 15:05:42 -07:00
func (c *Candidate) log(ctx context.Context, ec *event.Client, vals ...any) {
ec.Log(ctx, append([]any{
"library", "elect",
"subsystem", "candidate",
}, vals...)...)
2023-06-11 20:07:01 -07:00
}
2023-06-11 19:38:39 -07:00
func getForceState() CandidateState {
switch *electForceState {
case "":
return StateUndefined
case "leader":
return StateLeader
case "not-leader":
fallthrough
case "not_leader":
fallthrough
case "notleader":
return StateNotLeader
default:
panic("invalid --elect-force-state")
}
}