From 551ae094900d4f76185df10a90c54029fc8e370b Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Wed, 24 May 2023 08:00:11 +0000 Subject: [PATCH 01/15] Draft --- clickhouse/CMakeLists.txt | 1 + clickhouse/base/hosts_iterator.cpp | 46 +++++++++++++++++++++++ clickhouse/base/hosts_iterator.h | 44 ++++++++++++++++++++++ clickhouse/base/socket.cpp | 6 +-- clickhouse/base/socket.h | 5 ++- clickhouse/client.cpp | 60 ++++++++++++++++++++++++------ clickhouse/client.h | 3 ++ tests/simple/main.cpp | 14 ++++++- 8 files changed, 161 insertions(+), 18 deletions(-) create mode 100644 clickhouse/base/hosts_iterator.cpp create mode 100644 clickhouse/base/hosts_iterator.h diff --git a/clickhouse/CMakeLists.txt b/clickhouse/CMakeLists.txt index 67663ec5..a9fbdb9d 100644 --- a/clickhouse/CMakeLists.txt +++ b/clickhouse/CMakeLists.txt @@ -5,6 +5,7 @@ SET ( clickhouse-cpp-lib-src base/platform.cpp base/socket.cpp base/wire_format.cpp + base/hosts_iterator.cpp columns/array.cpp columns/column.cpp diff --git a/clickhouse/base/hosts_iterator.cpp b/clickhouse/base/hosts_iterator.cpp new file mode 100644 index 00000000..033669f4 --- /dev/null +++ b/clickhouse/base/hosts_iterator.cpp @@ -0,0 +1,46 @@ +#include "hosts_iterator.h" +#include "../client.h" + +namespace clickhouse { + +RoundRobinHostsIterator::RoundRobinHostsIterator(const ClientOptions& opts) : + hosts (opts.hosts) + , ports (opts.ports) + , current_index (0) + , reseted (true) + , iteration_counter(0) +{ + +} + +const std::string RoundRobinHostsIterator::getHostAddr() const +{ + return hosts[current_index]; +} + +unsigned int RoundRobinHostsIterator::getPort() const +{ + return ports[current_index]; +} + +void RoundRobinHostsIterator::ResetIterations() +{ + reseted = true; + iteration_counter = 0; +} + +void RoundRobinHostsIterator::next() +{ + current_index = (current_index + 1) % hosts.size(); + iteration_counter++; +} + +bool RoundRobinHostsIterator::nextIsExist() const +{ + return iteration_counter < hosts.size(); +} + +RoundRobinHostsIterator::~RoundRobinHostsIterator() = default; + + +} \ No newline at end of file diff --git a/clickhouse/base/hosts_iterator.h b/clickhouse/base/hosts_iterator.h new file mode 100644 index 00000000..57941278 --- /dev/null +++ b/clickhouse/base/hosts_iterator.h @@ -0,0 +1,44 @@ +#pragma once + +#include "../client.h" +#include + +namespace clickhouse { + +struct ClientOptions; + +class HostsIteratorBase +{ + public: + virtual ~HostsIteratorBase() = default; + + virtual void next() = 0; + virtual const std::string getHostAddr() const = 0; + virtual unsigned int getPort() const = 0; + virtual void ResetIterations() = 0; + virtual bool nextIsExist() const = 0; +}; + + +class RoundRobinHostsIterator : public HostsIteratorBase +{ + public: + RoundRobinHostsIterator(const ClientOptions& opts); + const std::string getHostAddr() const override; + unsigned int getPort() const override; + void ResetIterations() override; + bool nextIsExist() const override; + void next() override; + + ~RoundRobinHostsIterator() override; + + private: + + const std::vector& hosts; + const std::vector& ports; + int current_index; + bool reseted; + size_t iteration_counter; +}; + +} \ No newline at end of file diff --git a/clickhouse/base/socket.cpp b/clickhouse/base/socket.cpp index 48e90c73..36fb8e74 100644 --- a/clickhouse/base/socket.cpp +++ b/clickhouse/base/socket.cpp @@ -390,9 +390,9 @@ std::unique_ptr Socket::makeOutputStream() const { NonSecureSocketFactory::~NonSecureSocketFactory() {} -std::unique_ptr NonSecureSocketFactory::connect(const ClientOptions &opts) { - const auto address = NetworkAddress(opts.host, std::to_string(opts.port)); - +std::unique_ptr NonSecureSocketFactory::connect(const ClientOptions &opts, const std::shared_ptr hosts_iterator) { + + const auto address = NetworkAddress(hosts_iterator->getHostAddr(), std::to_string(hosts_iterator->getPort())); auto socket = doConnect(address, opts); setSocketOptions(*socket, opts); diff --git a/clickhouse/base/socket.h b/clickhouse/base/socket.h index 694d0d69..631da7d1 100644 --- a/clickhouse/base/socket.h +++ b/clickhouse/base/socket.h @@ -3,6 +3,7 @@ #include "platform.h" #include "input.h" #include "output.h" +#include "hosts_iterator.h" #include #include @@ -88,7 +89,7 @@ class SocketFactory { // TODO: move connection-related options to ConnectionOptions structure. - virtual std::unique_ptr connect(const ClientOptions& opts) = 0; + virtual std::unique_ptr connect(const ClientOptions& opts, const std::shared_ptr hosts_iterator) = 0; virtual void sleepFor(const std::chrono::milliseconds& duration); }; @@ -135,7 +136,7 @@ class NonSecureSocketFactory : public SocketFactory { public: ~NonSecureSocketFactory() override; - std::unique_ptr connect(const ClientOptions& opts) override; + std::unique_ptr connect(const ClientOptions& opts, const std::shared_ptr hosts_iterator) override; protected: virtual std::unique_ptr doConnect(const NetworkAddress& address, const ClientOptions& opts); diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index e4b0c7ef..b9ca4a5d 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -14,7 +14,7 @@ #include #include #include - +#include #if defined(WITH_OPENSSL) #include "base/sslsocket.h" #endif @@ -161,6 +161,8 @@ class Client::Impl { /// In case of network errors tries to reconnect to server and /// call fuc several times. void RetryGuard(std::function func); + + void RetryToConstEndpoint(std::function func); private: class EnsureNull { @@ -194,30 +196,52 @@ class Client::Impl { std::unique_ptr input_; std::unique_ptr output_; std::unique_ptr socket_; + std::shared_ptr hosts_iterator; ServerInfo server_info_; }; +ClientOptions modifyClientOptions(ClientOptions opts) +{ + if (!opts.host.empty()) + opts.hosts.insert(opts.hosts.begin(), opts.host); + + opts.ports.insert(opts.ports.begin(), opts.port); + return opts; +} Client::Impl::Impl(const ClientOptions& opts) : Impl(opts, GetSocketFactory(opts)) {} Client::Impl::Impl(const ClientOptions& opts, std::unique_ptr socket_factory) - : options_(opts) + : options_(modifyClientOptions(opts)) , events_(nullptr) , socket_factory_(std::move(socket_factory)) + , hosts_iterator(new RoundRobinHostsIterator(options_)) { - for (unsigned int i = 0; ; ) { - try { - ResetConnection(); - break; + auto init_connection_with_host = [&](){ + for (unsigned int i = 0; ; ) { + try { + ResetConnection(); + break; + } catch (const std::system_error&) { + if (++i > options_.send_retries) { + throw; + } + socket_factory_->sleepFor(options_.retry_timeout); + } + } + }; + + for (; hosts_iterator->nextIsExist(); hosts_iterator->next()) + { + try + { + init_connection_with_host(); } catch (const std::system_error&) { - if (++i > options_.send_retries) { + if(!hosts_iterator->nextIsExist()) throw; - } - - socket_factory_->sleepFor(options_.retry_timeout); } } @@ -329,7 +353,7 @@ void Client::Impl::Ping() { } void Client::Impl::ResetConnection() { - InitializeStreams(socket_factory_->connect(options_)); + InitializeStreams(socket_factory_->connect(options_, hosts_iterator)); if (!Handshake()) { throw ProtocolError("fail to connect to " + options_.host); @@ -861,6 +885,20 @@ bool Client::Impl::ReceiveHello() { } void Client::Impl::RetryGuard(std::function func) { + for(hosts_iterator->ResetIterations(); ; hosts_iterator->next()) + { + try + { + RetryToConstEndpoint(func); + return; + } catch (const std::system_error&) { + if (!hosts_iterator->nextIsExist()) + throw; + } + } +} + +void Client::Impl::RetryToConstEndpoint(std::function func) { for (unsigned int i = 0; ; ++i) { try { func(); diff --git a/clickhouse/client.h b/clickhouse/client.h index 63177313..f5e1fc42 100644 --- a/clickhouse/client.h +++ b/clickhouse/client.h @@ -58,6 +58,9 @@ struct ClientOptions { /// Service port. DECLARE_FIELD(port, unsigned int, SetPort, 9000); + DECLARE_FIELD(hosts, std::vector, SetHosts, std::vector()); + DECLARE_FIELD(ports, std::vector, SetPorts, std::vector()); + /// Default database. DECLARE_FIELD(default_database, std::string, SetDefaultDatabase, "default"); /// User name. diff --git a/tests/simple/main.cpp b/tests/simple/main.cpp index 51340a86..4379250e 100644 --- a/tests/simple/main.cpp +++ b/tests/simple/main.cpp @@ -496,8 +496,18 @@ static void RunTests(Client& client) { int main() { try { const auto localHostEndpoint = ClientOptions() - .SetHost( getEnvOrDefault("CLICKHOUSE_HOST", "localhost")) - .SetPort( getEnvOrDefault("CLICKHOUSE_PORT", "9000")) + // .SetHost( getEnvOrDefault("CLICKHOUSE_HOST", "localhost")) + // .SetPort( getEnvOrDefault("CLICKHOUSE_PORT", "9000")) + .SetHosts({getEnvOrDefault("CLICKHOUSE_HOST", "asasdasd"), + getEnvOrDefault("CLICKHOUSE_HOST", "localhost"), + getEnvOrDefault("CLICKHOUSE_HOST", "noalocalhost"), + getEnvOrDefault("CLICKHOUSE_HOST", "localhost"), + }) + .SetPorts({static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), + static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "1212")), + static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), + static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), + }) .SetUser( getEnvOrDefault("CLICKHOUSE_USER", "default")) .SetPassword( getEnvOrDefault("CLICKHOUSE_PASSWORD", "")) .SetDefaultDatabase(getEnvOrDefault("CLICKHOUSE_DB", "default")); From 08975773530e917b34774bf01811e1e9aa8518f9 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Wed, 24 May 2023 09:28:01 +0000 Subject: [PATCH 02/15] Rename to --- clickhouse/CMakeLists.txt | 2 +- clickhouse/base/endpoints_iterator.cpp | 45 ++++++++++++++++++ ...{hosts_iterator.h => endpoints_iterator.h} | 10 ++-- clickhouse/base/hosts_iterator.cpp | 46 ------------------- clickhouse/base/socket.cpp | 4 +- clickhouse/base/socket.h | 6 +-- clickhouse/client.cpp | 18 ++++---- tests/simple/main.cpp | 2 +- 8 files changed, 66 insertions(+), 67 deletions(-) create mode 100644 clickhouse/base/endpoints_iterator.cpp rename clickhouse/base/{hosts_iterator.h => endpoints_iterator.h} (74%) delete mode 100644 clickhouse/base/hosts_iterator.cpp diff --git a/clickhouse/CMakeLists.txt b/clickhouse/CMakeLists.txt index a9fbdb9d..4b0f018d 100644 --- a/clickhouse/CMakeLists.txt +++ b/clickhouse/CMakeLists.txt @@ -5,7 +5,7 @@ SET ( clickhouse-cpp-lib-src base/platform.cpp base/socket.cpp base/wire_format.cpp - base/hosts_iterator.cpp + base/endpoints_iterator.cpp columns/array.cpp columns/column.cpp diff --git a/clickhouse/base/endpoints_iterator.cpp b/clickhouse/base/endpoints_iterator.cpp new file mode 100644 index 00000000..dd89ad1f --- /dev/null +++ b/clickhouse/base/endpoints_iterator.cpp @@ -0,0 +1,45 @@ +#include "endpoints_iterator.h" +#include "../client.h" + +namespace clickhouse { + +RoundRobinEndpointsIterator::RoundRobinEndpointsIterator(const ClientOptions& opts) : + hosts (opts.hosts) + , ports (opts.ports) + , current_index (0) + , reseted (true) + , iteration_counter(0) +{ + +} + +const std::string RoundRobinEndpointsIterator::getHostAddr() const +{ + return hosts[current_index]; +} + +unsigned int RoundRobinEndpointsIterator::getPort() const +{ + return ports[current_index]; +} + +void RoundRobinEndpointsIterator::ResetIterations() +{ + reseted = true; + iteration_counter = 0; +} + +void RoundRobinEndpointsIterator::next() +{ + current_index = (current_index + 1) % hosts.size(); + iteration_counter++; +} + +bool RoundRobinEndpointsIterator::nextIsExist() const +{ + return iteration_counter + 1 < hosts.size(); +} + +RoundRobinEndpointsIterator::~RoundRobinEndpointsIterator() = default; + +} \ No newline at end of file diff --git a/clickhouse/base/hosts_iterator.h b/clickhouse/base/endpoints_iterator.h similarity index 74% rename from clickhouse/base/hosts_iterator.h rename to clickhouse/base/endpoints_iterator.h index 57941278..033d9b83 100644 --- a/clickhouse/base/hosts_iterator.h +++ b/clickhouse/base/endpoints_iterator.h @@ -7,10 +7,10 @@ namespace clickhouse { struct ClientOptions; -class HostsIteratorBase +class EndpointsIteratorBase { public: - virtual ~HostsIteratorBase() = default; + virtual ~EndpointsIteratorBase() = default; virtual void next() = 0; virtual const std::string getHostAddr() const = 0; @@ -20,17 +20,17 @@ class HostsIteratorBase }; -class RoundRobinHostsIterator : public HostsIteratorBase +class RoundRobinEndpointsIterator : public EndpointsIteratorBase { public: - RoundRobinHostsIterator(const ClientOptions& opts); + RoundRobinEndpointsIterator(const ClientOptions& opts); const std::string getHostAddr() const override; unsigned int getPort() const override; void ResetIterations() override; bool nextIsExist() const override; void next() override; - ~RoundRobinHostsIterator() override; + ~RoundRobinEndpointsIterator() override; private: diff --git a/clickhouse/base/hosts_iterator.cpp b/clickhouse/base/hosts_iterator.cpp deleted file mode 100644 index 033669f4..00000000 --- a/clickhouse/base/hosts_iterator.cpp +++ /dev/null @@ -1,46 +0,0 @@ -#include "hosts_iterator.h" -#include "../client.h" - -namespace clickhouse { - -RoundRobinHostsIterator::RoundRobinHostsIterator(const ClientOptions& opts) : - hosts (opts.hosts) - , ports (opts.ports) - , current_index (0) - , reseted (true) - , iteration_counter(0) -{ - -} - -const std::string RoundRobinHostsIterator::getHostAddr() const -{ - return hosts[current_index]; -} - -unsigned int RoundRobinHostsIterator::getPort() const -{ - return ports[current_index]; -} - -void RoundRobinHostsIterator::ResetIterations() -{ - reseted = true; - iteration_counter = 0; -} - -void RoundRobinHostsIterator::next() -{ - current_index = (current_index + 1) % hosts.size(); - iteration_counter++; -} - -bool RoundRobinHostsIterator::nextIsExist() const -{ - return iteration_counter < hosts.size(); -} - -RoundRobinHostsIterator::~RoundRobinHostsIterator() = default; - - -} \ No newline at end of file diff --git a/clickhouse/base/socket.cpp b/clickhouse/base/socket.cpp index 36fb8e74..4bc67ba3 100644 --- a/clickhouse/base/socket.cpp +++ b/clickhouse/base/socket.cpp @@ -390,9 +390,9 @@ std::unique_ptr Socket::makeOutputStream() const { NonSecureSocketFactory::~NonSecureSocketFactory() {} -std::unique_ptr NonSecureSocketFactory::connect(const ClientOptions &opts, const std::shared_ptr hosts_iterator) { +std::unique_ptr NonSecureSocketFactory::connect(const ClientOptions &opts, const std::shared_ptr endpoints_iterator) { - const auto address = NetworkAddress(hosts_iterator->getHostAddr(), std::to_string(hosts_iterator->getPort())); + const auto address = NetworkAddress(endpoints_iterator->getHostAddr(), std::to_string(endpoints_iterator->getPort())); auto socket = doConnect(address, opts); setSocketOptions(*socket, opts); diff --git a/clickhouse/base/socket.h b/clickhouse/base/socket.h index 631da7d1..8cad298f 100644 --- a/clickhouse/base/socket.h +++ b/clickhouse/base/socket.h @@ -3,7 +3,7 @@ #include "platform.h" #include "input.h" #include "output.h" -#include "hosts_iterator.h" +#include "endpoints_iterator.h" #include #include @@ -89,7 +89,7 @@ class SocketFactory { // TODO: move connection-related options to ConnectionOptions structure. - virtual std::unique_ptr connect(const ClientOptions& opts, const std::shared_ptr hosts_iterator) = 0; + virtual std::unique_ptr connect(const ClientOptions& opts, const std::shared_ptr endpoints_iterator) = 0; virtual void sleepFor(const std::chrono::milliseconds& duration); }; @@ -136,7 +136,7 @@ class NonSecureSocketFactory : public SocketFactory { public: ~NonSecureSocketFactory() override; - std::unique_ptr connect(const ClientOptions& opts, const std::shared_ptr hosts_iterator) override; + std::unique_ptr connect(const ClientOptions& opts, const std::shared_ptr endpoints_iterator) override; protected: virtual std::unique_ptr doConnect(const NetworkAddress& address, const ClientOptions& opts); diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index b9ca4a5d..bf04fe4f 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -196,7 +196,7 @@ class Client::Impl { std::unique_ptr input_; std::unique_ptr output_; std::unique_ptr socket_; - std::shared_ptr hosts_iterator; + std::shared_ptr endpoints_iterator; ServerInfo server_info_; }; @@ -218,9 +218,9 @@ Client::Impl::Impl(const ClientOptions& opts, : options_(modifyClientOptions(opts)) , events_(nullptr) , socket_factory_(std::move(socket_factory)) - , hosts_iterator(new RoundRobinHostsIterator(options_)) + , endpoints_iterator(new RoundRobinEndpointsIterator(options_)) { - auto init_connection_with_host = [&](){ + auto try_make_connection_with_endpoind = [this]() { for (unsigned int i = 0; ; ) { try { ResetConnection(); @@ -234,13 +234,13 @@ Client::Impl::Impl(const ClientOptions& opts, } }; - for (; hosts_iterator->nextIsExist(); hosts_iterator->next()) + for (endpoints_iterator->ResetIterations(); ; endpoints_iterator->next()) { try { - init_connection_with_host(); + try_make_connection_with_endpoind(); } catch (const std::system_error&) { - if(!hosts_iterator->nextIsExist()) + if(!endpoints_iterator->nextIsExist()) throw; } } @@ -353,7 +353,7 @@ void Client::Impl::Ping() { } void Client::Impl::ResetConnection() { - InitializeStreams(socket_factory_->connect(options_, hosts_iterator)); + InitializeStreams(socket_factory_->connect(options_, endpoints_iterator)); if (!Handshake()) { throw ProtocolError("fail to connect to " + options_.host); @@ -885,14 +885,14 @@ bool Client::Impl::ReceiveHello() { } void Client::Impl::RetryGuard(std::function func) { - for(hosts_iterator->ResetIterations(); ; hosts_iterator->next()) + for(endpoints_iterator->ResetIterations(); ; endpoints_iterator->next()) { try { RetryToConstEndpoint(func); return; } catch (const std::system_error&) { - if (!hosts_iterator->nextIsExist()) + if (!endpoints_iterator->nextIsExist()) throw; } } diff --git a/tests/simple/main.cpp b/tests/simple/main.cpp index 4379250e..d07aa40a 100644 --- a/tests/simple/main.cpp +++ b/tests/simple/main.cpp @@ -504,7 +504,7 @@ int main() { getEnvOrDefault("CLICKHOUSE_HOST", "localhost"), }) .SetPorts({static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), - static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "1212")), + static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), }) From f964bdbba0e8109af73bbf8dc12661da82c8af38 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Wed, 24 May 2023 10:02:35 +0000 Subject: [PATCH 03/15] Fix --- clickhouse/client.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index bf04fe4f..9bb97fe6 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -220,11 +220,11 @@ Client::Impl::Impl(const ClientOptions& opts, , socket_factory_(std::move(socket_factory)) , endpoints_iterator(new RoundRobinEndpointsIterator(options_)) { - auto try_make_connection_with_endpoind = [this]() { + auto try_make_connection_with_endpoint = [this]() { for (unsigned int i = 0; ; ) { try { ResetConnection(); - break; + return; } catch (const std::system_error&) { if (++i > options_.send_retries) { throw; @@ -233,12 +233,13 @@ Client::Impl::Impl(const ClientOptions& opts, } } }; - + for (endpoints_iterator->ResetIterations(); ; endpoints_iterator->next()) { try { - try_make_connection_with_endpoind(); + try_make_connection_with_endpoint(); + break; } catch (const std::system_error&) { if(!endpoints_iterator->nextIsExist()) throw; From 1c6aa2fbf6ca1625cf69ae1bb8e84d1b7d008153 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Wed, 24 May 2023 10:32:40 +0000 Subject: [PATCH 04/15] Equal sizes exeption --- clickhouse/client.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 9bb97fe6..8de45b4c 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -203,9 +203,11 @@ class Client::Impl { ClientOptions modifyClientOptions(ClientOptions opts) { + if (opts.hosts.size() != opts.ports.size()) + throw ValidationError("The sizes of lists of ports and hosts must match be equal."); if (!opts.host.empty()) opts.hosts.insert(opts.hosts.begin(), opts.host); - + opts.ports.insert(opts.ports.begin(), opts.port); return opts; } From fc27ec66c11c9d32fb958510550b84651c2edcaa Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Wed, 24 May 2023 10:44:14 +0000 Subject: [PATCH 05/15] fixes --- clickhouse/client.cpp | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 8de45b4c..05d845c3 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -14,7 +14,6 @@ #include #include #include -#include #if defined(WITH_OPENSSL) #include "base/sslsocket.h" #endif @@ -205,10 +204,10 @@ ClientOptions modifyClientOptions(ClientOptions opts) { if (opts.hosts.size() != opts.ports.size()) throw ValidationError("The sizes of lists of ports and hosts must match be equal."); - if (!opts.host.empty()) + if (!opts.host.empty()) { opts.hosts.insert(opts.hosts.begin(), opts.host); - - opts.ports.insert(opts.ports.begin(), opts.port); + opts.ports.insert(opts.ports.begin(), opts.port); + } return opts; } From 615d120d8cd6fb77ffec2cac0bab6d69ade0355f Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Wed, 24 May 2023 14:53:16 +0000 Subject: [PATCH 06/15] Added unit tests --- ut/client_ut.cpp | 84 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/ut/client_ut.cpp b/ut/client_ut.cpp index dfabcba9..5f453d93 100644 --- a/ut/client_ut.cpp +++ b/ut/client_ut.cpp @@ -1210,3 +1210,87 @@ INSTANTIATE_TEST_SUITE_P(ClientLocalFailed, ConnectionFailedClientTest, ExpectingException{"Authentication failed: password is incorrect"} } )); + + +class ConnectionSuccessTestCase : public testing::TestWithParam {}; + +TEST_P(ConnectionSuccessTestCase, SuccessConnectionEstablished) { + const auto & client_options = GetParam(); + std::unique_ptr client; + + try { + client = std::make_unique(client_options); + SUCCEED(); + } catch (const std::exception & e) { + FAIL() << "Got an unexpected exception : " << e.what(); + } +} + + +INSTANTIATE_TEST_SUITE_P(ClientMultipleEndpoints, ConnectionSuccessTestCase, + ::testing::Values(ClientCase::ParamType{ + ClientOptions() + .SetHosts({ + getEnvOrDefault("CLICKHOUSE_HOST", "somedeadhost"), + getEnvOrDefault("CLICKHOUSE_HOST", "deadaginghost"), + getEnvOrDefault("CLICKHOUSE_HOST", "localhost"), + getEnvOrDefault("CLICKHOUSE_HOST", "noalocalhost"), + }) + .SetPorts( { + static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), + static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "1245")), + static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), + static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "6784")), + }) + .SetUser( getEnvOrDefault("CLICKHOUSE_USER", "default")) + .SetPassword( getEnvOrDefault("CLICKHOUSE_PASSWORD", "")) + .SetDefaultDatabase(getEnvOrDefault("CLICKHOUSE_DB", "default")) + .SetPingBeforeQuery(true) + .SetConnectionConnectTimeout(std::chrono::milliseconds(200)) + .SetRetryTimeout(std::chrono::seconds(1)), + } +)); + +INSTANTIATE_TEST_SUITE_P(MultipleEndpointsFailed, ConnectionFailedClientTest, + ::testing::Values(ConnectionFailedClientTest::ParamType{ + ClientOptions() + .SetHosts({ + getEnvOrDefault("CLICKHOUSE_HOST", "somedeadhost"), + getEnvOrDefault("CLICKHOUSE_HOST", "deadaginghost"), + getEnvOrDefault("CLICKHOUSE_HOST", "noalocalhost") + }) + .SetPorts( { + static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), + static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "1245")), + static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "6784")), + }) + .SetUser( getEnvOrDefault("CLICKHOUSE_USER", "default")) + .SetPassword( getEnvOrDefault("CLICKHOUSE_PASSWORD", "")) + .SetDefaultDatabase(getEnvOrDefault("CLICKHOUSE_DB", "default")) + .SetPingBeforeQuery(true) + .SetConnectionConnectTimeout(std::chrono::milliseconds(200)) + .SetRetryTimeout(std::chrono::seconds(1)), + ExpectingException{"Temporary failure in name resolution"} + } +)); + +INSTANTIATE_TEST_SUITE_P(MultipleEndpointsNonValidConfig, ConnectionFailedClientTest, + ::testing::Values(ConnectionFailedClientTest::ParamType{ + ClientOptions() + .SetHosts({ + getEnvOrDefault("CLICKHOUSE_HOST", "somedeadhost"), + getEnvOrDefault("CLICKHOUSE_HOST", "localhost"), + getEnvOrDefault("CLICKHOUSE_HOST", "noalocalhost"), + }) + .SetPorts( { + static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), + }) + .SetUser( getEnvOrDefault("CLICKHOUSE_USER", "default")) + .SetPassword( getEnvOrDefault("CLICKHOUSE_PASSWORD", "")) + .SetDefaultDatabase(getEnvOrDefault("CLICKHOUSE_DB", "default")) + .SetPingBeforeQuery(true) + .SetConnectionConnectTimeout(std::chrono::milliseconds(200)) + .SetRetryTimeout(std::chrono::seconds(1)), + ExpectingException{"The sizes of lists of ports and hosts must match be equal."} + } +)); From 289343edc175ab3cb60517681e2128eaf04204bf Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Wed, 24 May 2023 15:08:58 +0000 Subject: [PATCH 07/15] minor changes --- clickhouse/base/endpoints_iterator.cpp | 4 ++-- clickhouse/base/endpoints_iterator.h | 2 +- clickhouse/base/socket.cpp | 2 +- clickhouse/client.cpp | 9 +++++---- tests/simple/main.cpp | 20 ++++++++++---------- 5 files changed, 19 insertions(+), 18 deletions(-) diff --git a/clickhouse/base/endpoints_iterator.cpp b/clickhouse/base/endpoints_iterator.cpp index dd89ad1f..d8ec0d76 100644 --- a/clickhouse/base/endpoints_iterator.cpp +++ b/clickhouse/base/endpoints_iterator.cpp @@ -10,7 +10,7 @@ RoundRobinEndpointsIterator::RoundRobinEndpointsIterator(const ClientOptions& op , reseted (true) , iteration_counter(0) { - + } const std::string RoundRobinEndpointsIterator::getHostAddr() const @@ -42,4 +42,4 @@ bool RoundRobinEndpointsIterator::nextIsExist() const RoundRobinEndpointsIterator::~RoundRobinEndpointsIterator() = default; -} \ No newline at end of file +} diff --git a/clickhouse/base/endpoints_iterator.h b/clickhouse/base/endpoints_iterator.h index 033d9b83..aed800d8 100644 --- a/clickhouse/base/endpoints_iterator.h +++ b/clickhouse/base/endpoints_iterator.h @@ -41,4 +41,4 @@ class RoundRobinEndpointsIterator : public EndpointsIteratorBase size_t iteration_counter; }; -} \ No newline at end of file +} diff --git a/clickhouse/base/socket.cpp b/clickhouse/base/socket.cpp index 4bc67ba3..786bb3eb 100644 --- a/clickhouse/base/socket.cpp +++ b/clickhouse/base/socket.cpp @@ -391,7 +391,7 @@ std::unique_ptr Socket::makeOutputStream() const { NonSecureSocketFactory::~NonSecureSocketFactory() {} std::unique_ptr NonSecureSocketFactory::connect(const ClientOptions &opts, const std::shared_ptr endpoints_iterator) { - + const auto address = NetworkAddress(endpoints_iterator->getHostAddr(), std::to_string(endpoints_iterator->getPort())); auto socket = doConnect(address, opts); setSocketOptions(*socket, opts); diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 05d845c3..f0d647aa 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -14,6 +14,7 @@ #include #include #include + #if defined(WITH_OPENSSL) #include "base/sslsocket.h" #endif @@ -161,7 +162,7 @@ class Client::Impl { /// call fuc several times. void RetryGuard(std::function func); - void RetryToConstEndpoint(std::function func); + void RetryConnectToTheEndpoint(std::function& func); private: class EnsureNull { @@ -242,7 +243,7 @@ Client::Impl::Impl(const ClientOptions& opts, try_make_connection_with_endpoint(); break; } catch (const std::system_error&) { - if(!endpoints_iterator->nextIsExist()) + if(!endpoints_iterator->nextIsExist()) throw; } } @@ -891,7 +892,7 @@ void Client::Impl::RetryGuard(std::function func) { { try { - RetryToConstEndpoint(func); + RetryConnectToTheEndpoint(func); return; } catch (const std::system_error&) { if (!endpoints_iterator->nextIsExist()) @@ -900,7 +901,7 @@ void Client::Impl::RetryGuard(std::function func) { } } -void Client::Impl::RetryToConstEndpoint(std::function func) { +void Client::Impl::RetryConnectToTheEndpoint(std::function& func) { for (unsigned int i = 0; ; ++i) { try { func(); diff --git a/tests/simple/main.cpp b/tests/simple/main.cpp index d07aa40a..d0d6162e 100644 --- a/tests/simple/main.cpp +++ b/tests/simple/main.cpp @@ -496,17 +496,17 @@ static void RunTests(Client& client) { int main() { try { const auto localHostEndpoint = ClientOptions() - // .SetHost( getEnvOrDefault("CLICKHOUSE_HOST", "localhost")) - // .SetPort( getEnvOrDefault("CLICKHOUSE_PORT", "9000")) - .SetHosts({getEnvOrDefault("CLICKHOUSE_HOST", "asasdasd"), - getEnvOrDefault("CLICKHOUSE_HOST", "localhost"), - getEnvOrDefault("CLICKHOUSE_HOST", "noalocalhost"), - getEnvOrDefault("CLICKHOUSE_HOST", "localhost"), + .SetHost( getEnvOrDefault("CLICKHOUSE_HOST", "localhost")) + .SetPort( getEnvOrDefault("CLICKHOUSE_PORT", "9000")) + .SetHosts({ getEnvOrDefault("CLICKHOUSE_HOST", "asasdasd"), + getEnvOrDefault("CLICKHOUSE_HOST", "localhost"), + getEnvOrDefault("CLICKHOUSE_HOST", "noalocalhost"), + getEnvOrDefault("CLICKHOUSE_HOST", "localhost"), }) - .SetPorts({static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), - static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), - static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), - static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), + .SetPorts({ static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), + static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), + // static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "1234")), + // static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "5678")), }) .SetUser( getEnvOrDefault("CLICKHOUSE_USER", "default")) .SetPassword( getEnvOrDefault("CLICKHOUSE_PASSWORD", "")) From f0bf6312fb7fd664620a03d945c1ed7941957d9c Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Thu, 25 May 2023 07:58:25 +0000 Subject: [PATCH 08/15] Added knob --- clickhouse/base/endpoints_iterator.h | 14 +++++++++++++- clickhouse/client.cpp | 9 ++++++++- clickhouse/client.h | 12 ++++++++++++ tests/simple/main.cpp | 4 ++-- 4 files changed, 35 insertions(+), 4 deletions(-) diff --git a/clickhouse/base/endpoints_iterator.h b/clickhouse/base/endpoints_iterator.h index aed800d8..6a97541b 100644 --- a/clickhouse/base/endpoints_iterator.h +++ b/clickhouse/base/endpoints_iterator.h @@ -7,19 +7,31 @@ namespace clickhouse { struct ClientOptions; +/** + * Base class for iterating through endpoints. +*/ class EndpointsIteratorBase { public: virtual ~EndpointsIteratorBase() = default; virtual void next() = 0; + // Get the address of current endpoint. virtual const std::string getHostAddr() const = 0; + + // Get the port of current endpoint. virtual unsigned int getPort() const = 0; + + // Reset iterations. virtual void ResetIterations() = 0; virtual bool nextIsExist() const = 0; }; - +/** + * Client tries to connect to those endpoints one by one, on the round-robin basis: + * first default enpoint, then each of endpoints, from begin() to end(), + * if previous are inaccessible. + */ class RoundRobinEndpointsIterator : public EndpointsIteratorBase { public: diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index f0d647aa..8109c568 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -111,6 +111,13 @@ std::unique_ptr GetSocketFactory(const ClientOptions& opts) { return std::make_unique(); } +std::unique_ptr GetEndpointsIterator(const ClientOptions& opts) { + if (opts.iteration_algo == EndpointsIterationAlgorithm::RoundRobin) { + return std::make_unique(opts); + } else + throw UnimplementedError("Unimplemented endpoints iteration alorithm."); +} + } class Client::Impl { @@ -220,7 +227,7 @@ Client::Impl::Impl(const ClientOptions& opts, : options_(modifyClientOptions(opts)) , events_(nullptr) , socket_factory_(std::move(socket_factory)) - , endpoints_iterator(new RoundRobinEndpointsIterator(options_)) + , endpoints_iterator(GetEndpointsIterator(options_)) { auto try_make_connection_with_endpoint = [this]() { for (unsigned int i = 0; ; ) { diff --git a/clickhouse/client.h b/clickhouse/client.h index f5e1fc42..accdb263 100644 --- a/clickhouse/client.h +++ b/clickhouse/client.h @@ -44,6 +44,10 @@ enum class CompressionMethod { LZ4 = 1, }; +enum class EndpointsIterationAlgorithm { + RoundRobin = 0, +}; + struct ClientOptions { // Setter goes first, so it is possible to apply 'deprecated' annotation safely. #define DECLARE_FIELD(name, type, setter, default_value) \ @@ -58,9 +62,17 @@ struct ClientOptions { /// Service port. DECLARE_FIELD(port, unsigned int, SetPort, 9000); + /* + * Client tries to connect to those endpoints one by one, on chosing alorithm basis: + * first default enpoint (set via SetHost() + SetPort()), then each of endpoints, from begin() to end(), + * the first one to establish connection is used for the rest of the session. + * If port part is not specified, default port (@see SetPort()) is used. + */ DECLARE_FIELD(hosts, std::vector, SetHosts, std::vector()); DECLARE_FIELD(ports, std::vector, SetPorts, std::vector()); + DECLARE_FIELD(iteration_algo, EndpointsIterationAlgorithm, SetEndpointsIterationAlgorithm, EndpointsIterationAlgorithm::RoundRobin); + /// Default database. DECLARE_FIELD(default_database, std::string, SetDefaultDatabase, "default"); /// User name. diff --git a/tests/simple/main.cpp b/tests/simple/main.cpp index d0d6162e..29381ff8 100644 --- a/tests/simple/main.cpp +++ b/tests/simple/main.cpp @@ -505,8 +505,8 @@ int main() { }) .SetPorts({ static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), - // static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "1234")), - // static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "5678")), + static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "1234")), + static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "5678")), }) .SetUser( getEnvOrDefault("CLICKHOUSE_USER", "default")) .SetPassword( getEnvOrDefault("CLICKHOUSE_PASSWORD", "")) From 6833281a5e242d4cdf3486a681f42113483cfd14 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Thu, 25 May 2023 08:46:18 +0000 Subject: [PATCH 09/15] comments' --- clickhouse/base/endpoints_iterator.h | 5 ----- clickhouse/client.cpp | 12 ++++++++---- clickhouse/client.h | 16 +++++++++------- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/clickhouse/base/endpoints_iterator.h b/clickhouse/base/endpoints_iterator.h index 6a97541b..f5011c7a 100644 --- a/clickhouse/base/endpoints_iterator.h +++ b/clickhouse/base/endpoints_iterator.h @@ -27,11 +27,6 @@ class EndpointsIteratorBase virtual bool nextIsExist() const = 0; }; -/** - * Client tries to connect to those endpoints one by one, on the round-robin basis: - * first default enpoint, then each of endpoints, from begin() to end(), - * if previous are inaccessible. - */ class RoundRobinEndpointsIterator : public EndpointsIteratorBase { public: diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 8109c568..e2b69f20 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -64,8 +64,12 @@ struct ClientInfo { }; std::ostream& operator<<(std::ostream& os, const ClientOptions& opt) { - os << "Client(" << opt.user << '@' << opt.host << ":" << opt.port - << " ping_before_query:" << opt.ping_before_query + os << "Client("; + for (size_t i = 0; i < opt.hosts.size(); i++) + os << opt.user << '@' << opt.hosts[i] << ":" << opt.ports[i] + << ((i == opt.hosts[i].size() - 1) ? "" : ", "); + + os << " ping_before_query:" << opt.ping_before_query << " send_retries:" << opt.send_retries << " retry_timeout:" << opt.retry_timeout.count() << " compression_method:" @@ -111,9 +115,9 @@ std::unique_ptr GetSocketFactory(const ClientOptions& opts) { return std::make_unique(); } -std::unique_ptr GetEndpointsIterator(const ClientOptions& opts) { +std::shared_ptr GetEndpointsIterator(const ClientOptions& opts) { if (opts.iteration_algo == EndpointsIterationAlgorithm::RoundRobin) { - return std::make_unique(opts); + return std::make_shared(opts); } else throw UnimplementedError("Unimplemented endpoints iteration alorithm."); } diff --git a/clickhouse/client.h b/clickhouse/client.h index accdb263..ee7cf1e8 100644 --- a/clickhouse/client.h +++ b/clickhouse/client.h @@ -45,6 +45,11 @@ enum class CompressionMethod { }; enum class EndpointsIterationAlgorithm { +/** + * Client tries to connect to those endpoints one by one, on the round-robin basis: + * first default enpoint, then each of endpoints, from begin() to end(), + * if previous are inaccessible. + */ RoundRobin = 0, }; @@ -62,15 +67,12 @@ struct ClientOptions { /// Service port. DECLARE_FIELD(port, unsigned int, SetPort, 9000); - /* - * Client tries to connect to those endpoints one by one, on chosing alorithm basis: - * first default enpoint (set via SetHost() + SetPort()), then each of endpoints, from begin() to end(), - * the first one to establish connection is used for the rest of the session. - * If port part is not specified, default port (@see SetPort()) is used. - */ + /// Hostnames of the servers. The next host to connect is selected according to the EndpointsIterationAlgorithm. + /// Note: If SetHost and SetHosts are setted, host will be placed at the beginning of the hosts vector. DECLARE_FIELD(hosts, std::vector, SetHosts, std::vector()); + /// Ports of the servers. DECLARE_FIELD(ports, std::vector, SetPorts, std::vector()); - + /// Algorithm for selecting the next endpoint for connection. DECLARE_FIELD(iteration_algo, EndpointsIterationAlgorithm, SetEndpointsIterationAlgorithm, EndpointsIterationAlgorithm::RoundRobin); /// Default database. From 1f52a687e55cf97f500f2d0e8fbf3d330f7692b6 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Thu, 25 May 2023 11:45:01 +0000 Subject: [PATCH 10/15] Remove unused --- clickhouse/base/endpoints_iterator.cpp | 4 +--- clickhouse/base/endpoints_iterator.h | 17 ++++++++--------- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/clickhouse/base/endpoints_iterator.cpp b/clickhouse/base/endpoints_iterator.cpp index d8ec0d76..8bee737b 100644 --- a/clickhouse/base/endpoints_iterator.cpp +++ b/clickhouse/base/endpoints_iterator.cpp @@ -7,13 +7,12 @@ RoundRobinEndpointsIterator::RoundRobinEndpointsIterator(const ClientOptions& op hosts (opts.hosts) , ports (opts.ports) , current_index (0) - , reseted (true) , iteration_counter(0) { } -const std::string RoundRobinEndpointsIterator::getHostAddr() const +std::string RoundRobinEndpointsIterator::getHostAddr() const { return hosts[current_index]; } @@ -25,7 +24,6 @@ unsigned int RoundRobinEndpointsIterator::getPort() const void RoundRobinEndpointsIterator::ResetIterations() { - reseted = true; iteration_counter = 0; } diff --git a/clickhouse/base/endpoints_iterator.h b/clickhouse/base/endpoints_iterator.h index f5011c7a..0afca930 100644 --- a/clickhouse/base/endpoints_iterator.h +++ b/clickhouse/base/endpoints_iterator.h @@ -17,7 +17,7 @@ class EndpointsIteratorBase virtual void next() = 0; // Get the address of current endpoint. - virtual const std::string getHostAddr() const = 0; + virtual std::string getHostAddr() const = 0; // Get the port of current endpoint. virtual unsigned int getPort() const = 0; @@ -30,21 +30,20 @@ class EndpointsIteratorBase class RoundRobinEndpointsIterator : public EndpointsIteratorBase { public: - RoundRobinEndpointsIterator(const ClientOptions& opts); - const std::string getHostAddr() const override; - unsigned int getPort() const override; - void ResetIterations() override; - bool nextIsExist() const override; - void next() override; + RoundRobinEndpointsIterator(const ClientOptions& opts); + std::string getHostAddr() const override; + unsigned int getPort() const override; + void ResetIterations() override; + bool nextIsExist() const override; + void next() override; - ~RoundRobinEndpointsIterator() override; + ~RoundRobinEndpointsIterator() override; private: const std::vector& hosts; const std::vector& ports; int current_index; - bool reseted; size_t iteration_counter; }; From 8255db67f9717fd70688d239f44707073dbe8d3d Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Thu, 25 May 2023 14:08:12 +0000 Subject: [PATCH 11/15] conservations resolving --- clickhouse/base/endpoints_iterator.cpp | 8 ++++---- clickhouse/base/endpoints_iterator.h | 16 ++++++++-------- clickhouse/base/socket.cpp | 4 ++-- clickhouse/base/socket.h | 4 ++-- clickhouse/client.cpp | 15 +++++++++------ tests/simple/main.cpp | 8 ++++---- ut/client_ut.cpp | 16 ++++++++-------- ut/utils.h | 3 ++- 8 files changed, 39 insertions(+), 35 deletions(-) diff --git a/clickhouse/base/endpoints_iterator.cpp b/clickhouse/base/endpoints_iterator.cpp index 8bee737b..4fe48119 100644 --- a/clickhouse/base/endpoints_iterator.cpp +++ b/clickhouse/base/endpoints_iterator.cpp @@ -12,12 +12,12 @@ RoundRobinEndpointsIterator::RoundRobinEndpointsIterator(const ClientOptions& op } -std::string RoundRobinEndpointsIterator::getHostAddr() const +std::string RoundRobinEndpointsIterator::GetHostAddr() const { return hosts[current_index]; } -unsigned int RoundRobinEndpointsIterator::getPort() const +unsigned int RoundRobinEndpointsIterator::GetPort() const { return ports[current_index]; } @@ -27,13 +27,13 @@ void RoundRobinEndpointsIterator::ResetIterations() iteration_counter = 0; } -void RoundRobinEndpointsIterator::next() +void RoundRobinEndpointsIterator::Next() { current_index = (current_index + 1) % hosts.size(); iteration_counter++; } -bool RoundRobinEndpointsIterator::nextIsExist() const +bool RoundRobinEndpointsIterator::NextIsExist() const { return iteration_counter + 1 < hosts.size(); } diff --git a/clickhouse/base/endpoints_iterator.h b/clickhouse/base/endpoints_iterator.h index 0afca930..76ce3cbb 100644 --- a/clickhouse/base/endpoints_iterator.h +++ b/clickhouse/base/endpoints_iterator.h @@ -15,27 +15,27 @@ class EndpointsIteratorBase public: virtual ~EndpointsIteratorBase() = default; - virtual void next() = 0; + virtual void Next() = 0; // Get the address of current endpoint. - virtual std::string getHostAddr() const = 0; + virtual std::string GetHostAddr() const = 0; // Get the port of current endpoint. - virtual unsigned int getPort() const = 0; + virtual unsigned int GetPort() const = 0; // Reset iterations. virtual void ResetIterations() = 0; - virtual bool nextIsExist() const = 0; + virtual bool NextIsExist() const = 0; }; class RoundRobinEndpointsIterator : public EndpointsIteratorBase { public: RoundRobinEndpointsIterator(const ClientOptions& opts); - std::string getHostAddr() const override; - unsigned int getPort() const override; + std::string GetHostAddr() const override; + unsigned int GetPort() const override; void ResetIterations() override; - bool nextIsExist() const override; - void next() override; + bool NextIsExist() const override; + void Next() override; ~RoundRobinEndpointsIterator() override; diff --git a/clickhouse/base/socket.cpp b/clickhouse/base/socket.cpp index 786bb3eb..13755a37 100644 --- a/clickhouse/base/socket.cpp +++ b/clickhouse/base/socket.cpp @@ -390,9 +390,9 @@ std::unique_ptr Socket::makeOutputStream() const { NonSecureSocketFactory::~NonSecureSocketFactory() {} -std::unique_ptr NonSecureSocketFactory::connect(const ClientOptions &opts, const std::shared_ptr endpoints_iterator) { +std::unique_ptr NonSecureSocketFactory::connect(const ClientOptions &opts, const std::string& host, const std::string& port) { - const auto address = NetworkAddress(endpoints_iterator->getHostAddr(), std::to_string(endpoints_iterator->getPort())); + const auto address = NetworkAddress(host, port); auto socket = doConnect(address, opts); setSocketOptions(*socket, opts); diff --git a/clickhouse/base/socket.h b/clickhouse/base/socket.h index 8cad298f..4953a4d1 100644 --- a/clickhouse/base/socket.h +++ b/clickhouse/base/socket.h @@ -89,7 +89,7 @@ class SocketFactory { // TODO: move connection-related options to ConnectionOptions structure. - virtual std::unique_ptr connect(const ClientOptions& opts, const std::shared_ptr endpoints_iterator) = 0; + virtual std::unique_ptr connect(const ClientOptions& opts, const std::string& host, const std::string& port) = 0; virtual void sleepFor(const std::chrono::milliseconds& duration); }; @@ -136,7 +136,7 @@ class NonSecureSocketFactory : public SocketFactory { public: ~NonSecureSocketFactory() override; - std::unique_ptr connect(const ClientOptions& opts, const std::shared_ptr endpoints_iterator) override; + std::unique_ptr connect(const ClientOptions& opts, const std::string& host, const std::string& port) override; protected: virtual std::unique_ptr doConnect(const NetworkAddress& address, const ClientOptions& opts); diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index e2b69f20..b967c0c9 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -216,7 +216,8 @@ ClientOptions modifyClientOptions(ClientOptions opts) { if (opts.hosts.size() != opts.ports.size()) throw ValidationError("The sizes of lists of ports and hosts must match be equal."); - if (!opts.host.empty()) { + if (!opts.host.empty() && std::find(opts.hosts.begin(), opts.hosts.end(), opts.host) == std::end(opts.hosts)) + { opts.hosts.insert(opts.hosts.begin(), opts.host); opts.ports.insert(opts.ports.begin(), opts.port); } @@ -247,14 +248,14 @@ Client::Impl::Impl(const ClientOptions& opts, } }; - for (endpoints_iterator->ResetIterations(); ; endpoints_iterator->next()) + for (endpoints_iterator->ResetIterations(); ; endpoints_iterator->Next()) { try { try_make_connection_with_endpoint(); break; } catch (const std::system_error&) { - if(!endpoints_iterator->nextIsExist()) + if(!endpoints_iterator->NextIsExist()) throw; } } @@ -367,7 +368,9 @@ void Client::Impl::Ping() { } void Client::Impl::ResetConnection() { - InitializeStreams(socket_factory_->connect(options_, endpoints_iterator)); + InitializeStreams(socket_factory_->connect(options_, endpoints_iterator->GetHostAddr(), + std::to_string(endpoints_iterator->GetPort()) + )); if (!Handshake()) { throw ProtocolError("fail to connect to " + options_.host); @@ -899,14 +902,14 @@ bool Client::Impl::ReceiveHello() { } void Client::Impl::RetryGuard(std::function func) { - for(endpoints_iterator->ResetIterations(); ; endpoints_iterator->next()) + for(endpoints_iterator->ResetIterations(); ; endpoints_iterator->Next()) { try { RetryConnectToTheEndpoint(func); return; } catch (const std::system_error&) { - if (!endpoints_iterator->nextIsExist()) + if (!endpoints_iterator->NextIsExist()) throw; } } diff --git a/tests/simple/main.cpp b/tests/simple/main.cpp index 29381ff8..f20602f4 100644 --- a/tests/simple/main.cpp +++ b/tests/simple/main.cpp @@ -503,10 +503,10 @@ int main() { getEnvOrDefault("CLICKHOUSE_HOST", "noalocalhost"), getEnvOrDefault("CLICKHOUSE_HOST", "localhost"), }) - .SetPorts({ static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), - static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), - static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "1234")), - static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "5678")), + .SetPorts({ getEnvOrDefault("CLICKHOUSE_PORT", "9000"), + getEnvOrDefault("CLICKHOUSE_PORT", "9000"), + getEnvOrDefault("CLICKHOUSE_PORT", "1234"), + getEnvOrDefault("CLICKHOUSE_PORT", "5678"), }) .SetUser( getEnvOrDefault("CLICKHOUSE_USER", "default")) .SetPassword( getEnvOrDefault("CLICKHOUSE_PASSWORD", "")) diff --git a/ut/client_ut.cpp b/ut/client_ut.cpp index 5f453d93..07e53fba 100644 --- a/ut/client_ut.cpp +++ b/ut/client_ut.cpp @@ -1237,10 +1237,10 @@ INSTANTIATE_TEST_SUITE_P(ClientMultipleEndpoints, ConnectionSuccessTestCase, getEnvOrDefault("CLICKHOUSE_HOST", "noalocalhost"), }) .SetPorts( { - static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), - static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "1245")), - static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), - static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "6784")), + getEnvOrDefault("CLICKHOUSE_PORT", "9000"), + getEnvOrDefault("CLICKHOUSE_PORT", "1245"), + getEnvOrDefault("CLICKHOUSE_PORT", "9000"), + getEnvOrDefault("CLICKHOUSE_PORT", "6784"), }) .SetUser( getEnvOrDefault("CLICKHOUSE_USER", "default")) .SetPassword( getEnvOrDefault("CLICKHOUSE_PASSWORD", "")) @@ -1260,9 +1260,9 @@ INSTANTIATE_TEST_SUITE_P(MultipleEndpointsFailed, ConnectionFailedClientTest, getEnvOrDefault("CLICKHOUSE_HOST", "noalocalhost") }) .SetPorts( { - static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), - static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "1245")), - static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "6784")), + getEnvOrDefault("CLICKHOUSE_PORT", "9000"), + getEnvOrDefault("CLICKHOUSE_PORT", "1245"), + getEnvOrDefault("CLICKHOUSE_PORT", "6784"), }) .SetUser( getEnvOrDefault("CLICKHOUSE_USER", "default")) .SetPassword( getEnvOrDefault("CLICKHOUSE_PASSWORD", "")) @@ -1283,7 +1283,7 @@ INSTANTIATE_TEST_SUITE_P(MultipleEndpointsNonValidConfig, ConnectionFailedClient getEnvOrDefault("CLICKHOUSE_HOST", "noalocalhost"), }) .SetPorts( { - static_cast(getEnvOrDefault("CLICKHOUSE_PORT", "9000")), + getEnvOrDefault("CLICKHOUSE_PORT", "9000"), }) .SetUser( getEnvOrDefault("CLICKHOUSE_USER", "default")) .SetPassword( getEnvOrDefault("CLICKHOUSE_PASSWORD", "")) diff --git a/ut/utils.h b/ut/utils.h index b9484626..41bec25c 100644 --- a/ut/utils.h +++ b/ut/utils.h @@ -51,7 +51,8 @@ auto getEnvOrDefault(const std::string& env, const char * default_val) { return std::stoll(value); } else if constexpr (std::is_unsigned_v) { if constexpr (sizeof(ResultType) <= sizeof(unsigned long)) - return std::stoul(value); + // For cases when ResultType is unsigned int. + return static_cast(std::stoul(value)); else if constexpr (sizeof(ResultType) <= sizeof(unsigned long long)) return std::stoull(value); } From a6795bbac55577871aed9c4686813b0770f316ec Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Thu, 25 May 2023 14:18:09 +0000 Subject: [PATCH 12/15] fix --- clickhouse/base/endpoints_iterator.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clickhouse/base/endpoints_iterator.h b/clickhouse/base/endpoints_iterator.h index 76ce3cbb..158f6246 100644 --- a/clickhouse/base/endpoints_iterator.h +++ b/clickhouse/base/endpoints_iterator.h @@ -30,7 +30,7 @@ class EndpointsIteratorBase class RoundRobinEndpointsIterator : public EndpointsIteratorBase { public: - RoundRobinEndpointsIterator(const ClientOptions& opts); + explicit RoundRobinEndpointsIterator(const ClientOptions& opts); std::string GetHostAddr() const override; unsigned int GetPort() const override; void ResetIterations() override; From cc40bbcca16e39e68d0ca4cf40bbc9d4c03b0363 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Tue, 6 Jun 2023 12:20:10 +0000 Subject: [PATCH 13/15] Reviews fixes --- clickhouse/base/endpoints_iterator.cpp | 15 ++-- clickhouse/base/endpoints_iterator.h | 24 +++-- clickhouse/client.cpp | 117 +++++++++++++++--------- clickhouse/client.h | 33 ++++--- tests/simple/main.cpp | 18 ++-- ut/client_ut.cpp | 119 ++++++++++++++++++------- 6 files changed, 210 insertions(+), 116 deletions(-) diff --git a/clickhouse/base/endpoints_iterator.cpp b/clickhouse/base/endpoints_iterator.cpp index 4fe48119..f687703d 100644 --- a/clickhouse/base/endpoints_iterator.cpp +++ b/clickhouse/base/endpoints_iterator.cpp @@ -1,11 +1,10 @@ #include "endpoints_iterator.h" -#include "../client.h" +#include namespace clickhouse { -RoundRobinEndpointsIterator::RoundRobinEndpointsIterator(const ClientOptions& opts) : - hosts (opts.hosts) - , ports (opts.ports) +RoundRobinEndpointsIterator::RoundRobinEndpointsIterator(const std::vector& _endpoints) : + endpoints (_endpoints) , current_index (0) , iteration_counter(0) { @@ -14,12 +13,12 @@ RoundRobinEndpointsIterator::RoundRobinEndpointsIterator(const ClientOptions& op std::string RoundRobinEndpointsIterator::GetHostAddr() const { - return hosts[current_index]; + return endpoints[current_index].host; } unsigned int RoundRobinEndpointsIterator::GetPort() const { - return ports[current_index]; + return endpoints[current_index].port; } void RoundRobinEndpointsIterator::ResetIterations() @@ -29,13 +28,13 @@ void RoundRobinEndpointsIterator::ResetIterations() void RoundRobinEndpointsIterator::Next() { - current_index = (current_index + 1) % hosts.size(); + current_index = (current_index + 1) % endpoints.size(); iteration_counter++; } bool RoundRobinEndpointsIterator::NextIsExist() const { - return iteration_counter + 1 < hosts.size(); + return iteration_counter + 1 < endpoints.size(); } RoundRobinEndpointsIterator::~RoundRobinEndpointsIterator() = default; diff --git a/clickhouse/base/endpoints_iterator.h b/clickhouse/base/endpoints_iterator.h index 158f6246..dcbdd86b 100644 --- a/clickhouse/base/endpoints_iterator.h +++ b/clickhouse/base/endpoints_iterator.h @@ -1,6 +1,6 @@ #pragma once -#include "../client.h" +#include "clickhouse/client.h" #include namespace clickhouse { @@ -30,21 +30,19 @@ class EndpointsIteratorBase class RoundRobinEndpointsIterator : public EndpointsIteratorBase { public: - explicit RoundRobinEndpointsIterator(const ClientOptions& opts); - std::string GetHostAddr() const override; - unsigned int GetPort() const override; - void ResetIterations() override; - bool NextIsExist() const override; - void Next() override; + explicit RoundRobinEndpointsIterator(const std::vector& opts); + std::string GetHostAddr() const override; + unsigned int GetPort() const override; + void ResetIterations() override; + bool NextIsExist() const override; + void Next() override; - ~RoundRobinEndpointsIterator() override; + ~RoundRobinEndpointsIterator() override; private: - - const std::vector& hosts; - const std::vector& ports; - int current_index; - size_t iteration_counter; + const std::vector& endpoints; + int current_index; + size_t iteration_counter; }; } diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index b967c0c9..27770c16 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -65,9 +65,9 @@ struct ClientInfo { std::ostream& operator<<(std::ostream& os, const ClientOptions& opt) { os << "Client("; - for (size_t i = 0; i < opt.hosts.size(); i++) - os << opt.user << '@' << opt.hosts[i] << ":" << opt.ports[i] - << ((i == opt.hosts[i].size() - 1) ? "" : ", "); + for (size_t i = 0; i < opt.endpoints.size(); i++) + os << opt.user << '@' << opt.endpoints[i].host << ":" << opt.endpoints[i].port + << ((i == opt.endpoints.size() - 1) ? "" : ", "); os << " ping_before_query:" << opt.ping_before_query << " send_retries:" << opt.send_retries @@ -115,11 +115,8 @@ std::unique_ptr GetSocketFactory(const ClientOptions& opts) { return std::make_unique(); } -std::shared_ptr GetEndpointsIterator(const ClientOptions& opts) { - if (opts.iteration_algo == EndpointsIterationAlgorithm::RoundRobin) { - return std::make_shared(opts); - } else - throw UnimplementedError("Unimplemented endpoints iteration alorithm."); +std::unique_ptr GetEndpointsIterator(const std::vector& endpoints) { + return std::make_unique(endpoints); } } @@ -141,8 +138,12 @@ class Client::Impl { void ResetConnection(); + void ResetConnectionEndpoint(); + const ServerInfo& GetServerInfo() const; + const std::optional& GetCurrentEndpoint() const; + private: bool Handshake(); @@ -166,6 +167,8 @@ class Client::Impl { void WriteBlock(const Block& block, OutputStream& output); + void CreateConnection(); + void InitializeStreams(std::unique_ptr&& socket); private: @@ -207,19 +210,21 @@ class Client::Impl { std::unique_ptr input_; std::unique_ptr output_; std::unique_ptr socket_; - std::shared_ptr endpoints_iterator; + std::unique_ptr endpoints_iterator; + + std::optional current_endpoint_; ServerInfo server_info_; }; ClientOptions modifyClientOptions(ClientOptions opts) { - if (opts.hosts.size() != opts.ports.size()) - throw ValidationError("The sizes of lists of ports and hosts must match be equal."); - if (!opts.host.empty() && std::find(opts.hosts.begin(), opts.hosts.end(), opts.host) == std::end(opts.hosts)) - { - opts.hosts.insert(opts.hosts.begin(), opts.host); - opts.ports.insert(opts.ports.begin(), opts.port); + if (opts.host.empty()) + return opts; + + Endpoint endpoint_single({opts.host, opts.port}); + if (std::find(opts.endpoints.begin(), opts.endpoints.end(), endpoint_single) == std::end(opts.endpoints)) { + opts.endpoints.emplace(opts.endpoints.begin(),endpoint_single); } return opts; } @@ -232,33 +237,9 @@ Client::Impl::Impl(const ClientOptions& opts, : options_(modifyClientOptions(opts)) , events_(nullptr) , socket_factory_(std::move(socket_factory)) - , endpoints_iterator(GetEndpointsIterator(options_)) + , endpoints_iterator(GetEndpointsIterator(options_.endpoints)) { - auto try_make_connection_with_endpoint = [this]() { - for (unsigned int i = 0; ; ) { - try { - ResetConnection(); - return; - } catch (const std::system_error&) { - if (++i > options_.send_retries) { - throw; - } - socket_factory_->sleepFor(options_.retry_timeout); - } - } - }; - - for (endpoints_iterator->ResetIterations(); ; endpoints_iterator->Next()) - { - try - { - try_make_connection_with_endpoint(); - break; - } catch (const std::system_error&) { - if(!endpoints_iterator->NextIsExist()) - throw; - } - } + CreateConnection(); if (options_.compression_method != CompressionMethod::None) { compression_ = CompressionState::Enable; @@ -377,10 +358,51 @@ void Client::Impl::ResetConnection() { } } +void Client::Impl::ResetConnectionEndpoint() { + endpoints_iterator->ResetIterations(); + endpoints_iterator->Next(); + CreateConnection(); +} + +void Client::Impl::CreateConnection() { + current_endpoint_.reset(); + auto try_make_connection_with_endpoint = [this]() { + for (unsigned int i = 0; ; ) { + try { + ResetConnection(); + return; + } catch (const std::system_error&) { + if (++i > options_.send_retries) { + throw; + } + socket_factory_->sleepFor(options_.retry_timeout); + } + } + }; + + for (endpoints_iterator->ResetIterations(); ; endpoints_iterator->Next()) + { + try + { + try_make_connection_with_endpoint(); + current_endpoint_ = {endpoints_iterator->GetHostAddr(), endpoints_iterator->GetPort()}; + break; + } catch (const std::system_error&) { + if(!endpoints_iterator->NextIsExist()) + throw; + } + } +} + const ServerInfo& Client::Impl::GetServerInfo() const { return server_info_; } + +const std::optional& Client::Impl::GetCurrentEndpoint() const { + return current_endpoint_; +} + bool Client::Impl::Handshake() { if (!SendHello()) { return false; @@ -907,10 +929,15 @@ void Client::Impl::RetryGuard(std::function func) { try { RetryConnectToTheEndpoint(func); + if (!current_endpoint_) { + current_endpoint_ = {endpoints_iterator->GetHostAddr(), endpoints_iterator->GetPort()}; + } return; } catch (const std::system_error&) { if (!endpoints_iterator->NextIsExist()) throw; + //If the exceptions was catched here, that's mean that we should change the current_endpoint. + current_endpoint_.reset(); } } } @@ -993,6 +1020,14 @@ void Client::ResetConnection() { impl_->ResetConnection(); } +void Client::ResetConnectionEndpoint() { + impl_->ResetConnectionEndpoint(); +} + +const std::optional& Client::GetCurrentEndpoint() const { + return impl_->GetCurrentEndpoint(); +} + const ServerInfo& Client::GetServerInfo() const { return impl_->GetServerInfo(); } diff --git a/clickhouse/client.h b/clickhouse/client.h index ee7cf1e8..c40254f2 100644 --- a/clickhouse/client.h +++ b/clickhouse/client.h @@ -44,12 +44,16 @@ enum class CompressionMethod { LZ4 = 1, }; +struct Endpoint { + std::string host; + unsigned int port = 9000; + + inline bool operator==(const Endpoint& right) const { + return host == right.host && port == right.port; + } +}; + enum class EndpointsIterationAlgorithm { -/** - * Client tries to connect to those endpoints one by one, on the round-robin basis: - * first default enpoint, then each of endpoints, from begin() to end(), - * if previous are inaccessible. - */ RoundRobin = 0, }; @@ -67,13 +71,13 @@ struct ClientOptions { /// Service port. DECLARE_FIELD(port, unsigned int, SetPort, 9000); - /// Hostnames of the servers. The next host to connect is selected according to the EndpointsIterationAlgorithm. - /// Note: If SetHost and SetHosts are setted, host will be placed at the beginning of the hosts vector. - DECLARE_FIELD(hosts, std::vector, SetHosts, std::vector()); - /// Ports of the servers. - DECLARE_FIELD(ports, std::vector, SetPorts, std::vector()); - /// Algorithm for selecting the next endpoint for connection. - DECLARE_FIELD(iteration_algo, EndpointsIterationAlgorithm, SetEndpointsIterationAlgorithm, EndpointsIterationAlgorithm::RoundRobin); + /** Set endpoints (host+port), only one is used. + * Client tries to connect to those endpoints one by one, on the round-robin basis: + * first default enpoint (set via SetHost() + SetPort()), then each of endpoints, from begin() to end(), + * the first one to establish connection is used for the rest of the session. + * If port isn't specified, default(9000) value will be used. + */ + DECLARE_FIELD(endpoints, std::vector, SetEndpoints, {}); /// Default database. DECLARE_FIELD(default_database, std::string, SetDefaultDatabase, "default"); @@ -257,6 +261,11 @@ class Client { const ServerInfo& GetServerInfo() const; + /// Get current connected endpoint. + /// In case when client is not connected to any endpoint, nullopt will returned. + const std::optional& GetCurrentEndpoint() const; + + void ResetConnectionEndpoint(); private: const ClientOptions options_; diff --git a/tests/simple/main.cpp b/tests/simple/main.cpp index f20602f4..2911e559 100644 --- a/tests/simple/main.cpp +++ b/tests/simple/main.cpp @@ -8,7 +8,8 @@ #include #include #include - +#include +#include #if defined(_MSC_VER) # pragma warning(disable : 4996) #endif @@ -498,16 +499,10 @@ int main() { const auto localHostEndpoint = ClientOptions() .SetHost( getEnvOrDefault("CLICKHOUSE_HOST", "localhost")) .SetPort( getEnvOrDefault("CLICKHOUSE_PORT", "9000")) - .SetHosts({ getEnvOrDefault("CLICKHOUSE_HOST", "asasdasd"), - getEnvOrDefault("CLICKHOUSE_HOST", "localhost"), - getEnvOrDefault("CLICKHOUSE_HOST", "noalocalhost"), - getEnvOrDefault("CLICKHOUSE_HOST", "localhost"), - }) - .SetPorts({ getEnvOrDefault("CLICKHOUSE_PORT", "9000"), - getEnvOrDefault("CLICKHOUSE_PORT", "9000"), - getEnvOrDefault("CLICKHOUSE_PORT", "1234"), - getEnvOrDefault("CLICKHOUSE_PORT", "5678"), - }) + .SetEndpoints({ {"asasdasd", 9000} + ,{"localhost"} + ,{"noalocalhost", 9000} + }) .SetUser( getEnvOrDefault("CLICKHOUSE_USER", "default")) .SetPassword( getEnvOrDefault("CLICKHOUSE_PASSWORD", "")) .SetDefaultDatabase(getEnvOrDefault("CLICKHOUSE_DB", "default")); @@ -516,6 +511,7 @@ int main() { Client client(ClientOptions(localHostEndpoint) .SetPingBeforeQuery(true)); RunTests(client); + std::cout << "current endpoint : " << client.GetCurrentEndpoint().value().host << "\n"; } { diff --git a/ut/client_ut.cpp b/ut/client_ut.cpp index 07e53fba..845bd801 100644 --- a/ut/client_ut.cpp +++ b/ut/client_ut.cpp @@ -1217,9 +1217,12 @@ class ConnectionSuccessTestCase : public testing::TestWithParam { TEST_P(ConnectionSuccessTestCase, SuccessConnectionEstablished) { const auto & client_options = GetParam(); std::unique_ptr client; - + try { client = std::make_unique(client_options); + auto endpoint = client->GetCurrentEndpoint().value(); + ASSERT_EQ("localhost", endpoint.host); + ASSERT_EQ(9000u, endpoint.port); SUCCEED(); } catch (const std::exception & e) { FAIL() << "Got an unexpected exception : " << e.what(); @@ -1230,17 +1233,29 @@ TEST_P(ConnectionSuccessTestCase, SuccessConnectionEstablished) { INSTANTIATE_TEST_SUITE_P(ClientMultipleEndpoints, ConnectionSuccessTestCase, ::testing::Values(ClientCase::ParamType{ ClientOptions() - .SetHosts({ - getEnvOrDefault("CLICKHOUSE_HOST", "somedeadhost"), - getEnvOrDefault("CLICKHOUSE_HOST", "deadaginghost"), - getEnvOrDefault("CLICKHOUSE_HOST", "localhost"), - getEnvOrDefault("CLICKHOUSE_HOST", "noalocalhost"), + .SetEndpoints({ + {"somedeadhost", 9000} + , {"deadaginghost", 1245} + , {"localhost", 9000} + , {"noalocalhost", 6784} }) - .SetPorts( { - getEnvOrDefault("CLICKHOUSE_PORT", "9000"), - getEnvOrDefault("CLICKHOUSE_PORT", "1245"), - getEnvOrDefault("CLICKHOUSE_PORT", "9000"), - getEnvOrDefault("CLICKHOUSE_PORT", "6784"), + .SetUser( getEnvOrDefault("CLICKHOUSE_USER", "default")) + .SetPassword( getEnvOrDefault("CLICKHOUSE_PASSWORD", "")) + .SetDefaultDatabase(getEnvOrDefault("CLICKHOUSE_DB", "default")) + .SetPingBeforeQuery(true) + .SetConnectionConnectTimeout(std::chrono::milliseconds(200)) + .SetRetryTimeout(std::chrono::seconds(1)), + } +)); + +INSTANTIATE_TEST_SUITE_P(ClientMultipleEndpointsWithDefaultPort, ConnectionSuccessTestCase, + ::testing::Values(ClientCase::ParamType{ + ClientOptions() + .SetEndpoints({ + {"somedeadhost"} + , {"deadaginghost", 1245} + , {"localhost"} + , {"noalocalhost", 6784} }) .SetUser( getEnvOrDefault("CLICKHOUSE_USER", "default")) .SetPassword( getEnvOrDefault("CLICKHOUSE_PASSWORD", "")) @@ -1254,15 +1269,10 @@ INSTANTIATE_TEST_SUITE_P(ClientMultipleEndpoints, ConnectionSuccessTestCase, INSTANTIATE_TEST_SUITE_P(MultipleEndpointsFailed, ConnectionFailedClientTest, ::testing::Values(ConnectionFailedClientTest::ParamType{ ClientOptions() - .SetHosts({ - getEnvOrDefault("CLICKHOUSE_HOST", "somedeadhost"), - getEnvOrDefault("CLICKHOUSE_HOST", "deadaginghost"), - getEnvOrDefault("CLICKHOUSE_HOST", "noalocalhost") - }) - .SetPorts( { - getEnvOrDefault("CLICKHOUSE_PORT", "9000"), - getEnvOrDefault("CLICKHOUSE_PORT", "1245"), - getEnvOrDefault("CLICKHOUSE_PORT", "6784"), + .SetEndpoints({ + {"deadaginghost", 9000} + ,{"somedeadhost", 1245} + ,{"noalocalhost", 6784} }) .SetUser( getEnvOrDefault("CLICKHOUSE_USER", "default")) .SetPassword( getEnvOrDefault("CLICKHOUSE_PASSWORD", "")) @@ -1274,23 +1284,70 @@ INSTANTIATE_TEST_SUITE_P(MultipleEndpointsFailed, ConnectionFailedClientTest, } )); -INSTANTIATE_TEST_SUITE_P(MultipleEndpointsNonValidConfig, ConnectionFailedClientTest, - ::testing::Values(ConnectionFailedClientTest::ParamType{ +class ResetConnectionTestCase : public testing::TestWithParam {}; + +TEST_P(ResetConnectionTestCase, ResetConnectionEndpointTest) { + const auto & client_options = GetParam(); + std::unique_ptr client; + + try { + client = std::make_unique(client_options); + auto endpoint = client->GetCurrentEndpoint().value(); + ASSERT_EQ("localhost", endpoint.host); + ASSERT_EQ(9000u, endpoint.port); + + client->ResetConnectionEndpoint(); + endpoint = client->GetCurrentEndpoint().value(); + ASSERT_EQ("127.0.0.1", endpoint.host); + ASSERT_EQ(9000u, endpoint.port); + + client->ResetConnectionEndpoint(); + + endpoint = client->GetCurrentEndpoint().value(); + ASSERT_EQ("localhost", endpoint.host); + ASSERT_EQ(9000u, endpoint.port); + + SUCCEED(); + } catch (const std::exception & e) { + FAIL() << "Got an unexpected exception : " << e.what(); + } +} + +TEST_P(ResetConnectionTestCase, ResetConnectionTest) { + const auto & client_options = GetParam(); + std::unique_ptr client; + + try { + client = std::make_unique(client_options); + auto endpoint = client->GetCurrentEndpoint().value(); + ASSERT_EQ("localhost", endpoint.host); + ASSERT_EQ(9000u, endpoint.port); + + client->ResetConnection(); + endpoint = client->GetCurrentEndpoint().value(); + ASSERT_EQ("localhost", endpoint.host); + ASSERT_EQ(9000u, endpoint.port); + + SUCCEED(); + } catch (const std::exception & e) { + FAIL() << "Got an unexpected exception : " << e.what(); + } +} + +INSTANTIATE_TEST_SUITE_P(ResetConnectionClientTest, ResetConnectionTestCase, + ::testing::Values(ResetConnectionTestCase::ParamType { ClientOptions() - .SetHosts({ - getEnvOrDefault("CLICKHOUSE_HOST", "somedeadhost"), - getEnvOrDefault("CLICKHOUSE_HOST", "localhost"), - getEnvOrDefault("CLICKHOUSE_HOST", "noalocalhost"), - }) - .SetPorts( { - getEnvOrDefault("CLICKHOUSE_PORT", "9000"), + .SetEndpoints({ + {"localhost", 9000} + ,{"somedeadhost", 1245} + ,{"noalocalhost", 6784} + ,{"127.0.0.1", 9000} }) .SetUser( getEnvOrDefault("CLICKHOUSE_USER", "default")) .SetPassword( getEnvOrDefault("CLICKHOUSE_PASSWORD", "")) .SetDefaultDatabase(getEnvOrDefault("CLICKHOUSE_DB", "default")) .SetPingBeforeQuery(true) .SetConnectionConnectTimeout(std::chrono::milliseconds(200)) - .SetRetryTimeout(std::chrono::seconds(1)), - ExpectingException{"The sizes of lists of ports and hosts must match be equal."} + .SetRetryTimeout(std::chrono::seconds(1)) } )); From 252503891bc1aed040a6766f5e96826163c0b35a Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Tue, 20 Jun 2023 14:33:13 +0000 Subject: [PATCH 14/15] Fix tests and reviews changes --- clickhouse/base/endpoints_iterator.cpp | 2 +- clickhouse/base/endpoints_iterator.h | 4 ++-- clickhouse/client.h | 5 ++--- ut/client_ut.cpp | 2 +- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/clickhouse/base/endpoints_iterator.cpp b/clickhouse/base/endpoints_iterator.cpp index f687703d..a1b4ccc7 100644 --- a/clickhouse/base/endpoints_iterator.cpp +++ b/clickhouse/base/endpoints_iterator.cpp @@ -16,7 +16,7 @@ std::string RoundRobinEndpointsIterator::GetHostAddr() const return endpoints[current_index].host; } -unsigned int RoundRobinEndpointsIterator::GetPort() const +uint16_t RoundRobinEndpointsIterator::GetPort() const { return endpoints[current_index].port; } diff --git a/clickhouse/base/endpoints_iterator.h b/clickhouse/base/endpoints_iterator.h index dcbdd86b..23d705d6 100644 --- a/clickhouse/base/endpoints_iterator.h +++ b/clickhouse/base/endpoints_iterator.h @@ -20,7 +20,7 @@ class EndpointsIteratorBase virtual std::string GetHostAddr() const = 0; // Get the port of current endpoint. - virtual unsigned int GetPort() const = 0; + virtual uint16_t GetPort() const = 0; // Reset iterations. virtual void ResetIterations() = 0; @@ -32,7 +32,7 @@ class RoundRobinEndpointsIterator : public EndpointsIteratorBase public: explicit RoundRobinEndpointsIterator(const std::vector& opts); std::string GetHostAddr() const override; - unsigned int GetPort() const override; + uint16_t GetPort() const override; void ResetIterations() override; bool NextIsExist() const override; void Next() override; diff --git a/clickhouse/client.h b/clickhouse/client.h index c40254f2..a345543f 100644 --- a/clickhouse/client.h +++ b/clickhouse/client.h @@ -46,8 +46,7 @@ enum class CompressionMethod { struct Endpoint { std::string host; - unsigned int port = 9000; - + uint16_t port = 9000; inline bool operator==(const Endpoint& right) const { return host == right.host && port == right.port; } @@ -69,7 +68,7 @@ struct ClientOptions { /// Hostname of the server. DECLARE_FIELD(host, std::string, SetHost, std::string()); /// Service port. - DECLARE_FIELD(port, unsigned int, SetPort, 9000); + DECLARE_FIELD(port, uint16_t, SetPort, 9000); /** Set endpoints (host+port), only one is used. * Client tries to connect to those endpoints one by one, on the round-robin basis: diff --git a/ut/client_ut.cpp b/ut/client_ut.cpp index 845bd801..3ed1092e 100644 --- a/ut/client_ut.cpp +++ b/ut/client_ut.cpp @@ -1280,7 +1280,7 @@ INSTANTIATE_TEST_SUITE_P(MultipleEndpointsFailed, ConnectionFailedClientTest, .SetPingBeforeQuery(true) .SetConnectionConnectTimeout(std::chrono::milliseconds(200)) .SetRetryTimeout(std::chrono::seconds(1)), - ExpectingException{"Temporary failure in name resolution"} + ExpectingException{""} } )); From 87804a0bd02e58dabe030f2c19d02592771af58c Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Mon, 26 Jun 2023 07:29:06 +0000 Subject: [PATCH 15/15] Review changes --- clickhouse/base/endpoints_iterator.cpp | 34 ++----- clickhouse/base/endpoints_iterator.h | 24 +---- clickhouse/base/socket.cpp | 4 +- clickhouse/base/socket.h | 4 +- clickhouse/client.cpp | 133 +++++++++++++------------ clickhouse/client.h | 1 + 6 files changed, 88 insertions(+), 112 deletions(-) diff --git a/clickhouse/base/endpoints_iterator.cpp b/clickhouse/base/endpoints_iterator.cpp index a1b4ccc7..30d3593e 100644 --- a/clickhouse/base/endpoints_iterator.cpp +++ b/clickhouse/base/endpoints_iterator.cpp @@ -3,38 +3,16 @@ namespace clickhouse { -RoundRobinEndpointsIterator::RoundRobinEndpointsIterator(const std::vector& _endpoints) : - endpoints (_endpoints) - , current_index (0) - , iteration_counter(0) +RoundRobinEndpointsIterator::RoundRobinEndpointsIterator(const std::vector& _endpoints) + : endpoints (_endpoints) + , current_index (endpoints.size() - 1ull) { - -} - -std::string RoundRobinEndpointsIterator::GetHostAddr() const -{ - return endpoints[current_index].host; -} - -uint16_t RoundRobinEndpointsIterator::GetPort() const -{ - return endpoints[current_index].port; -} - -void RoundRobinEndpointsIterator::ResetIterations() -{ - iteration_counter = 0; -} - -void RoundRobinEndpointsIterator::Next() -{ - current_index = (current_index + 1) % endpoints.size(); - iteration_counter++; } -bool RoundRobinEndpointsIterator::NextIsExist() const +Endpoint RoundRobinEndpointsIterator::Next() { - return iteration_counter + 1 < endpoints.size(); + current_index = (current_index + 1ull) % endpoints.size(); + return endpoints[current_index]; } RoundRobinEndpointsIterator::~RoundRobinEndpointsIterator() = default; diff --git a/clickhouse/base/endpoints_iterator.h b/clickhouse/base/endpoints_iterator.h index 23d705d6..ba6a850f 100644 --- a/clickhouse/base/endpoints_iterator.h +++ b/clickhouse/base/endpoints_iterator.h @@ -12,37 +12,23 @@ struct ClientOptions; */ class EndpointsIteratorBase { - public: + public: virtual ~EndpointsIteratorBase() = default; - virtual void Next() = 0; - // Get the address of current endpoint. - virtual std::string GetHostAddr() const = 0; - - // Get the port of current endpoint. - virtual uint16_t GetPort() const = 0; - - // Reset iterations. - virtual void ResetIterations() = 0; - virtual bool NextIsExist() const = 0; + virtual Endpoint Next() = 0; }; class RoundRobinEndpointsIterator : public EndpointsIteratorBase { public: explicit RoundRobinEndpointsIterator(const std::vector& opts); - std::string GetHostAddr() const override; - uint16_t GetPort() const override; - void ResetIterations() override; - bool NextIsExist() const override; - void Next() override; - + Endpoint Next() override; + ~RoundRobinEndpointsIterator() override; private: const std::vector& endpoints; - int current_index; - size_t iteration_counter; + size_t current_index; }; } diff --git a/clickhouse/base/socket.cpp b/clickhouse/base/socket.cpp index 13755a37..e6fee1f5 100644 --- a/clickhouse/base/socket.cpp +++ b/clickhouse/base/socket.cpp @@ -390,9 +390,9 @@ std::unique_ptr Socket::makeOutputStream() const { NonSecureSocketFactory::~NonSecureSocketFactory() {} -std::unique_ptr NonSecureSocketFactory::connect(const ClientOptions &opts, const std::string& host, const std::string& port) { +std::unique_ptr NonSecureSocketFactory::connect(const ClientOptions &opts, const Endpoint& endpoint) { - const auto address = NetworkAddress(host, port); + const auto address = NetworkAddress(endpoint.host, std::to_string(endpoint.port)); auto socket = doConnect(address, opts); setSocketOptions(*socket, opts); diff --git a/clickhouse/base/socket.h b/clickhouse/base/socket.h index 4953a4d1..9bd9ca34 100644 --- a/clickhouse/base/socket.h +++ b/clickhouse/base/socket.h @@ -89,7 +89,7 @@ class SocketFactory { // TODO: move connection-related options to ConnectionOptions structure. - virtual std::unique_ptr connect(const ClientOptions& opts, const std::string& host, const std::string& port) = 0; + virtual std::unique_ptr connect(const ClientOptions& opts, const Endpoint& endpoint) = 0; virtual void sleepFor(const std::chrono::milliseconds& duration); }; @@ -136,7 +136,7 @@ class NonSecureSocketFactory : public SocketFactory { public: ~NonSecureSocketFactory() override; - std::unique_ptr connect(const ClientOptions& opts, const std::string& host, const std::string& port) override; + std::unique_ptr connect(const ClientOptions& opts, const Endpoint& endpoint) override; protected: virtual std::unique_ptr doConnect(const NetworkAddress& address, const ClientOptions& opts); diff --git a/clickhouse/client.cpp b/clickhouse/client.cpp index 27770c16..41144940 100644 --- a/clickhouse/client.cpp +++ b/clickhouse/client.cpp @@ -64,7 +64,8 @@ struct ClientInfo { }; std::ostream& operator<<(std::ostream& os, const ClientOptions& opt) { - os << "Client("; + os << "Client(" << opt.user << '@' << opt.host << ":" << opt.port + << "Endpoints :"; for (size_t i = 0; i < opt.endpoints.size(); i++) os << opt.user << '@' << opt.endpoints[i].host << ":" << opt.endpoints[i].port << ((i == opt.endpoints.size() - 1) ? "" : ", "); @@ -115,8 +116,13 @@ std::unique_ptr GetSocketFactory(const ClientOptions& opts) { return std::make_unique(); } -std::unique_ptr GetEndpointsIterator(const std::vector& endpoints) { - return std::make_unique(endpoints); +std::unique_ptr GetEndpointsIterator(const ClientOptions& opts) { + if (opts.endpoints.empty()) + { + throw ValidationError("The list of endpoints is empty"); + } + + return std::make_unique(opts.endpoints); } } @@ -171,11 +177,16 @@ class Client::Impl { void InitializeStreams(std::unique_ptr&& socket); + inline size_t GetConnectionAttempts() const + { + return options_.endpoints.size() * options_.send_retries; + } + private: /// In case of network errors tries to reconnect to server and /// call fuc several times. void RetryGuard(std::function func); - + void RetryConnectToTheEndpoint(std::function& func); private: @@ -222,10 +233,8 @@ ClientOptions modifyClientOptions(ClientOptions opts) if (opts.host.empty()) return opts; - Endpoint endpoint_single({opts.host, opts.port}); - if (std::find(opts.endpoints.begin(), opts.endpoints.end(), endpoint_single) == std::end(opts.endpoints)) { - opts.endpoints.emplace(opts.endpoints.begin(),endpoint_single); - } + Endpoint default_endpoint({opts.host, opts.port}); + opts.endpoints.emplace(opts.endpoints.begin(), default_endpoint); return opts; } @@ -237,7 +246,7 @@ Client::Impl::Impl(const ClientOptions& opts, : options_(modifyClientOptions(opts)) , events_(nullptr) , socket_factory_(std::move(socket_factory)) - , endpoints_iterator(GetEndpointsIterator(options_.endpoints)) + , endpoints_iterator(GetEndpointsIterator(options_)) { CreateConnection(); @@ -349,9 +358,7 @@ void Client::Impl::Ping() { } void Client::Impl::ResetConnection() { - InitializeStreams(socket_factory_->connect(options_, endpoints_iterator->GetHostAddr(), - std::to_string(endpoints_iterator->GetPort()) - )); + InitializeStreams(socket_factory_->connect(options_, current_endpoint_.value())); if (!Handshake()) { throw ProtocolError("fail to connect to " + options_.host); @@ -359,37 +366,36 @@ void Client::Impl::ResetConnection() { } void Client::Impl::ResetConnectionEndpoint() { - endpoints_iterator->ResetIterations(); - endpoints_iterator->Next(); - CreateConnection(); -} - -void Client::Impl::CreateConnection() { current_endpoint_.reset(); - auto try_make_connection_with_endpoint = [this]() { - for (unsigned int i = 0; ; ) { - try { - ResetConnection(); - return; - } catch (const std::system_error&) { - if (++i > options_.send_retries) { - throw; - } - socket_factory_->sleepFor(options_.retry_timeout); + for (size_t i = 0; i < options_.endpoints.size();) + { + try + { + current_endpoint_ = endpoints_iterator->Next(); + ResetConnection(); + return; + } catch (const std::system_error&) { + if (++i == options_.endpoints.size()) + { + current_endpoint_.reset(); + throw; } } - }; + } +} - for (endpoints_iterator->ResetIterations(); ; endpoints_iterator->Next()) +void Client::Impl::CreateConnection() { + for (size_t i = 0; i < options_.send_retries;) { try { - try_make_connection_with_endpoint(); - current_endpoint_ = {endpoints_iterator->GetHostAddr(), endpoints_iterator->GetPort()}; - break; + ResetConnectionEndpoint(); + return; } catch (const std::system_error&) { - if(!endpoints_iterator->NextIsExist()) + if (++i == options_.send_retries) + { throw; + } } } } @@ -924,40 +930,45 @@ bool Client::Impl::ReceiveHello() { } void Client::Impl::RetryGuard(std::function func) { - for(endpoints_iterator->ResetIterations(); ; endpoints_iterator->Next()) + + if (current_endpoint_) { - try - { - RetryConnectToTheEndpoint(func); - if (!current_endpoint_) { - current_endpoint_ = {endpoints_iterator->GetHostAddr(), endpoints_iterator->GetPort()}; + for (unsigned int i = 0; ; ++i) { + try { + func(); + return; + } catch (const std::system_error&) { + bool ok = true; + + try { + socket_factory_->sleepFor(options_.retry_timeout); + ResetConnection(); + } catch (...) { + ok = false; + } + + if (!ok && i == options_.send_retries) { + break; + } } - return; - } catch (const std::system_error&) { - if (!endpoints_iterator->NextIsExist()) - throw; - //If the exceptions was catched here, that's mean that we should change the current_endpoint. - current_endpoint_.reset(); } } -} - -void Client::Impl::RetryConnectToTheEndpoint(std::function& func) { - for (unsigned int i = 0; ; ++i) { - try { + // Connectiong with current_endpoint_ are broken. + // Trying to establish with the another one from the list. + size_t connection_attempts_count = GetConnectionAttempts(); + for (size_t i = 0; i < connection_attempts_count;) + { + try + { + socket_factory_->sleepFor(options_.retry_timeout); + current_endpoint_ = endpoints_iterator->Next(); + ResetConnection(); func(); return; } catch (const std::system_error&) { - bool ok = true; - - try { - socket_factory_->sleepFor(options_.retry_timeout); - ResetConnection(); - } catch (...) { - ok = false; - } - - if (!ok && i == options_.send_retries) { + if (++i == connection_attempts_count) + { + current_endpoint_.reset(); throw; } } diff --git a/clickhouse/client.h b/clickhouse/client.h index a345543f..a66d59a6 100644 --- a/clickhouse/client.h +++ b/clickhouse/client.h @@ -264,6 +264,7 @@ class Client { /// In case when client is not connected to any endpoint, nullopt will returned. const std::optional& GetCurrentEndpoint() const; + // Try to connect to different endpoints one by one only one time. If it doesn't work, throw an exception. void ResetConnectionEndpoint(); private: const ClientOptions options_;