Add keepalive comment messages, per spec

This commit is contained in:
flamingcow
2019-05-12 11:18:59 -07:00
parent b079f48367
commit 9162799be0
9 changed files with 140 additions and 8 deletions

View File

@@ -5,7 +5,7 @@ FIRE_LDLIBS ?= -lgflags -lglog -lpthread
all: firesse.a firesse.o firesse.so example_clock all: firesse.a firesse.o firesse.so example_clock
objects = server.o index.o stream.o objects = server.o keepalive.o index.o stream.o
firecgi/firecgi.o: firecgi/firecgi.o:
$(MAKE) --directory=firecgi firecgi.o $(MAKE) --directory=firecgi firecgi.o

View File

@@ -47,4 +47,48 @@ void Index::Freshen(Stream* stream) {
Add(stream); Add(stream);
} }
std::chrono::nanoseconds Index::WithStalest(std::function<void(Stream*)> callback, const std::chrono::nanoseconds& min_stale) {
Stream* stalest = nullptr;
std::chrono::nanoseconds ret;
const auto now = std::chrono::steady_clock::now();
const auto latest = now - min_stale;
{
std::lock_guard l(mu_);
if (!stalest_) {
return min_stale;
}
if (stalest_->last_message_time_ > latest) {
return stalest_->last_message_time_ - latest;
}
// stalest_ is valid for callback
stalest = stalest_;
if (stalest->fresher_) {
if (stalest->fresher_->last_message_time_ > latest) {
ret = stalest_->fresher_->last_message_time_ - latest;
}
// Otherwise ret is 0 for immediate cycle
} else {
ret = min_stale;
}
if (!stalest->mu_.try_lock()) {
// We're acquiring mutexes in the wrong order here; normally it's
// (Stream, Index), but we're doing (Index, Stream). That means we
// may deadlock. We take the lower-priority path and fail in case
// of deadlock.
return {};
}
Freshen(stalest);
}
callback(stalest);
stalest->mu_.unlock();
return ret;
}
} // namespace firesse } // namespace firesse

10
index.h
View File

