From 6d46cf96fe015ae9354659547e3dea51f510a49e Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Thu, 20 Apr 2023 18:29:49 +0000 Subject: [PATCH] Initial commit --- .gitignore | 2 + .golangci.yaml | 38 +++++++++++++ bus.go | 136 ++++++++++++++++++++++++++++++++++++++++++++ bus_test.go | 149 +++++++++++++++++++++++++++++++++++++++++++++++++ go.mod | 16 ++++++ go.sum | 26 +++++++++ justfile | 18 ++++++ pkg_test.go | 11 ++++ 8 files changed, 396 insertions(+) create mode 100644 .gitignore create mode 100644 .golangci.yaml create mode 100644 bus.go create mode 100644 bus_test.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 justfile create mode 100644 pkg_test.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6754c7d --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +cover.out +cover.html diff --git a/.golangci.yaml b/.golangci.yaml new file mode 100644 index 0000000..9aa3e2e --- /dev/null +++ b/.golangci.yaml @@ -0,0 +1,38 @@ +linters: + enable-all: true + disable: + # re-enable when working + - rowserrcheck + - wastedassign + # maybe enable these + - wrapcheck + # leave these disabled + - cyclop + - deadcode + - dupl + - exhaustivestruct + - exhaustruct + - forbidigo + - forcetypeassert + - funlen + - gochecknoglobals + - gocognit + - goconst + - godox + - golint + - gomnd + - ifshort + - interfacer + - lll + - maintidx + - maligned + - nilnil + - nestif + - nlreturn + - nolintlint + - nosnakecase + - scopelint + - structcheck + - thelper + - varcheck + - varnamelen diff --git a/bus.go b/bus.go new file mode 100644 index 0000000..6b6eef6 --- /dev/null +++ b/bus.go @@ -0,0 +1,136 @@ +package bus + +import ( + "fmt" + "reflect" + "sync" + + "github.com/gopatchy/metadata" +) + +type Bus struct { + mu sync.Mutex + keyViews map[string]map[uintptr]chan<- any + typeViews map[string]map[uintptr]chan<- any +} + +func NewBus() *Bus { + return &Bus{ + keyViews: map[string]map[uintptr]chan<- any{}, + typeViews: map[string]map[uintptr]chan<- any{}, + } +} + +func (b *Bus) Announce(t string, obj any) { + key := getObjKey(t, obj) + + b.mu.Lock() + defer b.mu.Unlock() + + announce(obj, b.keyViews[key]) + announce(obj, b.typeViews[t]) +} + +func (b *Bus) Delete(t string, id string) { + key := getKey(t, id) + + b.mu.Lock() + defer b.mu.Unlock() + + for _, c := range b.keyViews[key] { + close(c) + } + + delete(b.keyViews, key) + + announce(id, b.typeViews[t]) +} + +func (b *Bus) SubscribeKey(t, id string, initial any) <-chan any { + key := getKey(t, id) + + b.mu.Lock() + defer b.mu.Unlock() + + ret := make(chan any, 100) + + ret <- initial + + if _, has := b.keyViews[key]; !has { + b.keyViews[key] = map[uintptr]chan<- any{} + } + + b.keyViews[key][chanID(ret)] = ret + + return ret +} + +func (b *Bus) SubscribeType(t string, initial any) <-chan any { + b.mu.Lock() + defer b.mu.Unlock() + + ret := make(chan any, 100) + + ret <- initial + + if _, has := b.typeViews[t]; !has { + b.typeViews[t] = map[uintptr]chan<- any{} + } + + b.typeViews[t][chanID(ret)] = ret + + return ret +} + +func (b *Bus) UnsubscribeKey(t, id string, c <-chan any) { + key := getKey(t, id) + + b.mu.Lock() + defer b.mu.Unlock() + + if cw, has := b.keyViews[key][chanID(c)]; has { + close(cw) + delete(b.keyViews[key], chanID(c)) + } + + if len(b.keyViews[key]) == 0 { + delete(b.keyViews, key) + } +} + +func (b *Bus) UnsubscribeType(t string, c <-chan any) { + b.mu.Lock() + defer b.mu.Unlock() + + if cw, has := b.typeViews[t][chanID(c)]; has { + close(cw) + delete(b.typeViews[t], chanID(c)) + } + + if len(b.typeViews[t]) == 0 { + delete(b.typeViews, t) + } +} + +func getObjKey(t string, obj any) string { + return getKey(t, metadata.GetMetadata(obj).ID) +} + +func getKey(t string, id string) string { + return fmt.Sprintf("%s:%s", t, id) +} + +func announce(obj any, chans map[uintptr]chan<- any) { + for id, c := range chans { + select { + case c <- obj: + default: + close(c) + delete(chans, id) + } + } +} + +func chanID(c <-chan any) uintptr { + return reflect.ValueOf(c).Pointer() +} diff --git a/bus_test.go b/bus_test.go new file mode 100644 index 0000000..bd00a64 --- /dev/null +++ b/bus_test.go @@ -0,0 +1,149 @@ +package bus_test + +import ( + "testing" + + "github.com/gopatchy/bus" + "github.com/gopatchy/metadata" + "github.com/stretchr/testify/require" +) + +func TestBus(t *testing.T) { + t.Parallel() + + bus := bus.NewBus() + + // Announce with no subscribers + bus.Announce("busTest1", &busTest{ + Metadata: metadata.Metadata{ + ID: "id-nosub", + }, + }) + + // Complex subscription layout + c1a := bus.SubscribeKey("busTest1", "id-overlap", nil) + require.Nil(t, <-c1a) + + defer bus.UnsubscribeKey("busTest1", "id-overlap", c1a) + + c2a := bus.SubscribeKey("busTest2", "id-overlap", nil) + require.Nil(t, <-c2a) + + defer bus.UnsubscribeKey("busTest2", "id-overlap", c2a) + + c2b := bus.SubscribeKey("busTest2", "id-dupe", nil) + require.Nil(t, <-c2b) + + defer bus.UnsubscribeKey("busTest2", "id-dupe", c2b) + + c2c := bus.SubscribeKey("busTest2", "id-dupe", nil) + require.Nil(t, <-c2c) + + defer bus.UnsubscribeKey("busTest2", "id-dupe", c2c) + + ct1 := bus.SubscribeType("busTest1", nil) + require.Nil(t, <-ct1) + + defer bus.UnsubscribeType("busTest1", ct1) + + ct2 := bus.SubscribeType("busTest2", nil) + require.Nil(t, <-ct2) + + defer bus.UnsubscribeType("busTest2", ct2) + + // Overlapping IDs but not types + bus.Announce("busTest1", &busTest{ + Metadata: metadata.Metadata{ + ID: "id-overlap", + }, + }) + + msg := <-c1a + require.Equal(t, "id-overlap", msg.(*busTest).ID) + + msg = <-ct1 + require.Equal(t, "id-overlap", msg.(*busTest).ID) + + select { + case msg := <-c2a: + require.Fail(t, "unexpected message", msg) + case msg := <-ct2: + require.Fail(t, "unexpected message", msg) + default: + } + + bus.Announce("busTest2", &busTest{ + Metadata: metadata.Metadata{ + ID: "id-overlap", + }, + }) + + select { + case msg := <-c1a: + require.Fail(t, "unexpected message", msg) + case msg := <-ct1: + require.Fail(t, "unexpected message", msg) + default: + } + + msg = <-c2a + require.Equal(t, "id-overlap", msg.(*busTest).ID) + + msg = <-ct2 + require.Equal(t, "id-overlap", msg.(*busTest).ID) + + bus.Announce("busTest2", &busTest{ + Metadata: metadata.Metadata{ + ID: "id-dupe", + }, + }) + + msg = <-c2b + require.Equal(t, "id-dupe", msg.(*busTest).ID) + + msg = <-c2c + require.Equal(t, "id-dupe", msg.(*busTest).ID) + + msg = <-ct2 + require.Equal(t, "id-dupe", msg.(*busTest).ID) +} + +func TestBusDelete(t *testing.T) { + t.Parallel() + + bus := bus.NewBus() + + c := bus.SubscribeKey("busTest", "id1", nil) + require.Nil(t, <-c) + + defer bus.UnsubscribeKey("busTest", "id1", c) + + ct := bus.SubscribeType("busTest", nil) + require.Nil(t, <-ct) + + defer bus.UnsubscribeType("busTest", ct) + + bus.Announce("busTest", &busTest{ + Metadata: metadata.Metadata{ + ID: "id1", + }, + }) + + msg := <-c + require.Equal(t, "id1", msg.(*busTest).ID) + + msg = <-ct + require.Equal(t, "id1", msg.(*busTest).ID) + + bus.Delete("busTest", "id1") + + _, ok := <-c + require.False(t, ok) + + id := (<-ct).(string) + require.Equal(t, "id1", id) +} + +type busTest struct { + metadata.Metadata +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..0c137ef --- /dev/null +++ b/go.mod @@ -0,0 +1,16 @@ +module github.com/gopatchy/bus + +go 1.19 + +require ( + github.com/gopatchy/metadata v0.0.0-20230420053349-25837551c11d + github.com/stretchr/testify v1.8.2 + go.uber.org/goleak v1.2.1 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..0c4f789 --- /dev/null +++ b/go.sum @@ -0,0 +1,26 @@ +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gopatchy/metadata v0.0.0-20230420053349-25837551c11d h1:chunoM47vkWSanIvLx4uRSkLMG6chDZOy09L2tt/bv8= +github.com/gopatchy/metadata v0.0.0-20230420053349-25837551c11d/go.mod h1:VgD33raUShjDePCDBo55aj+eSXFtUEpMzs+Ie39g2zo= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.8.1-0.20211023094830-115ce09fd6b4 h1:Ha8xCaq6ln1a+R91Km45Oq6lPXj2Mla6CRJYcuV2h1w= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/justfile b/justfile new file mode 100644 index 0000000..d607f93 --- /dev/null +++ b/justfile @@ -0,0 +1,18 @@ +go := env_var_or_default('GOCMD', 'go') + +default: tidy test + +tidy: + {{go}} mod tidy + goimports -l -w . + gofumpt -l -w . + {{go}} fmt ./... + +test: + {{go}} vet ./... + golangci-lint run ./... + {{go}} test -race -coverprofile=cover.out -timeout=60s -parallel=10 ./... + {{go}} tool cover -html=cover.out -o=cover.html + +todo: + -git grep -e TODO --and --not -e ignoretodo diff --git a/pkg_test.go b/pkg_test.go new file mode 100644 index 0000000..d16e17b --- /dev/null +++ b/pkg_test.go @@ -0,0 +1,11 @@ +package bus_test + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +}