diff --git a/Makefile b/Makefile index 5964e76..94f0b28 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ FIRE_LDLIBS ?= -lgflags -lglog -lpthread all: firesse.a firesse.o firesse.so example_clock -objects = server.o stream.o +objects = server.o index.o stream.o firecgi/firecgi.o: $(MAKE) --directory=firecgi firecgi.o diff --git a/index.cc b/index.cc new file mode 100644 index 0000000..c4afa28 --- /dev/null +++ b/index.cc @@ -0,0 +1,39 @@ +#include "index.h" + +namespace firesse { + +void Index::Add(Stream* stream) { + std::lock_guard l(mu_); + + // Implicitly freshest + stream->fresher_ = nullptr; + stream->staler_ = freshest_; + if (stream->staler_) { + stream->staler_->fresher_ = stream; + } + freshest_ = stream; + if (!stalest_) { + stalest_ = stream; + } +} + +void Index::Remove(Stream* stream) { + std::lock_guard l(mu_); + + if (freshest_ == stream) { + freshest_ = stream->staler_; + } + if (stalest_ == stream) { + stalest_ = stream->fresher_; + } + if (stream->fresher_) { + stream->fresher_->staler_ = stream->staler_; + stream->fresher_ = nullptr; + } + if (stream->staler_) { + stream->staler_->fresher_ = stream->fresher_; + stream->staler_ = nullptr; + } +} + +} // namespace firesse diff --git a/index.h b/index.h new file mode 100644 index 0000000..64295b4 --- /dev/null +++ b/index.h @@ -0,0 +1,19 @@ +#pragma once + +#include "stream.h" + +namespace firesse { + +// Track live streams +class Index { + public: + void Add(Stream* stream); + void Remove(Stream* stream); + + private: + std::mutex mu_; + Stream* freshest_ = nullptr; + Stream* stalest_ = nullptr; +}; + +} // namespace firesse diff --git a/server.cc b/server.cc index dd9a81a..fdb896f 100644 --- a/server.cc +++ b/server.cc @@ -25,16 +25,11 @@ void Server::OnRequest(firecgi::Request* request) { request->WriteHeader("Cache-Control", "no-cache"); request->WriteHeader("X-Accel-Buffering", "no"); request->WriteBody(""); - auto stream = new Stream(request); + auto stream = new Stream(request, &index_); { - std::lock_guard l(mu_); - streams_.insert(stream); - - request->OnClose([this, stream]() { - std::lock_guard l(mu_); + request->OnClose([stream]() { stream->Close(); - streams_.erase(stream); delete stream; }); } diff --git a/server.h b/server.h index 662784f..1c97fce 100644 --- a/server.h +++ b/server.h @@ -3,6 +3,8 @@ #include #include "firecgi/server.h" + +#include "index.h" #include "stream.h" namespace firesse { @@ -18,10 +20,8 @@ class Server { void OnRequest(firecgi::Request* request); std::function callback_; + Index index_; firecgi::Server firecgi_server_; - - std::mutex mu_; - std::set streams_; }; } // namespace firesse diff --git a/stream.cc b/stream.cc index 9ac2a83..84e3721 100644 --- a/stream.cc +++ b/stream.cc @@ -1,9 +1,19 @@ #include "stream.h" +#include "index.h" + namespace firesse { -Stream::Stream(firecgi::Request* request) - : request_(request) {} +Stream::Stream(firecgi::Request* request, Index* index) + : request_(request), + index_(index) { + index_->Add(this); +} + +Stream::~Stream() { + std::lock_guard l(mu_); + index_->Remove(this); +} void Stream::OnClose(const std::function& callback) { on_close_ = callback; @@ -31,19 +41,10 @@ bool Stream::End() { return request_->End(); } -std::chrono::steady_clock::time_point Stream::LastMessageTime() { - std::lock_guard l(mu_); - return last_message_time_; -} - void Stream::Close() { if (on_close_) { on_close_(); } } -bool IsFresherStream::operator() (Stream* a, Stream* b) const { - return a->LastMessageTime() > b->LastMessageTime(); -} - } // namespace firesse diff --git a/stream.h b/stream.h index 339ea8e..1e09bb4 100644 --- a/stream.h +++ b/stream.h @@ -6,30 +6,31 @@ namespace firesse { +class Index; + class Stream { public: - Stream(firecgi::Request* request); + Stream(firecgi::Request* request, Index* index); + ~Stream(); void OnClose(const std::function& callback); bool WriteEvent(const std::string& data, uint64_t id=0, const std::string& type=""); bool End(); - - std::chrono::steady_clock::time_point LastMessageTime(); void Close(); private: firecgi::Request* request_; + Index* index_; std::function on_close_; std::mutex mu_; std::chrono::steady_clock::time_point last_message_time_; -}; + Stream* fresher_; + Stream* staler_; -class IsFresherStream { - public: - bool operator() (Stream* a, Stream* b) const; + friend class Index; }; } // namespace firesse