diff --git a/cover.html b/cover.html
new file mode 100644
index 0000000..2f076a2
--- /dev/null
+++ b/cover.html
@@ -0,0 +1,280 @@
+
+
+
+
+
+
+
package storebus
+
+import (
+ "context"
+ "crypto/sha256"
+ "encoding/json"
+ "fmt"
+ "sync"
+
+ "github.com/gopatchy/bus"
+ "github.com/gopatchy/jsrest"
+ "github.com/gopatchy/metadata"
+ "github.com/gopatchy/store"
+)
+
+type StoreBus struct {
+ store *store.Store
+ bus *bus.Bus
+
+ // This lock ensures that no writes interleave with read/subscribe pairs
+ orderMu sync.RWMutex
+
+ chanMap map[<-chan []any]<-chan any
+ chanMapMu sync.Mutex
+}
+
+func NewStoreBus(dbname string) (*StoreBus, error) {
+ st, err := store.NewStore(dbname)
+ if err != nil {
+ return nil, err
+ }
+
+ return &StoreBus{
+ store: st,
+ bus: bus.NewBus(),
+ chanMap: map[<-chan []any]<-chan any{},
+ }, nil
+}
+
+func (sb *StoreBus) Close() {
+ sb.store.Close()
+}
+
+func (sb *StoreBus) Write(ctx context.Context, t string, obj any) error {
+ sb.orderMu.Lock()
+ defer sb.orderMu.Unlock()
+
+ if err := UpdateHash(obj); err != nil {
+ return jsrest.Errorf(jsrest.ErrInternalServerError, "hash update failed (%w)", err)
+ }
+
+ if err := sb.store.Write(ctx, t, obj); err != nil {
+ return jsrest.Errorf(jsrest.ErrInternalServerError, "write failed (%w)", err)
+ }
+
+ sb.bus.Announce(t, obj)
+
+ return nil
+}
+
+func (sb *StoreBus) Delete(ctx context.Context, t, id string) error {
+ sb.orderMu.Lock()
+ defer sb.orderMu.Unlock()
+
+ if err := sb.store.Delete(ctx, t, id); err != nil {
+ return jsrest.Errorf(jsrest.ErrInternalServerError, "delete failed (%w)", err)
+ }
+
+ sb.bus.Delete(t, id)
+
+ return nil
+}
+
+func (sb *StoreBus) Read(ctx context.Context, t, id string, factory func() any) (any, error) {
+ return sb.store.Read(ctx, t, id, factory)
+}
+
+func (sb *StoreBus) ReadStream(ctx context.Context, t, id string, factory func() any) (<-chan any, error) {
+ sb.orderMu.RLock()
+ defer sb.orderMu.RUnlock()
+
+ initial, err := sb.store.Read(ctx, t, id, factory)
+ if err != nil {
+ return nil, jsrest.Errorf(jsrest.ErrInternalServerError, "read failed (%w)", err)
+ }
+
+ c := sb.bus.SubscribeKey(t, id, initial)
+
+ return c, nil
+}
+
+func (sb *StoreBus) CloseReadStream(t, id string, c <-chan any) {
+ sb.bus.UnsubscribeKey(t, id, c)
+}
+
+func (sb *StoreBus) List(ctx context.Context, t string, factory func() any) ([]any, error) {
+ return sb.store.List(ctx, t, factory)
+}
+
+func (sb *StoreBus) ListStream(ctx context.Context, t string, factory func() any) (<-chan []any, error) {
+ sb.orderMu.RLock()
+ defer sb.orderMu.RUnlock()
+
+ initial, err := sb.store.List(ctx, t, factory)
+ if err != nil {
+ return nil, jsrest.Errorf(jsrest.ErrInternalServerError, "list failed (%w)", err)
+ }
+
+ c := sb.bus.SubscribeType(t, initial)
+
+ ret := make(chan []any, 100)
+
+ sb.registerChan(c, ret)
+
+ go func() {
+ defer close(ret)
+
+ for range c {
+ // List() results are always at least (but not exactly) as new as the write that triggered it
+ l, err := sb.store.List(ctx, t, factory)
+ if err != nil {
+ break
+ }
+
+ select {
+ case ret <- l:
+ default:
+ break
+ }
+ }
+ }()
+
+ return ret, nil
+}
+
+func (sb *StoreBus) CloseListStream(t string, c <-chan []any) {
+ sb.chanMapMu.Lock()
+ defer sb.chanMapMu.Unlock()
+
+ sb.bus.UnsubscribeType(t, sb.chanMap[c])
+
+ delete(sb.chanMap, c)
+}
+
+func (sb *StoreBus) registerChan(in <-chan any, out <-chan []any) {
+ sb.chanMapMu.Lock()
+ defer sb.chanMapMu.Unlock()
+
+ sb.chanMap[out] = in
+}
+
+func UpdateHash(obj any) error {
+ m := *metadata.GetMetadata(obj)
+ metadata.ClearMetadata(obj)
+
+ defer metadata.SetMetadata(obj, &m)
+
+ hash := sha256.New()
+ enc := json.NewEncoder(hash)
+
+ if err := enc.Encode(obj); err != nil {
+ return jsrest.Errorf(jsrest.ErrInternalServerError, "JSON encode failed (%w)", err)
+ }
+
+ m.ETag = fmt.Sprintf("etag:%x", hash.Sum(nil))
+
+ return nil
+}
+
+
+
+
+
+
diff --git a/cover.out b/cover.out
new file mode 100644
index 0000000..bfb7f8f
--- /dev/null
+++ b/cover.out
@@ -0,0 +1,34 @@
+mode: atomic
+github.com/gopatchy/storebus/storebus.go:27.52,29.16 2 2
+github.com/gopatchy/storebus/storebus.go:29.16,31.3 1 0
+github.com/gopatchy/storebus/storebus.go:33.2,37.8 1 2
+github.com/gopatchy/storebus/storebus.go:40.29,42.2 1 2
+github.com/gopatchy/storebus/storebus.go:44.73,48.40 3 4
+github.com/gopatchy/storebus/storebus.go:48.40,50.3 1 0
+github.com/gopatchy/storebus/storebus.go:52.2,52.52 1 4
+github.com/gopatchy/storebus/storebus.go:52.52,54.3 1 0
+github.com/gopatchy/storebus/storebus.go:56.2,58.12 2 4
+github.com/gopatchy/storebus/storebus.go:61.69,65.52 3 1
+github.com/gopatchy/storebus/storebus.go:65.52,67.3 1 0
+github.com/gopatchy/storebus/storebus.go:69.2,71.12 2 1
+github.com/gopatchy/storebus/storebus.go:74.94,76.2 1 1
+github.com/gopatchy/storebus/storebus.go:78.107,83.16 4 2
+github.com/gopatchy/storebus/storebus.go:83.16,85.3 1 0
+github.com/gopatchy/storebus/storebus.go:87.2,89.15 2 2
+github.com/gopatchy/storebus/storebus.go:92.65,94.2 1 2
+github.com/gopatchy/storebus/storebus.go:96.92,98.2 1 2
+github.com/gopatchy/storebus/storebus.go:100.105,105.16 4 1
+github.com/gopatchy/storebus/storebus.go:105.16,107.3 1 0
+github.com/gopatchy/storebus/storebus.go:109.2,115.12 4 1
+github.com/gopatchy/storebus/storebus.go:115.12,118.15 2 1
+github.com/gopatchy/storebus/storebus.go:118.15,121.18 2 3
+github.com/gopatchy/storebus/storebus.go:121.18,122.10 1 0
+github.com/gopatchy/storebus/storebus.go:125.4,125.11 1 3
+github.com/gopatchy/storebus/storebus.go:126.18,126.18 0 3
+github.com/gopatchy/storebus/storebus.go:127.12,128.10 1 0
+github.com/gopatchy/storebus/storebus.go:133.2,133.17 1 1
+github.com/gopatchy/storebus/storebus.go:136.63,143.2 4 1
+github.com/gopatchy/storebus/storebus.go:145.67,150.2 3 1
+github.com/gopatchy/storebus/storebus.go:152.32,161.40 6 4
+github.com/gopatchy/storebus/storebus.go:161.40,163.3 1 0
+github.com/gopatchy/storebus/storebus.go:165.2,167.12 2 4