From 205b8253b2c807ac134352e7216bef1b51dc15d9 Mon Sep 17 00:00:00 2001 From: Ian Gulliver Date: Mon, 29 Apr 2019 00:11:07 +0000 Subject: [PATCH] Working clock stream. Propagate up write errors --- Makefile | 2 +- example_clock.cc | 10 ++++++++++ fastcgi.cc | 3 +++ fastcgi_conn.cc | 4 ++-- fastcgi_conn.h | 2 +- fastcgi_request.cc | 18 +++++++++++++----- fastcgi_request.h | 4 ++-- sse.cc | 17 ++++++++++++++++- sse.h | 5 ++++- sse_stream.cc | 28 ++++++++++++++++++++++++++++ sse_stream.h | 12 ++++++++++++ 11 files changed, 92 insertions(+), 13 deletions(-) create mode 100644 sse_stream.cc diff --git a/Makefile b/Makefile index e427bc6..fb93c94 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ all: example_simple example_clock -objects = sse.o fastcgi.o fastcgi_conn.o fastcgi_request.o fastcgi_parse.o stream_buffer.o buffer.o +objects = sse.o sse_stream.o fastcgi.o fastcgi_conn.o fastcgi_request.o fastcgi_parse.o stream_buffer.o buffer.o example_simple: example_simple.o $(objects) Makefile clang++ -std=gnu++2a -o example_simple example_simple.o $(objects) -lgflags -lglog -lpthread diff --git a/example_clock.cc b/example_clock.cc index f69adc5..4bfc00a 100644 --- a/example_clock.cc +++ b/example_clock.cc @@ -1,5 +1,6 @@ #include #include +#include #include "sse.h" @@ -10,6 +11,15 @@ int main(int argc, char *argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); SSEServer server(FLAGS_port, [](std::unique_ptr stream) { + while (true) { + timeval tv; + PCHECK(gettimeofday(&tv, nullptr) == 0); + uint64_t time_ms = tv.tv_sec * 1000 + tv.tv_usec / 1000; + if (!stream->WriteEvent(std::to_string(time_ms), 0, "time")) { + break; + } + sleep(1); + } }); server.Serve(); } diff --git a/fastcgi.cc b/fastcgi.cc index c8392bc..f5f27de 100644 --- a/fastcgi.cc +++ b/fastcgi.cc @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -10,6 +11,8 @@ FastCGIServer::FastCGIServer(int port, const std::function= 0) << "socket()"; diff --git a/fastcgi_conn.cc b/fastcgi_conn.cc index 90c7b66..5e13388 100644 --- a/fastcgi_conn.cc +++ b/fastcgi_conn.cc @@ -22,12 +22,12 @@ FastCGIConn::~FastCGIConn() { LOG(INFO) << "connection closed (handled " << requests_ << " requests)"; } -void FastCGIConn::Write(const std::vector& vecs) { +bool FastCGIConn::Write(const std::vector& vecs) { size_t total_size = 0; for (const auto& vec : vecs) { total_size += vec.iov_len; } - CHECK_EQ(writev(sock_, vecs.data(), vecs.size()), total_size); + return writev(sock_, vecs.data(), vecs.size()) == total_size; } void FastCGIConn::Serve() { diff --git a/fastcgi_conn.h b/fastcgi_conn.h index 127a1d1..3b7069a 100644 --- a/fastcgi_conn.h +++ b/fastcgi_conn.h @@ -15,7 +15,7 @@ class FastCGIConn { void Serve(); - void Write(const std::vector& vecs); + [[nodiscard]] bool Write(const std::vector& vecs); private: const int sock_; diff --git a/fastcgi_request.cc b/fastcgi_request.cc index 35abaff..5dad645 100644 --- a/fastcgi_request.cc +++ b/fastcgi_request.cc @@ -33,7 +33,12 @@ void FastCGIRequest::AddIn(const std::string_view& in) { } const std::string& FastCGIRequest::GetParam(const std::string& key) { - return params_.at(key); + auto iter = params_.find(key); + if (iter == params_.end()) { + static const std::string none; + return none; + } + return iter->second; } void FastCGIRequest::WriteHeader(const std::string_view& name, const std::string_view& value) { @@ -53,7 +58,7 @@ void FastCGIRequest::WriteBody(const std::string_view& body) { CHECK(out_buf_.Write(body)); } -void FastCGIRequest::Flush() { +bool FastCGIRequest::Flush() { std::vector vecs; auto header = OutputHeader(); @@ -61,11 +66,14 @@ void FastCGIRequest::Flush() { vecs.push_back(OutputVec()); - conn_->Write(vecs); + if (!conn_->Write(vecs)) { + return false; + } out_buf_.Commit(); + return true; } -void FastCGIRequest::End() { +bool FastCGIRequest::End() { // Fully empty response not allowed WriteBody(""); @@ -83,7 +91,7 @@ void FastCGIRequest::End() { AppendVec(end_header, &vecs); AppendVec(end, &vecs); - conn_->Write(vecs); + return conn_->Write(vecs); } iovec FastCGIRequest::OutputVec() { diff --git a/fastcgi_request.h b/fastcgi_request.h index 40fbb56..b1ee3f6 100644 --- a/fastcgi_request.h +++ b/fastcgi_request.h @@ -20,8 +20,8 @@ class FastCGIRequest { void WriteHeader(const std::string_view& name, const std::string_view& value); void WriteBody(const std::string_view& body); - void Flush(); - void End(); + [[nodiscard]] bool Flush(); + bool End(); private: FastCGIHeader OutputHeader(); diff --git a/sse.cc b/sse.cc index 8be369a..28db3fc 100644 --- a/sse.cc +++ b/sse.cc @@ -1,8 +1,23 @@ #include "sse.h" SSEServer::SSEServer(int port, const std::function)>& callback) - : fastcgi_server_(port, [](std::unique_ptr request) {}) {} + : callback_(callback), + fastcgi_server_(port, [this](std::unique_ptr request) { OnRequest(std::move(request)); }) {} void SSEServer::Serve() { fastcgi_server_.Serve(); } + +void SSEServer::OnRequest(std::unique_ptr request) { + if (request->GetParam("HTTP_ACCEPT") != "text/event-stream") { + LOG(WARNING) << "bad HTTP_ACCEPT: " << request->GetParam("HTTP_ACCEPT"); + request->WriteHeader("Status", "400 Bad Request"); + request->WriteHeader("Content-Type", "text-plain"); + request->WriteBody("No \"Accept: text/event-stream\" header found in request. Please call this endpoint using EventSource."); + request->End(); + return; + } + request->WriteHeader("Content-Type", "text/event-stream"); + request->WriteBody(""); + callback_(std::make_unique(std::move(request))); +} diff --git a/sse.h b/sse.h index 7146bd9..e3d51ff 100644 --- a/sse.h +++ b/sse.h @@ -9,5 +9,8 @@ class SSEServer { void Serve(); private: - FastCGIServer fastcgi_server_; + void OnRequest(std::unique_ptr request); + + std::function)> callback_; + FastCGIServer fastcgi_server_; }; diff --git a/sse_stream.cc b/sse_stream.cc new file mode 100644 index 0000000..a5a4500 --- /dev/null +++ b/sse_stream.cc @@ -0,0 +1,28 @@ +#include "sse_stream.h" + +#include "fastcgi_request.h" + +SSEStream::SSEStream(std::unique_ptr request) + : request_(std::move(request)) {} + +bool SSEStream::WriteEvent(const std::string& data, uint64_t id, const std::string& type) { + if (id) { + request_->WriteBody("id: "); + request_->WriteBody(std::to_string(id)); + request_->WriteBody("\n"); + } + if (!type.empty()) { + request_->WriteBody("event: "); + request_->WriteBody(type); + request_->WriteBody("\n"); + } + request_->WriteBody("data: "); + request_->WriteBody(data); + request_->WriteBody("\n\n"); + + return request_->Flush(); +} + +bool SSEStream::End() { + return request_->End(); +} diff --git a/sse_stream.h b/sse_stream.h index 11a3289..c8b5fb0 100644 --- a/sse_stream.h +++ b/sse_stream.h @@ -1,4 +1,16 @@ #pragma once +#include + +class FastCGIRequest; + class SSEStream { + public: + SSEStream(std::unique_ptr request); + + [[nodiscard]] bool WriteEvent(const std::string& data, uint64_t id=0, const std::string& type=""); + bool End(); + + private: + std::unique_ptr request_; };