Integrate event into voter
This commit is contained in:
1
go.mod
1
go.mod
@@ -5,6 +5,7 @@ go 1.20
|
|||||||
require (
|
require (
|
||||||
github.com/dchest/uniuri v1.2.0
|
github.com/dchest/uniuri v1.2.0
|
||||||
github.com/go-resty/resty/v2 v2.7.0
|
github.com/go-resty/resty/v2 v2.7.0
|
||||||
|
github.com/gopatchy/event v0.0.0-20230617212132-152b30a10ad7
|
||||||
github.com/gopatchy/proxy v0.0.0-20230608062432-7adc14d65656
|
github.com/gopatchy/proxy v0.0.0-20230608062432-7adc14d65656
|
||||||
github.com/samber/lo v1.38.1
|
github.com/samber/lo v1.38.1
|
||||||
github.com/stretchr/testify v1.8.4
|
github.com/stretchr/testify v1.8.4
|
||||||
|
|||||||
2
go.sum
2
go.sum
@@ -5,6 +5,8 @@ github.com/dchest/uniuri v1.2.0 h1:koIcOUdrTIivZgSLhHQvKgqdWZq5d7KdMEWF1Ud6+5g=
|
|||||||
github.com/dchest/uniuri v1.2.0/go.mod h1:fSzm4SLHzNZvWLvWJew423PhAzkpNQYq+uNLq4kxhkY=
|
github.com/dchest/uniuri v1.2.0/go.mod h1:fSzm4SLHzNZvWLvWJew423PhAzkpNQYq+uNLq4kxhkY=
|
||||||
github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY=
|
github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY=
|
||||||
github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I=
|
github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I=
|
||||||
|
github.com/gopatchy/event v0.0.0-20230617212132-152b30a10ad7 h1:xSRLvyWogantNY6mybpX+mViJgJZaBmsI4fuyT94N+E=
|
||||||
|
github.com/gopatchy/event v0.0.0-20230617212132-152b30a10ad7/go.mod h1:SplGDUsTiPon8FasSqw3eogKiNgHuoV0kHu+jG2E2xo=
|
||||||
github.com/gopatchy/proxy v0.0.0-20230608062432-7adc14d65656 h1:q1ENffcg6/E0gbraIY39aGZ9To+1vGFxk8ZNbbqD/jI=
|
github.com/gopatchy/proxy v0.0.0-20230608062432-7adc14d65656 h1:q1ENffcg6/E0gbraIY39aGZ9To+1vGFxk8ZNbbqD/jI=
|
||||||
github.com/gopatchy/proxy v0.0.0-20230608062432-7adc14d65656/go.mod h1:k0bdf6Mh7yJIAeO1dogSND27OpXn6w+KS9TUPKUQAHI=
|
github.com/gopatchy/proxy v0.0.0-20230608062432-7adc14d65656/go.mod h1:k0bdf6Mh7yJIAeO1dogSND27OpXn6w+KS9TUPKUQAHI=
|
||||||
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
|
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package elect_test
|
package elect_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"testing"
|
"testing"
|
||||||
@@ -8,6 +9,7 @@ import (
|
|||||||
|
|
||||||
"github.com/dchest/uniuri"
|
"github.com/dchest/uniuri"
|
||||||
"github.com/gopatchy/elect"
|
"github.com/gopatchy/elect"
|
||||||
|
"github.com/gopatchy/event"
|
||||||
"github.com/gopatchy/proxy"
|
"github.com/gopatchy/proxy"
|
||||||
"github.com/samber/lo"
|
"github.com/samber/lo"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
@@ -60,6 +62,11 @@ func (ts *TestServer) Addr() *net.TCPAddr {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewTestSystem(t *testing.T, numCandidates, numVoters int) *TestSystem {
|
func NewTestSystem(t *testing.T, numCandidates, numVoters int) *TestSystem {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
ec := event.New()
|
||||||
|
defer ec.Close()
|
||||||
|
|
||||||
ts := &TestSystem{
|
ts := &TestSystem{
|
||||||
signingKey: uniuri.New(),
|
signingKey: uniuri.New(),
|
||||||
}
|
}
|
||||||
@@ -70,7 +77,7 @@ func NewTestSystem(t *testing.T, numCandidates, numVoters int) *TestSystem {
|
|||||||
|
|
||||||
for i := 0; i < numVoters; i++ {
|
for i := 0; i < numVoters; i++ {
|
||||||
ts.proxies = append(ts.proxies, proxy.NewProxy(t, ts.Server(0).Addr()))
|
ts.proxies = append(ts.proxies, proxy.NewProxy(t, ts.Server(0).Addr()))
|
||||||
ts.voters = append(ts.voters, elect.NewVoter(ts.Proxy(i).HTTP(), ts.signingKey))
|
ts.voters = append(ts.voters, elect.NewVoter(ctx, ec, ts.Proxy(i).HTTP(), ts.signingKey))
|
||||||
}
|
}
|
||||||
|
|
||||||
return ts
|
return ts
|
||||||
|
|||||||
53
voter.go
53
voter.go
@@ -1,6 +1,7 @@
|
|||||||
package elect
|
package elect
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"crypto/hmac"
|
"crypto/hmac"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"log"
|
"log"
|
||||||
@@ -8,6 +9,7 @@ import (
|
|||||||
|
|
||||||
"github.com/dchest/uniuri"
|
"github.com/dchest/uniuri"
|
||||||
"github.com/go-resty/resty/v2"
|
"github.com/go-resty/resty/v2"
|
||||||
|
"github.com/gopatchy/event"
|
||||||
"github.com/samber/lo"
|
"github.com/samber/lo"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -38,7 +40,7 @@ type voteResponse struct {
|
|||||||
ResponseSent time.Time `json:"responseSent"`
|
ResponseSent time.Time `json:"responseSent"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewVoter(url string, signingKey string) *Voter {
|
func NewVoter(ctx context.Context, ec *event.Client, url string, signingKey string) *Voter {
|
||||||
v := &Voter{
|
v := &Voter{
|
||||||
client: resty.New().
|
client: resty.New().
|
||||||
SetCloseConnection(true).
|
SetCloseConnection(true).
|
||||||
@@ -52,7 +54,7 @@ func NewVoter(url string, signingKey string) *Voter {
|
|||||||
period: maxVotePeriod,
|
period: maxVotePeriod,
|
||||||
}
|
}
|
||||||
|
|
||||||
go v.loop()
|
go v.loop(ctx, ec)
|
||||||
|
|
||||||
return v
|
return v
|
||||||
}
|
}
|
||||||
@@ -67,17 +69,17 @@ func (v *Voter) Stop() {
|
|||||||
v.update = nil
|
v.update = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Voter) loop() {
|
func (v *Voter) loop(ctx context.Context, ec *event.Client) {
|
||||||
defer close(v.done)
|
defer close(v.done)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if !v.poll() {
|
if !v.poll(ctx, ec) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Voter) poll() bool {
|
func (v *Voter) poll(ctx context.Context, ec *event.Client) bool {
|
||||||
t := time.NewTimer(RandDurationN(v.period))
|
t := time.NewTimer(RandDurationN(v.period))
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
|
|
||||||
@@ -90,10 +92,10 @@ func (v *Voter) poll() bool {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
v.sendVote()
|
v.sendVote(ctx, ec)
|
||||||
|
|
||||||
case <-t2.C:
|
case <-t2.C:
|
||||||
v.sendVote()
|
v.sendVote(ctx, ec)
|
||||||
|
|
||||||
case period, ok := <-v.update:
|
case period, ok := <-v.update:
|
||||||
if !ok {
|
if !ok {
|
||||||
@@ -106,7 +108,7 @@ func (v *Voter) poll() bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Voter) sendVote() {
|
func (v *Voter) sendVote(ctx context.Context, ec *event.Client) {
|
||||||
v.vote.VoteSent = time.Now().UTC()
|
v.vote.VoteSent = time.Now().UTC()
|
||||||
|
|
||||||
js := lo.Must(json.Marshal(v.vote))
|
js := lo.Must(json.Marshal(v.vote))
|
||||||
@@ -126,7 +128,10 @@ func (v *Voter) sendVote() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if resp.IsError() {
|
if resp.IsError() {
|
||||||
v.log("response: %d", resp.StatusCode())
|
v.log(ctx, ec,
|
||||||
|
"event", "error",
|
||||||
|
"response", resp.StatusCode(),
|
||||||
|
)
|
||||||
|
|
||||||
v.vote.NumPollsSinceChange = 0
|
v.vote.NumPollsSinceChange = 0
|
||||||
|
|
||||||
@@ -135,12 +140,20 @@ func (v *Voter) sendVote() {
|
|||||||
|
|
||||||
sig := resp.Header().Get("Signature")
|
sig := resp.Header().Get("Signature")
|
||||||
if sig == "" {
|
if sig == "" {
|
||||||
v.log("missing Signature response header")
|
v.log(ctx, ec,
|
||||||
|
"event", "error",
|
||||||
|
"error", "missing Signature response header",
|
||||||
|
)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if !hmac.Equal([]byte(sig), []byte(mac(resp.Body(), v.signingKey))) {
|
if !hmac.Equal([]byte(sig), []byte(mac(resp.Body(), v.signingKey))) {
|
||||||
v.log("invalid Signature response header")
|
v.log(ctx, ec,
|
||||||
|
"event", "error",
|
||||||
|
"error", "invalid Signature response header",
|
||||||
|
)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -148,12 +161,19 @@ func (v *Voter) sendVote() {
|
|||||||
|
|
||||||
err = json.Unmarshal(resp.Body(), vr)
|
err = json.Unmarshal(resp.Body(), vr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
v.log("invalid response: %s", resp.String())
|
v.log(ctx, ec,
|
||||||
|
"event", "error",
|
||||||
|
"error", err,
|
||||||
|
)
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if time.Since(vr.ResponseSent).Abs() > 15*time.Second {
|
if time.Since(vr.ResponseSent).Abs() > 15*time.Second {
|
||||||
v.log("excessive time difference (%.1f seconds); delay, replay, or clock skew", time.Since(vr.ResponseSent).Seconds())
|
v.log(ctx, ec,
|
||||||
|
"event", "error",
|
||||||
|
"error", "excessive time difference",
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
if vr.CandidateID == v.vote.LastSeenCandidateID {
|
if vr.CandidateID == v.vote.LastSeenCandidateID {
|
||||||
@@ -164,6 +184,9 @@ func (v *Voter) sendVote() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Voter) log(format string, args ...any) {
|
func (v *Voter) log(ctx context.Context, ec *event.Client, vals ...any) {
|
||||||
log.Printf("[voter] "+format, args...)
|
ec.Log(ctx, append([]any{
|
||||||
|
"library", "elect",
|
||||||
|
"subsystem", "voter",
|
||||||
|
}, vals...)...)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user