@@ -5,12 +5,22 @@
namespace firesse { namespace firesse {
// Track live streams // Track live streams
// O(1) for Add(), Remove(), and Freshen()
// Manages this by only supporting insertion of the freshest element,
// and by requiring an iterator for removal.
// Avoids allocation and folds in iterators by using an intrusive list
// inside Stream.
class Index { class Index {
public: public:
void Add(Stream* stream); void Add(Stream* stream);
void Remove(Stream* stream); void Remove(Stream* stream);
void Freshen(Stream* stream); void Freshen(Stream* stream);
// Returns time to sleep until next stalest, or min_stale if none
// Only calls callback if stalest is at least min_stale
// Handles all locking and marks Stream as fresh after callback
std::chrono::nanoseconds WithStalest(std::function<void(Stream*)> callback, const std::chrono::nanoseconds& min_stale);
private: private:
std::recursive_mutex mu_; std::recursive_mutex mu_;
Stream* freshest_ = nullptr; Stream* freshest_ = nullptr;

42
keepalive.cc Normal file
View File

@@ -0,0 +1,42 @@
#include <poll.h>
#include <sys/eventfd.h>
#include "keepalive.h"
namespace firesse {
KeepAlive::KeepAlive(const std::chrono::nanoseconds& max_stale, Index* index)
: max_stale_(max_stale),
index_(index),
shutdown_(eventfd(0, 0)) {
PCHECK(shutdown_ >= 0) << "eventfd()";
}
void KeepAlive::Start() {
thread_ = std::thread([this]() {
int timeout = 0;
constexpr auto num_fds = 1;
pollfd fds[num_fds] = {
{
.fd = shutdown_,
.events = POLLIN,
},
};
while (poll(fds, num_fds, timeout) <= 0) {
auto sleep = index_->WithStalest([](Stream* stream) {
stream->WriteRaw(":\n");
}, max_stale_);
timeout = std::chrono::duration_cast<std::chrono::milliseconds>(sleep).count();
}
});
}
void KeepAlive::Stop() {
CHECK(thread_.joinable());
uint64_t shutdown = 1;
PCHECK(write(shutdown_, &shutdown, sizeof(shutdown)) == sizeof(shutdown));
thread_.join();
}
} // namespace firesse

23
keepalive.h Normal file
View File

@@ -0,0 +1,23 @@
#pragma once
#include <thread>
#include "index.h"
namespace firesse {
class KeepAlive {
public:
KeepAlive(const std::chrono::nanoseconds& max_stale, Index* index);
void Start();
void Stop();
private:
const std::chrono::nanoseconds max_stale_;
Index* index_;
int shutdown_;
std::thread thread_;
};
} // namespace firesse

View File

@@ -4,12 +4,15 @@ namespace firesse {
Server::Server(int port, const std::function<void(Stream*)>& callback) Server::Server(int port, const std::function<void(Stream*)>& callback)
: callback_(callback), : callback_(callback),
keep_alive_(std::chrono::seconds(15), &index_),
firecgi_server_(port, firecgi_server_(port,
[this](firecgi::Request* request) { OnRequest(request); }, [this](firecgi::Request* request) { OnRequest(request); },
1) {} 1) {}
void Server::Serve() { void Server::Serve() {
keep_alive_.Start();
firecgi_server_.Serve(); firecgi_server_.Serve();
keep_alive_.Stop();
} }
void Server::Shutdown() { void Server::Shutdown() {

View File

@@ -5,6 +5,7 @@
#include "firecgi/server.h" #include "firecgi/server.h"
#include "index.h" #include "index.h"
#include "keepalive.h"
#include "stream.h" #include "stream.h"
namespace firesse { namespace firesse {
@@ -21,6 +22,7 @@ class Server {
std::function<void(Stream*)> callback_; std::function<void(Stream*)> callback_;
Index index_; Index index_;
KeepAlive keep_alive_;
firecgi::Server firecgi_server_; firecgi::Server firecgi_server_;
}; };

View File

@@ -19,11 +19,10 @@ void Stream::OnClose(const std::function<void()>& callback) {
on_close_ = callback; on_close_ = callback;
} }
bool Stream::WriteEvent(const std::string& data, uint64_t id, const std::string& type) { bool Stream::WriteEvent(const std::string_view& data, uint64_t id, const std::string& type) {
{ std::lock_guard l(mu_);
std::lock_guard l(mu_);
index_->Freshen(this); index_->Freshen(this);
}
return request_->InTransaction<bool>([=]() { return request_->InTransaction<bool>([=]() {
if (id) { if (id) {
@@ -37,6 +36,13 @@ bool Stream::WriteEvent(const std::string& data, uint64_t id, const std::string&
}); });
} }
bool Stream::WriteRaw(const std::string_view& data) {
std::lock_guard l(mu_);
request_->WriteBody(data);
return request_->Flush();
}
bool Stream::End() { bool Stream::End() {
return request_->End(); return request_->End();
} }

View File

@@ -15,7 +15,8 @@ class Stream {
void OnClose(const std::function<void()>& callback); void OnClose(const std::function<void()>& callback);
bool WriteEvent(const std::string& data, uint64_t id=0, const std::string& type=""); bool WriteEvent(const std::string_view& data, uint64_t id=0, const std::string& type="");
bool WriteRaw(const std::string_view& data);
bool End(); bool End();
void Close(); void Close();
@@ -25,7 +26,8 @@ class Stream {
std::function<void()> on_close_; std::function<void()> on_close_;
std::mutex mu_; // TODO: What exactly is this protecting?
std::recursive_mutex mu_;
std::chrono::steady_clock::time_point last_message_time_; std::chrono::steady_clock::time_point last_message_time_;
Stream* fresher_; Stream* fresher_;
Stream* staler_; Stream* staler_;