Separate Index class to track streams via intrusive list

This commit is contained in:
flamingcow
2019-05-12 08:45:56 -07:00
parent 79709c5b25
commit d29157c1ac
7 changed files with 84 additions and 29 deletions

View File

@@ -5,7 +5,7 @@ FIRE_LDLIBS ?= -lgflags -lglog -lpthread
all: firesse.a firesse.o firesse.so example_clock all: firesse.a firesse.o firesse.so example_clock
objects = server.o stream.o objects = server.o index.o stream.o
firecgi/firecgi.o: firecgi/firecgi.o:
$(MAKE) --directory=firecgi firecgi.o $(MAKE) --directory=firecgi firecgi.o

39
index.cc Normal file
View File

@@ -0,0 +1,39 @@
#include "index.h"
namespace firesse {
void Index::Add(Stream* stream) {
std::lock_guard l(mu_);
// Implicitly freshest
stream->fresher_ = nullptr;
stream->staler_ = freshest_;
if (stream->staler_) {
stream->staler_->fresher_ = stream;
}
freshest_ = stream;
if (!stalest_) {
stalest_ = stream;
}
}
void Index::Remove(Stream* stream) {
std::lock_guard l(mu_);
if (freshest_ == stream) {
freshest_ = stream->staler_;
}
if (stalest_ == stream) {
stalest_ = stream->fresher_;
}
if (stream->fresher_) {
stream->fresher_->staler_ = stream->staler_;
stream->fresher_ = nullptr;
}
if (stream->staler_) {
stream->staler_->fresher_ = stream->fresher_;
stream->staler_ = nullptr;
}
}
} // namespace firesse

19
index.h Normal file
View File

@@ -0,0 +1,19 @@
#pragma once
#include "stream.h"
namespace firesse {
// Track live streams
class Index {
public:
void Add(Stream* stream);
void Remove(Stream* stream);
private:
std::mutex mu_;
Stream* freshest_ = nullptr;
Stream* stalest_ = nullptr;
};
} // namespace firesse

View File

@@ -25,16 +25,11 @@ void Server::OnRequest(firecgi::Request* request) {
request->WriteHeader("Cache-Control", "no-cache"); request->WriteHeader("Cache-Control", "no-cache");
request->WriteHeader("X-Accel-Buffering", "no"); request->WriteHeader("X-Accel-Buffering", "no");
request->WriteBody(""); request->WriteBody("");
auto stream = new Stream(request); auto stream = new Stream(request, &index_);
{ {
std::lock_guard l(mu_); request->OnClose([stream]() {
streams_.insert(stream);
request->OnClose([this, stream]() {
std::lock_guard l(mu_);
stream->Close(); stream->Close();
streams_.erase(stream);
delete stream; delete stream;
}); });
} }

View File

@@ -3,6 +3,8 @@
#include <set> #include <set>
#include "firecgi/server.h" #include "firecgi/server.h"
#include "index.h"
#include "stream.h" #include "stream.h"
namespace firesse { namespace firesse {
@@ -18,10 +20,8 @@ class Server {
void OnRequest(firecgi::Request* request); void OnRequest(firecgi::Request* request);
std::function<void(Stream*)> callback_; std::function<void(Stream*)> callback_;
Index index_;
firecgi::Server firecgi_server_; firecgi::Server firecgi_server_;
std::mutex mu_;
std::set<Stream*, IsFresherStream> streams_;
}; };
} // namespace firesse } // namespace firesse

View File

@@ -1,9 +1,19 @@
#include "stream.h" #include "stream.h"
#include "index.h"
namespace firesse { namespace firesse {
Stream::Stream(firecgi::Request* request) Stream::Stream(firecgi::Request* request, Index* index)
: request_(request) {} : request_(request),
index_(index) {
index_->Add(this);
}
Stream::~Stream() {
std::lock_guard l(mu_);
index_->Remove(this);
}
void Stream::OnClose(const std::function<void()>& callback) { void Stream::OnClose(const std::function<void()>& callback) {
on_close_ = callback; on_close_ = callback;
@@ -31,19 +41,10 @@ bool Stream::End() {
return request_->End(); return request_->End();
} }
std::chrono::steady_clock::time_point Stream::LastMessageTime() {
std::lock_guard l(mu_);
return last_message_time_;
}
void Stream::Close() { void Stream::Close() {
if (on_close_) { if (on_close_) {
on_close_(); on_close_();
} }
} }
bool IsFresherStream::operator() (Stream* a, Stream* b) const {
return a->LastMessageTime() > b->LastMessageTime();
}
} // namespace firesse } // namespace firesse

View File

@@ -6,30 +6,31 @@
namespace firesse { namespace firesse {
class Index;
class Stream { class Stream {
public: public:
Stream(firecgi::Request* request); Stream(firecgi::Request* request, Index* index);
~Stream();
void OnClose(const std::function<void()>& callback); void OnClose(const std::function<void()>& callback);
bool WriteEvent(const std::string& data, uint64_t id=0, const std::string& type=""); bool WriteEvent(const std::string& data, uint64_t id=0, const std::string& type="");
bool End(); bool End();
std::chrono::steady_clock::time_point LastMessageTime();
void Close(); void Close();
private: private:
firecgi::Request* request_; firecgi::Request* request_;
Index* index_;
std::function<void()> on_close_; std::function<void()> on_close_;
std::mutex mu_; std::mutex mu_;
std::chrono::steady_clock::time_point last_message_time_; std::chrono::steady_clock::time_point last_message_time_;
}; Stream* fresher_;
Stream* staler_;
class IsFresherStream { friend class Index;
public:
bool operator() (Stream* a, Stream* b) const;
}; };
} // namespace firesse } // namespace firesse