diff --git a/example_clock.cc b/example_clock.cc index bdb480d..06b31f1 100644 --- a/example_clock.cc +++ b/example_clock.cc @@ -1,6 +1,8 @@ +#include #include #include #include +#include #include "server.h" @@ -10,16 +12,46 @@ int main(int argc, char *argv[]) { google::InitGoogleLogging(argv[0]); gflags::ParseCommandLineFlags(&argc, &argv, true); - firesse::Server server(FLAGS_port, [](std::unique_ptr stream) { - while (true) { + std::mutex mu; + std::unordered_set streams; + std::atomic running = true; + + std::thread clock([&streams, &mu, &running]() { + while (running) { + sleep(1); + timeval tv; PCHECK(gettimeofday(&tv, nullptr) == 0); - uint64_t time_ms = tv.tv_sec * 1000 + tv.tv_usec / 1000; - if (!stream->WriteEvent(std::to_string(time_ms), 0, "time")) { - break; + const uint64_t time_ms = tv.tv_sec * 1000 + tv.tv_usec / 1000; + const auto time_str = std::to_string(time_ms); + + { + std::lock_guard l(mu); + for (auto* stream : streams) { + stream->WriteEvent(time_str, 0, "time"); + } } - sleep(1); } }); + + firesse::Server server(FLAGS_port, [&streams, &mu](firesse::Stream* stream) { + LOG(INFO) << "new stream: " << stream; + + std::lock_guard l(mu); + streams.insert(stream); + stream->OnClose([stream, &streams, &mu]() { + LOG(INFO) << "stream closed: " << stream; + + std::lock_guard l(mu); + streams.erase(stream); + }); + }); + server.RegisterSignalHandlers(); server.Serve(); + + running = false; + clock.join(); + + gflags::ShutDownCommandLineFlags(); + google::ShutdownGoogleLogging(); } diff --git a/firecgi b/firecgi index 89876f5..ec99454 160000 --- a/firecgi +++ b/firecgi @@ -1 +1 @@ -Subproject commit 89876f5bd683cc4adddb836188ec5606876c48e7 +Subproject commit ec99454756342da9adc84db90ff9b568cbc8a837 diff --git a/server.cc b/server.cc index 2a0a2fd..c0c4e4e 100644 --- a/server.cc +++ b/server.cc @@ -2,7 +2,7 @@ namespace firesse { -Server::Server(int port, const std::function)>& callback) +Server::Server(int port, const std::function& callback) : callback_(callback), firecgi_server_(port, [this](firecgi::Request* request) { OnRequest(request); }, @@ -12,12 +12,21 @@ void Server::Serve() { firecgi_server_.Serve(); } +void Server::Shutdown() { + firecgi_server_.Shutdown(); +} + +void Server::RegisterSignalHandlers() { + firecgi_server_.RegisterSignalHandlers(); +} + void Server::OnRequest(firecgi::Request* request) { request->WriteHeader("Content-Type", "text/event-stream; charset=utf-8"); request->WriteHeader("Cache-Control", "no-cache"); request->WriteHeader("X-Accel-Buffering", "no"); request->WriteBody(""); - callback_(std::make_unique(request)); + auto stream = new Stream(request); + callback_(stream); } } // namespace firesse diff --git a/server.h b/server.h index 1cd8046..580108c 100644 --- a/server.h +++ b/server.h @@ -1,5 +1,7 @@ #pragma once +#include + #include "firecgi/server.h" #include "stream.h" @@ -7,13 +9,15 @@ namespace firesse { class Server { public: - Server(int port, const std::function)>& callback); + Server(int port, const std::function& callback); void Serve(); + void Shutdown(); + void RegisterSignalHandlers(); private: void OnRequest(firecgi::Request* request); - std::function)> callback_; + std::function callback_; firecgi::Server firecgi_server_; }; diff --git a/stream.cc b/stream.cc index ededd93..320a271 100644 --- a/stream.cc +++ b/stream.cc @@ -3,7 +3,17 @@ namespace firesse { Stream::Stream(firecgi::Request* request) - : request_(request) {} + : request_(request) { + request->OnClose([this]() { + if (on_close_) { + on_close_(); + } + }); +} + +void Stream::OnClose(const std::function& callback) { + on_close_ = callback; +} bool Stream::WriteEvent(const std::string& data, uint64_t id, const std::string& type) { return request_->InTransaction([=]() { diff --git a/stream.h b/stream.h index 452dc0b..c35b525 100644 --- a/stream.h +++ b/stream.h @@ -10,11 +10,15 @@ class Stream { public: Stream(firecgi::Request* request); - [[nodiscard]] bool WriteEvent(const std::string& data, uint64_t id=0, const std::string& type=""); + void OnClose(const std::function& callback); + + bool WriteEvent(const std::string& data, uint64_t id=0, const std::string& type=""); bool End(); private: firecgi::Request* request_; + + std::function on_close_; }; } // namespace firesse