diff --git a/connection.cc b/connection.cc new file mode 100644 index 0000000..d77418b --- /dev/null +++ b/connection.cc @@ -0,0 +1,147 @@ +#include +#include +#include + +#include "connection.h" + +#include "parse.h" +#include "request.h" + +namespace firecgi { + +Connection::Connection(int sock, const sockaddr_in6& client_addr, const std::function)>& callback, const std::unordered_set& headers) + : sock_(sock), + callback_(callback), + headers_(headers), + buf_(sock, fastcgi_max_record_len) { + char client_addr_str[INET6_ADDRSTRLEN]; + PCHECK(inet_ntop(AF_INET6, &client_addr.sin6_addr, client_addr_str, sizeof(client_addr_str))); + + LOG(INFO) << "new connection: [" << client_addr_str << "]:" << ntohs(client_addr.sin6_port); +} + +Connection::~Connection() { + PCHECK(close(sock_) == 0); + LOG(INFO) << "connection closed (handled " << requests_ << " requests)"; +} + +bool Connection::Write(const std::vector& vecs) { + ssize_t total_size = 0; + for (const auto& vec : vecs) { + total_size += vec.iov_len; + } + return writev(sock_, vecs.data(), vecs.size()) == total_size; +} + +int Connection::Read() { + if (!buf_.Refill()) { + return sock_; + } + + while (true) { + buf_.ResetRead(); + + const auto *header = buf_.ReadObj
(); + if (!header) { + break; + } + + if (header->version != 1) { + LOG(ERROR) << "invalid FastCGI protocol version: " << header->version; + return sock_; + } + + if (buf_.ReadMaxLen() < header->ContentLength()) { + break; + } + + switch (header->type) { + case 1: + { + if (header->ContentLength() != sizeof(BeginRequest)) { + LOG(ERROR) << "FCGI_BeginRequestBody is the wrong length: " << header->ContentLength(); + return sock_; + } + + const auto *begin_request = CHECK_NOTNULL(buf_.ReadObj()); + + if (begin_request->Role() != 1) { + LOG(ERROR) << "unsupported FastCGI role: " << begin_request->Role(); + return sock_; + } + + request_.reset(new Request(header->RequestId(), this)); + } + break; + + case 4: + { + if (request_ == nullptr || header->RequestId() != request_->RequestId()) { + LOG(ERROR) << "out of order FCGI_PARAMS record, or client is multiplexing requests (which we don't support)"; + return sock_; + } + + ConstBuffer param_buf(buf_.Read(header->ContentLength()), header->ContentLength()); + while (param_buf.ReadMaxLen() > 0) { + const auto *param_header = param_buf.ReadObj(); + if (!param_header) { + LOG(ERROR) << "FCGI_PARAMS missing header"; + return sock_; + } + + const auto *key_buf = param_buf.Read(param_header->key_length); + if (!key_buf) { + LOG(ERROR) << "FCGI_PARAMS missing key"; + return sock_; + } + std::string_view key(key_buf, param_header->key_length); + + const auto *value_buf = param_buf.Read(param_header->value_length); + if (!value_buf) { + LOG(ERROR) << "FCGI_PARAMS missing value"; + return sock_; + } + std::string_view value(value_buf, param_header->value_length); + + if (headers_.find(key) != headers_.end()) { + request_->AddParam(key, value); + } + } + } + break; + + case 5: + { + if (request_ == nullptr || header->RequestId() != request_->RequestId()) { + LOG(ERROR) << "out of order FCGI_STDIN record, or client is multiplexing requests (which we don't support)"; + return sock_; + } + + if (header->ContentLength() == 0) { + // Magic signal for completed request (mirrors the HTTP/1.1 protocol) + requests_++; + callback_(std::move(request_)); + } else { + std::string_view in(buf_.Read(header->ContentLength()), header->ContentLength()); + request_->AddIn(in); + } + } + break; + + default: + LOG(ERROR) << "unknown record type: " << header->type; + return sock_; + } + + if (!buf_.Discard(header->padding_length)) { + break; + } + + buf_.Commit(); // we've acted on the bytes read so far + } + + buf_.Consume(); + return -1; +} + +} // namespace firecgi diff --git a/connection.h b/connection.h new file mode 100644 index 0000000..22ac583 --- /dev/null +++ b/connection.h @@ -0,0 +1,33 @@ +#pragma once + +#include +#include +#include +#include + +#include "request.h" +#include "stream_buffer.h" + +namespace firecgi { + +class Connection { + public: + Connection(int sock, const sockaddr_in6& client_addr, const std::function)>& callback, const std::unordered_set& headers); + ~Connection(); + + [[nodiscard]] int Read(); + [[nodiscard]] bool Write(const std::vector& vecs); + + private: + const int sock_; + const std::function)>& callback_; + const std::unordered_set& headers_; + + uint64_t requests_ = 0; + + StreamBuffer buf_; + + std::unique_ptr request_; +}; + +} // namespace firecgi diff --git a/firecgi.cc b/firecgi.cc new file mode 100644 index 0000000..eb8586e --- /dev/null +++ b/firecgi.cc @@ -0,0 +1,117 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "firecgi.h" +#include "connection.h" + +namespace firecgi { + +Server::Server(int port, const std::function)>& callback, int threads, const std::unordered_set& headers) + : port_(port), + callback_(callback), + threads_(threads), + headers_(headers) { + LOG(INFO) << "listening on [::1]:" << port_; + + signal(SIGPIPE, SIG_IGN); +} + +void Server::Serve() { + std::vector threads; + for (int i = 0; i < threads_ - 1; ++i) { + threads.emplace_back([this]() { ServeInt(); }); + } + ServeInt(); +} + +void Server::ServeInt() { + auto epoll_fd = epoll_create1(0); + PCHECK(epoll_fd >= 0) << "epoll_create()"; + + auto listen_sock = NewListenSock(); + + { + struct epoll_event ev{ + .events = EPOLLIN, + .data = { + .ptr = nullptr, + }, + }; + PCHECK(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_sock, &ev) == 0); + } + + while (true) { + constexpr auto max_events = 256; + struct epoll_event events[max_events]; + auto num_fd = epoll_wait(epoll_fd, events, max_events, -1); + if (num_fd == -1 && errno == EINTR) { + continue; + } + PCHECK(num_fd > 0) << "epoll_wait()"; + + for (auto i = 0; i < num_fd; ++i) { + if (events[i].data.ptr == nullptr) { + NewConn(listen_sock, epoll_fd); + } else { + auto conn = static_cast(events[i].data.ptr); + auto fd = conn->Read(); + if (fd != -1) { + PCHECK(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr) == 0); + delete conn; + } + } + } + } +} + +void Server::NewConn(int listen_sock, int epoll_fd) { + sockaddr_in6 client_addr; + socklen_t client_addr_len = sizeof(client_addr); + + auto client_sock = accept(listen_sock, (sockaddr*) &client_addr, &client_addr_len); + PCHECK(client_sock >= 0) << "accept()"; + CHECK_EQ(client_addr.sin6_family, AF_INET6); + + int flags = 1; + PCHECK(setsockopt(client_sock, SOL_TCP, TCP_NODELAY, &flags, sizeof(flags)) == 0); + + { + auto *conn = new Connection(client_sock, client_addr, callback_, headers_); + struct epoll_event ev{ + .events = EPOLLIN, + .data = { + .ptr = conn, + }, + }; + PCHECK(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_sock, &ev) == 0); + } +} + +int Server::NewListenSock() { + auto sock = socket(AF_INET6, SOCK_STREAM, 0); + PCHECK(sock >= 0) << "socket()"; + + { + int optval = 1; + PCHECK(setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)) == 0); + } + + { + sockaddr_in6 bind_addr = { + .sin6_family = AF_INET6, + .sin6_port = htons(port_), + .sin6_addr = IN6ADDR_LOOPBACK_INIT, + }; + PCHECK(bind(sock, (sockaddr*) &bind_addr, sizeof(bind_addr)) == 0); + } + + PCHECK(listen(sock, 128) == 0); + return sock; +} + +} // namespace firecgi diff --git a/firecgi.h b/firecgi.h index e0d4da4..8fdb4b9 100644 --- a/firecgi.h +++ b/firecgi.h @@ -4,7 +4,7 @@ #include #include -#include "fastcgi_request.h" +#include "request.h" namespace firecgi { diff --git a/parse.cc b/parse.cc new file mode 100644 index 0000000..815f946 --- /dev/null +++ b/parse.cc @@ -0,0 +1,11 @@ +#include "parse.h" + +namespace firecgi { + +Header::Header(uint8_t type_in, uint16_t request_id, uint16_t content_length) + : type(type_in) { + SetRequestId(request_id); + SetContentLength(content_length); +} + +} // namespace firecgi diff --git a/parse.h b/parse.h new file mode 100644 index 0000000..6fb2246 --- /dev/null +++ b/parse.h @@ -0,0 +1,51 @@ +#pragma once + +#include + +namespace firecgi { + +struct Header { + Header(uint8_t type_in, uint16_t request_id, uint16_t content_length); + + uint8_t version = 1; + uint8_t type; + private: + uint16_t request_id_; // network byte order + uint16_t content_length_; // network byte order + public: + uint8_t padding_length = 0; + uint8_t reserved = 0; + + uint16_t RequestId() const { return ntohs(request_id_); } + uint16_t ContentLength() const { return ntohs(content_length_); } + + void SetRequestId(uint16_t request_id) { request_id_ = htons(request_id); } + void SetContentLength(uint16_t content_length) { content_length_ = htons(content_length); } +}; + +struct BeginRequest { + private: + uint16_t role_; // network byte order + public: + uint8_t flags; + uint8_t reserved[5]; + + uint16_t Role() const { return ntohs(role_); } +}; + +struct EndRequest { + uint32_t app_status = htonl(0); // network byte order + uint8_t protocol_status; + uint8_t reserved[3] = {}; +}; + +struct ParamHeader { + uint8_t key_length; + uint8_t value_length; +}; + +constexpr auto max_content_len = 65535; +constexpr auto max_padding_len = 255; +constexpr auto max_record_len = sizeof(Header) + max_content_len + max_padding_len; + +} // namespace firecgi