From 9162799be046539ee68b3d3fd5bccea63c4b5928 Mon Sep 17 00:00:00 2001 From: flamingcow Date: Sun, 12 May 2019 11:18:59 -0700 Subject: [PATCH] Add keepalive comment messages, per spec --- Makefile | 2 +- index.cc | 44 ++++++++++++++++++++++++++++++++++++++++++++ index.h | 10 ++++++++++ keepalive.cc | 42 ++++++++++++++++++++++++++++++++++++++++++ keepalive.h | 23 +++++++++++++++++++++++ server.cc | 3 +++ server.h | 2 ++ stream.cc | 16 +++++++++++----- stream.h | 6 ++++-- 9 files changed, 140 insertions(+), 8 deletions(-) create mode 100644 keepalive.cc create mode 100644 keepalive.h diff --git a/Makefile b/Makefile index 94f0b28..cec2783 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ FIRE_LDLIBS ?= -lgflags -lglog -lpthread all: firesse.a firesse.o firesse.so example_clock -objects = server.o index.o stream.o +objects = server.o keepalive.o index.o stream.o firecgi/firecgi.o: $(MAKE) --directory=firecgi firecgi.o diff --git a/index.cc b/index.cc index c395c8f..81c27c2 100644 --- a/index.cc +++ b/index.cc @@ -47,4 +47,48 @@ void Index::Freshen(Stream* stream) { Add(stream); } +std::chrono::nanoseconds Index::WithStalest(std::function callback, const std::chrono::nanoseconds& min_stale) { + Stream* stalest = nullptr; + std::chrono::nanoseconds ret; + const auto now = std::chrono::steady_clock::now(); + const auto latest = now - min_stale; + + { + std::lock_guard l(mu_); + if (!stalest_) { + return min_stale; + } + if (stalest_->last_message_time_ > latest) { + return stalest_->last_message_time_ - latest; + } + + // stalest_ is valid for callback + stalest = stalest_; + + if (stalest->fresher_) { + if (stalest->fresher_->last_message_time_ > latest) { + ret = stalest_->fresher_->last_message_time_ - latest; + } + // Otherwise ret is 0 for immediate cycle + } else { + ret = min_stale; + } + + if (!stalest->mu_.try_lock()) { + // We're acquiring mutexes in the wrong order here; normally it's + // (Stream, Index), but we're doing (Index, Stream). That means we + // may deadlock. We take the lower-priority path and fail in case + // of deadlock. + return {}; + } + + Freshen(stalest); + } + + callback(stalest); + stalest->mu_.unlock(); + + return ret; +} + } // namespace firesse diff --git a/index.h b/index.h index ca2e326..0c6c28f 100644 --- a/index.h +++ b/index.h @@ -5,12 +5,22 @@ namespace firesse { // Track live streams +// O(1) for Add(), Remove(), and Freshen() +// Manages this by only supporting insertion of the freshest element, +// and by requiring an iterator for removal. +// Avoids allocation and folds in iterators by using an intrusive list +// inside Stream. class Index { public: void Add(Stream* stream); void Remove(Stream* stream); void Freshen(Stream* stream); + // Returns time to sleep until next stalest, or min_stale if none + // Only calls callback if stalest is at least min_stale + // Handles all locking and marks Stream as fresh after callback + std::chrono::nanoseconds WithStalest(std::function callback, const std::chrono::nanoseconds& min_stale); + private: std::recursive_mutex mu_; Stream* freshest_ = nullptr; diff --git a/keepalive.cc b/keepalive.cc new file mode 100644 index 0000000..facbd25 --- /dev/null +++ b/keepalive.cc @@ -0,0 +1,42 @@ +#include +#include + +#include "keepalive.h" + +namespace firesse { + +KeepAlive::KeepAlive(const std::chrono::nanoseconds& max_stale, Index* index) + : max_stale_(max_stale), + index_(index), + shutdown_(eventfd(0, 0)) { + PCHECK(shutdown_ >= 0) << "eventfd()"; +} + +void KeepAlive::Start() { + thread_ = std::thread([this]() { + int timeout = 0; + constexpr auto num_fds = 1; + pollfd fds[num_fds] = { + { + .fd = shutdown_, + .events = POLLIN, + }, + }; + while (poll(fds, num_fds, timeout) <= 0) { + auto sleep = index_->WithStalest([](Stream* stream) { + stream->WriteRaw(":\n"); + }, max_stale_); + timeout = std::chrono::duration_cast(sleep).count(); + } + }); +} + +void KeepAlive::Stop() { + CHECK(thread_.joinable()); + + uint64_t shutdown = 1; + PCHECK(write(shutdown_, &shutdown, sizeof(shutdown)) == sizeof(shutdown)); + thread_.join(); +} + +} // namespace firesse diff --git a/keepalive.h b/keepalive.h new file mode 100644 index 0000000..94eea8a --- /dev/null +++ b/keepalive.h @@ -0,0 +1,23 @@ +#pragma once + +#include + +#include "index.h" + +namespace firesse { + +class KeepAlive { + public: + KeepAlive(const std::chrono::nanoseconds& max_stale, Index* index); + void Start(); + void Stop(); + + private: + const std::chrono::nanoseconds max_stale_; + Index* index_; + int shutdown_; + + std::thread thread_; +}; + +} // namespace firesse diff --git a/server.cc b/server.cc index fdb896f..1ee7521 100644 --- a/server.cc +++ b/server.cc @@ -4,12 +4,15 @@ namespace firesse { Server::Server(int port, const std::function& callback) : callback_(callback), + keep_alive_(std::chrono::seconds(15), &index_), firecgi_server_(port, [this](firecgi::Request* request) { OnRequest(request); }, 1) {} void Server::Serve() { + keep_alive_.Start(); firecgi_server_.Serve(); + keep_alive_.Stop(); } void Server::Shutdown() { diff --git a/server.h b/server.h index 1c97fce..ee3b938 100644 --- a/server.h +++ b/server.h @@ -5,6 +5,7 @@ #include "firecgi/server.h" #include "index.h" +#include "keepalive.h" #include "stream.h" namespace firesse { @@ -21,6 +22,7 @@ class Server { std::function callback_; Index index_; + KeepAlive keep_alive_; firecgi::Server firecgi_server_; }; diff --git a/stream.cc b/stream.cc index 21d18d0..19e4e37 100644 --- a/stream.cc +++ b/stream.cc @@ -19,11 +19,10 @@ void Stream::OnClose(const std::function& callback) { on_close_ = callback; } -bool Stream::WriteEvent(const std::string& data, uint64_t id, const std::string& type) { - { - std::lock_guard l(mu_); - index_->Freshen(this); - } +bool Stream::WriteEvent(const std::string_view& data, uint64_t id, const std::string& type) { + std::lock_guard l(mu_); + + index_->Freshen(this); return request_->InTransaction([=]() { if (id) { @@ -37,6 +36,13 @@ bool Stream::WriteEvent(const std::string& data, uint64_t id, const std::string& }); } +bool Stream::WriteRaw(const std::string_view& data) { + std::lock_guard l(mu_); + + request_->WriteBody(data); + return request_->Flush(); +} + bool Stream::End() { return request_->End(); } diff --git a/stream.h b/stream.h index 1e09bb4..b113562 100644 --- a/stream.h +++ b/stream.h @@ -15,7 +15,8 @@ class Stream { void OnClose(const std::function& callback); - bool WriteEvent(const std::string& data, uint64_t id=0, const std::string& type=""); + bool WriteEvent(const std::string_view& data, uint64_t id=0, const std::string& type=""); + bool WriteRaw(const std::string_view& data); bool End(); void Close(); @@ -25,7 +26,8 @@ class Stream { std::function on_close_; - std::mutex mu_; + // TODO: What exactly is this protecting? + std::recursive_mutex mu_; std::chrono::steady_clock::time_point last_message_time_; Stream* fresher_; Stream* staler_;