diff --git a/api.go b/api.go index f3083dd..ec22082 100644 --- a/api.go +++ b/api.go @@ -4,6 +4,7 @@ import "encoding/json" import "fmt" import "log" import "net/http" +import "time" import "github.com/google/uuid" import "github.com/gorilla/mux" @@ -22,7 +23,7 @@ func NewAPI(storePath string) *API { } api.router.HandleFunc("/template", returnError(jsonOutput(api.createTemplate))).Methods("POST").Headers("Content-Type", "application/json") - api.router.HandleFunc("/template/{id}", api.streamTemplate).Methods("GET").Headers("Accept", "text/event-stream") + api.router.HandleFunc("/template/{id}", returnError(api.streamTemplate)).Methods("GET").Headers("Accept", "text/event-stream") api.router.HandleFunc("/template/{id}", returnError(jsonOutput(api.getTemplate))).Methods("GET") api.router.HandleFunc("/template/{id}", returnError(jsonOutput(api.updateTemplate))).Methods("PATCH").Headers("Content-Type", "application/json") @@ -33,7 +34,7 @@ func (api *API) ServeHTTP(w http.ResponseWriter, r *http.Request) { api.router.ServeHTTP(w, r) } -func (api *API) createTemplate(r *http.Request) (interface{}, string, int) { +func (api *API) createTemplate(r *http.Request) (Object, string, int) { log.Printf("createTemplate") template := NewTemplate() @@ -56,29 +57,50 @@ func (api *API) createTemplate(r *http.Request) (interface{}, string, int) { return template, "", 0 } -func (api *API) streamTemplate(w http.ResponseWriter, r *http.Request) { +func (api *API) streamTemplate(w http.ResponseWriter, r *http.Request) (string, int) { log.Printf("streamTemplate %s", mux.Vars(r)) - flusher, ok := w.(http.Flusher) + _, ok := w.(http.Flusher) if !ok { - http.Error(w, "streaming unsupported", http.StatusBadRequest) - return + return "Streaming unsupported", http.StatusBadRequest } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") + template := NewTemplate() + template.Id = mux.Vars(r)["id"] + + err := api.store.Read(template) + if err != nil { + return fmt.Sprintf("Template %s not found", template.Id), http.StatusNotFound + } + + writeEvent(w, template) + closeChan := w.(http.CloseNotifier).CloseNotify() + msgChan := api.bus.Subscribe(template) + ticker := time.NewTicker(5 * time.Second) - flusher.Flush() - - <-closeChan + connected := true + for connected { + select { + case <-closeChan: + connected = false + case msg := <-msgChan: + writeEvent(w, msg) + case <-ticker.C: + writeEvent(w, NewHeartbeat()) + } + } log.Printf("streamTemplate %s end", mux.Vars(r)) + + return "", 0 } -func (api *API) getTemplate(r *http.Request) (interface{}, string, int) { +func (api *API) getTemplate(r *http.Request) (Object, string, int) { log.Printf("getTemplate %s", mux.Vars(r)) template := NewTemplate() @@ -92,7 +114,7 @@ func (api *API) getTemplate(r *http.Request) (interface{}, string, int) { return template, "", 0 } -func (api *API) updateTemplate(r *http.Request) (interface{}, string, int) { +func (api *API) updateTemplate(r *http.Request) (Object, string, int) { log.Printf("updateTemplate %s", mux.Vars(r)) patch := NewTemplate() @@ -129,7 +151,7 @@ func (api *API) updateTemplate(r *http.Request) (interface{}, string, int) { } -func readJson(r *http.Request, out interface{}) (string, int) { +func readJson(r *http.Request, out Object) (string, int) { dec := json.NewDecoder(r.Body) dec.DisallowUnknownFields() @@ -150,7 +172,7 @@ func returnError(wrapped func(http.ResponseWriter, *http.Request) (string, int)) } } -func jsonOutput(wrapped func(*http.Request) (interface{}, string, int)) func(http.ResponseWriter, *http.Request) (string, int) { +func jsonOutput(wrapped func(*http.Request) (Object, string, int)) func(http.ResponseWriter, *http.Request) (string, int) { return func(w http.ResponseWriter, r *http.Request) (string, int) { out, msg, code := wrapped(r) if code != 0 { @@ -166,3 +188,15 @@ func jsonOutput(wrapped func(*http.Request) (interface{}, string, int)) func(htt return "", 0 } } + +func writeEvent(w http.ResponseWriter, in Object) (string, int) { + data, err := json.Marshal(in) + if err != nil { + return fmt.Sprintf("Failed to encode JSON: %s", err), http.StatusInternalServerError + } + + fmt.Fprintf(w, "event: %s\ndata: %s\n\n", in.GetType(), data) + w.(http.Flusher).Flush() + + return "", 0 +} diff --git a/heartbeat.go b/heartbeat.go new file mode 100644 index 0000000..4e4351a --- /dev/null +++ b/heartbeat.go @@ -0,0 +1,16 @@ +package main + +type Heartbeat struct { +} + +func NewHeartbeat() *Heartbeat { + return &Heartbeat{} +} + +func (h *Heartbeat) GetType() string { + return "heartbeat" +} + +func (h *Heartbeat) GetId() string { + return "" +}