From b5b910e431a5e270bc8feb777541879c6119d871 Mon Sep 17 00:00:00 2001 From: fantasy-peak <1356346239@qq.com> Date: Sat, 14 Sep 2024 12:23:58 +0800 Subject: [PATCH] Add HttpClient pool --- CMakeLists.txt | 1 + examples/client_example/main.cc | 117 ++++++++-- lib/inc/drogon/utils/HttpClientPool.h | 321 ++++++++++++++++++++++++++ 3 files changed, 416 insertions(+), 23 deletions(-) create mode 100644 lib/inc/drogon/utils/HttpClientPool.h diff --git a/CMakeLists.txt b/CMakeLists.txt index c36d29a28c..1d0692506a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -717,6 +717,7 @@ install(FILES ${NOSQL_HEADERS} DESTINATION ${INSTALL_INCLUDE_DIR}/drogon/nosql) set(DROGON_UTIL_HEADERS lib/inc/drogon/utils/coroutine.h + lib/inc/drogon/utils/HttpClientPool.h lib/inc/drogon/utils/FunctionTraits.h lib/inc/drogon/utils/HttpConstraint.h lib/inc/drogon/utils/OStringStream.h diff --git a/examples/client_example/main.cc b/examples/client_example/main.cc index d783f54e0c..f20dc61df4 100644 --- a/examples/client_example/main.cc +++ b/examples/client_example/main.cc @@ -1,43 +1,113 @@ #include -#include +#include #include +#include +#include +#include +#include +#include #ifdef __linux__ #include #include #endif - +#include using namespace drogon; int nth_resp = 0; int main() { + auto func = [](int fd) { + std::cout << "setSockOptCallback:" << fd << std::endl; +#ifdef __linux__ + int optval = 10; + ::setsockopt(fd, + SOL_TCP, + TCP_KEEPCNT, + &optval, + static_cast(sizeof optval)); + ::setsockopt(fd, + SOL_TCP, + TCP_KEEPIDLE, + &optval, + static_cast(sizeof optval)); + ::setsockopt(fd, + SOL_TCP, + TCP_KEEPINTVL, + &optval, + static_cast(sizeof optval)); +#endif + }; trantor::Logger::setLogLevel(trantor::Logger::kTrace); +#ifdef __cpp_impl_coroutine + HttpClientPoolConfig cfg{ + .hostString = "http://www.baidu.com", + .useOldTLS = false, + .validateCert = false, + .size = 10, + .setCallback = + [func](auto &client) { + LOG_INFO << "setCallback"; + client->setSockOptCallback(func); + }, + .numOfThreads = 4, + .keepaliveRequests = 1000, + .idleTimeout = std::chrono::seconds(10), + .maxLifeTime = std::chrono::seconds(300), + .checkInterval = std::chrono::seconds(10), + }; + auto pool = std::make_unique(cfg); + auto req = HttpRequest::newHttpRequest(); + req->setMethod(drogon::Get); + req->setPath("/s"); + req->setParameter("wd", "wx"); + req->setParameter("oq", "wx"); + + for (int i = 0; i < 1; i++) + { + [](auto req, auto &pool) -> drogon::AsyncTask { + { + auto [result, resp] = co_await pool->sendRequestCoro(req, 10); + if (result == ReqResult::Ok) + LOG_INFO << "1:" << resp->getStatusCode(); + } + { + auto [result, resp] = co_await pool->sendRequestCoro(req, 10); + if (result == ReqResult::Ok) + LOG_INFO << "2:" << resp->getStatusCode(); + } + { + auto [result, resp] = co_await pool->sendRequestCoro(req, 10); + if (result == ReqResult::Ok) + LOG_INFO << "3:" << resp->getStatusCode(); + } + co_return; + }(req, pool); + } + + for (int i = 0; i < 10; i++) + { + pool->sendRequest( + req, + [](ReqResult result, const HttpResponsePtr &response) { + if (result != ReqResult::Ok) + { + LOG_ERROR + << "error while sending request to server! result: " + << result; + return; + } + LOG_INFO << "callback:" << response->getStatusCode(); + }, + 10); + } + std::this_thread::sleep_for(std::chrono::seconds(30)); +#else { auto client = HttpClient::newHttpClient("http://www.baidu.com"); - client->setSockOptCallback([](int fd) { - std::cout << "setSockOptCallback:" << fd << std::endl; -#ifdef __linux__ - int optval = 10; - ::setsockopt(fd, - SOL_TCP, - TCP_KEEPCNT, - &optval, - static_cast(sizeof optval)); - ::setsockopt(fd, - SOL_TCP, - TCP_KEEPIDLE, - &optval, - static_cast(sizeof optval)); - ::setsockopt(fd, - SOL_TCP, - TCP_KEEPINTVL, - &optval, - static_cast(sizeof optval)); -#endif - }); + client->setSockOptCallback(func); auto req = HttpRequest::newHttpRequest(); req->setMethod(drogon::Get); @@ -77,4 +147,5 @@ int main() } app().run(); +#endif } diff --git a/lib/inc/drogon/utils/HttpClientPool.h b/lib/inc/drogon/utils/HttpClientPool.h new file mode 100644 index 0000000000..6cb9c37e52 --- /dev/null +++ b/lib/inc/drogon/utils/HttpClientPool.h @@ -0,0 +1,321 @@ +/** + * + * @file HttpClientPool.h + * @author fantasy-peak + * + * Copyright 2024, fantasy-peak. All rights reserved. + * https://github.com/an-tao/drogon + * Use of this source code is governed by a MIT license + * that can be found in the License file. + * + * Drogon + * + */ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#ifdef __cpp_impl_coroutine +#include +#endif +#include +#include + +namespace drogon +{ + +struct HttpClientPoolConfig +{ + std::string hostString; + bool useOldTLS{false}; + bool validateCert{false}; + std::size_t size{100}; + std::function setCallback; + std::size_t numOfThreads{std::thread::hardware_concurrency()}; + std::optional keepaliveRequests; + std::optional idleTimeout; + std::optional maxLifeTime; + std::optional checkInterval; +}; + +class HttpClientPool final +{ + public: + HttpClientPool( + HttpClientPoolConfig cfg, + std::shared_ptr loopPool = nullptr, + std::shared_ptr dispatchPool = nullptr) + : cfg_(std::move(cfg)), + loopPool_(std::move(loopPool)), + dispatchPool_(std::move(dispatchPool)) + { + if (loopPool_ == nullptr) + { + loopPool_ = std::make_shared( + cfg_.numOfThreads); + loopPool_->start(); + } + if (dispatchPool_ == nullptr) + { + dispatchPool_ = std::make_shared(1); + dispatchPool_->start(); + } + loopPtr_ = dispatchPool_->getNextLoop(); + + for (std::size_t i = 0; i < cfg_.size; i++) + { + auto loopPtr = loopPool_->getNextLoop(); + auto func = [this, loopPtr]() mutable { + auto client = HttpClient::newHttpClient(cfg_.hostString, + loopPtr, + cfg_.useOldTLS, + cfg_.validateCert); + if (cfg_.setCallback) + cfg_.setCallback(client); + return client; + }; + httpClients_.emplace(std::make_shared(func, cfg_)); + } + LOG_DEBUG << "httpClients_ size:" << httpClients_.size(); + + if (cfg_.idleTimeout.has_value() && cfg_.checkInterval.has_value()) + { + timerId_ = loopPtr_->runEvery(cfg_.checkInterval.value(), [this] { + std::unique_lock lock(mutex_); + if (httpClients_.empty()) + return; + std::queue> clients; + while (!httpClients_.empty()) + { + auto connPtr = std::move(httpClients_.front()); + httpClients_.pop(); + if (connPtr->reachIdleTimeout()) + { + // close tcp connection + connPtr->resetClientPtr(); + } + clients.emplace(std::move(connPtr)); + } + httpClients_ = std::move(clients); + }); + } + } + + ~HttpClientPool() + { + if (timerId_) + { + std::promise done; + loopPtr_->runInLoop([&] { + loopPtr_->invalidateTimer(timerId_.value()); + done.set_value(); + }); + done.get_future().wait(); + } + [](auto &&...args) { + auto func = [](auto &poolPtr) { + if (poolPtr == nullptr) + return; + for (auto &ptr : poolPtr->getLoops()) + ptr->runInLoop([=] { ptr->quit(); }); + poolPtr->wait(); + poolPtr.reset(); + }; + (func(args), ...); + }(dispatchPool_, loopPool_); + std::unique_lock lock(mutex_); + std::queue> tmp; + httpClients_.swap(tmp); + } + + HttpClientPool(const HttpClientPool &) = delete; + HttpClientPool &operator=(const HttpClientPool &) = delete; + HttpClientPool(HttpClientPool &&) = delete; + HttpClientPool &operator=(HttpClientPool &&) = delete; + + void sendRequest(const HttpRequestPtr &req, + std::function cb, + double timeout = 0) + { + std::unique_lock lock(mutex_); + if (httpClients_.empty()) + { + httpRequest_.emplace(req, std::move(cb)); + } + else + { + auto connPtr = std::move(httpClients_.front()); + httpClients_.pop(); + lock.unlock(); + send(std::move(connPtr), req, std::move(cb), timeout); + } + return; + } + +#ifdef __cpp_impl_coroutine + + auto sendRequestCoro(HttpRequestPtr req, double timeout = 0) + { + struct Awaiter + : public CallbackAwaiter> + { + Awaiter(HttpClientPool *pool, HttpRequestPtr req, double timeout) + : pool_(pool), req_(std::move(req)), timeout_(timeout) + { + } + + void await_suspend(std::coroutine_handle<> handle) + { + pool_->sendRequest( + req_, + [this, handle](ReqResult result, + const HttpResponsePtr &ptr) { + setValue(std::make_tuple(result, ptr)); + handle.resume(); + }, + timeout_); + } + + private: + HttpClientPool *pool_; + HttpRequestPtr req_; + double timeout_; + }; + + return Awaiter{this, std::move(req), timeout}; + } + +#endif + + private: + struct Connection + { + Connection(std::function cb, + const HttpClientPoolConfig &cfg) + : createHttpClientFunc_(std::move(cb)), cfg_(cfg) + { + init(); + } + + void init() + { + clientPtr_ = createHttpClientFunc_(); + auto now = std::chrono::system_clock::now(); + timePoint_ = now; + startTimePoint_ = now; + counter_ = 0; + } + + void send(const HttpRequestPtr &req, + std::function cb, + double timeout) + { + if (isInvalid()) + { + init(); + } + assert(clientPtr_ != nullptr); + clientPtr_->sendRequest(req, std::move(cb), timeout); + ++counter_; + auto now = std::chrono::system_clock::now(); + timePoint_ = now; + } + + bool isInvalid() + { + auto now = std::chrono::system_clock::now(); + auto idleDut = now - timePoint_; + auto dut = now - startTimePoint_; + if ((clientPtr_ == nullptr) || + (cfg_.keepaliveRequests.has_value() && + counter_ >= cfg_.keepaliveRequests.value()) || + (cfg_.idleTimeout.has_value() && + idleDut >= cfg_.idleTimeout.value()) || + (cfg_.maxLifeTime.has_value() && + dut >= cfg_.maxLifeTime.value())) + { + return true; + } + return false; + } + + bool reachIdleTimeout() + { + auto now = std::chrono::system_clock::now(); + auto idleDut = now - timePoint_; + if (idleDut >= cfg_.idleTimeout.value()) + { + return true; + } + return false; + } + + void resetClientPtr() + { + clientPtr_ = nullptr; + } + + HttpClientPoolConfig cfg_; + std::function createHttpClientFunc_; + HttpClientPtr clientPtr_; + std::chrono::time_point timePoint_; + std::chrono::time_point startTimePoint_; + std::size_t counter_{0}; + }; + + void send(std::shared_ptr connPtr, + const HttpRequestPtr &req, + std::function cb, + double timeout) + { + connPtr->send( + req, + [connPtr, this, cb = std::move(cb), timeout]( + ReqResult result, const HttpResponsePtr &ptr) mutable { + cb(result, ptr); + std::unique_lock lock(mutex_); + if (httpRequest_.empty()) + { + httpClients_.emplace(std::move(connPtr)); + } + else + { + auto op = std::move(httpRequest_.front()); + httpRequest_.pop(); + lock.unlock(); + auto &[req, cb] = op; + send(std::move(connPtr), req, std::move(cb), timeout); + } + return; + }, + timeout); + } + + HttpClientPoolConfig cfg_; + std::shared_ptr loopPool_; + std::shared_ptr dispatchPool_; + trantor::EventLoop *loopPtr_; + std::mutex mutex_; + std::queue> httpClients_; + std::queue< + std::tuple>> + httpRequest_; + std::optional timerId_; +}; + +} // namespace drogon