diff --git a/main.go b/main.go index bfd3e0e..935f2f9 100644 --- a/main.go +++ b/main.go @@ -98,14 +98,7 @@ type room struct { roomId string timerStart time.Time clientById map[string]*client - present map[*presentState]bool -} - -type presentState struct { - responseWriter http.ResponseWriter - flusher http.Flusher - room *room - controlChan chan *controlEvent + present map[chan *controlEvent]bool } var key []byte @@ -363,8 +356,14 @@ func create(w http.ResponseWriter, r *http.Request) { } func present(w http.ResponseWriter, r *http.Request) { - ps := newPresentState(w, r) - if ps == nil { + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming unsupported", http.StatusBadRequest) + return + } + + room, controlChan := registerPresent(w, r) + if controlChan == nil { return } @@ -374,18 +373,16 @@ func present(w http.ResponseWriter, r *http.Request) { for { select { case <-closeChan: - ps.close() - return + mu.Lock() + delete(room.present, controlChan) + close(controlChan) + mu.Unlock() case <-ticker.C: - mu.Lock() - ps.sendHeartbeat() - mu.Unlock() + writePresentHeartbeat(w, flusher) - case ctrl := <-ps.controlChan: - mu.Lock() - ps.sendEvent(ctrl) - mu.Unlock() + case ctrl := <-controlChan: + writePresentEvent(ctrl, w, flusher) } } } @@ -450,12 +447,11 @@ func watch(w http.ResponseWriter, r *http.Request) { for { select { case <-closeChan: - close(eventChan) - mu.Lock() if client.eventChan == eventChan { client.eventChan = nil } + close(eventChan) mu.Unlock() case <-ticker.C: @@ -509,7 +505,7 @@ func newRoom(roomId string) *room { roomId: roomId, timerStart: time.Now(), clientById: map[string]*client{}, - present: map[*presentState]bool{}, + present: map[chan *controlEvent]bool{}, } } @@ -559,7 +555,7 @@ func (rm *room) sendAdminEvent(ae *adminEvent) { func (rm *room) sendControlEvent(ce *controlEvent) { for present, _ := range rm.present { - present.sendEvent(ce) + present <- ce } } @@ -635,52 +631,33 @@ func writeEvent(e *event, w http.ResponseWriter, flusher http.Flusher) { flusher.Flush() } -func newPresentState(w http.ResponseWriter, r *http.Request) *presentState { +func registerPresent(w http.ResponseWriter, r *http.Request) (*room, chan *controlEvent) { mu.Lock() defer mu.Unlock() - ps := &presentState{ - responseWriter: w, - controlChan: make(chan *controlEvent, 100), - } - - var ok bool - ps.flusher, ok = w.(http.Flusher) - if !ok { - http.Error(ps.responseWriter, "streaming unsupported", http.StatusBadRequest) - return nil - } + controlChan := make(chan *controlEvent, 100) roomId := r.URL.Query().Get("room_id") - ps.room = getRoom(roomId) + room := getRoom(roomId) - ps.room.present[ps] = true + room.present[controlChan] = true w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") - return ps + return room, controlChan } -func (ps *presentState) sendHeartbeat() { - fmt.Fprintf(ps.responseWriter, ":\n\n") - ps.flusher.Flush() +func writePresentHeartbeat(w http.ResponseWriter, flusher http.Flusher) { + writePresentEvent(&controlEvent{}, w, flusher) } -func (ps *presentState) sendEvent(e *controlEvent) { +func writePresentEvent(e *controlEvent, w http.ResponseWriter, flusher http.Flusher) { j, err := json.Marshal(e) if err != nil { log.Fatal(err) } - fmt.Fprintf(ps.responseWriter, "data: %s\n\n", j) - ps.flusher.Flush() -} - -func (ps *presentState) close() { - mu.Lock() - defer mu.Unlock() - - delete(ps.room.present, ps) - close(ps.controlChan) + fmt.Fprintf(w, "data: %s\n\n", j) + flusher.Flush() }