Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-43258: [C++][Flight] Use a Base CRTP type for the types used in RPC calls #43255

Merged
merged 13 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -584,8 +584,8 @@ arrow::Result<std::unique_ptr<ResultStream>> FlightClient::DoAction(

arrow::Result<CancelFlightInfoResult> FlightClient::CancelFlightInfo(
const FlightCallOptions& options, const CancelFlightInfoRequest& request) {
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToString());
Action action{ActionType::kCancelFlightInfo.type, Buffer::FromString(body)};
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToBuffer());
Action action{ActionType::kCancelFlightInfo.type, std::move(body)};
ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());
ARROW_ASSIGN_OR_RAISE(auto cancel_result, CancelFlightInfoResult::Deserialize(
Expand All @@ -596,8 +596,8 @@ arrow::Result<CancelFlightInfoResult> FlightClient::CancelFlightInfo(

arrow::Result<FlightEndpoint> FlightClient::RenewFlightEndpoint(
const FlightCallOptions& options, const RenewFlightEndpointRequest& request) {
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToString());
Action action{ActionType::kRenewFlightEndpoint.type, Buffer::FromString(body)};
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToBuffer());
Action action{ActionType::kRenewFlightEndpoint.type, std::move(body)};
ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());
ARROW_ASSIGN_OR_RAISE(auto renewed_endpoint,
Expand Down Expand Up @@ -716,8 +716,8 @@ arrow::Result<FlightClient::DoExchangeResult> FlightClient::DoExchange(
::arrow::Result<SetSessionOptionsResult> FlightClient::SetSessionOptions(
const FlightCallOptions& options, const SetSessionOptionsRequest& request) {
RETURN_NOT_OK(CheckOpen());
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToString());
Action action{ActionType::kSetSessionOptions.type, Buffer::FromString(body)};
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToBuffer());
Action action{ActionType::kSetSessionOptions.type, std::move(body)};
ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());
ARROW_ASSIGN_OR_RAISE(
Expand All @@ -730,8 +730,8 @@ ::arrow::Result<SetSessionOptionsResult> FlightClient::SetSessionOptions(
::arrow::Result<GetSessionOptionsResult> FlightClient::GetSessionOptions(
const FlightCallOptions& options, const GetSessionOptionsRequest& request) {
RETURN_NOT_OK(CheckOpen());
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToString());
Action action{ActionType::kGetSessionOptions.type, Buffer::FromString(body)};
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToBuffer());
Action action{ActionType::kGetSessionOptions.type, std::move(body)};
ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());
ARROW_ASSIGN_OR_RAISE(
Expand All @@ -744,8 +744,8 @@ ::arrow::Result<GetSessionOptionsResult> FlightClient::GetSessionOptions(
::arrow::Result<CloseSessionResult> FlightClient::CloseSession(
const FlightCallOptions& options, const CloseSessionRequest& request) {
RETURN_NOT_OK(CheckOpen());
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToString());
Action action{ActionType::kCloseSession.type, Buffer::FromString(body)};
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToBuffer());
Action action{ActionType::kCloseSession.type, std::move(body)};
ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());
ARROW_ASSIGN_OR_RAISE(auto close_session_result,
Expand Down
41 changes: 22 additions & 19 deletions cpp/src/arrow/flight/flight_internals_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ void TestRoundtrip(const std::vector<FlightType>& values,
ASSERT_OK(internal::ToProto(values[i], &pb_value));

if constexpr (std::is_same_v<FlightType, FlightInfo>) {
ASSERT_OK_AND_ASSIGN(FlightInfo value, internal::FromProto(pb_value));
EXPECT_EQ(values[i], value);
FlightInfo::Data info_data;
ASSERT_OK(internal::FromProto(pb_value, &info_data));
EXPECT_EQ(values[i], FlightInfo{std::move(info_data)});
} else if constexpr (std::is_same_v<FlightType, SchemaResult>) {
std::string data;
ASSERT_OK(internal::FromProto(pb_value, &data));
Expand Down Expand Up @@ -152,9 +153,11 @@ TEST(FlightTypes, BasicAuth) {
}

TEST(FlightTypes, Criteria) {
std::vector<Criteria> values = {{""}, {"criteria"}};
std::vector<std::string> reprs = {"<Criteria expression=''>",
"<Criteria expression='criteria'>"};
std::vector<Criteria> values = {Criteria{""}, Criteria{"criteria"}};
std::vector<std::string> reprs = {
"<Criteria expression=''>",
"<Criteria expression='criteria'>",
};
ASSERT_NO_FATAL_FAILURE(TestRoundtrip<pb::Criteria>(values, reprs));
}

Expand Down Expand Up @@ -191,14 +194,14 @@ TEST(FlightTypes, FlightEndpoint) {
Timestamp expiration_time(
std::chrono::duration_cast<Timestamp::duration>(expiration_time_duration));
std::vector<FlightEndpoint> values = {
{{""}, {}, std::nullopt, {}},
{{"foo"}, {}, std::nullopt, {}},
{{"bar"}, {}, std::nullopt, {"\xDE\xAD\xBE\xEF"}},
{{"foo"}, {}, expiration_time, {}},
{{"foo"}, {location1}, std::nullopt, {}},
{{"bar"}, {location1}, std::nullopt, {}},
{{"foo"}, {location2}, std::nullopt, {}},
{{"foo"}, {location1, location2}, std::nullopt, {"\xba\xdd\xca\xfe"}},
{Ticket{""}, {}, std::nullopt, {}},
{Ticket{"foo"}, {}, std::nullopt, {}},
{Ticket{"bar"}, {}, std::nullopt, {"\xDE\xAD\xBE\xEF"}},
{Ticket{"foo"}, {}, expiration_time, {}},
{Ticket{"foo"}, {location1}, std::nullopt, {}},
{Ticket{"bar"}, {location1}, std::nullopt, {}},
{Ticket{"foo"}, {location2}, std::nullopt, {}},
{Ticket{"foo"}, {location1, location2}, std::nullopt, {"\xba\xdd\xca\xfe"}},
};
std::vector<std::string> reprs = {
"<FlightEndpoint ticket=<Ticket ticket=''> locations=[] "
Expand Down Expand Up @@ -299,9 +302,9 @@ TEST(FlightTypes, PollInfo) {

TEST(FlightTypes, Result) {
std::vector<Result> values = {
{Buffer::FromString("")},
{Buffer::FromString("foo")},
{Buffer::FromString("bar")},
Result{Buffer::FromString("")},
Result{Buffer::FromString("foo")},
Result{Buffer::FromString("bar")},
};
std::vector<std::string> reprs = {
"<Result body=(0 bytes)>",
Expand Down Expand Up @@ -333,9 +336,9 @@ TEST(FlightTypes, SchemaResult) {

TEST(FlightTypes, Ticket) {
std::vector<Ticket> values = {
{""},
{"foo"},
{"bar"},
Ticket{""},
Ticket{"foo"},
Ticket{"bar"},
};
std::vector<std::string> reprs = {
"<Ticket ticket=''>",
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/flight/flight_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,8 @@ TEST_F(TestFlightClient, ListFlights) {
}

TEST_F(TestFlightClient, ListFlightsWithCriteria) {
ASSERT_OK_AND_ASSIGN(auto listing, client_->ListFlights(FlightCallOptions(), {"foo"}));
ASSERT_OK_AND_ASSIGN(auto listing,
client_->ListFlights(FlightCallOptions{}, Criteria{"foo"}));
std::unique_ptr<FlightInfo> info;
ASSERT_OK_AND_ASSIGN(info, listing->Next());
ASSERT_TRUE(info == nullptr);
Expand Down
45 changes: 30 additions & 15 deletions cpp/src/arrow/flight/serialization_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -251,22 +251,28 @@ Status ToProto(const FlightDescriptor& descriptor, pb::FlightDescriptor* pb_desc

// FlightInfo

arrow::Result<FlightInfo> FromProto(const pb::FlightInfo& pb_info) {
FlightInfo::Data info;
RETURN_NOT_OK(FromProto(pb_info.flight_descriptor(), &info.descriptor));
Status FromProto(const pb::FlightInfo& pb_info, FlightInfo::Data* info) {
RETURN_NOT_OK(FromProto(pb_info.flight_descriptor(), &info->descriptor));

info.schema = pb_info.schema();
info->schema = pb_info.schema();

info.endpoints.resize(pb_info.endpoint_size());
info->endpoints.resize(pb_info.endpoint_size());
for (int i = 0; i < pb_info.endpoint_size(); ++i) {
RETURN_NOT_OK(FromProto(pb_info.endpoint(i), &info.endpoints[i]));
RETURN_NOT_OK(FromProto(pb_info.endpoint(i), &info->endpoints[i]));
}

info.total_records = pb_info.total_records();
info.total_bytes = pb_info.total_bytes();
info.ordered = pb_info.ordered();
info.app_metadata = pb_info.app_metadata();
return FlightInfo(std::move(info));
info->total_records = pb_info.total_records();
info->total_bytes = pb_info.total_bytes();
info->ordered = pb_info.ordered();
info->app_metadata = pb_info.app_metadata();
return Status::OK();
}

Status FromProto(const pb::FlightInfo& pb_info, std::unique_ptr<FlightInfo>* info) {
FlightInfo::Data info_data;
RETURN_NOT_OK(FromProto(pb_info, &info_data));
*info = std::make_unique<FlightInfo>(std::move(info_data));
return Status::OK();
}

Status FromProto(const pb::BasicAuth& pb_basic_auth, BasicAuth* basic_auth) {
Expand Down Expand Up @@ -315,8 +321,9 @@ Status ToProto(const FlightInfo& info, pb::FlightInfo* pb_info) {

Status FromProto(const pb::PollInfo& pb_info, PollInfo* info) {
if (pb_info.has_info()) {
ARROW_ASSIGN_OR_RAISE(auto flight_info, FromProto(pb_info.info()));
info->info = std::make_unique<FlightInfo>(std::move(flight_info));
FlightInfo::Data info_data;
RETURN_NOT_OK(FromProto(pb_info.info(), &info_data));
info->info = std::make_unique<FlightInfo>(std::move(info_data));
}
if (pb_info.has_flight_descriptor()) {
FlightDescriptor descriptor;
Expand All @@ -340,6 +347,13 @@ Status FromProto(const pb::PollInfo& pb_info, PollInfo* info) {
return Status::OK();
}

Status FromProto(const pb::PollInfo& pb_info, std::unique_ptr<PollInfo>* info) {
PollInfo poll_info;
RETURN_NOT_OK(FromProto(pb_info, &poll_info));
*info = std::make_unique<PollInfo>(std::move(poll_info));
return Status::OK();
}

Status ToProto(const PollInfo& info, pb::PollInfo* pb_info) {
if (info.info) {
RETURN_NOT_OK(ToProto(*info.info, pb_info->mutable_info()));
Expand All @@ -360,8 +374,9 @@ Status ToProto(const PollInfo& info, pb::PollInfo* pb_info) {

Status FromProto(const pb::CancelFlightInfoRequest& pb_request,
CancelFlightInfoRequest* request) {
ARROW_ASSIGN_OR_RAISE(FlightInfo info, FromProto(pb_request.info()));
request->info = std::make_unique<FlightInfo>(std::move(info));
FlightInfo::Data info_data;
RETURN_NOT_OK(FromProto(pb_request.info(), &info_data));
request->info = std::make_unique<FlightInfo>(std::move(info_data));
return Status::OK();
}

Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/flight/serialization_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ Status FromProto(const pb::FlightDescriptor& pb_descr, FlightDescriptor* descr);
Status FromProto(const pb::FlightEndpoint& pb_endpoint, FlightEndpoint* endpoint);
Status FromProto(const pb::RenewFlightEndpointRequest& pb_request,
RenewFlightEndpointRequest* request);
arrow::Result<FlightInfo> FromProto(const pb::FlightInfo& pb_info);
Status FromProto(const pb::FlightInfo& pb_info, FlightInfo::Data* info);
Status FromProto(const pb::FlightInfo& pb_info, std::unique_ptr<FlightInfo>* info);
Status FromProto(const pb::PollInfo& pb_info, PollInfo* info);
Status FromProto(const pb::PollInfo& pb_info, std::unique_ptr<PollInfo>* info);
Status FromProto(const pb::CancelFlightInfoRequest& pb_request,
CancelFlightInfoRequest* request);
Status FromProto(const pb::SchemaResult& pb_result, std::string* result);
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/flight/sql/example/sqlite_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ arrow::Result<std::unique_ptr<FlightDataStream>> DoGetSQLiteQuery(
arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfoForCommand(
const FlightDescriptor& descriptor, const std::shared_ptr<Schema>& schema) {
std::vector<FlightEndpoint> endpoints{
FlightEndpoint{{descriptor.cmd}, {}, std::nullopt, ""}};
FlightEndpoint{Ticket{descriptor.cmd}, {}, std::nullopt, ""}};
ARROW_ASSIGN_OR_RAISE(auto result,
FlightInfo::Make(*schema, descriptor, endpoints, -1, -1, false))

Expand Down Expand Up @@ -389,7 +389,7 @@ class SQLiteFlightSqlServer::Impl {
const ServerCallContext& context, const GetTables& command,
const FlightDescriptor& descriptor) {
std::vector<FlightEndpoint> endpoints{
FlightEndpoint{{descriptor.cmd}, {}, std::nullopt, ""}};
FlightEndpoint{Ticket{descriptor.cmd}, {}, std::nullopt, ""}};

bool include_schema = command.include_schema;
ARROW_LOG(INFO) << "GetTables include_schema=" << include_schema;
Expand Down
35 changes: 9 additions & 26 deletions cpp/src/arrow/flight/sql/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -477,13 +477,11 @@ arrow::Result<Result> PackActionResult(ActionBeginTransactionResult result) {
}

arrow::Result<Result> PackActionResult(CancelFlightInfoResult result) {
ARROW_ASSIGN_OR_RAISE(auto serialized, result.SerializeToString());
return Result{Buffer::FromString(std::move(serialized))};
return result.SerializeToBuffer();
}

arrow::Result<Result> PackActionResult(const FlightEndpoint& endpoint) {
ARROW_ASSIGN_OR_RAISE(auto serialized, endpoint.SerializeToString());
return Result{Buffer::FromString(std::move(serialized))};
return endpoint.SerializeToBuffer();
}

arrow::Result<Result> PackActionResult(CancelResult result) {
Expand Down Expand Up @@ -525,21 +523,6 @@ arrow::Result<Result> PackActionResult(ActionCreatePreparedStatementResult resul
return PackActionResult(pb_result);
}

arrow::Result<Result> PackActionResult(SetSessionOptionsResult result) {
ARROW_ASSIGN_OR_RAISE(auto serialized, result.SerializeToString());
return Result{Buffer::FromString(std::move(serialized))};
}

arrow::Result<Result> PackActionResult(GetSessionOptionsResult result) {
ARROW_ASSIGN_OR_RAISE(auto serialized, result.SerializeToString());
return Result{Buffer::FromString(std::move(serialized))};
}

arrow::Result<Result> PackActionResult(CloseSessionResult result) {
ARROW_ASSIGN_OR_RAISE(auto serialized, result.SerializeToString());
return Result{Buffer::FromString(std::move(serialized))};
}

} // namespace

arrow::Result<StatementQueryTicket> StatementQueryTicket::Deserialize(
Expand Down Expand Up @@ -908,23 +891,23 @@ Status FlightSqlServerBase::DoAction(const ServerCallContext& context,
std::string_view body(*action.body);
ARROW_ASSIGN_OR_RAISE(auto request, SetSessionOptionsRequest::Deserialize(body));
ARROW_ASSIGN_OR_RAISE(auto result, SetSessionOptions(context, request));
ARROW_ASSIGN_OR_RAISE(auto packed_result, PackActionResult(std::move(result)));
ARROW_ASSIGN_OR_RAISE(auto packed_result, result.SerializeToBuffer());

results.push_back(std::move(packed_result));
results.emplace_back(std::move(packed_result));
} else if (action.type == ActionType::kGetSessionOptions.type) {
std::string_view body(*action.body);
ARROW_ASSIGN_OR_RAISE(auto request, GetSessionOptionsRequest::Deserialize(body));
ARROW_ASSIGN_OR_RAISE(auto result, GetSessionOptions(context, request));
ARROW_ASSIGN_OR_RAISE(auto packed_result, PackActionResult(std::move(result)));
ARROW_ASSIGN_OR_RAISE(auto packed_result, result.SerializeToBuffer());

results.push_back(std::move(packed_result));
results.emplace_back(std::move(packed_result));
} else if (action.type == ActionType::kCloseSession.type) {
std::string_view body(*action.body);
ARROW_ASSIGN_OR_RAISE(auto request, CloseSessionRequest::Deserialize(body));
ARROW_ASSIGN_OR_RAISE(auto result, CloseSession(context, request));
ARROW_ASSIGN_OR_RAISE(auto packed_result, PackActionResult(std::move(result)));
ARROW_ASSIGN_OR_RAISE(auto packed_result, result.SerializeToBuffer());

results.push_back(std::move(packed_result));
results.emplace_back(std::move(packed_result));
} else {
google::protobuf::Any any;
if (!any.ParseFromArray(action.body->data(), static_cast<int>(action.body->size()))) {
Expand Down Expand Up @@ -1063,7 +1046,7 @@ arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlServerBase::GetFlightInfoSql
}

std::vector<FlightEndpoint> endpoints{
FlightEndpoint{{descriptor.cmd}, {}, std::nullopt, {}}};
FlightEndpoint{Ticket{descriptor.cmd}, {}, std::nullopt, {}}};
ARROW_ASSIGN_OR_RAISE(
auto result, FlightInfo::Make(*SqlSchema::GetSqlInfoSchema(), descriptor, endpoints,
-1, -1, false))
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/arrow/flight/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -604,11 +604,11 @@ std::vector<FlightInfo> ExampleFlightInfo() {
Location location4 = *Location::ForGrpcTcp("foo4.bar.com", 12345);
Location location5 = *Location::ForGrpcTcp("foo5.bar.com", 12345);

FlightEndpoint endpoint1({{"ticket-ints-1"}, {location1}, std::nullopt, {}});
FlightEndpoint endpoint2({{"ticket-ints-2"}, {location2}, std::nullopt, {}});
FlightEndpoint endpoint3({{"ticket-cmd"}, {location3}, std::nullopt, {}});
FlightEndpoint endpoint4({{"ticket-dicts-1"}, {location4}, std::nullopt, {}});
FlightEndpoint endpoint5({{"ticket-floats-1"}, {location5}, std::nullopt, {}});
FlightEndpoint endpoint1({Ticket{"ticket-ints-1"}, {location1}, std::nullopt, {}});
FlightEndpoint endpoint2({Ticket{"ticket-ints-2"}, {location2}, std::nullopt, {}});
FlightEndpoint endpoint3({Ticket{"ticket-cmd"}, {location3}, std::nullopt, {}});
FlightEndpoint endpoint4({Ticket{"ticket-dicts-1"}, {location4}, std::nullopt, {}});
FlightEndpoint endpoint5({Ticket{"ticket-floats-1"}, {location5}, std::nullopt, {}});

FlightDescriptor descr1{FlightDescriptor::PATH, "", {"examples", "ints"}};
FlightDescriptor descr2{FlightDescriptor::CMD, "my_command", {}};
Expand Down
12 changes: 7 additions & 5 deletions cpp/src/arrow/flight/transport/grpc/grpc_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -648,10 +648,10 @@ class UnaryUnaryAsyncCall : public ::grpc::ClientUnaryReactor, public internal::

void OnDone(const ::grpc::Status& status) override {
if (status.ok()) {
auto result = internal::FromProto(pb_response);
client_status = result.status();
FlightInfo::Data info_data;
client_status = internal::FromProto(pb_response, &info_data);
if (client_status.ok()) {
listener->OnNext(std::move(result).MoveValueUnsafe());
listener->OnNext(FlightInfo{std::move(info_data)});
}
}
Finish(status);
Expand Down Expand Up @@ -889,7 +889,8 @@ class GrpcClientImpl : public internal::ClientTransport {

pb::FlightInfo pb_info;
while (!options.stop_token.IsStopRequested() && stream->Read(&pb_info)) {
ARROW_ASSIGN_OR_RAISE(FlightInfo info_data, internal::FromProto(pb_info));
FlightInfo::Data info_data;
RETURN_NOT_OK(internal::FromProto(pb_info, &info_data));
flights.emplace_back(std::move(info_data));
}
if (options.stop_token.IsStopRequested()) rpc.context.TryCancel();
Expand Down Expand Up @@ -939,7 +940,8 @@ class GrpcClientImpl : public internal::ClientTransport {
stub_->GetFlightInfo(&rpc.context, pb_descriptor, &pb_response), &rpc.context);
RETURN_NOT_OK(s);

ARROW_ASSIGN_OR_RAISE(auto info_data, internal::FromProto(pb_response));
FlightInfo::Data info_data;
RETURN_NOT_OK(internal::FromProto(pb_response, &info_data));
*info = std::make_unique<FlightInfo>(std::move(info_data));
return Status::OK();
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/flight/transport/ucx/ucx_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ class UcxServerImpl : public arrow::flight::internal::ServerTransport {
std::unique_ptr<FlightInfo> info;
std::string response;
SERVER_RETURN_NOT_OK(driver, base_->GetFlightInfo(context, descriptor, &info));
SERVER_RETURN_NOT_OK(driver, info->SerializeToString().Value(&response));
SERVER_RETURN_NOT_OK(driver, info->DoSerializeToString(&response));
RETURN_NOT_OK(driver->SendFrame(FrameType::kBuffer,
reinterpret_cast<const uint8_t*>(response.data()),
static_cast<int64_t>(response.size())));
Expand All @@ -397,7 +397,7 @@ class UcxServerImpl : public arrow::flight::internal::ServerTransport {
std::unique_ptr<PollInfo> info;
std::string response;
SERVER_RETURN_NOT_OK(driver, base_->PollFlightInfo(context, descriptor, &info));
SERVER_RETURN_NOT_OK(driver, info->SerializeToString().Value(&response));
SERVER_RETURN_NOT_OK(driver, info->DoSerializeToString(&response));
RETURN_NOT_OK(driver->SendFrame(FrameType::kBuffer,
reinterpret_cast<const uint8_t*>(response.data()),
static_cast<int64_t>(response.size())));
Expand Down
Loading
Loading