Working clock stream. Propagate up write errors

This commit is contained in:
Ian Gulliver
2019-04-29 00:11:07 +00:00
parent 4d4525b145
commit 205b8253b2
11 changed files with 92 additions and 13 deletions

View File

@@ -1,6 +1,6 @@
all: example_simple example_clock
objects = sse.o fastcgi.o fastcgi_conn.o fastcgi_request.o fastcgi_parse.o stream_buffer.o buffer.o
objects = sse.o sse_stream.o fastcgi.o fastcgi_conn.o fastcgi_request.o fastcgi_parse.o stream_buffer.o buffer.o
example_simple: example_simple.o $(objects) Makefile
clang++ -std=gnu++2a -o example_simple example_simple.o $(objects) -lgflags -lglog -lpthread

View File

@@ -1,5 +1,6 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <sys/time.h>
#include "sse.h"
@@ -10,6 +11,15 @@ int main(int argc, char *argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
SSEServer server(FLAGS_port, [](std::unique_ptr<SSEStream> stream) {
while (true) {
timeval tv;
PCHECK(gettimeofday(&tv, nullptr) == 0);
uint64_t time_ms = tv.tv_sec * 1000 + tv.tv_usec / 1000;
if (!stream->WriteEvent(std::to_string(time_ms), 0, "time")) {
break;
}
sleep(1);
}
});
server.Serve();
}

View File

@@ -1,5 +1,6 @@
#include <arpa/inet.h>
#include <netinet/in.h>
#include <signal.h>
#include <sys/socket.h>
#include <thread>
@@ -10,6 +11,8 @@ FastCGIServer::FastCGIServer(int port, const std::function<void(std::unique_ptr<
: callback_(callback) {
LOG(INFO) << "listening on [::1]:" << port;
signal(SIGPIPE, SIG_IGN);
listen_sock_ = socket(AF_INET6, SOCK_STREAM, 0);
PCHECK(listen_sock_ >= 0) << "socket()";

View File

@@ -22,12 +22,12 @@ FastCGIConn::~FastCGIConn() {
LOG(INFO) << "connection closed (handled " << requests_ << " requests)";
}
void FastCGIConn::Write(const std::vector<iovec>& vecs) {
bool FastCGIConn::Write(const std::vector<iovec>& vecs) {
size_t total_size = 0;
for (const auto& vec : vecs) {
total_size += vec.iov_len;
}
CHECK_EQ(writev(sock_, vecs.data(), vecs.size()), total_size);
return writev(sock_, vecs.data(), vecs.size()) == total_size;
}
void FastCGIConn::Serve() {

View File

@@ -15,7 +15,7 @@ class FastCGIConn {
void Serve();
void Write(const std::vector<iovec>& vecs);
[[nodiscard]] bool Write(const std::vector<iovec>& vecs);
private:
const int sock_;

View File

@@ -33,7 +33,12 @@ void FastCGIRequest::AddIn(const std::string_view& in) {
}
const std::string& FastCGIRequest::GetParam(const std::string& key) {
return params_.at(key);
auto iter = params_.find(key);
if (iter == params_.end()) {
static const std::string none;
return none;
}
return iter->second;
}
void FastCGIRequest::WriteHeader(const std::string_view& name, const std::string_view& value) {
@@ -53,7 +58,7 @@ void FastCGIRequest::WriteBody(const std::string_view& body) {
CHECK(out_buf_.Write(body));
}
void FastCGIRequest::Flush() {
bool FastCGIRequest::Flush() {
std::vector<iovec> vecs;
auto header = OutputHeader();
@@ -61,11 +66,14 @@ void FastCGIRequest::Flush() {
vecs.push_back(OutputVec());
conn_->Write(vecs);
if (!conn_->Write(vecs)) {
return false;
}
out_buf_.Commit();
return true;
}
void FastCGIRequest::End() {
bool FastCGIRequest::End() {
// Fully empty response not allowed
WriteBody("");
@@ -83,7 +91,7 @@ void FastCGIRequest::End() {
AppendVec(end_header, &vecs);
AppendVec(end, &vecs);
conn_->Write(vecs);
return conn_->Write(vecs);
}
iovec FastCGIRequest::OutputVec() {

View File

@@ -20,8 +20,8 @@ class FastCGIRequest {
void WriteHeader(const std::string_view& name, const std::string_view& value);
void WriteBody(const std::string_view& body);
void Flush();
void End();
[[nodiscard]] bool Flush();
bool End();
private:
FastCGIHeader OutputHeader();

17
sse.cc
View File

@@ -1,8 +1,23 @@
#include "sse.h"
SSEServer::SSEServer(int port, const std::function<void(std::unique_ptr<SSEStream>)>& callback)
: fastcgi_server_(port, [](std::unique_ptr<FastCGIRequest> request) {}) {}
: callback_(callback),
fastcgi_server_(port, [this](std::unique_ptr<FastCGIRequest> request) { OnRequest(std::move(request)); }) {}
void SSEServer::Serve() {
fastcgi_server_.Serve();
}
void SSEServer::OnRequest(std::unique_ptr<FastCGIRequest> request) {
if (request->GetParam("HTTP_ACCEPT") != "text/event-stream") {
LOG(WARNING) << "bad HTTP_ACCEPT: " << request->GetParam("HTTP_ACCEPT");
request->WriteHeader("Status", "400 Bad Request");
request->WriteHeader("Content-Type", "text-plain");
request->WriteBody("No \"Accept: text/event-stream\" header found in request. Please call this endpoint using EventSource.");
request->End();
return;
}
request->WriteHeader("Content-Type", "text/event-stream");
request->WriteBody("");
callback_(std::make_unique<SSEStream>(std::move(request)));
}

5
sse.h
View File

@@ -9,5 +9,8 @@ class SSEServer {
void Serve();
private:
FastCGIServer fastcgi_server_;
void OnRequest(std::unique_ptr<FastCGIRequest> request);
std::function<void(std::unique_ptr<SSEStream>)> callback_;
FastCGIServer fastcgi_server_;
};

28
sse_stream.cc Normal file
View File

@@ -0,0 +1,28 @@
#include "sse_stream.h"
#include "fastcgi_request.h"
SSEStream::SSEStream(std::unique_ptr<FastCGIRequest> request)
: request_(std::move(request)) {}
bool SSEStream::WriteEvent(const std::string& data, uint64_t id, const std::string& type) {
if (id) {
request_->WriteBody("id: ");
request_->WriteBody(std::to_string(id));
request_->WriteBody("\n");
}
if (!type.empty()) {
request_->WriteBody("event: ");
request_->WriteBody(type);
request_->WriteBody("\n");
}
request_->WriteBody("data: ");
request_->WriteBody(data);
request_->WriteBody("\n\n");
return request_->Flush();
}
bool SSEStream::End() {
return request_->End();
}

View File

@@ -1,4 +1,16 @@
#pragma once
#include <memory>
class FastCGIRequest;
class SSEStream {
public:
SSEStream(std::unique_ptr<FastCGIRequest> request);
[[nodiscard]] bool WriteEvent(const std::string& data, uint64_t id=0, const std::string& type="");
bool End();
private:
std::unique_ptr<FastCGIRequest> request_;
};