Add close handling, move clock example to split threads

This commit is contained in:
flamingcow
2019-05-11 21:53:12 -07:00
parent c5f5d9f0cf
commit 1f84aa25b6
6 changed files with 72 additions and 13 deletions

View File

@@ -1,6 +1,8 @@
#include <atomic>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <sys/time.h>
#include <thread>
#include "server.h"
@@ -10,16 +12,46 @@ int main(int argc, char *argv[]) {
google::InitGoogleLogging(argv[0]);
gflags::ParseCommandLineFlags(&argc, &argv, true);
firesse::Server server(FLAGS_port, [](std::unique_ptr<firesse::Stream> stream) {
while (true) {
std::mutex mu;
std::unordered_set<firesse::Stream*> streams;
std::atomic<bool> running = true;
std::thread clock([&streams, &mu, &running]() {
while (running) {
sleep(1);
timeval tv;
PCHECK(gettimeofday(&tv, nullptr) == 0);
uint64_t time_ms = tv.tv_sec * 1000 + tv.tv_usec / 1000;
if (!stream->WriteEvent(std::to_string(time_ms), 0, "time")) {
break;
const uint64_t time_ms = tv.tv_sec * 1000 + tv.tv_usec / 1000;
const auto time_str = std::to_string(time_ms);
{
std::lock_guard l(mu);
for (auto* stream : streams) {
stream->WriteEvent(time_str, 0, "time");
}
}
sleep(1);
}
});
firesse::Server server(FLAGS_port, [&streams, &mu](firesse::Stream* stream) {
LOG(INFO) << "new stream: " << stream;
std::lock_guard l(mu);
streams.insert(stream);
stream->OnClose([stream, &streams, &mu]() {
LOG(INFO) << "stream closed: " << stream;
std::lock_guard l(mu);
streams.erase(stream);
});
});
server.RegisterSignalHandlers();
server.Serve();
running = false;
clock.join();
gflags::ShutDownCommandLineFlags();
google::ShutdownGoogleLogging();
}

Submodule firecgi updated: 89876f5bd6...ec99454756

View File

@@ -2,7 +2,7 @@
namespace firesse {
Server::Server(int port, const std::function<void(std::unique_ptr<Stream>)>& callback)
Server::Server(int port, const std::function<void(Stream*)>& callback)
: callback_(callback),
firecgi_server_(port,
[this](firecgi::Request* request) { OnRequest(request); },
@@ -12,12 +12,21 @@ void Server::Serve() {
firecgi_server_.Serve();
}
void Server::Shutdown() {
firecgi_server_.Shutdown();
}
void Server::RegisterSignalHandlers() {
firecgi_server_.RegisterSignalHandlers();
}
void Server::OnRequest(firecgi::Request* request) {
request->WriteHeader("Content-Type", "text/event-stream; charset=utf-8");
request->WriteHeader("Cache-Control", "no-cache");
request->WriteHeader("X-Accel-Buffering", "no");
request->WriteBody("");
callback_(std::make_unique<Stream>(request));
auto stream = new Stream(request);
callback_(stream);
}
} // namespace firesse

View File

@@ -1,5 +1,7 @@
#pragma once
#include <queue>
#include "firecgi/server.h"
#include "stream.h"
@@ -7,13 +9,15 @@ namespace firesse {
class Server {
public:
Server(int port, const std::function<void(std::unique_ptr<Stream>)>& callback);
Server(int port, const std::function<void(Stream*)>& callback);
void Serve();
void Shutdown();
void RegisterSignalHandlers();
private:
void OnRequest(firecgi::Request* request);
std::function<void(std::unique_ptr<Stream>)> callback_;
std::function<void(Stream*)> callback_;
firecgi::Server firecgi_server_;
};

View File

@@ -3,7 +3,17 @@
namespace firesse {
Stream::Stream(firecgi::Request* request)
: request_(request) {}
: request_(request) {
request->OnClose([this]() {
if (on_close_) {
on_close_();
}
});
}
void Stream::OnClose(const std::function<void()>& callback) {
on_close_ = callback;
}
bool Stream::WriteEvent(const std::string& data, uint64_t id, const std::string& type) {
return request_->InTransaction<bool>([=]() {

View File

@@ -10,11 +10,15 @@ class Stream {
public:
Stream(firecgi::Request* request);
[[nodiscard]] bool WriteEvent(const std::string& data, uint64_t id=0, const std::string& type="");
void OnClose(const std::function<void()>& callback);
bool WriteEvent(const std::string& data, uint64_t id=0, const std::string& type="");
bool End();
private:
firecgi::Request* request_;
std::function<void()> on_close_;
};
} // namespace firesse