Template streaming

This commit is contained in:
Ian Gulliver
2022-03-18 05:51:09 +00:00
parent 61ac021269
commit 7f822c1631
2 changed files with 63 additions and 13 deletions

60
api.go
View File

@@ -4,6 +4,7 @@ import "encoding/json"
import "fmt" import "fmt"
import "log" import "log"
import "net/http" import "net/http"
import "time"
import "github.com/google/uuid" import "github.com/google/uuid"
import "github.com/gorilla/mux" import "github.com/gorilla/mux"
@@ -22,7 +23,7 @@ func NewAPI(storePath string) *API {
} }
api.router.HandleFunc("/template", returnError(jsonOutput(api.createTemplate))).Methods("POST").Headers("Content-Type", "application/json") api.router.HandleFunc("/template", returnError(jsonOutput(api.createTemplate))).Methods("POST").Headers("Content-Type", "application/json")
api.router.HandleFunc("/template/{id}", api.streamTemplate).Methods("GET").Headers("Accept", "text/event-stream") api.router.HandleFunc("/template/{id}", returnError(api.streamTemplate)).Methods("GET").Headers("Accept", "text/event-stream")
api.router.HandleFunc("/template/{id}", returnError(jsonOutput(api.getTemplate))).Methods("GET") api.router.HandleFunc("/template/{id}", returnError(jsonOutput(api.getTemplate))).Methods("GET")
api.router.HandleFunc("/template/{id}", returnError(jsonOutput(api.updateTemplate))).Methods("PATCH").Headers("Content-Type", "application/json") api.router.HandleFunc("/template/{id}", returnError(jsonOutput(api.updateTemplate))).Methods("PATCH").Headers("Content-Type", "application/json")
@@ -33,7 +34,7 @@ func (api *API) ServeHTTP(w http.ResponseWriter, r *http.Request) {
api.router.ServeHTTP(w, r) api.router.ServeHTTP(w, r)
} }
func (api *API) createTemplate(r *http.Request) (interface{}, string, int) { func (api *API) createTemplate(r *http.Request) (Object, string, int) {
log.Printf("createTemplate") log.Printf("createTemplate")
template := NewTemplate() template := NewTemplate()
@@ -56,29 +57,50 @@ func (api *API) createTemplate(r *http.Request) (interface{}, string, int) {
return template, "", 0 return template, "", 0
} }
func (api *API) streamTemplate(w http.ResponseWriter, r *http.Request) { func (api *API) streamTemplate(w http.ResponseWriter, r *http.Request) (string, int) {
log.Printf("streamTemplate %s", mux.Vars(r)) log.Printf("streamTemplate %s", mux.Vars(r))
flusher, ok := w.(http.Flusher) _, ok := w.(http.Flusher)
if !ok { if !ok {
http.Error(w, "streaming unsupported", http.StatusBadRequest) return "Streaming unsupported", http.StatusBadRequest
return
} }
w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive") w.Header().Set("Connection", "keep-alive")
template := NewTemplate()
template.Id = mux.Vars(r)["id"]
err := api.store.Read(template)
if err != nil {
return fmt.Sprintf("Template %s not found", template.Id), http.StatusNotFound
}
writeEvent(w, template)
closeChan := w.(http.CloseNotifier).CloseNotify() closeChan := w.(http.CloseNotifier).CloseNotify()
msgChan := api.bus.Subscribe(template)
ticker := time.NewTicker(5 * time.Second)
flusher.Flush() connected := true
for connected {
<-closeChan select {
case <-closeChan:
connected = false
case msg := <-msgChan:
writeEvent(w, msg)
case <-ticker.C:
writeEvent(w, NewHeartbeat())
}
}
log.Printf("streamTemplate %s end", mux.Vars(r)) log.Printf("streamTemplate %s end", mux.Vars(r))
return "", 0
} }
func (api *API) getTemplate(r *http.Request) (interface{}, string, int) { func (api *API) getTemplate(r *http.Request) (Object, string, int) {
log.Printf("getTemplate %s", mux.Vars(r)) log.Printf("getTemplate %s", mux.Vars(r))
template := NewTemplate() template := NewTemplate()
@@ -92,7 +114,7 @@ func (api *API) getTemplate(r *http.Request) (interface{}, string, int) {
return template, "", 0 return template, "", 0
} }
func (api *API) updateTemplate(r *http.Request) (interface{}, string, int) { func (api *API) updateTemplate(r *http.Request) (Object, string, int) {
log.Printf("updateTemplate %s", mux.Vars(r)) log.Printf("updateTemplate %s", mux.Vars(r))
patch := NewTemplate() patch := NewTemplate()
@@ -129,7 +151,7 @@ func (api *API) updateTemplate(r *http.Request) (interface{}, string, int) {
} }
func readJson(r *http.Request, out interface{}) (string, int) { func readJson(r *http.Request, out Object) (string, int) {
dec := json.NewDecoder(r.Body) dec := json.NewDecoder(r.Body)
dec.DisallowUnknownFields() dec.DisallowUnknownFields()
@@ -150,7 +172,7 @@ func returnError(wrapped func(http.ResponseWriter, *http.Request) (string, int))
} }
} }
func jsonOutput(wrapped func(*http.Request) (interface{}, string, int)) func(http.ResponseWriter, *http.Request) (string, int) { func jsonOutput(wrapped func(*http.Request) (Object, string, int)) func(http.ResponseWriter, *http.Request) (string, int) {
return func(w http.ResponseWriter, r *http.Request) (string, int) { return func(w http.ResponseWriter, r *http.Request) (string, int) {
out, msg, code := wrapped(r) out, msg, code := wrapped(r)
if code != 0 { if code != 0 {
@@ -166,3 +188,15 @@ func jsonOutput(wrapped func(*http.Request) (interface{}, string, int)) func(htt
return "", 0 return "", 0
} }
} }
func writeEvent(w http.ResponseWriter, in Object) (string, int) {
data, err := json.Marshal(in)
if err != nil {
return fmt.Sprintf("Failed to encode JSON: %s", err), http.StatusInternalServerError
}
fmt.Fprintf(w, "event: %s\ndata: %s\n\n", in.GetType(), data)
w.(http.Flusher).Flush()
return "", 0
}

16
heartbeat.go Normal file
View File

@@ -0,0 +1,16 @@
package main
type Heartbeat struct {
}
func NewHeartbeat() *Heartbeat {
return &Heartbeat{}
}
func (h *Heartbeat) GetType() string {
return "heartbeat"
}
func (h *Heartbeat) GetId() string {
return ""
}