From 38f232f7aa6e71c1a983a0690d11cc5ebfc9b311 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Wed, 7 Jun 2023 22:48:38 -0700 Subject: [PATCH] Clean up locking, remove Voter/Candidate entanglement --- candidate.go | 91 ++++++++++++++++++--------------------------------- elect_test.go | 2 +- lib_test.go | 5 +-- voter.go | 6 +--- 4 files changed, 34 insertions(+), 70 deletions(-) diff --git a/candidate.go b/candidate.go index 36e512a..1ee597f 100644 --- a/candidate.go +++ b/candidate.go @@ -26,14 +26,14 @@ type Candidate struct { votes map[string]*vote state CandidateState firstYes time.Time - mu sync.Mutex + mu sync.RWMutex } -type CandidateState string +type CandidateState int -var ( - StateLeader CandidateState = "LEADER" - StateNotLeader CandidateState = "NOT_LEADER" +const ( + StateLeader CandidateState = iota + StateNotLeader ) const ( @@ -52,6 +52,7 @@ func NewCandidate(numVoters int, signingKey string) *Candidate { numVoters: numVoters, signingKey: []byte(signingKey), votes: map[string]*vote{}, + state: StateNotLeader, stop: make(chan bool), done: make(chan bool), c: change, @@ -66,29 +67,22 @@ func NewCandidate(numVoters int, signingKey string) *Candidate { } func (c *Candidate) Stop() { - // Lock not required - close(c.stop) <-c.done } func (c *Candidate) State() CandidateState { - c.mu.Lock() - defer c.mu.Unlock() + c.mu.RLock() + defer c.mu.RUnlock() return c.state } func (c *Candidate) IsLeader() bool { - // Lock not required - return c.State() == StateLeader } func (c *Candidate) ServeHTTP(w http.ResponseWriter, r *http.Request) { - c.mu.Lock() - defer c.mu.Unlock() - if r.Method != http.MethodPost { http.Error( w, @@ -162,9 +156,9 @@ func (c *Candidate) ServeHTTP(w http.ResponseWriter, r *http.Request) { ) } - c.resp.ResponseSent = time.Now().UTC() - - js = lo.Must(json.Marshal(c.resp)) + resp := c.resp + resp.ResponseSent = time.Now().UTC() + js = lo.Must(json.Marshal(resp)) w.Header().Set("Content-Type", "application/json") w.Header().Set("Signature", mac(js, c.signingKey)) @@ -180,32 +174,29 @@ func (c *Candidate) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - c.vote(v) + c.elect(v) } -func (c *Candidate) VoteIfNo(v vote) { +func (c *Candidate) elect(v *vote) { c.mu.Lock() defer c.mu.Unlock() - if v.LastSeenCandidateID == c.resp.CandidateID { - return + state := StateNotLeader + + defer func() { + if c.state == state { + return + } + + c.state = state + c.c <- state + }() + + if v != nil { + v.received = time.Now() + c.votes[v.VoterID] = v } - c.vote(&v) -} - -func (c *Candidate) vote(v *vote) { - // Must hold lock to call - - v.received = time.Now() - c.votes[v.VoterID] = v - - c.elect() -} - -func (c *Candidate) elect() { - // Must hold lock to call - no := 0 yes := 0 @@ -232,7 +223,7 @@ func (c *Candidate) elect() { if no > 0 || yes <= c.numVoters/2 { // We lost the vote c.firstYes = time.Time{} - c.update(StateNotLeader) + state = StateNotLeader return } @@ -244,28 +235,15 @@ func (c *Candidate) elect() { if time.Since(c.firstYes) < leadershipWait { // Not enough time in "yes" state - c.update(StateNotLeader) + state = StateNotLeader return } // All checks passed - c.update(StateLeader) -} - -func (c *Candidate) update(state CandidateState) { - // Must hold lock to call - - if c.state == state { - return - } - - c.state = state - c.c <- state + state = StateLeader } func (c *Candidate) loop() { - // Lock not required - t := time.NewTicker(1 * time.Second) defer t.Stop() defer close(c.done) @@ -276,14 +254,7 @@ func (c *Candidate) loop() { return case <-t.C: - c.lockElect() + c.elect(nil) } } } - -func (c *Candidate) lockElect() { - c.mu.Lock() - defer c.mu.Unlock() - - c.elect() -} diff --git a/elect_test.go b/elect_test.go index 4fdd5cc..bf20311 100644 --- a/elect_test.go +++ b/elect_test.go @@ -66,7 +66,7 @@ func TestPartialVotes(t *testing.T) { ts := NewTestSystem(t, 3) defer ts.Stop() - ts.Voter(0).Stop() + ts.Proxy(0).SetRefuse(true) require.False(t, ts.Candidate(0).IsLeader()) require.False(t, ts.Candidate(1).IsLeader()) diff --git a/lib_test.go b/lib_test.go index ed9e534..deba06d 100644 --- a/lib_test.go +++ b/lib_test.go @@ -63,10 +63,7 @@ func NewTestSystem(t *testing.T, num int) *TestSystem { for i := 0; i < num; i++ { ts.servers = append(ts.servers, NewTestServer(t, ts.signingKey)) ts.proxies = append(ts.proxies, proxy.NewProxy(t, ts.Server(0).Addr())) - ts.voters = append(ts.voters, elect.NewVoter(ts.Proxy(i).HTTP(), ts.signingKey, ts.Candidate(i))) - } - - for i := 0; i < num; i++ { + ts.voters = append(ts.voters, elect.NewVoter(ts.Proxy(i).HTTP(), ts.signingKey)) } return ts diff --git a/voter.go b/voter.go index bc967f1..1e6a6c2 100644 --- a/voter.go +++ b/voter.go @@ -20,7 +20,6 @@ type Voter struct { // used by loop() goroutine only client *resty.Client signingKey []byte - candidate *Candidate vote vote period time.Duration } @@ -40,13 +39,12 @@ type voteResponse struct { ResponseSent time.Time `json:"responseSent"` } -func NewVoter(url string, signingKey string, candidate *Candidate) *Voter { +func NewVoter(url string, signingKey string) *Voter { v := &Voter{ client: resty.New(). SetCloseConnection(true). SetBaseURL(url), signingKey: []byte(signingKey), - candidate: candidate, update: make(chan time.Duration), done: make(chan bool), vote: vote{ @@ -112,8 +110,6 @@ func (v *Voter) poll() bool { func (v *Voter) sendVote() { v.vote.VoteSent = time.Now().UTC() - v.candidate.VoteIfNo(v.vote) - js := lo.Must(json.Marshal(v.vote)) resp, err := v.client.R().