Un-yuck the server side of the presenter API

This commit is contained in:
Ian Gulliver
2020-11-26 00:11:09 +00:00
parent 0ee3ca970e
commit cbc1321019

81
main.go
View File

@@ -98,14 +98,7 @@ type room struct {
roomId string roomId string
timerStart time.Time timerStart time.Time
clientById map[string]*client clientById map[string]*client
present map[*presentState]bool present map[chan *controlEvent]bool
}
type presentState struct {
responseWriter http.ResponseWriter
flusher http.Flusher
room *room
controlChan chan *controlEvent
} }
var key []byte var key []byte
@@ -363,8 +356,14 @@ func create(w http.ResponseWriter, r *http.Request) {
} }
func present(w http.ResponseWriter, r *http.Request) { func present(w http.ResponseWriter, r *http.Request) {
ps := newPresentState(w, r) flusher, ok := w.(http.Flusher)
if ps == nil { if !ok {
http.Error(w, "streaming unsupported", http.StatusBadRequest)
return
}
room, controlChan := registerPresent(w, r)
if controlChan == nil {
return return
} }
@@ -374,18 +373,16 @@ func present(w http.ResponseWriter, r *http.Request) {
for { for {
select { select {
case <-closeChan: case <-closeChan:
ps.close() mu.Lock()
return delete(room.present, controlChan)
close(controlChan)
mu.Unlock()
case <-ticker.C: case <-ticker.C:
mu.Lock() writePresentHeartbeat(w, flusher)
ps.sendHeartbeat()
mu.Unlock()
case ctrl := <-ps.controlChan: case ctrl := <-controlChan:
mu.Lock() writePresentEvent(ctrl, w, flusher)
ps.sendEvent(ctrl)
mu.Unlock()
} }
} }
} }
@@ -450,12 +447,11 @@ func watch(w http.ResponseWriter, r *http.Request) {
for { for {
select { select {
case <-closeChan: case <-closeChan:
close(eventChan)
mu.Lock() mu.Lock()
if client.eventChan == eventChan { if client.eventChan == eventChan {
client.eventChan = nil client.eventChan = nil
} }
close(eventChan)
mu.Unlock() mu.Unlock()
case <-ticker.C: case <-ticker.C:
@@ -509,7 +505,7 @@ func newRoom(roomId string) *room {
roomId: roomId, roomId: roomId,
timerStart: time.Now(), timerStart: time.Now(),
clientById: map[string]*client{}, clientById: map[string]*client{},
present: map[*presentState]bool{}, present: map[chan *controlEvent]bool{},
} }
} }
@@ -559,7 +555,7 @@ func (rm *room) sendAdminEvent(ae *adminEvent) {
func (rm *room) sendControlEvent(ce *controlEvent) { func (rm *room) sendControlEvent(ce *controlEvent) {
for present, _ := range rm.present { for present, _ := range rm.present {
present.sendEvent(ce) present <- ce
} }
} }
@@ -635,52 +631,33 @@ func writeEvent(e *event, w http.ResponseWriter, flusher http.Flusher) {
flusher.Flush() flusher.Flush()
} }
func newPresentState(w http.ResponseWriter, r *http.Request) *presentState { func registerPresent(w http.ResponseWriter, r *http.Request) (*room, chan *controlEvent) {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
ps := &presentState{ controlChan := make(chan *controlEvent, 100)
responseWriter: w,
controlChan: make(chan *controlEvent, 100),
}
var ok bool
ps.flusher, ok = w.(http.Flusher)
if !ok {
http.Error(ps.responseWriter, "streaming unsupported", http.StatusBadRequest)
return nil
}
roomId := r.URL.Query().Get("room_id") roomId := r.URL.Query().Get("room_id")
ps.room = getRoom(roomId) room := getRoom(roomId)
ps.room.present[ps] = true room.present[controlChan] = true
w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive") w.Header().Set("Connection", "keep-alive")
return ps return room, controlChan
} }
func (ps *presentState) sendHeartbeat() { func writePresentHeartbeat(w http.ResponseWriter, flusher http.Flusher) {
fmt.Fprintf(ps.responseWriter, ":\n\n") writePresentEvent(&controlEvent{}, w, flusher)
ps.flusher.Flush()
} }
func (ps *presentState) sendEvent(e *controlEvent) { func writePresentEvent(e *controlEvent, w http.ResponseWriter, flusher http.Flusher) {
j, err := json.Marshal(e) j, err := json.Marshal(e)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
fmt.Fprintf(ps.responseWriter, "data: %s\n\n", j) fmt.Fprintf(w, "data: %s\n\n", j)
ps.flusher.Flush() flusher.Flush()
}
func (ps *presentState) close() {
mu.Lock()
defer mu.Unlock()
delete(ps.room.present, ps)
close(ps.controlChan)
} }