diff --git a/firecgi b/firecgi index ec99454..0ba446b 160000 --- a/firecgi +++ b/firecgi @@ -1 +1 @@ -Subproject commit ec99454756342da9adc84db90ff9b568cbc8a837 +Subproject commit 0ba446bacb1cedaacea0f8347c0a068f652e04cf diff --git a/server.cc b/server.cc index c0c4e4e..dd9a81a 100644 --- a/server.cc +++ b/server.cc @@ -26,6 +26,19 @@ void Server::OnRequest(firecgi::Request* request) { request->WriteHeader("X-Accel-Buffering", "no"); request->WriteBody(""); auto stream = new Stream(request); + + { + std::lock_guard l(mu_); + streams_.insert(stream); + + request->OnClose([this, stream]() { + std::lock_guard l(mu_); + stream->Close(); + streams_.erase(stream); + delete stream; + }); + } + callback_(stream); } diff --git a/server.h b/server.h index 580108c..662784f 100644 --- a/server.h +++ b/server.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include "firecgi/server.h" #include "stream.h" @@ -19,6 +19,9 @@ class Server { std::function callback_; firecgi::Server firecgi_server_; + + std::mutex mu_; + std::set streams_; }; } // namespace firesse diff --git a/stream.cc b/stream.cc index 320a271..9ac2a83 100644 --- a/stream.cc +++ b/stream.cc @@ -3,19 +3,18 @@ namespace firesse { Stream::Stream(firecgi::Request* request) - : request_(request) { - request->OnClose([this]() { - if (on_close_) { - on_close_(); - } - }); -} + : request_(request) {} void Stream::OnClose(const std::function& callback) { on_close_ = callback; } bool Stream::WriteEvent(const std::string& data, uint64_t id, const std::string& type) { + { + std::lock_guard l(mu_); + last_message_time_ = std::chrono::steady_clock::now(); + } + return request_->InTransaction([=]() { if (id) { request_->WriteBody("id: ", std::to_string(id), "\n"); @@ -32,4 +31,19 @@ bool Stream::End() { return request_->End(); } +std::chrono::steady_clock::time_point Stream::LastMessageTime() { + std::lock_guard l(mu_); + return last_message_time_; +} + +void Stream::Close() { + if (on_close_) { + on_close_(); + } +} + +bool IsFresherStream::operator() (Stream* a, Stream* b) const { + return a->LastMessageTime() > b->LastMessageTime(); +} + } // namespace firesse diff --git a/stream.h b/stream.h index c35b525..339ea8e 100644 --- a/stream.h +++ b/stream.h @@ -15,10 +15,21 @@ class Stream { bool WriteEvent(const std::string& data, uint64_t id=0, const std::string& type=""); bool End(); + std::chrono::steady_clock::time_point LastMessageTime(); + void Close(); + private: firecgi::Request* request_; std::function on_close_; + + std::mutex mu_; + std::chrono::steady_clock::time_point last_message_time_; +}; + +class IsFresherStream { + public: + bool operator() (Stream* a, Stream* b) const; }; } // namespace firesse