Factor out Store & Bus
This commit is contained in:
21
api.go
21
api.go
@@ -6,20 +6,21 @@ import "log"
|
||||
import "net/http"
|
||||
import "time"
|
||||
|
||||
import "github.com/firestuff/storebus"
|
||||
import "github.com/google/uuid"
|
||||
import "github.com/gorilla/mux"
|
||||
|
||||
type API struct {
|
||||
router *mux.Router
|
||||
store *Store
|
||||
bus *Bus
|
||||
store *storebus.Store
|
||||
bus *storebus.Bus
|
||||
}
|
||||
|
||||
func NewAPI(storePath string) *API {
|
||||
api := &API{
|
||||
router: mux.NewRouter(),
|
||||
store: NewStore(storePath),
|
||||
bus: NewBus(),
|
||||
store: storebus.NewStore(storePath),
|
||||
bus: storebus.NewBus(),
|
||||
}
|
||||
|
||||
api.router.HandleFunc("/template", returnError(jsonOutput(api.createTemplate))).Methods("POST").Headers("Content-Type", "application/json")
|
||||
@@ -34,7 +35,7 @@ func (api *API) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
api.router.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
func (api *API) createTemplate(r *http.Request) (Object, string, int) {
|
||||
func (api *API) createTemplate(r *http.Request) (storebus.Object, string, int) {
|
||||
log.Printf("createTemplate")
|
||||
|
||||
template := NewTemplate()
|
||||
@@ -100,7 +101,7 @@ func (api *API) streamTemplate(w http.ResponseWriter, r *http.Request) (string,
|
||||
return "", 0
|
||||
}
|
||||
|
||||
func (api *API) getTemplate(r *http.Request) (Object, string, int) {
|
||||
func (api *API) getTemplate(r *http.Request) (storebus.Object, string, int) {
|
||||
log.Printf("getTemplate %s", mux.Vars(r))
|
||||
|
||||
template := NewTemplate()
|
||||
@@ -114,7 +115,7 @@ func (api *API) getTemplate(r *http.Request) (Object, string, int) {
|
||||
return template, "", 0
|
||||
}
|
||||
|
||||
func (api *API) updateTemplate(r *http.Request) (Object, string, int) {
|
||||
func (api *API) updateTemplate(r *http.Request) (storebus.Object, string, int) {
|
||||
log.Printf("updateTemplate %s", mux.Vars(r))
|
||||
|
||||
patch := NewTemplate()
|
||||
@@ -151,7 +152,7 @@ func (api *API) updateTemplate(r *http.Request) (Object, string, int) {
|
||||
|
||||
}
|
||||
|
||||
func readJson(r *http.Request, out Object) (string, int) {
|
||||
func readJson(r *http.Request, out storebus.Object) (string, int) {
|
||||
dec := json.NewDecoder(r.Body)
|
||||
dec.DisallowUnknownFields()
|
||||
|
||||
@@ -172,7 +173,7 @@ func returnError(wrapped func(http.ResponseWriter, *http.Request) (string, int))
|
||||
}
|
||||
}
|
||||
|
||||
func jsonOutput(wrapped func(*http.Request) (Object, string, int)) func(http.ResponseWriter, *http.Request) (string, int) {
|
||||
func jsonOutput(wrapped func(*http.Request) (storebus.Object, string, int)) func(http.ResponseWriter, *http.Request) (string, int) {
|
||||
return func(w http.ResponseWriter, r *http.Request) (string, int) {
|
||||
out, msg, code := wrapped(r)
|
||||
if code != 0 {
|
||||
@@ -189,7 +190,7 @@ func jsonOutput(wrapped func(*http.Request) (Object, string, int)) func(http.Res
|
||||
}
|
||||
}
|
||||
|
||||
func writeEvent(w http.ResponseWriter, in Object) (string, int) {
|
||||
func writeEvent(w http.ResponseWriter, in storebus.Object) (string, int) {
|
||||
data, err := json.Marshal(in)
|
||||
if err != nil {
|
||||
return fmt.Sprintf("Failed to encode JSON: %s", err), http.StatusInternalServerError
|
||||
|
||||
50
bus.go
50
bus.go
@@ -1,50 +0,0 @@
|
||||
package main
|
||||
|
||||
import "sync"
|
||||
|
||||
type Bus struct {
|
||||
mu sync.Mutex
|
||||
chans map[string][]chan Object
|
||||
}
|
||||
|
||||
func NewBus() *Bus {
|
||||
return &Bus{
|
||||
chans: map[string][]chan Object{},
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Bus) Announce(obj Object) {
|
||||
key := ObjectKey(obj)
|
||||
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
chans := b.chans[key]
|
||||
newChans := []chan Object{}
|
||||
|
||||
for _, ch := range chans {
|
||||
select {
|
||||
case ch <- obj:
|
||||
newChans = append(newChans, ch)
|
||||
default:
|
||||
close(ch)
|
||||
}
|
||||
}
|
||||
|
||||
if len(chans) != len(newChans) {
|
||||
b.chans[key] = newChans
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Bus) Subscribe(obj Object) chan Object {
|
||||
key := ObjectKey(obj)
|
||||
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
ch := make(chan Object, 100)
|
||||
|
||||
b.chans[key] = append(b.chans[key], ch)
|
||||
|
||||
return ch
|
||||
}
|
||||
98
bus_test.go
98
bus_test.go
@@ -1,98 +0,0 @@
|
||||
package main
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestBus(t *testing.T) {
|
||||
bus := NewBus()
|
||||
|
||||
// Announce with no subscribers
|
||||
bus.Announce(&busTest1{
|
||||
Id: "id-nosub",
|
||||
})
|
||||
|
||||
// Complex subscription layout
|
||||
ch1a := bus.Subscribe(&busTest1{
|
||||
Id: "id-overlap",
|
||||
})
|
||||
|
||||
ch2a := bus.Subscribe(&busTest2{
|
||||
Id: "id-overlap",
|
||||
})
|
||||
|
||||
ch2b := bus.Subscribe(&busTest2{
|
||||
Id: "id-dupe",
|
||||
})
|
||||
|
||||
ch2c := bus.Subscribe(&busTest2{
|
||||
Id: "id-dupe",
|
||||
})
|
||||
|
||||
// Overlapping IDs but not types
|
||||
bus.Announce(&busTest1{
|
||||
Id: "id-overlap",
|
||||
})
|
||||
|
||||
msg := <-ch1a
|
||||
if msg.(*busTest1).Id != "id-overlap" {
|
||||
t.Errorf("%+v", msg)
|
||||
}
|
||||
|
||||
select {
|
||||
case msg := <-ch2a:
|
||||
t.Errorf("%+v", msg)
|
||||
default:
|
||||
}
|
||||
|
||||
bus.Announce(&busTest2{
|
||||
Id: "id-overlap",
|
||||
})
|
||||
|
||||
select {
|
||||
case msg := <-ch1a:
|
||||
t.Errorf("%+v", msg)
|
||||
default:
|
||||
}
|
||||
|
||||
msg = <-ch2a
|
||||
if msg.(*busTest2).Id != "id-overlap" {
|
||||
t.Errorf("%+v", msg)
|
||||
}
|
||||
|
||||
bus.Announce(&busTest2{
|
||||
Id: "id-dupe",
|
||||
})
|
||||
|
||||
msg = <-ch2b
|
||||
if msg.(*busTest2).Id != "id-dupe" {
|
||||
t.Errorf("%+v", msg)
|
||||
}
|
||||
|
||||
msg = <-ch2c
|
||||
if msg.(*busTest2).Id != "id-dupe" {
|
||||
t.Errorf("%+v", msg)
|
||||
}
|
||||
}
|
||||
|
||||
type busTest1 struct {
|
||||
Id string
|
||||
}
|
||||
|
||||
func (bt *busTest1) GetType() string {
|
||||
return "busTest1"
|
||||
}
|
||||
|
||||
func (bt *busTest1) GetId() string {
|
||||
return bt.Id
|
||||
}
|
||||
|
||||
type busTest2 struct {
|
||||
Id string
|
||||
}
|
||||
|
||||
func (bt *busTest2) GetType() string {
|
||||
return "busTest2"
|
||||
}
|
||||
|
||||
func (bt *busTest2) GetId() string {
|
||||
return bt.Id
|
||||
}
|
||||
1
go.mod
1
go.mod
@@ -3,6 +3,7 @@ module github.com/firestuff/checky
|
||||
go 1.16
|
||||
|
||||
require (
|
||||
github.com/firestuff/storebus v0.0.0-20220320234918-5e1588edb2eb
|
||||
github.com/google/uuid v1.3.0
|
||||
github.com/gorilla/mux v1.8.0
|
||||
)
|
||||
|
||||
2
go.sum
2
go.sum
@@ -1,3 +1,5 @@
|
||||
github.com/firestuff/storebus v0.0.0-20220320234918-5e1588edb2eb h1:UWxwtE1DbFqGdw6hAZVtBM1cX5HKvRPygwF2MnhEa1g=
|
||||
github.com/firestuff/storebus v0.0.0-20220320234918-5e1588edb2eb/go.mod h1:GfDVrwTVW/pVlgb7Qg3SJ1hXI4aE3SO/IfYz7btihys=
|
||||
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
|
||||
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
|
||||
|
||||
17
object.go
17
object.go
@@ -1,17 +0,0 @@
|
||||
package main
|
||||
|
||||
import "encoding/hex"
|
||||
import "fmt"
|
||||
|
||||
type Object interface {
|
||||
GetType() string
|
||||
GetId() string
|
||||
}
|
||||
|
||||
func ObjectSafeId(obj Object) string {
|
||||
return hex.EncodeToString([]byte(obj.GetId()))
|
||||
}
|
||||
|
||||
func ObjectKey(obj Object) string {
|
||||
return fmt.Sprintf("%s:%s", obj.GetType(), ObjectSafeId(obj))
|
||||
}
|
||||
73
store.go
73
store.go
@@ -1,73 +0,0 @@
|
||||
package main
|
||||
|
||||
import "encoding/json"
|
||||
import "fmt"
|
||||
import "os"
|
||||
import "path/filepath"
|
||||
|
||||
type Store struct {
|
||||
root string
|
||||
}
|
||||
|
||||
func NewStore(root string) *Store {
|
||||
return &Store{
|
||||
root: root,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) Write(obj Object) error {
|
||||
dir := filepath.Join(s.root, obj.GetType())
|
||||
filename := ObjectSafeId(obj)
|
||||
|
||||
err := os.MkdirAll(dir, 0700)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tmp, err := os.CreateTemp(dir, fmt.Sprintf("%s.*", filename))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tmp.Close()
|
||||
|
||||
enc := json.NewEncoder(tmp)
|
||||
enc.SetEscapeHTML(false)
|
||||
|
||||
err = enc.Encode(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = tmp.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = os.Rename(tmp.Name(), filepath.Join(dir, filename))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) Read(obj Object) error {
|
||||
dir := filepath.Join(s.root, obj.GetType())
|
||||
filename := ObjectSafeId(obj)
|
||||
|
||||
fh, err := os.Open(filepath.Join(dir, filename))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer fh.Close()
|
||||
|
||||
dec := json.NewDecoder(fh)
|
||||
dec.DisallowUnknownFields()
|
||||
|
||||
err = dec.Decode(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -1,69 +0,0 @@
|
||||
package main
|
||||
|
||||
import "os"
|
||||
import "testing"
|
||||
|
||||
func TestStore(t *testing.T) {
|
||||
dir, err := os.MkdirTemp("", "")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
store := NewStore(dir)
|
||||
|
||||
err = store.Write(&storeTest{
|
||||
Id: "id1",
|
||||
Opaque: "foo",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = store.Write(&storeTest{
|
||||
Id: "id2",
|
||||
Opaque: "bar",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
out1 := &storeTest{
|
||||
Id: "id1",
|
||||
}
|
||||
|
||||
err = store.Read(out1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if out1.Opaque != "foo" {
|
||||
t.Errorf("%+v", out1)
|
||||
}
|
||||
|
||||
out2 := &storeTest{
|
||||
Id: "id2",
|
||||
}
|
||||
|
||||
err = store.Read(out2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if out2.Opaque != "bar" {
|
||||
t.Errorf("%+v", out2)
|
||||
}
|
||||
}
|
||||
|
||||
type storeTest struct {
|
||||
Id string
|
||||
Opaque string
|
||||
}
|
||||
|
||||
func (st *storeTest) GetType() string {
|
||||
return "storeTest"
|
||||
}
|
||||
|
||||
func (st *storeTest) GetId() string {
|
||||
return st.Id
|
||||
}
|
||||
Reference in New Issue
Block a user