Clean up locking, remove Voter/Candidate entanglement

This commit is contained in:
Ian Gulliver
2023-06-07 22:48:38 -07:00
parent 0bcd5a0b06
commit 38f232f7aa
4 changed files with 34 additions and 70 deletions

View File

@@ -26,14 +26,14 @@ type Candidate struct {
votes map[string]*vote
state CandidateState
firstYes time.Time
mu sync.Mutex
mu sync.RWMutex
}
type CandidateState string
type CandidateState int
var (
StateLeader CandidateState = "LEADER"
StateNotLeader CandidateState = "NOT_LEADER"
const (
StateLeader CandidateState = iota
StateNotLeader
)
const (
@@ -52,6 +52,7 @@ func NewCandidate(numVoters int, signingKey string) *Candidate {
numVoters: numVoters,
signingKey: []byte(signingKey),
votes: map[string]*vote{},
state: StateNotLeader,
stop: make(chan bool),
done: make(chan bool),
c: change,
@@ -66,29 +67,22 @@ func NewCandidate(numVoters int, signingKey string) *Candidate {
}
func (c *Candidate) Stop() {
// Lock not required
close(c.stop)
<-c.done
}
func (c *Candidate) State() CandidateState {
c.mu.Lock()
defer c.mu.Unlock()
c.mu.RLock()
defer c.mu.RUnlock()
return c.state
}
func (c *Candidate) IsLeader() bool {
// Lock not required
return c.State() == StateLeader
}
func (c *Candidate) ServeHTTP(w http.ResponseWriter, r *http.Request) {
c.mu.Lock()
defer c.mu.Unlock()
if r.Method != http.MethodPost {
http.Error(
w,
@@ -162,9 +156,9 @@ func (c *Candidate) ServeHTTP(w http.ResponseWriter, r *http.Request) {
)
}
c.resp.ResponseSent = time.Now().UTC()
js = lo.Must(json.Marshal(c.resp))
resp := c.resp
resp.ResponseSent = time.Now().UTC()
js = lo.Must(json.Marshal(resp))
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Signature", mac(js, c.signingKey))
@@ -180,32 +174,29 @@ func (c *Candidate) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
c.vote(v)
c.elect(v)
}
func (c *Candidate) VoteIfNo(v vote) {
func (c *Candidate) elect(v *vote) {
c.mu.Lock()
defer c.mu.Unlock()
if v.LastSeenCandidateID == c.resp.CandidateID {
return
state := StateNotLeader
defer func() {
if c.state == state {
return
}
c.state = state
c.c <- state
}()
if v != nil {
v.received = time.Now()
c.votes[v.VoterID] = v
}
c.vote(&v)
}
func (c *Candidate) vote(v *vote) {
// Must hold lock to call
v.received = time.Now()
c.votes[v.VoterID] = v
c.elect()
}
func (c *Candidate) elect() {
// Must hold lock to call
no := 0
yes := 0
@@ -232,7 +223,7 @@ func (c *Candidate) elect() {
if no > 0 || yes <= c.numVoters/2 {
// We lost the vote
c.firstYes = time.Time{}
c.update(StateNotLeader)
state = StateNotLeader
return
}
@@ -244,28 +235,15 @@ func (c *Candidate) elect() {
if time.Since(c.firstYes) < leadershipWait {
// Not enough time in "yes" state
c.update(StateNotLeader)
state = StateNotLeader
return
}
// All checks passed
c.update(StateLeader)
}
func (c *Candidate) update(state CandidateState) {
// Must hold lock to call
if c.state == state {
return
}
c.state = state
c.c <- state
state = StateLeader
}
func (c *Candidate) loop() {
// Lock not required
t := time.NewTicker(1 * time.Second)
defer t.Stop()
defer close(c.done)
@@ -276,14 +254,7 @@ func (c *Candidate) loop() {
return
case <-t.C:
c.lockElect()
c.elect(nil)
}
}
}
func (c *Candidate) lockElect() {
c.mu.Lock()
defer c.mu.Unlock()
c.elect()
}

View File

@@ -66,7 +66,7 @@ func TestPartialVotes(t *testing.T) {
ts := NewTestSystem(t, 3)
defer ts.Stop()
ts.Voter(0).Stop()
ts.Proxy(0).SetRefuse(true)
require.False(t, ts.Candidate(0).IsLeader())
require.False(t, ts.Candidate(1).IsLeader())

View File

@@ -63,10 +63,7 @@ func NewTestSystem(t *testing.T, num int) *TestSystem {
for i := 0; i < num; i++ {
ts.servers = append(ts.servers, NewTestServer(t, ts.signingKey))
ts.proxies = append(ts.proxies, proxy.NewProxy(t, ts.Server(0).Addr()))
ts.voters = append(ts.voters, elect.NewVoter(ts.Proxy(i).HTTP(), ts.signingKey, ts.Candidate(i)))
}
for i := 0; i < num; i++ {
ts.voters = append(ts.voters, elect.NewVoter(ts.Proxy(i).HTTP(), ts.signingKey))
}
return ts

View File

@@ -20,7 +20,6 @@ type Voter struct {
// used by loop() goroutine only
client *resty.Client
signingKey []byte
candidate *Candidate
vote vote
period time.Duration
}
@@ -40,13 +39,12 @@ type voteResponse struct {
ResponseSent time.Time `json:"responseSent"`
}
func NewVoter(url string, signingKey string, candidate *Candidate) *Voter {
func NewVoter(url string, signingKey string) *Voter {
v := &Voter{
client: resty.New().
SetCloseConnection(true).
SetBaseURL(url),
signingKey: []byte(signingKey),
candidate: candidate,
update: make(chan time.Duration),
done: make(chan bool),
vote: vote{
@@ -112,8 +110,6 @@ func (v *Voter) poll() bool {
func (v *Voter) sendVote() {
v.vote.VoteSent = time.Now().UTC()
v.candidate.VoteIfNo(v.vote)
js := lo.Must(json.Marshal(v.vote))
resp, err := v.client.R().