Track streams

This commit is contained in:
flamingcow
2019-05-11 22:27:33 -07:00
parent 1f84aa25b6
commit 79709c5b25
5 changed files with 50 additions and 9 deletions

Submodule firecgi updated: ec99454756...0ba446bacb

View File

@@ -26,6 +26,19 @@ void Server::OnRequest(firecgi::Request* request) {
request->WriteHeader("X-Accel-Buffering", "no"); request->WriteHeader("X-Accel-Buffering", "no");
request->WriteBody(""); request->WriteBody("");
auto stream = new Stream(request); auto stream = new Stream(request);
{
std::lock_guard l(mu_);
streams_.insert(stream);
request->OnClose([this, stream]() {
std::lock_guard l(mu_);
stream->Close();
streams_.erase(stream);
delete stream;
});
}
callback_(stream); callback_(stream);
} }

View File

@@ -1,6 +1,6 @@
#pragma once #pragma once
#include <queue> #include <set>
#include "firecgi/server.h" #include "firecgi/server.h"
#include "stream.h" #include "stream.h"
@@ -19,6 +19,9 @@ class Server {
std::function<void(Stream*)> callback_; std::function<void(Stream*)> callback_;
firecgi::Server firecgi_server_; firecgi::Server firecgi_server_;
std::mutex mu_;
std::set<Stream*, IsFresherStream> streams_;
}; };
} // namespace firesse } // namespace firesse

View File

@@ -3,19 +3,18 @@
namespace firesse { namespace firesse {
Stream::Stream(firecgi::Request* request) Stream::Stream(firecgi::Request* request)
: request_(request) { : request_(request) {}
request->OnClose([this]() {
if (on_close_) {
on_close_();
}
});
}
void Stream::OnClose(const std::function<void()>& callback) { void Stream::OnClose(const std::function<void()>& callback) {
on_close_ = callback; on_close_ = callback;
} }
bool Stream::WriteEvent(const std::string& data, uint64_t id, const std::string& type) { bool Stream::WriteEvent(const std::string& data, uint64_t id, const std::string& type) {
{
std::lock_guard l(mu_);
last_message_time_ = std::chrono::steady_clock::now();
}
return request_->InTransaction<bool>([=]() { return request_->InTransaction<bool>([=]() {
if (id) { if (id) {
request_->WriteBody("id: ", std::to_string(id), "\n"); request_->WriteBody("id: ", std::to_string(id), "\n");
@@ -32,4 +31,19 @@ bool Stream::End() {
return request_->End(); return request_->End();
} }
std::chrono::steady_clock::time_point Stream::LastMessageTime() {
std::lock_guard l(mu_);
return last_message_time_;
}
void Stream::Close() {
if (on_close_) {
on_close_();
}
}
bool IsFresherStream::operator() (Stream* a, Stream* b) const {
return a->LastMessageTime() > b->LastMessageTime();
}
} // namespace firesse } // namespace firesse

View File

@@ -15,10 +15,21 @@ class Stream {
bool WriteEvent(const std::string& data, uint64_t id=0, const std::string& type=""); bool WriteEvent(const std::string& data, uint64_t id=0, const std::string& type="");
bool End(); bool End();
std::chrono::steady_clock::time_point LastMessageTime();
void Close();
private: private:
firecgi::Request* request_; firecgi::Request* request_;
std::function<void()> on_close_; std::function<void()> on_close_;
std::mutex mu_;
std::chrono::steady_clock::time_point last_message_time_;
};
class IsFresherStream {
public:
bool operator() (Stream* a, Stream* b) const;
}; };
} // namespace firesse } // namespace firesse