From 77fa45b615170a658440e6ce3828b9b74a1cd050 Mon Sep 17 00:00:00 2001 From: Mateusz Date: Wed, 2 Aug 2023 14:12:16 +0100 Subject: [PATCH] CXXCBC-341 Support for subdoc read from replica (#436) --- CMakeLists.txt | 4 + core/impl/lookup_in_all_replicas.cxx | 176 ++++++ core/impl/lookup_in_all_replicas.hxx | 80 +++ core/impl/lookup_in_any_replica.cxx | 164 ++++++ core/impl/lookup_in_any_replica.hxx | 75 +++ core/impl/lookup_in_replica.cxx | 97 ++++ core/impl/lookup_in_replica.hxx | 67 +++ core/meta/features.hxx | 6 + core/operations.hxx | 2 + .../document_lookup_in_all_replicas.hxx | 192 +++++++ .../document_lookup_in_any_replica.hxx | 185 ++++++ core/protocol/cmd_hello.hxx | 1 + core/protocol/cmd_lookup_in_replica.cxx | 107 ++++ core/protocol/cmd_lookup_in_replica.hxx | 137 +++++ core/protocol/hello_feature.hxx | 6 + core/protocol/hello_feature_fmt.hxx | 3 + core/topology/capabilities.hxx | 1 + core/topology/capabilities_fmt.hxx | 3 + core/topology/configuration.hxx | 5 + core/topology/configuration_json.hxx | 2 + couchbase/collection.hxx | 111 ++++ couchbase/lookup_in_all_replicas_options.hxx | 109 ++++ couchbase/lookup_in_any_replica_options.hxx | 101 ++++ couchbase/lookup_in_replica_result.hxx | 74 +++ test/test_integration_subdoc.cxx | 542 ++++++++++++++++++ 25 files changed, 2250 insertions(+) create mode 100644 core/impl/lookup_in_all_replicas.cxx create mode 100644 core/impl/lookup_in_all_replicas.hxx create mode 100644 core/impl/lookup_in_any_replica.cxx create mode 100644 core/impl/lookup_in_any_replica.hxx create mode 100644 core/impl/lookup_in_replica.cxx create mode 100644 core/impl/lookup_in_replica.hxx create mode 100644 core/operations/document_lookup_in_all_replicas.hxx create mode 100644 core/operations/document_lookup_in_any_replica.hxx create mode 100644 core/protocol/cmd_lookup_in_replica.cxx create mode 100644 core/protocol/cmd_lookup_in_replica.hxx create mode 100644 couchbase/lookup_in_all_replicas_options.hxx create mode 100644 couchbase/lookup_in_any_replica_options.hxx create mode 100644 couchbase/lookup_in_replica_result.hxx diff --git a/CMakeLists.txt b/CMakeLists.txt index d3c7bacd0..8fe7c4899 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -233,6 +233,7 @@ set(couchbase_cxx_client_FILES core/protocol/cmd_increment.cxx core/protocol/cmd_insert.cxx core/protocol/cmd_lookup_in.cxx + core/protocol/cmd_lookup_in_replica.cxx core/protocol/cmd_mutate_in.cxx core/protocol/cmd_noop.cxx core/protocol/cmd_observe_seqno.cxx @@ -311,6 +312,9 @@ set(couchbase_cxx_client_FILES core/impl/key_value_error_category.cxx core/impl/key_value_error_context.cxx core/impl/lookup_in.cxx + core/impl/lookup_in_replica.cxx + core/impl/lookup_in_all_replicas.cxx + core/impl/lookup_in_any_replica.cxx core/impl/management_error_category.cxx core/impl/manager_error_context.cxx core/impl/match_all_query.cxx diff --git a/core/impl/lookup_in_all_replicas.cxx b/core/impl/lookup_in_all_replicas.cxx new file mode 100644 index 000000000..ddb13238e --- /dev/null +++ b/core/impl/lookup_in_all_replicas.cxx @@ -0,0 +1,176 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2020-Present Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "lookup_in_all_replicas.hxx" +#include "lookup_in_replica.hxx" + +#include "core/cluster.hxx" +#include "core/error_context/key_value.hxx" +#include "core/operations/document_lookup_in.hxx" +#include "core/topology/configuration.hxx" + +namespace couchbase::core::impl +{ + +void +initiate_lookup_in_all_replicas_operation(std::shared_ptr core, + const std::string& bucket_name, + const std::string& scope_name, + const std::string& collection_name, + std::string document_key, + const std::vector& specs, + lookup_in_all_replicas_options::built options, + lookup_in_all_replicas_handler&& handler) +{ + return initiate_lookup_in_all_replicas_operation(std::move(core), + bucket_name, + scope_name, + collection_name, + std::move(document_key), + specs, + options.timeout, + movable_lookup_in_all_replicas_handler{ std::move(handler) }); +} + +void +initiate_lookup_in_all_replicas_operation(std::shared_ptr core, + const std::string& bucket_name, + const std::string& scope_name, + const std::string& collection_name, + std::string document_key, + const std::vector& specs, + std::optional timeout, + movable_lookup_in_all_replicas_handler&& handler) +{ + auto request = std::make_shared( + bucket_name, scope_name, collection_name, std::move(document_key), specs, timeout); + core->with_bucket_configuration( + bucket_name, + [core, r = std::move(request), h = std::move(handler)](std::error_code ec, const core::topology::configuration& config) mutable { + if (!config.supports_subdoc_read_replica()) { + ec = errc::common::feature_not_available; + } + + if (ec) { + std::optional first_error_path{}; + std::optional first_error_index{}; + return h( + make_subdocument_error_context(make_key_value_error_context(ec, r->id()), ec, first_error_path, first_error_index, false), + lookup_in_all_replicas_result{}); + } + struct replica_context { + replica_context(movable_lookup_in_all_replicas_handler handler, std::uint32_t expected_responses) + : handler_(std::move(handler)) + , expected_responses_(expected_responses) + { + } + + movable_lookup_in_all_replicas_handler handler_; + std::uint32_t expected_responses_; + bool done_{ false }; + std::mutex mutex_{}; + lookup_in_all_replicas_result result_{}; + }; + auto ctx = std::make_shared(std::move(h), config.num_replicas.value_or(0U) + 1U); + + for (std::size_t idx = 1U; idx <= config.num_replicas.value_or(0U); ++idx) { + document_id replica_id{ r->id() }; + replica_id.node_index(idx); + core->execute( + impl::lookup_in_replica_request{ std::move(replica_id), r->specs(), r->timeout() }, + [ctx](impl::lookup_in_replica_response&& resp) { + movable_lookup_in_all_replicas_handler local_handler{}; + { + std::scoped_lock lock(ctx->mutex_); + if (ctx->done_) { + return; + } + --ctx->expected_responses_; + if (resp.ctx.ec()) { + if (ctx->expected_responses_ > 0) { + // just ignore the response + return; + } + } else { + std::vector entries{}; + for (auto& field : resp.fields) { + lookup_in_replica_result::entry lookup_in_entry{}; + lookup_in_entry.path = field.path; + lookup_in_entry.value = field.value; + lookup_in_entry.exists = field.exists; + lookup_in_entry.original_index = field.original_index; + entries.emplace_back(lookup_in_entry); + } + ctx->result_.emplace_back(lookup_in_replica_result{ resp.cas, entries, resp.deleted, true /* replica */ }); + } + if (ctx->expected_responses_ == 0) { + ctx->done_ = true; + std::swap(local_handler, ctx->handler_); + } + } + if (local_handler) { + if (!ctx->result_.empty()) { + resp.ctx.override_ec({}); + } + return local_handler(std::move(resp.ctx), std::move(ctx->result_)); + } + }); + } + + core::operations::lookup_in_request active{ document_id{ r->id() } }; + active.specs = r->specs(); + active.timeout = r->timeout(); + core->execute(active, [ctx](core::operations::lookup_in_response&& resp) { + movable_lookup_in_all_replicas_handler local_handler{}; + { + std::scoped_lock lock(ctx->mutex_); + if (ctx->done_) { + return; + } + --ctx->expected_responses_; + if (resp.ctx.ec()) { + if (ctx->expected_responses_ > 0) { + // just ignore the response + return; + } + } else { + std::vector entries{}; + for (auto& field : resp.fields) { + lookup_in_replica_result::entry lookup_in_entry{}; + lookup_in_entry.path = field.path; + lookup_in_entry.value = field.value; + lookup_in_entry.exists = field.exists; + lookup_in_entry.original_index = field.original_index; + entries.emplace_back(lookup_in_entry); + } + ctx->result_.emplace_back(lookup_in_replica_result{ resp.cas, entries, resp.deleted, false /* active */ }); + } + if (ctx->expected_responses_ == 0) { + ctx->done_ = true; + std::swap(local_handler, ctx->handler_); + } + } + if (local_handler) { + if (!ctx->result_.empty()) { + resp.ctx.override_ec({}); + } + return local_handler(std::move(resp.ctx), std::move(ctx->result_)); + } + }); + }); +} +} // namespace couchbase::core::impl diff --git a/core/impl/lookup_in_all_replicas.hxx b/core/impl/lookup_in_all_replicas.hxx new file mode 100644 index 000000000..31b720db7 --- /dev/null +++ b/core/impl/lookup_in_all_replicas.hxx @@ -0,0 +1,80 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2020-Present Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "core/document_id.hxx" +#include "core/error_context/key_value.hxx" +#include "core/utils/movable_function.hxx" + +#include + +namespace couchbase::core::impl +{ + +class lookup_in_all_replicas_request +{ + public: + explicit lookup_in_all_replicas_request(std::string bucket_name, + std::string scope_name, + std::string collection_name, + std::string document_key, + std::vector specs, + std::optional timeout) + : id_{ std::move(bucket_name), std::move(scope_name), std::move(collection_name), std::move(document_key) } + , specs_{ std::move(specs) } + , timeout_{ timeout } + { + } + + [[nodiscard]] const auto& id() const + { + return id_; + } + + [[nodiscard]] const auto& specs() const + { + return specs_; + } + + [[nodiscard]] const auto& timeout() const + { + return timeout_; + } + + private: + core::document_id id_; + std::vector specs_; + std::optional timeout_{}; +}; + +using movable_lookup_in_all_replicas_handler = + utils::movable_function; + +void +initiate_lookup_in_all_replicas_operation(std::shared_ptr core, + const std::string& bucket_name, + const std::string& scope_name, + const std::string& collection_name, + std::string document_key, + const std::vector& specs, + std::optional timeout, + movable_lookup_in_all_replicas_handler&& handler); +} // namespace couchbase::core::impl diff --git a/core/impl/lookup_in_any_replica.cxx b/core/impl/lookup_in_any_replica.cxx new file mode 100644 index 000000000..49f96df25 --- /dev/null +++ b/core/impl/lookup_in_any_replica.cxx @@ -0,0 +1,164 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2020-Present Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "lookup_in_any_replica.hxx" +#include "lookup_in_replica.hxx" + +#include "core/cluster.hxx" +#include "core/error_context/key_value.hxx" +#include "core/operations/document_lookup_in.hxx" +#include "core/topology/configuration.hxx" + +namespace couchbase::core::impl +{ +void +initiate_lookup_in_any_replica_operation(std::shared_ptr core, + const std::string& bucket_name, + const std::string& scope_name, + const std::string& collection_name, + std::string document_key, + const std::vector& specs, + lookup_in_any_replica_options::built options, + lookup_in_any_replica_handler&& handler) +{ + return initiate_lookup_in_any_replica_operation(std::move(core), + bucket_name, + scope_name, + collection_name, + std::move(document_key), + specs, + options.timeout, + movable_lookup_in_any_replica_handler{ std::move(handler) }); +} + +void +initiate_lookup_in_any_replica_operation(std::shared_ptr core, + const std::string& bucket_name, + const std::string& scope_name, + const std::string& collection_name, + std::string document_key, + const std::vector& specs, + std::optional timeout, + movable_lookup_in_any_replica_handler&& handler) +{ + auto request = std::make_shared( + bucket_name, scope_name, collection_name, std::move(document_key), specs, timeout); + core->with_bucket_configuration( + bucket_name, + [core, r = std::move(request), h = std::move(handler)](std::error_code ec, const core::topology::configuration& config) mutable { + if (!config.supports_subdoc_read_replica()) { + ec = errc::common::feature_not_available; + } + if (ec) { + std::optional first_error_path{}; + std::optional first_error_index{}; + return h( + make_subdocument_error_context(make_key_value_error_context(ec, r->id()), ec, first_error_path, first_error_index, false), + lookup_in_replica_result{}); + } + struct replica_context { + replica_context(movable_lookup_in_any_replica_handler handler, std::uint32_t expected_responses) + : handler_(std::move(handler)) + , expected_responses_(expected_responses) + { + } + + movable_lookup_in_any_replica_handler handler_; + std::uint32_t expected_responses_; + bool done_{ false }; + std::mutex mutex_{}; + }; + auto ctx = std::make_shared(std::move(h), config.num_replicas.value_or(0U) + 1U); + + for (std::size_t idx = 1U; idx <= config.num_replicas.value_or(0U); ++idx) { + document_id replica_id{ r->id() }; + replica_id.node_index(idx); + core->execute(impl::lookup_in_replica_request{ std::move(replica_id), r->specs(), r->timeout() }, + [ctx](impl::lookup_in_replica_response&& resp) { + movable_lookup_in_any_replica_handler local_handler; + { + std::scoped_lock lock(ctx->mutex_); + if (ctx->done_) { + return; + } + --ctx->expected_responses_; + if (resp.ctx.ec()) { + if (ctx->expected_responses_ > 0) { + // just ignore the response + return; + } + // consider document irretrievable and give up + resp.ctx.override_ec(errc::key_value::document_irretrievable); + } + ctx->done_ = true; + std::swap(local_handler, ctx->handler_); + } + if (local_handler) { + std::vector entries; + for (auto& field : resp.fields) { + lookup_in_replica_result::entry entry{}; + entry.path = field.path; + entry.original_index = field.original_index; + entry.exists = field.exists; + entry.value = field.value; + entries.emplace_back(entry); + } + return local_handler(std::move(resp.ctx), + lookup_in_replica_result{ resp.cas, entries, resp.deleted, true /* replica */ }); + } + }); + } + + core::operations::lookup_in_request active{ document_id{ r->id() } }; + active.specs = r->specs(); + active.timeout = r->timeout(); + core->execute(active, [ctx](core::operations::lookup_in_response&& resp) { + movable_lookup_in_any_replica_handler local_handler{}; + { + std::scoped_lock lock(ctx->mutex_); + if (ctx->done_) { + return; + } + --ctx->expected_responses_; + if (resp.ctx.ec()) { + if (ctx->expected_responses_ > 0) { + // just ignore the response + return; + } + // consider document irretrievable and give up + resp.ctx.override_ec(errc::key_value::document_irretrievable); + } + ctx->done_ = true; + std::swap(local_handler, ctx->handler_); + } + if (local_handler) { + std::vector entries; + for (auto& field : resp.fields) { + lookup_in_replica_result::entry entry{}; + entry.path = field.path; + entry.original_index = field.original_index; + entry.exists = field.exists; + entry.value = field.value; + entries.emplace_back(entry); + } + return local_handler(std::move(resp.ctx), + lookup_in_replica_result{ resp.cas, entries, resp.deleted, false /* active */ }); + } + }); + }); +} +} // namespace couchbase::core::impl diff --git a/core/impl/lookup_in_any_replica.hxx b/core/impl/lookup_in_any_replica.hxx new file mode 100644 index 000000000..9f7dd25f4 --- /dev/null +++ b/core/impl/lookup_in_any_replica.hxx @@ -0,0 +1,75 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2020-Present Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "core/document_id.hxx" +#include "core/utils/movable_function.hxx" + +namespace couchbase::core::impl +{ +class lookup_in_any_replica_request +{ + public: + explicit lookup_in_any_replica_request(std::string bucket_name, + std::string scope_name, + std::string collection_name, + std::string document_key, + std::vector specs, + std::optional timeout) + : id_{ std::move(bucket_name), std::move(scope_name), std::move(collection_name), std::move(document_key) } + , specs_{ std::move(specs) } + , timeout_{ timeout } + { + } + + [[nodiscard]] const auto& id() const + { + return id_; + } + + [[nodiscard]] const auto& specs() const + { + return specs_; + } + + [[nodiscard]] const auto& timeout() const + { + return timeout_; + } + + private: + core::document_id id_; + std::vector specs_; + std::optional timeout_{}; +}; + +using movable_lookup_in_any_replica_handler = utils::movable_function; + +void +initiate_lookup_in_any_replica_operation(std::shared_ptr core, + const std::string& bucket_name, + const std::string& scope_name, + const std::string& collection_name, + std::string document_key, + const std::vector& specs, + std::optional timeout, + movable_lookup_in_any_replica_handler&& handler); +} // namespace couchbase::core::impl diff --git a/core/impl/lookup_in_replica.cxx b/core/impl/lookup_in_replica.cxx new file mode 100644 index 000000000..ace8347dc --- /dev/null +++ b/core/impl/lookup_in_replica.cxx @@ -0,0 +1,97 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2020-Present Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "lookup_in_replica.hxx" +#include "core/impl/subdoc/path_flags.hxx" + +#include + +namespace couchbase::core::impl +{ +std::error_code +lookup_in_replica_request::encode_to(lookup_in_replica_request::encoded_request_type& encoded, mcbp_context&& /* context */) +{ + for (std::size_t i = 0; i < specs.size(); ++i) { + specs[i].original_index_ = i; + } + std::stable_sort(specs.begin(), specs.end(), [](const auto& lhs, const auto& rhs) { + /* move XATTRs to the beginning of the vector */ + return core::impl::subdoc::has_xattr_path_flag(lhs.flags_) && !core::impl::subdoc::has_xattr_path_flag(rhs.flags_); + }); + + encoded.opaque(opaque); + encoded.partition(partition); + encoded.body().id(id); + encoded.body().read_replica(true); + encoded.body().specs(specs); + return {}; +} + +lookup_in_replica_response +lookup_in_replica_request::make_response(key_value_error_context&& ctx, const encoded_response_type& encoded) const +{ + + bool deleted = false; + couchbase::cas cas{}; + std::vector fields{}; + std::error_code ec = ctx.ec(); + std::optional first_error_index{}; + std::optional first_error_path{}; + + if (encoded.status() == key_value_status_code::subdoc_success_deleted || + encoded.status() == key_value_status_code::subdoc_multi_path_failure_deleted) { + deleted = true; + } + if (!ctx.ec()) { + fields.resize(specs.size()); + for (size_t i = 0; i < specs.size(); ++i) { + const auto& req_entry = specs[i]; + fields[i].original_index = req_entry.original_index_; + fields[i].path = req_entry.path_; + fields[i].opcode = static_cast(req_entry.opcode_); + fields[i].status = key_value_status_code::success; + } + for (size_t i = 0; i < encoded.body().fields().size(); ++i) { + const auto& res_entry = encoded.body().fields()[i]; + fields[i].status = res_entry.status; + fields[i].ec = + protocol::map_status_code(protocol::client_opcode::subdoc_multi_mutation, static_cast(res_entry.status)); + if (!fields[i].ec && !ctx.ec()) { + ec = fields[i].ec; + } + if (!first_error_index && !fields[i].ec) { + first_error_index = i; + first_error_path = fields[i].path; + } + fields[i].exists = + res_entry.status == key_value_status_code::success || res_entry.status == key_value_status_code::subdoc_success_deleted; + fields[i].value = utils::to_binary(res_entry.value); + } + if (!ec) { + cas = encoded.cas(); + } + std::sort(fields.begin(), fields.end(), [](const auto& lhs, const auto& rhs) { return lhs.original_index < rhs.original_index; }); + } + + return lookup_in_replica_response{ + make_subdocument_error_context(ctx, ec, first_error_path, first_error_index, deleted), + cas, + std::move(fields), + deleted, + }; +} +} // namespace couchbase::core::impl diff --git a/core/impl/lookup_in_replica.hxx b/core/impl/lookup_in_replica.hxx new file mode 100644 index 000000000..1d34cba76 --- /dev/null +++ b/core/impl/lookup_in_replica.hxx @@ -0,0 +1,67 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2020-Present Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "core/error_context/key_value.hxx" +#include "core/impl/subdoc/command.hxx" +#include "core/io/mcbp_context.hxx" +#include "core/io/retry_context.hxx" +#include "core/protocol/client_request.hxx" +#include "core/protocol/cmd_lookup_in_replica.hxx" +#include "core/public_fwd.hxx" +#include "core/timeout_defaults.hxx" + +#include +#include + +namespace couchbase::core::impl +{ +struct lookup_in_replica_response { + struct entry { + std::string path; + couchbase::codec::binary value; + std::size_t original_index; + bool exists; + protocol::subdoc_opcode opcode; + key_value_status_code status; + std::error_code ec{}; + }; + subdocument_error_context ctx{}; + couchbase::cas cas{}; + std::vector fields{}; + bool deleted{ false }; +}; + +struct lookup_in_replica_request { + using response_type = lookup_in_replica_response; + using encoded_request_type = protocol::client_request; + using encoded_response_type = protocol::client_response; + + document_id id; + std::vector specs{}; + std::optional timeout{}; + std::shared_ptr parent_span{ nullptr }; + std::uint16_t partition{}; + std::uint32_t opaque{}; + io::retry_context retries{}; + + [[nodiscard]] std::error_code encode_to(encoded_request_type& encoded, mcbp_context&& context); + + [[nodiscard]] lookup_in_replica_response make_response(key_value_error_context&& ctx, const encoded_response_type& encoded) const; +}; +} // namespace couchbase::core::impl diff --git a/core/meta/features.hxx b/core/meta/features.hxx index 259d4a599..d05206b15 100644 --- a/core/meta/features.hxx +++ b/core/meta/features.hxx @@ -48,3 +48,9 @@ * - couchbase::query_options::use_replica() */ #define COUCHBASE_CXX_CLIENT_QUERY_READ_FROM_REPLICA 1 + +/** + * Subdoc read from replica is available in the core + * couchbase::core::lookup_in_replica support + */ +#define COUCHBASE_CXX_CLIENT_CORE_HAS_SUBDOC_READ_REPLICA 1 diff --git a/core/operations.hxx b/core/operations.hxx index f68b42340..1fb60add9 100644 --- a/core/operations.hxx +++ b/core/operations.hxx @@ -30,6 +30,8 @@ #include "core/operations/document_increment.hxx" #include "core/operations/document_insert.hxx" #include "core/operations/document_lookup_in.hxx" +#include "core/operations/document_lookup_in_all_replicas.hxx" +#include "core/operations/document_lookup_in_any_replica.hxx" #include "core/operations/document_mutate_in.hxx" #include "core/operations/document_prepend.hxx" #include "core/operations/document_query.hxx" diff --git a/core/operations/document_lookup_in_all_replicas.hxx b/core/operations/document_lookup_in_all_replicas.hxx new file mode 100644 index 000000000..987cba097 --- /dev/null +++ b/core/operations/document_lookup_in_all_replicas.hxx @@ -0,0 +1,192 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2020-2021 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "core/error_context/key_value.hxx" +#include "core/impl/lookup_in_replica.hxx" +#include "core/impl/subdoc/command.hxx" +#include "core/operations/document_lookup_in.hxx" +#include "core/operations/operation_traits.hxx" +#include "core/utils/movable_function.hxx" +#include "couchbase/codec/encoded_value.hxx" +#include "couchbase/error_codes.hxx" + +#include +#include +#include + +namespace couchbase::core::operations +{ +struct lookup_in_all_replicas_response { + struct entry { + struct lookup_in_entry { + std::string path; + codec::binary value; + std::size_t original_index; + bool exists; + protocol::subdoc_opcode opcode; + key_value_status_code status; + std::error_code ec{}; + }; + std::vector fields{}; + couchbase::cas cas{}; + bool deleted{ false }; + bool is_replica{ true }; + }; + subdocument_error_context ctx{}; + std::vector entries{}; +}; + +struct lookup_in_all_replicas_request { + using response_type = lookup_in_all_replicas_response; + using encoded_request_type = core::protocol::client_request; + using encoded_response_type = core::protocol::client_response; + + core::document_id id; + std::vector specs{}; + std::optional timeout{}; + std::shared_ptr parent_span{ nullptr }; + + template + void execute(Core core, Handler handler) + { + core->with_bucket_configuration( + id.bucket(), + [core, id = id, timeout = timeout, specs = specs, parent_span = parent_span, h = std::forward(handler)]( + std::error_code ec, const topology::configuration& config) mutable { + if (!config.supports_subdoc_read_replica()) { + ec = errc::common::feature_not_available; + } + + if (ec) { + std::optional first_error_path{}; + std::optional first_error_index{}; + return h(response_type{ + make_subdocument_error_context(make_key_value_error_context(ec, id), ec, first_error_path, first_error_index, false) }); + } + using handler_type = utils::movable_function; + + struct replica_context { + replica_context(handler_type handler, std::uint32_t expected_responses) + : handler_(std::move(handler)) + , expected_responses_(expected_responses) + { + } + + handler_type handler_; + std::uint32_t expected_responses_; + bool done_{ false }; + std::mutex mutex_{}; + std::vector result_{}; + }; + auto ctx = std::make_shared(std::move(h), config.num_replicas.value_or(0U) + 1U); + + for (std::size_t idx = 1U; idx <= config.num_replicas.value_or(0U); ++idx) { + document_id replica_id{ id }; + replica_id.node_index(idx); + core->execute(impl::lookup_in_replica_request{ std::move(replica_id), specs, timeout, parent_span }, + [ctx](impl::lookup_in_replica_response&& resp) { + handler_type local_handler{}; + { + std::scoped_lock lock(ctx->mutex_); + if (ctx->done_) { + return; + } + --ctx->expected_responses_; + if (resp.ctx.ec()) { + if (ctx->expected_responses_ > 0) { + // just ignore the response + return; + } + } else { + lookup_in_all_replicas_response::entry top_entry{}; + top_entry.cas = resp.cas; + top_entry.deleted = resp.deleted; + top_entry.is_replica = true; + for (auto& field : resp.fields) { + lookup_in_all_replicas_response::entry::lookup_in_entry lookup_in_entry{}; + lookup_in_entry.path = field.path; + lookup_in_entry.value = field.value; + lookup_in_entry.status = field.status; + lookup_in_entry.ec = field.ec; + lookup_in_entry.exists = field.exists; + lookup_in_entry.original_index = field.original_index; + lookup_in_entry.opcode = field.opcode; + top_entry.fields.emplace_back(lookup_in_entry); + } + ctx->result_.emplace_back(lookup_in_all_replicas_response::entry{ top_entry }); + } + if (ctx->expected_responses_ == 0) { + ctx->done_ = true; + std::swap(local_handler, ctx->handler_); + } + } + if (local_handler) { + return local_handler({ std::move(resp.ctx), std::move(ctx->result_) }); + } + }); + } + + core->execute(lookup_in_request{ document_id{ id }, {}, {}, false, specs, timeout }, [ctx](lookup_in_response&& resp) { + handler_type local_handler{}; + { + std::scoped_lock lock(ctx->mutex_); + if (ctx->done_) { + return; + } + --ctx->expected_responses_; + if (resp.ctx.ec()) { + if (ctx->expected_responses_ > 0) { + // just ignore the response + return; + } + } else { + lookup_in_all_replicas_response::entry top_entry{}; + top_entry.cas = resp.cas; + top_entry.deleted = resp.deleted; + top_entry.is_replica = false; + for (auto& field : resp.fields) { + lookup_in_all_replicas_response::entry::lookup_in_entry lookup_in_entry{}; + lookup_in_entry.path = field.path; + lookup_in_entry.value = field.value; + lookup_in_entry.status = field.status; + lookup_in_entry.ec = field.ec; + lookup_in_entry.exists = field.exists; + lookup_in_entry.original_index = field.original_index; + lookup_in_entry.opcode = field.opcode; + top_entry.fields.emplace_back(lookup_in_entry); + } + ctx->result_.emplace_back(lookup_in_all_replicas_response::entry{ top_entry }); + } + if (ctx->expected_responses_ == 0) { + ctx->done_ = true; + std::swap(local_handler, ctx->handler_); + } + } + if (local_handler) { + return local_handler({ std::move(resp.ctx), std::move(ctx->result_) }); + } + }); + }); + } +}; + +template<> +struct is_compound_operation : public std::true_type { +}; +} // namespace couchbase::core::operations diff --git a/core/operations/document_lookup_in_any_replica.hxx b/core/operations/document_lookup_in_any_replica.hxx new file mode 100644 index 000000000..671ac21f8 --- /dev/null +++ b/core/operations/document_lookup_in_any_replica.hxx @@ -0,0 +1,185 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2020-2021 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "core/error_context/key_value.hxx" +#include "core/impl/lookup_in_replica.hxx" +#include "core/impl/subdoc/command.hxx" +#include "core/operations/document_lookup_in.hxx" +#include "core/operations/operation_traits.hxx" +#include "core/utils/movable_function.hxx" +#include "couchbase/codec/encoded_value.hxx" +#include "couchbase/error_codes.hxx" + +#include +#include +#include + +namespace couchbase::core::operations +{ +struct lookup_in_any_replica_response { + struct entry { + std::string path; + codec::binary value; + std::size_t original_index; + bool exists; + protocol::subdoc_opcode opcode; + key_value_status_code status; + std::error_code ec{}; + }; + subdocument_error_context ctx{}; + couchbase::cas cas{}; + std::vector fields{}; + bool deleted{ false }; + bool is_replica{ true }; +}; + +struct lookup_in_any_replica_request { + using response_type = lookup_in_any_replica_response; + using encoded_request_type = core::protocol::client_request; + using encoded_response_type = core::protocol::client_response; + + core::document_id id; + std::vector specs{}; + std::optional timeout{}; + std::shared_ptr parent_span{ nullptr }; + + template + void execute(Core core, Handler handler) + { + core->with_bucket_configuration( + id.bucket(), + [core, id = id, timeout = timeout, specs = specs, parent_span = parent_span, h = std::forward(handler)]( + std::error_code ec, const topology::configuration& config) mutable { + if (!config.supports_subdoc_read_replica()) { + ec = errc::common::feature_not_available; + } + + if (ec) { + std::optional first_error_path{}; + std::optional first_error_index{}; + return h(response_type{ + make_subdocument_error_context(make_key_value_error_context(ec, id), ec, first_error_path, first_error_index, false) }); + } + using handler_type = utils::movable_function; + + struct replica_context { + replica_context(handler_type&& handler, std::uint32_t expected_responses) + : handler_(std::move(handler)) + , expected_responses_(expected_responses) + { + } + + handler_type handler_; + std::uint32_t expected_responses_; + bool done_{ false }; + std::mutex mutex_{}; + }; + auto ctx = std::make_shared(std::move(h), config.num_replicas.value_or(0U) + 1U); + + for (std::size_t idx = 1U; idx <= config.num_replicas.value_or(0U); ++idx) { + document_id replica_id{ id }; + replica_id.node_index(idx); + core->execute(impl::lookup_in_replica_request{ std::move(replica_id), specs, timeout, parent_span }, + [ctx](impl::lookup_in_replica_response&& resp) { + handler_type local_handler; + { + std::scoped_lock lock(ctx->mutex_); + if (ctx->done_) { + return; + } + --ctx->expected_responses_; + if (resp.ctx.ec()) { + if (ctx->expected_responses_ > 0) { + // just ignore the response + return; + } + // consider document irretrievable and give up + resp.ctx.override_ec(errc::key_value::document_irretrievable); + } + ctx->done_ = true; + std::swap(local_handler, ctx->handler_); + } + if (local_handler) { + response_type res{}; + res.ctx = resp.ctx; + res.cas = resp.cas; + res.deleted = resp.deleted; + res.is_replica = true; + for (auto& field : resp.fields) { + auto lookup_in_entry = lookup_in_any_replica_response::entry{}; + lookup_in_entry.path = field.path; + lookup_in_entry.value = field.value; + lookup_in_entry.status = field.status; + lookup_in_entry.ec = field.ec; + lookup_in_entry.exists = field.exists; + lookup_in_entry.original_index = field.original_index; + lookup_in_entry.opcode = field.opcode; + res.fields.emplace_back(lookup_in_entry); + } + return local_handler(res); + } + }); + } + core->execute(lookup_in_request{ id, {}, {}, false, specs, timeout }, [ctx](lookup_in_response&& resp) { + handler_type local_handler{}; + { + std::scoped_lock lock(ctx->mutex_); + if (ctx->done_) { + return; + } + --ctx->expected_responses_; + if (resp.ctx.ec()) { + if (ctx->expected_responses_ > 0) { + // just ignore the response + return; + } + // consider document irretrievable and give up + resp.ctx.override_ec(errc::key_value::document_irretrievable); + } + ctx->done_ = true; + std::swap(local_handler, ctx->handler_); + } + if (local_handler) { + auto res = response_type{}; + res.ctx = resp.ctx; + res.cas = resp.cas; + res.deleted = resp.deleted; + res.is_replica = false; + for (auto& field : resp.fields) { + auto lookup_in_entry = lookup_in_any_replica_response::entry{}; + lookup_in_entry.path = field.path; + lookup_in_entry.value = field.value; + lookup_in_entry.status = field.status; + lookup_in_entry.ec = field.ec; + lookup_in_entry.exists = field.exists; + lookup_in_entry.original_index = field.original_index; + lookup_in_entry.opcode = field.opcode; + res.fields.emplace_back(lookup_in_entry); + } + return local_handler(res); + } + }); + }); + } +}; + +template<> +struct is_compound_operation : public std::true_type { +}; +} // namespace couchbase::core::operations diff --git a/core/protocol/cmd_hello.hxx b/core/protocol/cmd_hello.hxx index 68d26802e..a0a5c3a51 100644 --- a/core/protocol/cmd_hello.hxx +++ b/core/protocol/cmd_hello.hxx @@ -71,6 +71,7 @@ class hello_request_body hello_feature::collections, hello_feature::subdoc_create_as_deleted, hello_feature::preserve_ttl, + hello_feature::subdoc_replica_read, }; std::vector value_; diff --git a/core/protocol/cmd_lookup_in_replica.cxx b/core/protocol/cmd_lookup_in_replica.cxx new file mode 100644 index 000000000..4bb6c3ad4 --- /dev/null +++ b/core/protocol/cmd_lookup_in_replica.cxx @@ -0,0 +1,107 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2020-2021 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "cmd_lookup_in_replica.hxx" + +#include "core/utils/byteswap.hxx" +#include "core/utils/unsigned_leb128.hxx" + +#include +#include + +namespace couchbase::core::protocol +{ +bool +lookup_in_replica_response_body::parse(key_value_status_code status, + const header_buffer& header, + std::uint8_t framing_extras_size, + std::uint16_t key_size, + std::uint8_t extras_size, + const std::vector& body, + const cmd_info& /* info */) +{ + Expects(header[1] == static_cast(opcode)); + if (status == key_value_status_code::success || status == key_value_status_code::subdoc_multi_path_failure || + status == key_value_status_code::subdoc_success_deleted || status == key_value_status_code::subdoc_multi_path_failure_deleted) { + using offset_type = std::vector::difference_type; + offset_type offset = framing_extras_size + key_size + extras_size; + fields_.reserve(16); /* we won't have more than 16 entries anyway */ + while (static_cast(offset) < body.size()) { + lookup_in_field field; + + std::uint16_t entry_status = 0; + memcpy(&entry_status, body.data() + offset, sizeof(entry_status)); + entry_status = utils::byte_swap(entry_status); + Expects(is_valid_status(entry_status)); + field.status = static_cast(entry_status); + offset += static_cast(sizeof(entry_status)); + + std::uint32_t entry_size = 0; + memcpy(&entry_size, body.data() + offset, sizeof(entry_size)); + entry_size = utils::byte_swap(entry_size); + Expects(entry_size < 20 * 1024 * 1024); + offset += static_cast(sizeof(entry_size)); + + field.value.resize(entry_size); + memcpy(field.value.data(), body.data() + offset, entry_size); + offset += static_cast(entry_size); + + fields_.emplace_back(field); + } + return true; + } + return false; +} + +void +lookup_in_replica_request_body::id(const document_id& id) +{ + key_ = make_protocol_key(id); +} + +void +lookup_in_replica_request_body::fill_extras() +{ + if (flags_ != 0) { + extras_.resize(sizeof(flags_)); + extras_[0] = std::byte{ flags_ }; + } +} + +void +lookup_in_replica_request_body::fill_value() +{ + size_t value_size = 0; + for (const auto& spec : specs_) { + value_size += sizeof(spec.opcode_) + sizeof(std::uint8_t) + sizeof(std::uint16_t) + spec.path_.size(); + } + Expects(value_size > 0); + value_.resize(value_size); + std::vector::size_type offset = 0; + for (const auto& spec : specs_) { + value_[offset] = static_cast(spec.opcode_); + ++offset; + value_[offset] = spec.flags_; + ++offset; + std::uint16_t path_size = utils::byte_swap(gsl::narrow_cast(spec.path_.size())); + std::memcpy(value_.data() + offset, &path_size, sizeof(path_size)); + offset += sizeof(path_size); + std::memcpy(value_.data() + offset, spec.path_.data(), spec.path_.size()); + offset += spec.path_.size(); + } +} +} // namespace couchbase::core::protocol diff --git a/core/protocol/cmd_lookup_in_replica.hxx b/core/protocol/cmd_lookup_in_replica.hxx new file mode 100644 index 000000000..464c7eb94 --- /dev/null +++ b/core/protocol/cmd_lookup_in_replica.hxx @@ -0,0 +1,137 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2020-2021 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "client_opcode.hxx" +#include "cmd_info.hxx" +#include "core/document_id.hxx" +#include "core/impl/subdoc/command.hxx" +#include "core/io/mcbp_message.hxx" +#include "status.hxx" + +#include + +namespace couchbase::core::protocol +{ + +class lookup_in_replica_response_body +{ + public: + static const inline client_opcode opcode = client_opcode::subdoc_multi_lookup; + + struct lookup_in_field { + key_value_status_code status{}; + std::string value; + }; + + private: + std::vector fields_{}; + + public: + [[nodiscard]] const std::vector& fields() const + { + return fields_; + } + + [[nodiscard]] bool parse(key_value_status_code status, + const header_buffer& header, + std::uint8_t framing_extras_size, + std::uint16_t key_size, + std::uint8_t extras_size, + const std::vector& body, + const cmd_info& info); +}; + +class lookup_in_replica_request_body +{ + public: + using response_body_type = lookup_in_replica_response_body; + static const inline client_opcode opcode = client_opcode::subdoc_multi_lookup; + + /** + * Tells the server to operate on replica vbucket instead of active + */ + static const inline std::uint8_t doc_flag_replica_read = 0b0010'0000; + + private: + std::vector key_; + std::vector extras_{}; + std::vector value_{}; + + std::uint8_t flags_{ 0 }; + std::vector specs_; + + public: + void id(const document_id& id); + + void read_replica(bool value) + { + if (value) { + flags_ = flags_ | doc_flag_replica_read; + } + } + + void specs(const std::vector& specs) + { + specs_ = specs; + } + + [[nodiscard]] const auto& key() const + { + return key_; + } + + [[nodiscard]] const auto& framing_extras() const + { + return empty_buffer; + } + + [[nodiscard]] const auto& extras() + { + if (extras_.empty()) { + fill_extras(); + } + return extras_; + } + + [[nodiscard]] const auto& value() + { + if (value_.empty()) { + fill_value(); + } + return value_; + } + + [[nodiscard]] std::size_t size() + { + if (extras_.empty()) { + fill_extras(); + } + if (value_.empty()) { + fill_value(); + } + return key_.size() + extras_.size() + value_.size(); + } + + private: + void fill_extras(); + + void fill_value(); +}; + +} // namespace couchbase::core::protocol diff --git a/core/protocol/hello_feature.hxx b/core/protocol/hello_feature.hxx index 6bb9e4d14..9f9e7d1c9 100644 --- a/core/protocol/hello_feature.hxx +++ b/core/protocol/hello_feature.hxx @@ -155,6 +155,11 @@ enum class hello_feature : std::uint16_t { replace_body_with_xattr = 0x19, resource_units = 0x1a, + + /** + * Indicates support for subdoc lookup operations on replicas + */ + subdoc_replica_read = 0x1c, }; constexpr bool @@ -185,6 +190,7 @@ is_valid_hello_feature(std::uint16_t code) case hello_feature::subdoc_document_macro_support: case hello_feature::replace_body_with_xattr: case hello_feature::resource_units: + case hello_feature::subdoc_replica_read: return true; } return false; diff --git a/core/protocol/hello_feature_fmt.hxx b/core/protocol/hello_feature_fmt.hxx index 7311599c7..27ddb8389 100644 --- a/core/protocol/hello_feature_fmt.hxx +++ b/core/protocol/hello_feature_fmt.hxx @@ -106,6 +106,9 @@ struct fmt::formatter { case couchbase::core::protocol::hello_feature::resource_units: name = "resource_units"; break; + case couchbase::core::protocol::hello_feature::subdoc_replica_read: + name = "subdoc_replica_read"; + break; } return format_to(ctx.out(), "{}", name); } diff --git a/core/topology/capabilities.hxx b/core/topology/capabilities.hxx index 677b5299d..19a504458 100644 --- a/core/topology/capabilities.hxx +++ b/core/topology/capabilities.hxx @@ -32,6 +32,7 @@ enum class bucket_capability { durable_write, tombstoned_user_xattrs, range_scan, + replica_read, }; enum class cluster_capability { diff --git a/core/topology/capabilities_fmt.hxx b/core/topology/capabilities_fmt.hxx index 3345c7b6f..970177172 100644 --- a/core/topology/capabilities_fmt.hxx +++ b/core/topology/capabilities_fmt.hxx @@ -70,6 +70,9 @@ struct fmt::formatter { case couchbase::core::bucket_capability::range_scan: name = "range_scan"; break; + case couchbase::core::bucket_capability::replica_read: + name = "replica_read"; + break; } return format_to(ctx.out(), "{}", name); } diff --git a/core/topology/configuration.hxx b/core/topology/configuration.hxx index 3aa57c243..a8c1d1785 100644 --- a/core/topology/configuration.hxx +++ b/core/topology/configuration.hxx @@ -129,6 +129,11 @@ struct configuration { return bucket_capabilities.count(couchbase::core::bucket_capability::couchapi) == 0; } + [[nodiscard]] bool supports_subdoc_read_replica() const + { + return bucket_capabilities.find(bucket_capability::replica_read) != bucket_capabilities.end(); + } + [[nodiscard]] std::size_t index_for_this_node() const; [[nodiscard]] bool has_node(const std::string& network, service_type type, diff --git a/core/topology/configuration_json.hxx b/core/topology/configuration_json.hxx index 3e3463769..96e733847 100644 --- a/core/topology/configuration_json.hxx +++ b/core/topology/configuration_json.hxx @@ -235,6 +235,8 @@ struct traits { result.bucket_capabilities.insert(couchbase::core::bucket_capability::xattr); } else if (name == "rangeScan") { result.bucket_capabilities.insert(couchbase::core::bucket_capability::range_scan); + } else if (name == "subdoc.ReplicaRead") { + result.bucket_capabilities.insert(couchbase::core::bucket_capability::replica_read); } } } diff --git a/couchbase/collection.hxx b/couchbase/collection.hxx index 4e4abfcf4..2dabb92b5 100644 --- a/couchbase/collection.hxx +++ b/couchbase/collection.hxx @@ -28,6 +28,8 @@ #include #include #include +#include +#include #include #include #include @@ -856,6 +858,115 @@ class collection return future; } + /** + * Performs lookups to document fragments with default options from all replicas and the active node and returns the result as a vector. + * + * @tparam Handler type of the handler that implements @ref lookup_in_all_replicas_handler + * + * @param document_id the outer document ID + * @param specs an object that specifies the types of lookups to perform + * @param options custom options to modify the lookup options + * @param handler callable that implements @ref lookup_in_all_replicas_handler + * + * @exception errc::key_value::document_not_found the given document id is not found in the collection. + * @exception errc::common::ambiguous_timeout + * @exception errc::common::unambiguous_timeout + * + * @since 1.0.0 + * @committed + */ + template + void lookup_in_all_replicas(std::string document_id, + lookup_in_specs specs, + const lookup_in_all_replicas_options& options, + Handler&& handler) const + { + return core::impl::initiate_lookup_in_all_replicas_operation( + core_, bucket_name_, scope_name_, name_, std::move(document_id), specs.specs(), options.build(), std::forward(handler)); + } + + /** + * Performs lookups to document fragments with default options from all replicas and the active node and returns the result as a vector. + * + * @param document_id the outer document ID + * @param specs an object that specifies the types of lookups to perform + * @param options custom options to modify the lookup options + * @return future object that carries result of the operation + * + * @exception errc::key_value::document_not_found the given document id is not found in the collection. + * @exception errc::common::ambiguous_timeout + * @exception errc::common::unambiguous_timeout + * + * @since 1.0.0 + * @committed + */ + [[nodiscard]] auto lookup_in_all_replicas(std::string document_id, + lookup_in_specs specs, + const lookup_in_all_replicas_options& options = {}) const + -> std::future> + { + auto barrier = std::make_shared>>(); + auto future = barrier->get_future(); + lookup_in_all_replicas(std::move(document_id), std::move(specs), options, [barrier](auto ctx, auto result) { + barrier->set_value({ std::move(ctx), std::move(result) }); + }); + return future; + } + + /** + * Performs lookups to document fragments with default options from all replicas and returns the first found. + * + * @tparam Handler type of the handler that implements @ref lookup_in_any_replica_handler + * + * @param document_id the outer document ID + * @param specs an object that specifies the types of lookups to perform + * @param options custom options to modify the lookup options + * + * @exception errc::key_value::document_not_found the given document id is not found in the collection. + * @exception errc::common::ambiguous_timeout + * @exception errc::common::unambiguous_timeout + * + * @since 1.0.0 + * @committed + */ + template + void lookup_in_any_replica(std::string document_id, + lookup_in_specs specs, + const lookup_in_any_replica_options& options, + Handler&& handler) const + { + return core::impl::initiate_lookup_in_any_replica_operation( + core_, bucket_name_, scope_name_, name_, std::move(document_id), specs.specs(), options.build(), std::forward(handler)); + } + + /** + * Performs lookups to document fragments with default options from all replicas and returns the first found. + * + * @param document_id the outer document ID + * @param specs an object that specifies the types of lookups to perform + * @param options custom options to modify the lookup options + * @return future object that carries result of the operation + * + * @exception errc::key_value::document_not_found the given document id is not found in the collection. + * @exception errc::common::ambiguous_timeout + * @exception errc::common::unambiguous_timeout + * + * @since 1.0.0 + * @committed + */ + [[nodiscard]] auto lookup_in_any_replica(std::string document_id, + lookup_in_specs specs, + const lookup_in_any_replica_options& options = {}) const + -> std::future> + { + auto barrier = std::make_shared>>(); + auto future = barrier->get_future(); + lookup_in_any_replica(std::move(document_id), std::move(specs), options, [barrier](auto ctx, auto result) { + barrier->set_value({ std::move(ctx), std::move(result) }); + }); + return future; + } + /** * Gets a document for a given id and places a pessimistic lock on it for mutations * diff --git a/couchbase/lookup_in_all_replicas_options.hxx b/couchbase/lookup_in_all_replicas_options.hxx new file mode 100644 index 000000000..02aca77ae --- /dev/null +++ b/couchbase/lookup_in_all_replicas_options.hxx @@ -0,0 +1,109 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2020-Present Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace couchbase +{ +/** + * Options for @ref collection#lookup_in_all_replicas(). + * + * @since 1.0.0 + * @committed + */ +struct lookup_in_all_replicas_options : common_options { + /** + * Immutable value object representing consistent options. + * + * @since 1.0.0 + * @internal + */ + struct built : public common_options::built { + }; + + /** + * Validates options and returns them as an immutable value. + * + * @return consistent options as an immutable value + * + * @exception std::system_error with code errc::common::invalid_argument if the options are not valid + * + * @since 1.0.0 + * @internal + */ + [[nodiscard]] auto build() const -> built + { + return { build_common_options() }; + } +}; + +/** + * The result for the @ref collection#lookup_in_all_replicas() operation + * + * @since 1.0.0 + * @uncommitted + */ +using lookup_in_all_replicas_result = std::vector; + +/** + * The signature for the handler of the @ref collection#lookup_in_all_replicas() operation + * + * @since 1.0.0 + * @uncommitted + */ +using lookup_in_all_replicas_handler = std::function; + +#ifndef COUCHBASE_CXX_CLIENT_DOXYGEN +namespace core +{ +class cluster; +namespace impl +{ + +/** + * @since 1.0.0 + * @internal + */ +void +initiate_lookup_in_all_replicas_operation(std::shared_ptr core, + const std::string& bucket_name, + const std::string& scope_name, + const std::string& collection_name, + std::string document_key, + const std::vector& specs, + lookup_in_all_replicas_options::built options, + lookup_in_all_replicas_handler&& handler); +#endif +} // namespace impl +} // namespace core +} // namespace couchbase diff --git a/couchbase/lookup_in_any_replica_options.hxx b/couchbase/lookup_in_any_replica_options.hxx new file mode 100644 index 000000000..96f67cc06 --- /dev/null +++ b/couchbase/lookup_in_any_replica_options.hxx @@ -0,0 +1,101 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2020-Present Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace couchbase +{ +/** + * Options for @ref collection#lookup_in_any_replica(). + * + * @since 1.0.0 + * @committed + */ +struct lookup_in_any_replica_options : common_options { + /** + * Immutable value object representing consistent options. + * + * @since 1.0.0 + * @internal + */ + struct built : public common_options::built { + }; + + /** + * Validates options and returns them as an immutable value. + * + * @return consistent options as an immutable value + * + * @exception std::system_error with code errc::common::invalid_argument if the options are not valid + * + * @since 1.0.0 + * @internal + */ + [[nodiscard]] auto build() const -> built + { + return { build_common_options() }; + } +}; + +/** + * The signature for the handler of the @ref collection#lookup_in_any_replica() operation + * + * @since 1.0.0 + * @uncommitted + */ +using lookup_in_any_replica_handler = std::function; + +#ifndef COUCHBASE_CXX_CLIENT_DOXYGEN +namespace core +{ +class cluster; +namespace impl +{ + +/** + * @since 1.0.0 + * @internal + */ +void +initiate_lookup_in_any_replica_operation(std::shared_ptr core, + const std::string& bucket_name, + const std::string& scope_name, + const std::string& collection_name, + std::string document_key, + const std::vector& specs, + lookup_in_any_replica_options::built options, + lookup_in_any_replica_handler&& handler); +#endif +} // namespace impl +} // namespace core +} // namespace couchbase diff --git a/couchbase/lookup_in_replica_result.hxx b/couchbase/lookup_in_replica_result.hxx new file mode 100644 index 000000000..9b2fef8d2 --- /dev/null +++ b/couchbase/lookup_in_replica_result.hxx @@ -0,0 +1,74 @@ +/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +/* + * Copyright 2020-Present Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include + +namespace couchbase +{ + +/** + * Represents result of lookup_in_replica operations. + * + * @since 1.0.0 + * @committed + */ +class lookup_in_replica_result : public lookup_in_result +{ + public: + /** + * @since 1.0.0 + * @internal + */ + lookup_in_replica_result() = default; + + /** + * Constructs result for lookup_in_replica operation + * + * @param cas + * @param entries list of the fields returned by the server + * @param is_deleted + * @param is_replica true if document originates from replica node + * + * @since 1.0.0 + * @committed + */ + lookup_in_replica_result(couchbase::cas cas, std::vector entries, bool is_deleted, bool is_replica) + : lookup_in_result{ cas, std::move(entries), is_deleted } + , is_replica_{ is_replica } + { + } + + /** + * Returns whether this document originates from a replica node + * + * @return whether document originates from a replica node + * + * @since 1.0.0 + */ + [[nodiscard]] auto is_replica() const -> bool + { + return is_replica_; + } + + private: + bool is_replica_{ false }; +}; +} // namespace couchbase diff --git a/test/test_integration_subdoc.cxx b/test/test_integration_subdoc.cxx index 1967c9864..a16dbf2a5 100644 --- a/test/test_integration_subdoc.cxx +++ b/test/test_integration_subdoc.cxx @@ -65,6 +65,108 @@ assert_single_lookup_error(test::utils::integration_test_guard& integration, REQUIRE(resp.fields[0].ec == expected_ec); } +template +void +assert_single_lookup_any_replica_success(test::utils::integration_test_guard& integration, + const couchbase::core::document_id& id, + const SubdocumentOperation& spec, + std::optional expected_value = std::nullopt) +{ + couchbase::core::operations::lookup_in_any_replica_request req{ id }; + req.specs = couchbase::lookup_in_specs{ spec }.specs(); + auto resp = test::utils::execute(integration.cluster, req); + INFO(fmt::format("assert_single_lookup_all_replica_success(\"{}\", \"{}\")", id, req.specs[0].path_)); + REQUIRE_SUCCESS(resp.ctx.ec()); + REQUIRE_FALSE(resp.cas.empty()); + REQUIRE(resp.fields.size() == 1); + REQUIRE(resp.fields[0].exists); + REQUIRE(resp.fields[0].path == req.specs[0].path_); + REQUIRE(resp.fields[0].status == couchbase::key_value_status_code::success); + REQUIRE_SUCCESS(resp.fields[0].ec); + if (expected_value.has_value()) { + REQUIRE(couchbase::core::utils::to_binary(expected_value.value()) == resp.fields[0].value); + } +} + +template +void +assert_single_lookup_any_replica_error(test::utils::integration_test_guard& integration, + const couchbase::core::document_id& id, + const SubdocumentOperation& spec, + couchbase::key_value_status_code expected_status, + std::error_code expected_ec) +{ + couchbase::core::operations::lookup_in_any_replica_request req{ id }; + req.specs = couchbase::lookup_in_specs{ spec }.specs(); + auto resp = test::utils::execute(integration.cluster, req); + INFO(fmt::format("assert_single_lookup_all_replica_error(\"{}\", \"{}\")", id, req.specs[0].path_)); + REQUIRE_SUCCESS(resp.ctx.ec()); + REQUIRE_FALSE(resp.cas.empty()); + REQUIRE(resp.fields.size() == 1); + REQUIRE_FALSE(resp.fields[0].exists); + REQUIRE(resp.fields[0].path == req.specs[0].path_); + REQUIRE(resp.fields[0].value.empty()); + REQUIRE(resp.fields[0].status == expected_status); + REQUIRE(resp.fields[0].ec == expected_ec); +} + +template +void +assert_single_lookup_all_replica_success(test::utils::integration_test_guard& integration, + const couchbase::core::document_id& id, + const SubdocumentOperation& spec, + std::optional expected_value = std::nullopt) +{ + couchbase::core::operations::lookup_in_all_replicas_request req{ id }; + req.specs = couchbase::lookup_in_specs{ spec }.specs(); + auto response = test::utils::execute(integration.cluster, req); + INFO(fmt::format("assert_single_lookup_all_replica_success(\"{}\", \"{}\")", id, req.specs[0].path_)); + REQUIRE_SUCCESS(response.ctx.ec()); + REQUIRE(response.entries.size() == integration.number_of_replicas() + 1); + auto responses_from_active = + std::count_if(response.entries.begin(), response.entries.end(), [](const auto& r) { return !r.is_replica; }); + REQUIRE(responses_from_active == 1); + for (auto& resp : response.entries) { + REQUIRE_FALSE(resp.cas.empty()); + REQUIRE(resp.fields.size() == 1); + REQUIRE(resp.fields[0].exists); + REQUIRE(resp.fields[0].path == req.specs[0].path_); + REQUIRE(resp.fields[0].status == couchbase::key_value_status_code::success); + REQUIRE_SUCCESS(resp.fields[0].ec); + if (expected_value.has_value()) { + REQUIRE(couchbase::core::utils::to_binary(expected_value.value()) == resp.fields[0].value); + } + } +} + +template +void +assert_single_lookup_all_replica_error(test::utils::integration_test_guard& integration, + const couchbase::core::document_id& id, + const SubdocumentOperation& spec, + couchbase::key_value_status_code expected_status, + std::error_code expected_ec) +{ + couchbase::core::operations::lookup_in_all_replicas_request req{ id }; + req.specs = couchbase::lookup_in_specs{ spec }.specs(); + auto response = test::utils::execute(integration.cluster, req); + INFO(fmt::format("assert_single_lookup_all_replica_error(\"{}\", \"{}\")", id, req.specs[0].path_)); + REQUIRE_SUCCESS(response.ctx.ec()); + REQUIRE(response.entries.size() == integration.number_of_replicas() + 1); + auto responses_from_active = + std::count_if(response.entries.begin(), response.entries.end(), [](const auto& r) { return !r.is_replica; }); + REQUIRE(responses_from_active == 1); + for (auto& resp : response.entries) { + REQUIRE_FALSE(resp.cas.empty()); + REQUIRE(resp.fields.size() == 1); + REQUIRE_FALSE(resp.fields[0].exists); + REQUIRE(resp.fields[0].path == req.specs[0].path_); + REQUIRE(resp.fields[0].value.empty()); + REQUIRE(resp.fields[0].status == expected_status); + REQUIRE(resp.fields[0].ec == expected_ec); + } +} + void assert_single_mutate_success(couchbase::core::operations::mutate_in_response resp, const std::string& path, const std::string& value = "") { @@ -973,3 +1075,443 @@ TEST_CASE("integration: subdoc top level array", "[integration]") REQUIRE(resp.fields[0].value == couchbase::core::utils::to_binary("3")); } } + +TEST_CASE("integration: subdoc all replica reads", "[integration]") +{ + + test::utils::integration_test_guard integration; + + if (!integration.has_bucket_capability("subdoc.ReplicaRead")) { + SKIP("cluster does not support replica_read"); + } + + auto number_of_replicas = integration.number_of_replicas(); + + if (number_of_replicas == 0) { + SKIP("bucket has zero replicas"); + } + if (integration.number_of_nodes() <= number_of_replicas) { + SKIP(fmt::format("number of nodes ({}) is less or equal to number of replicas ({})", + integration.number_of_nodes(), + integration.number_of_replicas())); + } + + auto key = test::utils::uniq_id("lookup_in_any_replica"); + couchbase::core::document_id id{ integration.ctx.bucket, "_default", "_default", key }; + + { + auto value_json = couchbase::core::utils::to_binary(R"({"dictkey":"dictval","array":[1,2,3,4,[10,20,30,[100,200,300]]]})"); + couchbase::core::operations::insert_request req{ id, value_json }; + req.durability_level = couchbase::durability_level::majority_and_persist_to_active; + auto resp = test::utils::execute(integration.cluster, req); + REQUIRE_SUCCESS(resp.ctx.ec()); + } + + SECTION("dict get") + { + assert_single_lookup_all_replica_success(integration, id, couchbase::lookup_in_specs::get("dictkey"), R"("dictval")"); + } + + SECTION("dict exists") + { + assert_single_lookup_all_replica_success(integration, id, couchbase::lookup_in_specs::exists("dictkey")); + } + + SECTION("array get") + { + assert_single_lookup_all_replica_success( + integration, id, couchbase::lookup_in_specs::get("array"), "[1,2,3,4,[10,20,30,[100,200,300]]]"); + } + + SECTION("array exists") + { + assert_single_lookup_all_replica_success(integration, id, couchbase::lookup_in_specs::exists("array")); + } + + SECTION("array index get") + { + assert_single_lookup_all_replica_success(integration, id, couchbase::lookup_in_specs::get("array[0]"), "1"); + } + + SECTION("array index exists") + { + assert_single_lookup_all_replica_success(integration, id, couchbase::lookup_in_specs::exists("array[0]")); + } + + SECTION("non existent path get") + { + assert_single_lookup_all_replica_error(integration, + id, + couchbase::lookup_in_specs::get("non-exist"), + couchbase::key_value_status_code::subdoc_path_not_found, + couchbase::errc::key_value::path_not_found); + } + + SECTION("non existent path exists") + { + assert_single_lookup_all_replica_error(integration, + id, + couchbase::lookup_in_specs::exists("non-exist"), + couchbase::key_value_status_code::subdoc_path_not_found, + couchbase::errc::key_value::path_not_found); + } + + SECTION("non existent doc") + { + couchbase::core::document_id missing_id{ integration.ctx.bucket, "_default", "_default", "missing_key" }; + + SECTION("non existent doc get") + { + couchbase::core::operations::lookup_in_all_replicas_request req{ missing_id }; + req.specs = + couchbase::lookup_in_specs{ + couchbase::lookup_in_specs::get("non-exist"), + } + .specs(); + auto resp = test::utils::execute(integration.cluster, req); + REQUIRE(resp.ctx.ec() == couchbase::errc::key_value::document_not_found); + REQUIRE(resp.entries.empty()); + } + + SECTION("non existent doc exists") + { + couchbase::core::operations::lookup_in_all_replicas_request req{ missing_id }; + req.specs = + couchbase::lookup_in_specs{ + couchbase::lookup_in_specs::exists("non-exist"), + } + .specs(); + auto resp = test::utils::execute(integration.cluster, req); + REQUIRE(resp.ctx.ec() == couchbase::errc::key_value::document_not_found); + REQUIRE(resp.entries.empty()); + } + } + + SECTION("non json") + { + couchbase::core::document_id non_json_id{ integration.ctx.bucket, "_default", "_default", test::utils::uniq_id("non_json") }; + auto non_json_doc = couchbase::core::utils::to_binary("string"); + + { + couchbase::core::operations::insert_request req{ non_json_id, non_json_doc }; + auto resp = test::utils::execute(integration.cluster, req); + REQUIRE_SUCCESS(resp.ctx.ec()); + } + + SECTION("non json get") + { + if (integration.cluster_version().is_mock()) { + SKIP("GOCAVES does not handle subdocument operations for non-JSON documents. See " + "https://github.com/couchbaselabs/gocaves/issues/103"); + } + assert_single_lookup_all_replica_error(integration, + non_json_id, + couchbase::lookup_in_specs::get("non-exist"), + couchbase::key_value_status_code::subdoc_doc_not_json, + couchbase::errc::key_value::document_not_json); + } + + SECTION("non json exists") + { + if (integration.cluster_version().is_mock()) { + SKIP("GOCAVES does not handle subdocument operations for non-JSON documents. See " + "https://github.com/couchbaselabs/gocaves/issues/103"); + } + assert_single_lookup_all_replica_error(integration, + non_json_id, + couchbase::lookup_in_specs::exists("non-exist"), + couchbase::key_value_status_code::subdoc_doc_not_json, + couchbase::errc::key_value::document_not_json); + } + } + + SECTION("invalid path") + { + std::vector invalid_paths = { "invalid..path", "invalid[-2]" }; + for (const auto& path : invalid_paths) { + if (integration.cluster_version().is_mock()) { + assert_single_lookup_all_replica_error(integration, + id, + couchbase::lookup_in_specs::get(path), + couchbase::key_value_status_code::subdoc_path_not_found, + couchbase::errc::key_value::path_not_found); + } else { + assert_single_lookup_all_replica_error(integration, + id, + couchbase::lookup_in_specs::get(path), + couchbase::key_value_status_code::subdoc_path_invalid, + couchbase::errc::key_value::path_invalid); + } + } + } + + SECTION("negative paths") + { + assert_single_lookup_all_replica_success(integration, id, couchbase::lookup_in_specs::get("array[-1][-1][-1]"), "300"); + } + + SECTION("nested arrays") + { + assert_single_lookup_all_replica_success(integration, id, couchbase::lookup_in_specs::get("array[4][3][2]"), "300"); + } + + SECTION("path mismatch") + { + assert_single_lookup_all_replica_error(integration, + id, + couchbase::lookup_in_specs::get("array.key"), + couchbase::key_value_status_code::subdoc_path_mismatch, + couchbase::errc::key_value::path_mismatch); + } + + SECTION("public API") + { + auto collection = couchbase::cluster(integration.cluster).bucket(integration.ctx.bucket).scope("_default").collection("_default"); + + SECTION("lookup in all replicas") + { + auto specs = couchbase::lookup_in_specs{ couchbase::lookup_in_specs::get("dictkey"), + couchbase::lookup_in_specs::exists("array"), + couchbase::lookup_in_specs::count("array") }; + auto [ctx, result] = collection.lookup_in_all_replicas(key, specs).get(); + REQUIRE_SUCCESS(ctx.ec()); + REQUIRE(result.size() == number_of_replicas + 1); + auto responses_from_active = std::count_if(result.begin(), result.end(), [](const auto& r) { return !r.is_replica(); }); + REQUIRE(responses_from_active == 1); + for (auto& res : result) { + REQUIRE(!res.cas().empty()); + REQUIRE("dictval" == res.content_as(0)); + REQUIRE(res.exists("array")); + REQUIRE(5 == res.content_as(2)); + } + } + + SECTION("missing document") + { + auto specs = couchbase::lookup_in_specs{ + couchbase::lookup_in_specs::get("non-exists"), + }; + auto [ctx, result] = collection.lookup_in_all_replicas("missing-key", specs).get(); + REQUIRE(ctx.ec() == couchbase::errc::key_value::document_not_found); + REQUIRE(result.empty()); + } + } +} + +TEST_CASE("integration: subdoc any replica reads", "[integration]") +{ + test::utils::integration_test_guard integration; + + if (!integration.has_bucket_capability("subdoc.ReplicaRead")) { + SKIP("cluster does not support replica_read"); + } + + auto number_of_replicas = integration.number_of_replicas(); + + if (number_of_replicas == 0) { + SKIP("bucket has zero replicas"); + } + if (integration.number_of_nodes() <= number_of_replicas) { + SKIP(fmt::format("number of nodes ({}) is less or equal to number of replicas ({})", + integration.number_of_nodes(), + integration.number_of_replicas())); + } + + test::utils::open_bucket(integration.cluster, integration.ctx.bucket); + + auto key = test::utils::uniq_id("lookup_in_any_replica"); + couchbase::core::document_id id{ integration.ctx.bucket, "_default", "_default", key }; + + { + auto value_json = couchbase::core::utils::to_binary(R"({"dictkey":"dictval","array":[1,2,3,4,[10,20,30,[100,200,300]]]})"); + couchbase::core::operations::insert_request req{ id, value_json }; + req.durability_level = couchbase::durability_level::majority_and_persist_to_active; + auto resp = test::utils::execute(integration.cluster, req); + REQUIRE_SUCCESS(resp.ctx.ec()); + } + + SECTION("dict get") + { + assert_single_lookup_any_replica_success(integration, id, couchbase::lookup_in_specs::get("dictkey"), R"("dictval")"); + } + + SECTION("dict exists") + { + assert_single_lookup_any_replica_success(integration, id, couchbase::lookup_in_specs::exists("dictkey")); + } + + SECTION("array get") + { + assert_single_lookup_any_replica_success( + integration, id, couchbase::lookup_in_specs::get("array"), "[1,2,3,4,[10,20,30,[100,200,300]]]"); + } + + SECTION("array exists") + { + assert_single_lookup_any_replica_success(integration, id, couchbase::lookup_in_specs::exists("array")); + } + + SECTION("array index get") + { + assert_single_lookup_any_replica_success(integration, id, couchbase::lookup_in_specs::get("array[0]"), "1"); + } + + SECTION("array index exists") + { + assert_single_lookup_any_replica_success(integration, id, couchbase::lookup_in_specs::exists("array[0]")); + } + + SECTION("non existent path get") + { + assert_single_lookup_any_replica_error(integration, + id, + couchbase::lookup_in_specs::get("non-exist"), + couchbase::key_value_status_code::subdoc_path_not_found, + couchbase::errc::key_value::path_not_found); + } + + SECTION("non existent path exists") + { + assert_single_lookup_any_replica_error(integration, + id, + couchbase::lookup_in_specs::exists("non-exist"), + couchbase::key_value_status_code::subdoc_path_not_found, + couchbase::errc::key_value::path_not_found); + } + + SECTION("non existent doc") + { + couchbase::core::document_id missing_id{ integration.ctx.bucket, "_default", "_default", "missing_key" }; + + SECTION("non existent doc get") + { + couchbase::core::operations::lookup_in_any_replica_request req{ missing_id }; + req.specs = + couchbase::lookup_in_specs{ + couchbase::lookup_in_specs::get("non-exist"), + } + .specs(); + auto resp = test::utils::execute(integration.cluster, req); + REQUIRE(resp.ctx.ec() == couchbase::errc::key_value::document_irretrievable); + REQUIRE(resp.fields.empty()); + } + + SECTION("non existent doc exists") + { + couchbase::core::operations::lookup_in_any_replica_request req{ missing_id }; + req.specs = + couchbase::lookup_in_specs{ + couchbase::lookup_in_specs::exists("non-exist"), + } + .specs(); + auto resp = test::utils::execute(integration.cluster, req); + REQUIRE(resp.ctx.ec() == couchbase::errc::key_value::document_irretrievable); + REQUIRE(resp.fields.empty()); + } + } + + SECTION("non json") + { + couchbase::core::document_id non_json_id{ integration.ctx.bucket, "_default", "_default", test::utils::uniq_id("non_json") }; + auto non_json_doc = couchbase::core::utils::to_binary("string"); + + { + couchbase::core::operations::insert_request req{ non_json_id, non_json_doc }; + auto resp = test::utils::execute(integration.cluster, req); + REQUIRE_SUCCESS(resp.ctx.ec()); + } + + SECTION("non json get") + { + if (integration.cluster_version().is_mock()) { + SKIP("GOCAVES does not handle subdocument operations for non-JSON documents. See " + "https://github.com/couchbaselabs/gocaves/issues/103"); + } + assert_single_lookup_any_replica_error(integration, + non_json_id, + couchbase::lookup_in_specs::get("non-exist"), + couchbase::key_value_status_code::subdoc_doc_not_json, + couchbase::errc::key_value::document_not_json); + } + + SECTION("non json exists") + { + if (integration.cluster_version().is_mock()) { + SKIP("GOCAVES does not handle subdocument operations for non-JSON documents. See " + "https://github.com/couchbaselabs/gocaves/issues/103"); + } + assert_single_lookup_any_replica_error(integration, + non_json_id, + couchbase::lookup_in_specs::exists("non-exist"), + couchbase::key_value_status_code::subdoc_doc_not_json, + couchbase::errc::key_value::document_not_json); + } + } + + SECTION("invalid path") + { + std::vector invalid_paths = { "invalid..path", "invalid[-2]" }; + for (const auto& path : invalid_paths) { + if (integration.cluster_version().is_mock()) { + assert_single_lookup_any_replica_error(integration, + id, + couchbase::lookup_in_specs::get(path), + couchbase::key_value_status_code::subdoc_path_not_found, + couchbase::errc::key_value::path_not_found); + } else { + assert_single_lookup_any_replica_error(integration, + id, + couchbase::lookup_in_specs::get(path), + couchbase::key_value_status_code::subdoc_path_invalid, + couchbase::errc::key_value::path_invalid); + } + } + } + + SECTION("negative paths") + { + assert_single_lookup_any_replica_success(integration, id, couchbase::lookup_in_specs::get("array[-1][-1][-1]"), "300"); + } + + SECTION("nested arrays") + { + assert_single_lookup_any_replica_success(integration, id, couchbase::lookup_in_specs::get("array[4][3][2]"), "300"); + } + + SECTION("path mismatch") + { + assert_single_lookup_any_replica_error(integration, + id, + couchbase::lookup_in_specs::get("array.key"), + couchbase::key_value_status_code::subdoc_path_mismatch, + couchbase::errc::key_value::path_mismatch); + } + + SECTION("public API") + { + auto collection = couchbase::cluster(integration.cluster).bucket(integration.ctx.bucket).scope("_default").collection("_default"); + + SECTION("lookup in any replica") + { + auto specs = couchbase::lookup_in_specs{ couchbase::lookup_in_specs::get("dictkey"), + couchbase::lookup_in_specs::exists("array"), + couchbase::lookup_in_specs::count("array") }; + auto [ctx, result] = collection.lookup_in_any_replica(key, specs).get(); + REQUIRE_SUCCESS(ctx.ec()); + REQUIRE(!result.cas().empty()); + REQUIRE("dictval" == result.content_as(0)); + REQUIRE(result.exists("array")); + REQUIRE(5 == result.content_as(2)); + } + + SECTION("missing document") + { + auto specs = couchbase::lookup_in_specs{ + couchbase::lookup_in_specs::get("non-exists"), + }; + auto [ctx, result] = collection.lookup_in_any_replica("missing-key", specs).get(); + REQUIRE(ctx.ec() == couchbase::errc::key_value::document_irretrievable); + REQUIRE(result.cas().empty()); + } + } +}