From 88c24f1a40895918d35a994a38c7473b096038a5 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Tue, 15 Mar 2022 16:27:52 +0000 Subject: [PATCH] Initial Bus implementation --- bus.go | 50 ++++++++++++++++++++++++++++++ main.go | 25 ++++++++------- object.go | 17 +++++----- store.go | 89 ++++++++++++++++++++++++++--------------------------- template.go | 8 ++--- 5 files changed, 120 insertions(+), 69 deletions(-) create mode 100644 bus.go diff --git a/bus.go b/bus.go new file mode 100644 index 0000000..50b9902 --- /dev/null +++ b/bus.go @@ -0,0 +1,50 @@ +package main + +import "sync" + +type Bus struct { + mu sync.Mutex + chans map[string][]chan Object +} + +func NewBus() *Bus { + return &Bus{ + chans: map[string][]chan Object{}, + } +} + +func (b *Bus) Announce(obj Object) { + key := ObjectKey(obj) + + b.mu.Lock() + defer b.mu.Unlock() + + chans := b.chans[key] + newChans := []chan Object{} + + for _, ch := range chans { + select { + case ch <- obj: + newChans = append(newChans, ch) + default: + close(ch) + } + } + + if len(chans) != len(newChans) { + b.chans[key] = newChans + } +} + +func (b *Bus) Subscribe(obj Object) chan Object { + key := ObjectKey(obj) + + b.mu.Lock() + defer b.mu.Unlock() + + ch := make(chan Object, 100) + + b.chans[key] = append(b.chans[key], ch) + + return ch +} diff --git a/main.go b/main.go index f730b9a..0476815 100644 --- a/main.go +++ b/main.go @@ -5,20 +5,23 @@ import "fmt" import "github.com/google/uuid" func main() { - store := NewStore("foo") + store := NewStore("foo") - out := &Template{ - Id: uuid.NewString(), - Test: "round trip", - } + out := &Template{ + Id: uuid.NewString(), + Test: "round trip", + } - store.Write(out) + store.Write(out) - in := &Template{ - Id: out.Id, - } + in := &Template{ + Id: out.Id, + } - store.Read(in) + store.Read(in) - fmt.Printf("%+v\n", in) + fmt.Printf("%+v\n", in) + + bus := NewBus() + bus.Announce(in) } diff --git a/object.go b/object.go index 2537072..3403180 100644 --- a/object.go +++ b/object.go @@ -1,18 +1,17 @@ package main +import "encoding/hex" import "fmt" type Object interface { - GetType() string - GetId() string + GetType() string + GetId() string +} + +func ObjectSafeId(obj Object) string { + return hex.EncodeToString([]byte(obj.GetId())) } func ObjectKey(obj Object) string { - return fmt.Sprintf( - "%d:%s:%d:%s", - len(obj.GetType()), - obj.GetType(), - len(obj.GetId()), - obj.GetId(), - ) + return fmt.Sprintf("%s:%s", obj.GetType(), ObjectSafeId(obj)) } diff --git a/store.go b/store.go index 0c3ad6a..33ca58d 100644 --- a/store.go +++ b/store.go @@ -1,74 +1,73 @@ package main -import "encoding/hex" import "encoding/json" import "fmt" import "os" import "path/filepath" type Store struct { - root string + root string } func NewStore(root string) *Store { - return &Store{ - root: root, - } + return &Store{ + root: root, + } } func (s *Store) Write(obj Object) error { - dir := filepath.Join(s.root, obj.GetType()) - filename := hex.EncodeToString([]byte(obj.GetId())) + dir := filepath.Join(s.root, obj.GetType()) + filename := ObjectSafeId(obj) - err := os.MkdirAll(dir, 0700) - if err != nil { - return err - } + err := os.MkdirAll(dir, 0700) + if err != nil { + return err + } - tmp, err := os.CreateTemp(dir, fmt.Sprintf("%s.*", filename)) - if err != nil { - return err - } - defer tmp.Close() + tmp, err := os.CreateTemp(dir, fmt.Sprintf("%s.*", filename)) + if err != nil { + return err + } + defer tmp.Close() - enc := json.NewEncoder(tmp) - enc.SetEscapeHTML(false) + enc := json.NewEncoder(tmp) + enc.SetEscapeHTML(false) - err = enc.Encode(obj) - if err != nil { - return err - } + err = enc.Encode(obj) + if err != nil { + return err + } - err = tmp.Close() - if err != nil { - return err - } + err = tmp.Close() + if err != nil { + return err + } - err = os.Rename(tmp.Name(), filepath.Join(dir, filename)) - if err != nil { - return err - } + err = os.Rename(tmp.Name(), filepath.Join(dir, filename)) + if err != nil { + return err + } - return nil + return nil } func (s *Store) Read(obj Object) error { - dir := filepath.Join(s.root, obj.GetType()) - filename := hex.EncodeToString([]byte(obj.GetId())) + dir := filepath.Join(s.root, obj.GetType()) + filename := ObjectSafeId(obj) - fh, err := os.Open(filepath.Join(dir, filename)) - if err != nil { - return err - } - defer fh.Close() + fh, err := os.Open(filepath.Join(dir, filename)) + if err != nil { + return err + } + defer fh.Close() - dec := json.NewDecoder(fh) - dec.DisallowUnknownFields() + dec := json.NewDecoder(fh) + dec.DisallowUnknownFields() - err = dec.Decode(obj) - if err != nil { - return err - } + err = dec.Decode(obj) + if err != nil { + return err + } - return nil + return nil } diff --git a/template.go b/template.go index 35f6d29..6ab4702 100644 --- a/template.go +++ b/template.go @@ -1,14 +1,14 @@ package main type Template struct { - Id string - Test string + Id string + Test string } func (t *Template) GetType() string { - return "template" + return "template" } func (t *Template) GetId() string { - return t.Id + return t.Id }