diff --git a/connection.cc b/connection.cc index 764b91a..1e19768 100644 --- a/connection.cc +++ b/connection.cc @@ -147,4 +147,8 @@ int Connection::Read() { return -1; } +uint64_t Connection::Requests() const { + return requests_; +} + } // namespace firecgi diff --git a/connection.h b/connection.h index 241d5fa..2fd869e 100644 --- a/connection.h +++ b/connection.h @@ -19,6 +19,8 @@ class Connection { [[nodiscard]] int Read(); [[nodiscard]] bool Write(const std::vector& vecs); + [[nodiscard]] uint64_t Requests() const; + private: const int sock_; const std::function& callback_; diff --git a/example_simple.cc b/example_simple.cc index b7bac4a..9a441c8 100644 --- a/example_simple.cc +++ b/example_simple.cc @@ -15,5 +15,6 @@ int main(int argc, char *argv[]) { request->WriteBody("Hello world"); request->End(); }, FLAGS_threads); + server.RegisterSignalHandlers(); server.Serve(); } diff --git a/request.cc b/request.cc index 4db52fd..cc40259 100644 --- a/request.cc +++ b/request.cc @@ -28,7 +28,7 @@ void Request::NewRequest(uint16_t request_id) { body_written_ = false; } -uint16_t Request::RequestId() { +uint16_t Request::RequestId() const { return request_id_; } @@ -40,7 +40,7 @@ void Request::SetBody(const std::string_view& body) { body_ = body; } -const std::string_view& Request::GetParam(const std::string_view& key) { +const std::string_view& Request::GetParam(const std::string_view& key) const { auto iter = params_.find(key); if (iter == params_.end()) { static const std::string_view none; @@ -49,7 +49,7 @@ const std::string_view& Request::GetParam(const std::string_view& key) { return iter->second; } -const std::string_view& Request::GetBody() { +const std::string_view& Request::GetBody() const { return body_; } diff --git a/request.h b/request.h index 2cd185b..35ad412 100644 --- a/request.h +++ b/request.h @@ -18,13 +18,13 @@ class Request { void NewRequest(uint16_t request_id); - uint16_t RequestId(); + uint16_t RequestId() const; void AddParam(const std::string_view& key, const std::string_view& value); void SetBody(const std::string_view& in); - const std::string_view& GetParam(const std::string_view& key); - const std::string_view& GetBody(); + const std::string_view& GetParam(const std::string_view& key) const; + const std::string_view& GetBody() const; void WriteHeader(const std::string_view& name, const std::string_view& value); void WriteBody(const std::string_view& body); diff --git a/server.cc b/server.cc index 123e7b7..a6d0bff 100644 --- a/server.cc +++ b/server.cc @@ -1,8 +1,11 @@ #include +#include #include #include #include #include +#include +#include #include #include @@ -16,9 +19,11 @@ Server::Server(int port, const std::function& callback, int thre callback_(callback), threads_(threads), headers_(headers), - max_request_len_(max_request_len) { - LOG(INFO) << "listening on [::1]:" << port_; + max_request_len_(max_request_len), + close_fd_(eventfd(0, 0)) { + CHECK_GE(close_fd_, 0); + LOG(INFO) << "listening on [::1]:" << port_; signal(SIGPIPE, SIG_IGN); } @@ -28,6 +33,29 @@ void Server::Serve() { threads.emplace_back([this]() { ServeInt(); }); } ServeInt(); + for (auto& thread : threads) { + thread.join(); + } + LOG(INFO) << "all threads shut down"; +} + +void Server::Shutdown() { + uint64_t shutdown = 1; + PCHECK(write(close_fd_, &shutdown, sizeof(shutdown)) == sizeof(shutdown)); +} + +namespace { +Server* shutdown_server = nullptr; +} // namespace + +void Server::RegisterSignalHandlers() { + shutdown_server = this; + for (auto sig : {SIGINT, SIGTERM}) { + signal(sig, [](int signum) { + LOG(INFO) << "received " << strsignal(signum); + shutdown_server->Shutdown(); + }); + } } void Server::ServeInt() { @@ -36,16 +64,31 @@ void Server::ServeInt() { auto listen_sock = NewListenSock(); + char new_conn; { struct epoll_event ev{ .events = EPOLLIN, .data = { - .ptr = nullptr, + .ptr = &new_conn, }, }; PCHECK(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_sock, &ev) == 0); } + char shutdown; + { + struct epoll_event ev{ + .events = EPOLLIN, + .data = { + .ptr = &shutdown, + }, + }; + PCHECK(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, close_fd_, &ev) == 0); + } + + std::unordered_set connections; + uint64_t requests = 0; + while (true) { constexpr auto max_events = 256; struct epoll_event events[max_events]; @@ -56,13 +99,33 @@ void Server::ServeInt() { PCHECK(num_fd > 0) << "epoll_wait()"; for (auto i = 0; i < num_fd; ++i) { - if (events[i].data.ptr == nullptr) { - NewConn(listen_sock, epoll_fd); + if (events[i].data.ptr == &new_conn) { + connections.insert(CHECK_NOTNULL(NewConn(listen_sock, epoll_fd))); + } else if (events[i].data.ptr == &shutdown) { + for (auto& conn : connections) { + requests += conn->Requests(); + delete conn; + } + PCHECK(close(listen_sock) == 0); + PCHECK(close(epoll_fd) == 0); + + rusage usage; + PCHECK(getrusage(RUSAGE_THREAD, &usage) == 0); + + LOG(INFO) << std::setfill('0') + << "thread shutting down (" + << "handled " << requests << " requests, " + << usage.ru_utime.tv_sec << "." << std::setw(6) << usage.ru_utime.tv_usec << " user seconds, " << std::setw(0) + << usage.ru_stime.tv_sec << "." << std::setw(6) << usage.ru_stime.tv_usec << " system seconds" << std::setw(0) + << ")"; + return; } else { auto conn = static_cast(events[i].data.ptr); auto fd = conn->Read(); if (fd != -1) { PCHECK(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr) == 0); + requests += conn->Requests(); + connections.erase(conn); delete conn; } } @@ -70,7 +133,7 @@ void Server::ServeInt() { } } -void Server::NewConn(int listen_sock, int epoll_fd) { +Connection* Server::NewConn(int listen_sock, int epoll_fd) { sockaddr_in6 client_addr; socklen_t client_addr_len = sizeof(client_addr); @@ -81,8 +144,8 @@ void Server::NewConn(int listen_sock, int epoll_fd) { int flags = 1; PCHECK(setsockopt(client_sock, SOL_TCP, TCP_NODELAY, &flags, sizeof(flags)) == 0); + auto *conn = new Connection(client_sock, client_addr, callback_, headers_, max_request_len_); { - auto *conn = new Connection(client_sock, client_addr, callback_, headers_, max_request_len_); struct epoll_event ev{ .events = EPOLLIN, .data = { @@ -91,6 +154,8 @@ void Server::NewConn(int listen_sock, int epoll_fd) { }; PCHECK(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_sock, &ev) == 0); } + + return conn; } int Server::NewListenSock() { diff --git a/server.h b/server.h index 0f964a1..42dec9b 100644 --- a/server.h +++ b/server.h @@ -12,9 +12,11 @@ class Server { public: Server(int port, const std::function& callback, int threads=1, const std::unordered_set& headers={}, int max_request_len=(16*1024)); void Serve(); + void Shutdown(); + void RegisterSignalHandlers(); private: - void NewConn(int listen_sock, int epoll_fd); + Connection *NewConn(int listen_sock, int epoll_fd); int NewListenSock(); void ServeInt(); @@ -23,6 +25,8 @@ class Server { const int threads_; const std::unordered_set headers_; const int max_request_len_; + + int close_fd_; }; } // firecgi