Skip to content

Commit

Permalink
CXXCBC-598: Columnar - propagate bootstrap errors to HTTP operations
Browse files Browse the repository at this point in the history
Changes
=======
* Create and error_union type of std::variant<std::monostate,
  std::error_code, impl::bootstrap_error> that can be passed around to
Columnar callbacks
* Allow the http_session_manager drain queue to accept an error_union
* Update error message in certain contexts to indicate a bootstrap error
  is the reason the HTTP operation failed.
* Confirm columnar testing passes
  • Loading branch information
thejcfactor committed Oct 1, 2024
1 parent 51f4775 commit f8ae7aa
Show file tree
Hide file tree
Showing 19 changed files with 279 additions and 98 deletions.
8 changes: 8 additions & 0 deletions core/agent_group.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ class agent_group_impl

auto free_form_http_request(const http_request& request,
free_form_http_request_callback&& callback)
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
-> tl::expected<std::shared_ptr<pending_operation>, error_union>
#else
-> tl::expected<std::shared_ptr<pending_operation>, std::error_code>
#endif
{
return cluster_agent_.free_form_http_request(request, std::move(callback));
}
Expand Down Expand Up @@ -221,7 +225,11 @@ agent_group::search_query(const search_query_options& options, search_query_call
auto
agent_group::free_form_http_request(const http_request& request,
free_form_http_request_callback&& callback)
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
-> tl::expected<std::shared_ptr<pending_operation>, error_union>
#else
-> tl::expected<std::shared_ptr<pending_operation>, std::error_code>
#endif
{
return impl_->free_form_http_request(request, std::move(callback));
}
Expand Down
4 changes: 4 additions & 0 deletions core/agent_group.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ public:

auto free_form_http_request(const http_request& request,
free_form_http_request_callback&& callback)
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
-> tl::expected<std::shared_ptr<pending_operation>, error_union>;
#else
-> tl::expected<std::shared_ptr<pending_operation>, std::error_code>;
#endif

auto wait_until_ready(std::chrono::milliseconds timeout,
const wait_until_ready_options& options,
Expand Down
8 changes: 8 additions & 0 deletions core/cluster_agent.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ class cluster_agent_impl

auto free_form_http_request(const http_request& request,
free_form_http_request_callback&& callback)
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
-> tl::expected<std::shared_ptr<pending_operation>, error_union>
#else
-> tl::expected<std::shared_ptr<pending_operation>, std::error_code>
#endif
{
return http_.do_http_request(request, std::move(callback));
}
Expand All @@ -61,7 +65,11 @@ cluster_agent::cluster_agent(asio::io_context& io, cluster_agent_config config)
auto
cluster_agent::free_form_http_request(const couchbase::core::http_request& request,
couchbase::core::free_form_http_request_callback&& callback)
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
-> tl::expected<std::shared_ptr<pending_operation>, error_union>
#else
-> tl::expected<std::shared_ptr<pending_operation>, std::error_code>
#endif
{
return impl_->free_form_http_request(request, std::move(callback));
}
Expand Down
4 changes: 4 additions & 0 deletions core/cluster_agent.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ public:

auto free_form_http_request(const http_request& request,
free_form_http_request_callback&& callback)
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
-> tl::expected<std::shared_ptr<pending_operation>, error_union>;
#else
-> tl::expected<std::shared_ptr<pending_operation>, std::error_code>;
#endif

private:
std::shared_ptr<cluster_agent_impl> impl_;
Expand Down
4 changes: 2 additions & 2 deletions core/columnar/agent.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class agent_impl

auto free_form_http_request(const http_request& request,
free_form_http_request_callback&& callback)
-> tl::expected<std::shared_ptr<pending_operation>, std::error_code>
-> tl::expected<std::shared_ptr<pending_operation>, error_union>
{
return http_.do_http_request(request, std::move(callback));
}
Expand Down Expand Up @@ -102,7 +102,7 @@ agent::agent(asio::io_context& io, couchbase::core::columnar::agent_config confi
auto
agent::free_form_http_request(const http_request& request,
free_form_http_request_callback&& callback)
-> tl::expected<std::shared_ptr<pending_operation>, std::error_code>
-> tl::expected<std::shared_ptr<pending_operation>, error_union>
{
return impl_->free_form_http_request(request, std::move(callback));
}
Expand Down
2 changes: 1 addition & 1 deletion core/columnar/agent.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public:

auto free_form_http_request(const http_request& request,
free_form_http_request_callback&& callback)
-> tl::expected<std::shared_ptr<pending_operation>, std::error_code>;
-> tl::expected<std::shared_ptr<pending_operation>, error_union>;

auto free_form_http_request_buffered(const http_request& request,
buffered_free_form_http_request_callback&& callback)
Expand Down
38 changes: 32 additions & 6 deletions core/columnar/query_component.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,27 @@ class pending_query_operation

auto dispatch() -> error
{
auto op =
http_.do_http_request(http_req_, [self = shared_from_this()](auto resp, auto ec) mutable {
auto op = http_.do_http_request(
http_req_, [self = shared_from_this()](auto resp, error_union err) mutable {
std::shared_ptr<pending_operation> op;
{
const std::scoped_lock lock{ self->pending_op_mutex_ };
std::swap(op, self->pending_op_);
}
if (ec) {
self->invoke_callback(
{},
{ maybe_convert_error_code(ec), "Failed to execute the HTTP request for the query" });
if (!std::holds_alternative<std::monostate>(err)) {
if (std::holds_alternative<impl::bootstrap_error>(err)) {
auto bootstrap_error = std::get<impl::bootstrap_error>(err);
auto message = fmt::format(
"Failed to execute the HTTP request for the query due to a bootstrap error. "
"See logs for further details. bootstrap_error.message={}",
bootstrap_error.error_message);
self->invoke_callback({}, { maybe_convert_error_code(bootstrap_error.ec), message });
} else {
auto ec = std::get<std::error_code>(err);
self->invoke_callback(
{},
{ maybe_convert_error_code(ec), "Failed to execute the HTTP request for the query" });
}
return;
}
// op can be null if the pending_query_operation was cancelled.
Expand Down Expand Up @@ -127,7 +137,23 @@ class pending_query_operation
pending_op_ = op.value();
return {};
}
retry_timer_.cancel();
deadline_.cancel();
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
if (std::holds_alternative<impl::bootstrap_error>(op.error())) {
auto bootstrap_error = std::get<impl::bootstrap_error>(op.error());
auto message =
fmt::format("Failed to create the HTTP pending operation due to a bootstrap error. "
"See logs for further details. bootstrap_error.message={}",
bootstrap_error.error_message);
return error{ bootstrap_error.ec, message };
} else {
return error{ std::get<std::error_code>(op.error()),
"Failed to create the HTTP pending operation." };
}
#else
return error{ op.error() };
#endif
}

auto start(query_callback&& callback) -> error
Expand Down
1 change: 1 addition & 0 deletions core/columnar/query_component.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include "core/impl/bootstrap_error.hxx"
#include "core/pending_operation.hxx"
#include "error.hxx"
#include "query_options.hxx"
Expand Down
8 changes: 8 additions & 0 deletions core/free_form_http_request.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

#pragma once

#include <couchbase/build_config.hxx>

#include "core/impl/bootstrap_error.hxx"
#include "service_type.hxx"
#include "utils/movable_function.hxx"

Expand Down Expand Up @@ -101,8 +104,13 @@ private:
std::shared_ptr<http_response_impl> impl_;
};

#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
using free_form_http_request_callback =
utils::movable_function<void(http_response response, couchbase::core::error_union err)>;
#else
using free_form_http_request_callback =
utils::movable_function<void(http_response response, std::error_code ec)>;
#endif

class buffered_http_response_impl;

Expand Down
79 changes: 55 additions & 24 deletions core/http_component.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -131,19 +131,22 @@ class pending_http_operation
invoke_response_handler(errc::common::request_canceled, {});
}

void invoke_response_handler(std::error_code ec, io::http_streaming_response resp)
{
deadline_.cancel();
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
void invoke_response_handler(error_union err, io::http_streaming_response resp)
{
dispatch_deadline_.cancel();
#else
void invoke_response_handler(std::error_code err, io::http_streaming_response resp)
{
#endif
deadline_.cancel();
free_form_http_request_callback callback{};
{
const std::scoped_lock lock(callback_mutex_);
std::swap(callback, callback_);
}
if (callback) {
callback(http_response{ std::move(resp) }, ec);
callback(http_response{ std::move(resp) }, err);
}
}

Expand All @@ -157,12 +160,22 @@ class pending_http_operation
auto start_op = [self = shared_from_this()]() {
self->session_->write_and_stream(
self->encoded_,
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
[self](error_union err, io::http_streaming_response resp) {
if (std::holds_alternative<std::error_code>(err) &&
std::get<std::error_code>(err) == asio::error::operation_aborted) {
return;
}
self->invoke_response_handler(err, std::move(resp));
},
#else
[self](std::error_code ec, io::http_streaming_response resp) {
if (ec == asio::error::operation_aborted) {
return;
}
self->invoke_response_handler(ec, std::move(resp));
},
#endif
[self]() {
self->stream_end_callback_();
});
Expand Down Expand Up @@ -409,8 +422,13 @@ class http_component_impl
{
}

#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
auto do_http_request(const http_request& request, free_form_http_request_callback&& callback)
-> tl::expected<std::shared_ptr<pending_operation>, error_union>
#else
auto do_http_request(const http_request& request, free_form_http_request_callback&& callback)
-> tl::expected<std::shared_ptr<pending_operation>, std::error_code>
#endif
{
std::shared_ptr<io::http_session_manager> session_manager;
{
Expand All @@ -436,9 +454,9 @@ class http_component_impl
auto op =
std::make_shared<pending_http_operation>(io_, request, session_manager->dispatch_timeout());
if (!session_manager->is_configured()) {
auto ec = defer_command(op, session_manager, credentials, std::move(callback));
if (ec) {
return tl::unexpected{ ec };
auto err = defer_command(op, session_manager, credentials, std::move(callback));
if (!std::holds_alternative<std::monostate>(err)) {
return tl::unexpected{ err };
}
return op;
}
Expand Down Expand Up @@ -478,8 +496,11 @@ class http_component_impl
auto op = std::make_shared<pending_buffered_http_operation>(
io_, request, session_manager->dispatch_timeout());
if (!session_manager->is_configured()) {
auto ec = defer_command(op, session_manager, credentials, std::move(callback));
if (ec) {
auto err = defer_command(op, session_manager, credentials, std::move(callback));
if (!std::holds_alternative<std::monostate>(err)) {
auto ec = std::holds_alternative<impl::bootstrap_error>(err)
? std::get<impl::bootstrap_error>(err).ec
: std::get<std::error_code>(err);
return tl::unexpected{ ec };
}
return op;
Expand All @@ -498,16 +519,19 @@ class http_component_impl
const couchbase::core::cluster_credentials& credentials,
free_form_http_request_callback&& callback)
{
op->start([callback = std::move(callback)](auto resp, auto ec) mutable {
callback(std::move(resp), ec);
});

#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
op->start([callback = std::move(callback)](auto resp, error_union err) mutable {
callback(std::move(resp), err);
});
// don't do anything if the op wasn't dispatched or has already timed out
auto now = std::chrono::steady_clock::now();
if (op->dispatch_deadline_expiry() < now || op->deadline_expiry() < now) {
return;
}
#else
op->start([callback = std::move(callback)](auto resp, auto ec) mutable {
callback(std::move(resp), ec);
});
#endif
std::shared_ptr<io::http_session> session;
{
Expand Down Expand Up @@ -592,10 +616,10 @@ class http_component_impl
auto defer_command(std::shared_ptr<PendingHttpOp> pending_op,
const std::shared_ptr<io::http_session_manager>& session_manager,
const couchbase::core::cluster_credentials& credentials,
Callback&& callback) -> std::error_code
Callback&& callback) -> error_union
{
if (auto last_error = session_manager->last_bootstrap_error(); last_error.has_value()) {
return last_error->ec;
return last_error.value();
}
CB_LOG_DEBUG(
R"(Adding pending HTTP operation to deferred queue: service={}, client_context_id={})",
Expand All @@ -605,26 +629,26 @@ class http_component_impl
callback = std::forward<Callback>(callback),
op = std::move(pending_op),
session_manager,
credentials](std::error_code ec) mutable {
if (ec) {
credentials](error_union err) mutable {
if (!std::holds_alternative<std::monostate>(err)) {
// The deferred operation was cancelled - currently this can happen due to closing the
// cluster
return callback({}, ec);
return callback({}, err);
}

return send_http_operation(
op, session_manager, credentials, std::forward<Callback>(callback));
});
return {};
return std::monostate{};
}

auto defer_command(std::shared_ptr<pending_buffered_http_operation> pending_op,
const std::shared_ptr<io::http_session_manager>& session_manager,
const couchbase::core::cluster_credentials& credentials,
buffered_free_form_http_request_callback&& callback) -> std::error_code
buffered_free_form_http_request_callback&& callback) -> error_union
{
if (auto last_error = session_manager->last_bootstrap_error(); last_error.has_value()) {
return last_error->ec;
return last_error.value();
}
CB_LOG_DEBUG(
R"(Adding pending HTTP operation to deferred queue: service={}, client_context_id={})",
Expand All @@ -634,16 +658,19 @@ class http_component_impl
callback = std::move(callback),
op = std::move(pending_op),
session_manager,
credentials](std::error_code ec) mutable {
if (ec) {
credentials](error_union err) mutable {
if (!std::holds_alternative<std::monostate>(err)) {
auto ec = std::holds_alternative<impl::bootstrap_error>(err)
? std::get<impl::bootstrap_error>(err).ec
: std::get<std::error_code>(err);
// The deferred operation was cancelled - currently this can happen due to closing the
// cluster
return callback({}, ec);
}

return send_http_operation(op, session_manager, credentials, std::move(callback));
});
return {};
return std::monostate{};
}
#endif

Expand All @@ -664,7 +691,11 @@ http_component::http_component(asio::io_context& io,
auto
http_component::do_http_request(const http_request& request,
free_form_http_request_callback&& callback)
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
-> tl::expected<std::shared_ptr<pending_operation>, error_union>
#else
-> tl::expected<std::shared_ptr<pending_operation>, std::error_code>
#endif
{
return impl_->do_http_request(request, std::move(callback));
}
Expand Down
4 changes: 4 additions & 0 deletions core/http_component.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ public:
std::shared_ptr<retry_strategy> default_retry_strategy = {});

auto do_http_request(const http_request& request, free_form_http_request_callback&& callback)
#ifdef COUCHBASE_CXX_CLIENT_COLUMNAR
-> tl::expected<std::shared_ptr<pending_operation>, error_union>;
#else
-> tl::expected<std::shared_ptr<pending_operation>, std::error_code>;
#endif

auto do_http_request_buffered(const http_request& request,
buffered_free_form_http_request_callback&& callback)
Expand Down
Loading

0 comments on commit f8ae7aa

Please sign in to comment.