Remove watchState

This commit is contained in:
Ian Gulliver
2020-11-25 21:52:30 +00:00
parent 2619794b78
commit 462159c606

119
main.go
View File

@@ -100,15 +100,6 @@ type room struct {
present map[*presentState]bool present map[*presentState]bool
} }
type watchState struct {
responseWriter http.ResponseWriter
flusher http.Flusher
room *room
client *client
admin bool
eventChan chan *event
}
type presentState struct { type presentState struct {
responseWriter http.ResponseWriter responseWriter http.ResponseWriter
flusher http.Flusher flusher http.Flusher
@@ -422,31 +413,45 @@ func reset(w http.ResponseWriter, r *http.Request) {
} }
func watch(w http.ResponseWriter, r *http.Request) { func watch(w http.ResponseWriter, r *http.Request) {
ws := newWatchState(w, r) flusher, ok := w.(http.Flusher)
if ws == nil { if !ok {
http.Error(w, "streaming unsupported", http.StatusBadRequest)
return return
} }
client, eventChan := registerWatch(w, r)
if client == nil {
return
}
// TODO: refcount client so it stays alive from just a watch
// add in registerwatch, sub in defer here
closeChan := w.(http.CloseNotifier).CloseNotify() closeChan := w.(http.CloseNotifier).CloseNotify()
ticker := time.NewTicker(15 * time.Second) ticker := time.NewTicker(15 * time.Second)
ws.sendInitial() writeInitial(client, w, flusher)
for { for {
select { select {
case <-closeChan: case <-closeChan:
ws.close() close(eventChan)
return
mu.Lock()
if client.eventChan == eventChan {
client.eventChan = nil
}
mu.Unlock()
case <-ticker.C: case <-ticker.C:
mu.Lock() writeHeartbeat(w, flusher)
ws.sendHeartbeat()
mu.Unlock()
case event := <-ws.eventChan: case event, ok := <-eventChan:
mu.Lock() if ok {
ws.sendEvent(event) writeEvent(event, w, flusher)
mu.Unlock() } else {
return
}
} }
} }
} }
@@ -545,89 +550,71 @@ func (rm *room) updateAllClients() {
} }
} }
func newWatchState(w http.ResponseWriter, r *http.Request) *watchState { func registerWatch(w http.ResponseWriter, r *http.Request) (*client, chan *event) {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
ws := &watchState{
responseWriter: w,
eventChan: make(chan *event, 100),
}
var ok bool
ws.flusher, ok = w.(http.Flusher)
if !ok {
http.Error(ws.responseWriter, "streaming unsupported", http.StatusBadRequest)
return nil
}
roomId := r.URL.Query().Get("room_id") roomId := r.URL.Query().Get("room_id")
ws.room = getRoom(roomId) room := getRoom(roomId)
clientId := r.URL.Query().Get("client_id") clientId := r.URL.Query().Get("client_id")
ws.client = ws.room.getClient(clientId) client := room.getClient(clientId)
adminSecret := r.URL.Query().Get("admin_secret") adminSecret := r.URL.Query().Get("admin_secret")
if adminSecret != "" { if adminSecret != "" {
if adminSecret == ws.room.adminSecret() { if adminSecret == room.adminSecret() {
ws.admin = true client.Admin = true
} else { } else {
http.Error(w, "invalid admin_secret", http.StatusBadRequest) http.Error(w, "invalid admin_secret", http.StatusBadRequest)
return nil return nil, nil
} }
} }
ws.client.eventChan = ws.eventChan if client.eventChan != nil {
ws.client.update() close(client.eventChan)
}
client.eventChan = make(chan *event, 100)
client.update()
w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive") w.Header().Set("Connection", "keep-alive")
return ws // Return eventChan because we're reading it with the lock held
return client, client.eventChan
} }
func (ws *watchState) sendInitial() { func writeInitial(client *client, w http.ResponseWriter, flusher http.Flusher) {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
if !ws.admin { if !client.Admin {
return return
} }
for _, client := range ws.room.clientById { for _, iter := range client.room.clientById {
ws.sendEvent(&event{ writeEvent(&event{
AdminEvent: &adminEvent{ AdminEvent: &adminEvent{
Client: client, Client: iter,
}, },
}) }, w, flusher)
} }
ws.flusher.Flush()
} }
func (ws *watchState) sendHeartbeat() { func writeHeartbeat(w http.ResponseWriter, flusher http.Flusher) {
fmt.Fprintf(ws.responseWriter, ":\n\n") fmt.Fprintf(w, ":\n\n")
ws.flusher.Flush() flusher.Flush()
} }
func (ws *watchState) sendEvent(e *event) { func writeEvent(e *event, w http.ResponseWriter, flusher http.Flusher) {
j, err := json.Marshal(e) j, err := json.Marshal(e)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
fmt.Fprintf(ws.responseWriter, "data: %s\n\n", j) fmt.Fprintf(w, "data: %s\n\n", j)
ws.flusher.Flush() flusher.Flush()
}
func (ws *watchState) close() {
mu.Lock()
defer mu.Unlock()
if ws.client.eventChan == ws.eventChan {
ws.client.eventChan = nil
close(ws.eventChan)
}
} }
func newPresentState(w http.ResponseWriter, r *http.Request) *presentState { func newPresentState(w http.ResponseWriter, r *http.Request) *presentState {