diff --git a/c/driver/postgresql/connection.cc b/c/driver/postgresql/connection.cc index a918188bf3..b5c0ef161e 100644 --- a/c/driver/postgresql/connection.cc +++ b/c/driver/postgresql/connection.cc @@ -145,12 +145,10 @@ class PqGetObjectsHelper { params.push_back(db_schema_); } - auto result_helper = - PqResultHelper{conn_, std::string(query.buffer), params, error_}; + auto result_helper = PqResultHelper{conn_, std::string(query.buffer)}; StringBuilderReset(&query); - RAISE_ADBC(result_helper.Prepare()); - RAISE_ADBC(result_helper.Execute()); + RAISE_ADBC(result_helper.Execute(error_, params)); for (PqResultRow row : result_helper) { const char* schema_name = row[0].data; @@ -188,12 +186,10 @@ class PqGetObjectsHelper { params.push_back(catalog_); } - PqResultHelper result_helper = - PqResultHelper{conn_, std::string(query.buffer), params, error_}; + PqResultHelper result_helper = PqResultHelper{conn_, std::string(query.buffer)}; StringBuilderReset(&query); - RAISE_ADBC(result_helper.Prepare()); - RAISE_ADBC(result_helper.Execute()); + RAISE_ADBC(result_helper.Execute(error_, params)); for (PqResultRow row : result_helper) { const char* db_name = row[0].data; @@ -280,11 +276,10 @@ class PqGetObjectsHelper { } } - auto result_helper = PqResultHelper{conn_, query.buffer, params, error_}; + auto result_helper = PqResultHelper{conn_, query.buffer}; StringBuilderReset(&query); - RAISE_ADBC(result_helper.Prepare()); - RAISE_ADBC(result_helper.Execute()); + RAISE_ADBC(result_helper.Execute(error_, params)); for (PqResultRow row : result_helper) { const char* table_name = row[0].data; const char* table_type = row[1].data; @@ -341,11 +336,10 @@ class PqGetObjectsHelper { params.push_back(std::string(column_name_)); } - auto result_helper = PqResultHelper{conn_, query.buffer, params, error_}; + auto result_helper = PqResultHelper{conn_, query.buffer}; StringBuilderReset(&query); - RAISE_ADBC(result_helper.Prepare()); - RAISE_ADBC(result_helper.Execute()); + RAISE_ADBC(result_helper.Execute(error_, params)); for (PqResultRow row : result_helper) { const char* column_name = row[0].data; @@ -493,11 +487,10 @@ class PqGetObjectsHelper { params.push_back(std::string(column_name_)); } - auto result_helper = PqResultHelper{conn_, query.buffer, params, error_}; + auto result_helper = PqResultHelper{conn_, query.buffer}; StringBuilderReset(&query); - RAISE_ADBC(result_helper.Prepare()); - RAISE_ADBC(result_helper.Execute()); + RAISE_ADBC(result_helper.Execute(error_, params)); for (PqResultRow row : result_helper) { const char* constraint_name = row[0].data; @@ -655,9 +648,8 @@ AdbcStatusCode PostgresConnection::PostgresConnectionGetInfoImpl( break; case ADBC_INFO_VENDOR_VERSION: { const char* stmt = "SHOW server_version_num"; - auto result_helper = PqResultHelper{conn_, std::string(stmt), error}; - RAISE_ADBC(result_helper.Prepare()); - RAISE_ADBC(result_helper.Execute()); + auto result_helper = PqResultHelper{conn_, std::string(stmt)}; + RAISE_ADBC(result_helper.Execute(error)); auto it = result_helper.begin(); if (it == result_helper.end()) { SetError(error, "[libpq] PostgreSQL returned no rows for '%s'", stmt); @@ -760,9 +752,8 @@ AdbcStatusCode PostgresConnection::GetOption(const char* option, char* value, if (std::strcmp(option, ADBC_CONNECTION_OPTION_CURRENT_CATALOG) == 0) { output = PQdb(conn_); } else if (std::strcmp(option, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA) == 0) { - PqResultHelper result_helper{conn_, "SELECT CURRENT_SCHEMA", {}, error}; - RAISE_ADBC(result_helper.Prepare()); - RAISE_ADBC(result_helper.Execute()); + PqResultHelper result_helper{conn_, "SELECT CURRENT_SCHEMA"}; + RAISE_ADBC(result_helper.Execute(error)); auto it = result_helper.begin(); if (it == result_helper.end()) { SetError(error, "[libpq] PostgreSQL returned no rows for 'SELECT CURRENT_SCHEMA'"); @@ -931,10 +922,8 @@ AdbcStatusCode PostgresConnectionGetStatisticsImpl(PGconn* conn, const char* db_ std::string prev_table; { - PqResultHelper result_helper{ - conn, query, {db_schema, table_name ? table_name : "%"}, error}; - RAISE_ADBC(result_helper.Prepare()); - RAISE_ADBC(result_helper.Execute()); + PqResultHelper result_helper{conn, query}; + RAISE_ADBC(result_helper.Execute(error, {db_schema, table_name ? table_name : "%"})); for (PqResultRow row : result_helper) { auto reltuples = row[5].ParseDouble(); @@ -1166,11 +1155,9 @@ AdbcStatusCode PostgresConnection::GetTableSchema(const char* catalog, std::vector params = {table_name_str}; - PqResultHelper result_helper = - PqResultHelper{conn_, std::string(query.c_str()), params, error}; + PqResultHelper result_helper = PqResultHelper{conn_, std::string(query.c_str())}; - RAISE_ADBC(result_helper.Prepare()); - auto result = result_helper.Execute(); + auto result = result_helper.Execute(error, params); if (result != ADBC_STATUS_OK) { auto error_code = std::string(error->sqlstate, 5); if ((error_code == "42P01") || (error_code == "42602")) { @@ -1337,10 +1324,8 @@ AdbcStatusCode PostgresConnection::SetOption(const char* key, const char* value, return ADBC_STATUS_OK; } else if (std::strcmp(key, ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA) == 0) { // PostgreSQL doesn't accept a parameter here - PqResultHelper result_helper{ - conn_, std::string("SET search_path TO ") + value, {}, error}; - RAISE_ADBC(result_helper.Prepare()); - RAISE_ADBC(result_helper.Execute()); + PqResultHelper result_helper{conn_, std::string("SET search_path TO ") + value}; + RAISE_ADBC(result_helper.Execute(error)); return ADBC_STATUS_OK; } SetError(error, "%s%s", "[libpq] Unknown option ", key); diff --git a/c/driver/postgresql/postgresql_test.cc b/c/driver/postgresql/postgresql_test.cc index a6d4f7704c..ff3dc0b70b 100644 --- a/c/driver/postgresql/postgresql_test.cc +++ b/c/driver/postgresql/postgresql_test.cc @@ -1247,7 +1247,7 @@ TEST_F(PostgresStatementTest, UpdateInExecuteQuery) { ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value, &reader.rows_affected, &error), IsOkStatus(&error)); - ASSERT_EQ(reader.rows_affected, -1); + ASSERT_EQ(reader.rows_affected, 2); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_EQ(reader.array->release, nullptr); @@ -1276,6 +1276,32 @@ TEST_F(PostgresStatementTest, UpdateInExecuteQuery) { } } +TEST_F(PostgresStatementTest, ExecuteSchemaParameterizedQuery) { + nanoarrow::UniqueSchema schema_bind; + ArrowSchemaInit(schema_bind.get()); + ASSERT_THAT(ArrowSchemaSetTypeStruct(schema_bind.get(), 1), + adbc_validation::IsOkErrno()); + ASSERT_THAT(ArrowSchemaSetType(schema_bind->children[0], NANOARROW_TYPE_STRING), + adbc_validation::IsOkErrno()); + + nanoarrow::UniqueArrayStream bind; + nanoarrow::EmptyArrayStream(schema_bind.get()).ToArrayStream(bind.get()); + + ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); + ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT $1", &error), + IsOkStatus(&error)); + ASSERT_THAT(AdbcStatementBindStream(&statement, bind.get(), &error), IsOkStatus()); + + nanoarrow::UniqueSchema schema; + ASSERT_THAT(AdbcStatementExecuteSchema(&statement, schema.get(), &error), + IsOkStatus(&error)); + + ASSERT_EQ(1, schema->n_children); + ASSERT_STREQ("u", schema->children[0]->format); + + ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error)); +} + TEST_F(PostgresStatementTest, BatchSizeHint) { ASSERT_THAT(quirks()->EnsureSampleTable(&connection, "batch_size_hint_test", &error), IsOkStatus(&error)); @@ -1345,16 +1371,13 @@ TEST_F(PostgresStatementTest, AdbcErrorBackwardsCompatibility) { TEST_F(PostgresStatementTest, Cancel) { ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); - for (const char* query : { - "DROP TABLE IF EXISTS test_cancel", - "CREATE TABLE test_cancel (ints INT)", - R"(INSERT INTO test_cancel (ints) - SELECT g :: INT FROM GENERATE_SERIES(1, 65536) temp(g))", - }) { - ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query, &error), IsOkStatus(&error)); - ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error), - IsOkStatus(&error)); - } + const char* query = R"(DROP TABLE IF EXISTS test_cancel; + CREATE TABLE test_cancel (ints INT); + INSERT INTO test_cancel (ints) + SELECT g :: INT FROM GENERATE_SERIES(1, 65536) temp(g);)"; + ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query, &error), IsOkStatus(&error)); + ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error), + IsOkStatus(&error)); ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT * FROM test_cancel", &error), IsOkStatus(&error)); @@ -1381,6 +1404,91 @@ TEST_F(PostgresStatementTest, Cancel) { ASSERT_NE(0, AdbcErrorGetDetailCount(detail)); } +TEST_F(PostgresStatementTest, MultipleStatementsSingleQuery) { + ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); + + const char* query = R"(DROP TABLE IF EXISTS test_query_statements; + CREATE TABLE test_query_statements (ints INT); + INSERT INTO test_query_statements VALUES((1)); + INSERT INTO test_query_statements VALUES((2)); + INSERT INTO test_query_statements VALUES((3));)"; + ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query, &error), IsOkStatus(&error)); + ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error), + IsOkStatus(&error)); + + ASSERT_THAT( + AdbcStatementSetSqlQuery(&statement, "SELECT * FROM test_query_statements", &error), + IsOkStatus(&error)); + + adbc_validation::StreamReader reader; + ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value, + &reader.rows_affected, &error), + IsOkStatus(&error)); + reader.GetSchema(); + ASSERT_THAT(reader.MaybeNext(), adbc_validation::IsOkErrno()); + ASSERT_EQ(reader.array->length, 3); +} + +TEST_F(PostgresStatementTest, SetUseCopyFalse) { + ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); + + const char* query = R"(DROP TABLE IF EXISTS test_query_set_copy_false; + CREATE TABLE test_query_set_copy_false (ints INT); + INSERT INTO test_query_set_copy_false VALUES((1)); + INSERT INTO test_query_set_copy_false VALUES((NULL)); + INSERT INTO test_query_set_copy_false VALUES((3));)"; + ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query, &error), IsOkStatus(&error)); + ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error), + IsOkStatus(&error)); + + // Check option setting/getting + ASSERT_EQ( + adbc_validation::StatementGetOption(&statement, "adbc.postgresql.use_copy", &error), + "true"); + + ASSERT_THAT(AdbcStatementSetOption(&statement, "adbc.postgresql.use_copy", + "not true or false", &error), + IsStatus(ADBC_STATUS_INVALID_ARGUMENT)); + + ASSERT_THAT(AdbcStatementSetOption(&statement, "adbc.postgresql.use_copy", + ADBC_OPTION_VALUE_ENABLED, &error), + IsOkStatus(&error)); + ASSERT_EQ( + adbc_validation::StatementGetOption(&statement, "adbc.postgresql.use_copy", &error), + "true"); + + ASSERT_THAT(AdbcStatementSetOption(&statement, "adbc.postgresql.use_copy", + ADBC_OPTION_VALUE_DISABLED, &error), + IsOkStatus(&error)); + ASSERT_EQ( + adbc_validation::StatementGetOption(&statement, "adbc.postgresql.use_copy", &error), + "false"); + + ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, + "SELECT * FROM test_query_set_copy_false", &error), + IsOkStatus(&error)); + + adbc_validation::StreamReader reader; + ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value, + &reader.rows_affected, &error), + IsOkStatus(&error)); + + ASSERT_EQ(reader.rows_affected, 3); + + reader.GetSchema(); + ASSERT_EQ(reader.schema->n_children, 1); + ASSERT_STREQ(reader.schema->children[0]->format, "i"); + ASSERT_STREQ(reader.schema->children[0]->name, "ints"); + + ASSERT_THAT(reader.MaybeNext(), adbc_validation::IsOkErrno()); + ASSERT_EQ(reader.array->length, 3); + ASSERT_EQ(reader.array->n_children, 1); + ASSERT_EQ(reader.array->children[0]->null_count, 1); + + ASSERT_THAT(reader.MaybeNext(), adbc_validation::IsOkErrno()); + ASSERT_EQ(reader.array->release, nullptr); +} + struct TypeTestCase { std::string name; std::string sql_type; diff --git a/c/driver/postgresql/result_helper.cc b/c/driver/postgresql/result_helper.cc index ad5a54e00d..df890a7c51 100644 --- a/c/driver/postgresql/result_helper.cc +++ b/c/driver/postgresql/result_helper.cc @@ -17,24 +17,25 @@ #include "result_helper.h" +#include +#include + +#include "copy/reader.h" #include "driver/common/utils.h" #include "error.h" namespace adbcpq { -PqResultHelper::~PqResultHelper() { - if (result_ != nullptr) { - PQclear(result_); - } -} +PqResultHelper::~PqResultHelper() { ClearResult(); } -AdbcStatusCode PqResultHelper::Prepare() { +AdbcStatusCode PqResultHelper::PrepareInternal(int n_params, const Oid* param_oids, + struct AdbcError* error) { // TODO: make stmtName a unique identifier? PGresult* result = - PQprepare(conn_, /*stmtName=*/"", query_.c_str(), param_values_.size(), NULL); + PQprepare(conn_, /*stmtName=*/"", query_.c_str(), n_params, param_oids); if (PQresultStatus(result) != PGRES_COMMAND_OK) { AdbcStatusCode code = - SetError(error_, result, "[libpq] Failed to prepare query: %s\nQuery was:%s", + SetError(error, result, "[libpq] Failed to prepare query: %s\nQuery was:%s", PQerrorMessage(conn_), query_.c_str()); PQclear(result); return code; @@ -44,24 +45,327 @@ AdbcStatusCode PqResultHelper::Prepare() { return ADBC_STATUS_OK; } -AdbcStatusCode PqResultHelper::Execute() { - std::vector param_c_strs; +AdbcStatusCode PqResultHelper::Prepare(struct AdbcError* error) { + return PrepareInternal(0, nullptr, error); +} - for (size_t index = 0; index < param_values_.size(); index++) { - param_c_strs.push_back(param_values_[index].c_str()); +AdbcStatusCode PqResultHelper::Prepare(const std::vector& param_oids, + struct AdbcError* error) { + return PrepareInternal(param_oids.size(), param_oids.data(), error); +} + +AdbcStatusCode PqResultHelper::DescribePrepared(struct AdbcError* error) { + ClearResult(); + result_ = PQdescribePrepared(conn_, /*stmtName=*/""); + if (PQresultStatus(result_) != PGRES_COMMAND_OK) { + AdbcStatusCode code = SetError( + error, result_, "[libpq] Failed to describe prepared statement: %s\nQuery was:%s", + PQerrorMessage(conn_), query_.c_str()); + ClearResult(); + return code; } - result_ = - PQexecPrepared(conn_, "", param_values_.size(), param_c_strs.data(), NULL, NULL, 0); + return ADBC_STATUS_OK; +} + +AdbcStatusCode PqResultHelper::Execute(struct AdbcError* error, + const std::vector& params, + PostgresType* param_types) { + if (params.size() == 0 && param_types == nullptr && output_format_ == Format::kText) { + ClearResult(); + result_ = PQexec(conn_, query_.c_str()); + } else { + std::vector param_values; + std::vector param_lengths; + std::vector param_formats; + + for (const auto& param : params) { + param_values.push_back(param.data()); + param_lengths.push_back(static_cast(param.size())); + param_formats.push_back(static_cast(param_format_)); + } + + std::vector param_oids; + const Oid* param_oids_ptr = nullptr; + if (param_types != nullptr) { + param_oids.resize(params.size()); + for (size_t i = 0; i < params.size(); i++) { + param_oids[i] = param_types->child(i).oid(); + } + param_oids_ptr = param_oids.data(); + } + + ClearResult(); + result_ = PQexecParams(conn_, query_.c_str(), param_values.size(), param_oids_ptr, + param_values.data(), param_lengths.data(), + param_formats.data(), static_cast(output_format_)); + } ExecStatusType status = PQresultStatus(result_); if (status != PGRES_TUPLES_OK && status != PGRES_COMMAND_OK) { - AdbcStatusCode error = - SetError(error_, result_, "[libpq] Failed to execute query '%s': %s", + AdbcStatusCode status = + SetError(error, result_, "[libpq] Failed to execute query '%s': %s", query_.c_str(), PQerrorMessage(conn_)); - return error; + return status; + } + + return ADBC_STATUS_OK; +} + +AdbcStatusCode PqResultHelper::ExecuteCopy(struct AdbcError* error) { + // Remove trailing semicolon(s) from the query before feeding it into COPY + while (!query_.empty() && query_.back() == ';') { + query_.pop_back(); } + std::string copy_query = "COPY (" + query_ + ") TO STDOUT (FORMAT binary)"; + ClearResult(); + result_ = PQexecParams(conn_, copy_query.c_str(), /*nParams=*/0, + /*paramTypes=*/nullptr, /*paramValues=*/nullptr, + /*paramLengths=*/nullptr, /*paramFormats=*/nullptr, + static_cast(Format::kBinary)); + + if (PQresultStatus(result_) != PGRES_COPY_OUT) { + AdbcStatusCode code = SetError( + error, result_, + "[libpq] Failed to execute query: could not begin COPY: %s\nQuery was: %s", + PQerrorMessage(conn_), copy_query.c_str()); + ClearResult(); + return code; + } + + return ADBC_STATUS_OK; +} + +AdbcStatusCode PqResultHelper::ResolveParamTypes(PostgresTypeResolver& type_resolver, + PostgresType* param_types, + struct AdbcError* error) { + struct ArrowError na_error; + ArrowErrorInit(&na_error); + + const int num_params = PQnparams(result_); + PostgresType root_type(PostgresTypeId::kRecord); + + for (int i = 0; i < num_params; i++) { + const Oid pg_oid = PQparamtype(result_, i); + PostgresType pg_type; + if (type_resolver.Find(pg_oid, &pg_type, &na_error) != NANOARROW_OK) { + SetError(error, "%s%d%s%s%s%d", "[libpq] Parameter #", i + 1, " (\"", + PQfname(result_, i), "\") has unknown type code ", pg_oid); + ClearResult(); + return ADBC_STATUS_NOT_IMPLEMENTED; + } + + root_type.AppendChild(PQfname(result_, i), pg_type); + } + + *param_types = root_type; + return ADBC_STATUS_OK; +} + +AdbcStatusCode PqResultHelper::ResolveOutputTypes(PostgresTypeResolver& type_resolver, + PostgresType* result_types, + struct AdbcError* error) { + struct ArrowError na_error; + ArrowErrorInit(&na_error); + + const int num_fields = PQnfields(result_); + PostgresType root_type(PostgresTypeId::kRecord); + + for (int i = 0; i < num_fields; i++) { + const Oid pg_oid = PQftype(result_, i); + PostgresType pg_type; + if (type_resolver.Find(pg_oid, &pg_type, &na_error) != NANOARROW_OK) { + SetError(error, "%s%d%s%s%s%d", "[libpq] Column #", i + 1, " (\"", + PQfname(result_, i), "\") has unknown type code ", pg_oid); + ClearResult(); + return ADBC_STATUS_NOT_IMPLEMENTED; + } + + root_type.AppendChild(PQfname(result_, i), pg_type); + } + + *result_types = root_type; + return ADBC_STATUS_OK; +} + +PGresult* PqResultHelper::ReleaseResult() { + PGresult* out = result_; + result_ = nullptr; + return out; +} + +int64_t PqResultHelper::AffectedRows() { + if (result_ == nullptr) { + return -1; + } + + char* first = PQcmdTuples(result_); + char* last = first + strlen(first); + if ((last - first) == 0) { + return -1; + } + + int64_t out; + auto result = std::from_chars(first, last, out); + + if (result.ec == std::errc() && result.ptr == last) { + return out; + } else { + return -1; + } +} + +int PqResultArrayReader::GetSchema(struct ArrowSchema* out) { + ResetErrors(); + + if (schema_->release == nullptr) { + AdbcStatusCode status = Initialize(&error_); + if (status != ADBC_STATUS_OK) { + return EINVAL; + } + } + + return ArrowSchemaDeepCopy(schema_.get(), out); +} + +int PqResultArrayReader::GetNext(struct ArrowArray* out) { + ResetErrors(); + + if (schema_->release == nullptr) { + AdbcStatusCode status = Initialize(&error_); + if (status != ADBC_STATUS_OK) { + return EINVAL; + } + } + + if (!helper_.HasResult()) { + out->release = nullptr; + return NANOARROW_OK; + } + + nanoarrow::UniqueArray tmp; + NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromSchema(tmp.get(), schema_.get(), &na_error_)); + NANOARROW_RETURN_NOT_OK(ArrowArrayStartAppending(tmp.get())); + for (int i = 0; i < helper_.NumColumns(); i++) { + NANOARROW_RETURN_NOT_OK(field_readers_[i]->InitArray(tmp->children[i])); + } + + // TODO: If we get an EOVERFLOW here (e.g., big string data), we + // would need to keep track of what row number we're on and start + // from there instead of begin() on the next call. We could also + // respect the size hint here to chunk the batches. + struct ArrowBufferView item; + for (auto it = helper_.begin(); it != helper_.end(); it++) { + auto row = *it; + for (int i = 0; i < helper_.NumColumns(); i++) { + auto pg_item = row[i]; + item.data.data = pg_item.data; + + if (pg_item.is_null) { + item.size_bytes = -1; + } else { + item.size_bytes = pg_item.len; + } + + NANOARROW_RETURN_NOT_OK( + field_readers_[i]->Read(&item, item.size_bytes, tmp->children[i], &na_error_)); + } + } + + for (int i = 0; i < helper_.NumColumns(); i++) { + NANOARROW_RETURN_NOT_OK(field_readers_[i]->FinishArray(tmp->children[i], &na_error_)); + } + + tmp->length = helper_.NumRows(); + tmp->null_count = 0; + NANOARROW_RETURN_NOT_OK(ArrowArrayFinishBuildingDefault(tmp.get(), &na_error_)); + + // Ensure that the next call to GetNext() will signal the end of the stream + helper_.ClearResult(); + + // Canonically return zero-size results as an empty stream + if (tmp->length == 0) { + out->release = nullptr; + return NANOARROW_OK; + } + + ArrowArrayMove(tmp.get(), out); + return NANOARROW_OK; +} + +const char* PqResultArrayReader::GetLastError() { + if (error_.message != nullptr) { + return error_.message; + } else { + return na_error_.message; + } +} + +AdbcStatusCode PqResultArrayReader::Initialize(struct AdbcError* error) { + helper_.set_output_format(PqResultHelper::Format::kBinary); + RAISE_ADBC(helper_.Execute(error)); + + ArrowSchemaInit(schema_.get()); + CHECK_NA_DETAIL(INTERNAL, ArrowSchemaSetTypeStruct(schema_.get(), helper_.NumColumns()), + &na_error_, error); + + for (int i = 0; i < helper_.NumColumns(); i++) { + PostgresType child_type; + CHECK_NA_DETAIL(INTERNAL, + type_resolver_->Find(helper_.FieldType(i), &child_type, &na_error_), + &na_error_, error); + + CHECK_NA(INTERNAL, child_type.SetSchema(schema_->children[i]), error); + CHECK_NA(INTERNAL, ArrowSchemaSetName(schema_->children[i], helper_.FieldName(i)), + error); + + std::unique_ptr child_reader; + CHECK_NA_DETAIL( + INTERNAL, + MakeCopyFieldReader(child_type, schema_->children[i], &child_reader, &na_error_), + &na_error_, error); + + child_reader->Init(child_type); + CHECK_NA_DETAIL(INTERNAL, child_reader->InitSchema(schema_->children[i]), &na_error_, + error); + + field_readers_.push_back(std::move(child_reader)); + } + + return ADBC_STATUS_OK; +} + +AdbcStatusCode PqResultArrayReader::ToArrayStream(int64_t* affected_rows, + struct ArrowArrayStream* out, + struct AdbcError* error) { + if (out == nullptr) { + // If there is no output requested, we still need to execute and set + // affected_rows if needed. We don't need an output schema or to set + // up a copy reader, so we can skip those steps by going straight + // to Execute(). This also enables us to support queries with multiple + // statements because we can call PQexec() instead of PQexecParams(). + RAISE_ADBC(helper_.Execute(error)); + + if (affected_rows != nullptr) { + *affected_rows = helper_.AffectedRows(); + } + + return ADBC_STATUS_OK; + } + + // Execute eagerly. We need this to provide row counts for DELETE and + // CREATE TABLE queries as well as to provide more informative errors + // until this reader class is wired up to provide extended AdbcError + // information. + RAISE_ADBC(Initialize(error)); + if (affected_rows != nullptr) { + *affected_rows = helper_.AffectedRows(); + } + + nanoarrow::ArrayStreamFactory::InitArrayStream( + new PqResultArrayReader(this), out); + return ADBC_STATUS_OK; } diff --git a/c/driver/postgresql/result_helper.h b/c/driver/postgresql/result_helper.h index 25a79fad8d..43083b8bcb 100644 --- a/c/driver/postgresql/result_helper.h +++ b/c/driver/postgresql/result_helper.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include #include @@ -26,6 +27,8 @@ #include #include +#include "copy/reader.h" + namespace adbcpq { /// \brief A single column in a single row of a result set. @@ -73,25 +76,57 @@ class PqResultRow { // prior to iterating class PqResultHelper { public: - explicit PqResultHelper(PGconn* conn, std::string query, struct AdbcError* error) - : conn_(conn), query_(std::move(query)), error_(error) {} + enum class Format { + kText = 0, + kBinary = 1, + }; - explicit PqResultHelper(PGconn* conn, std::string query, - std::vector param_values, struct AdbcError* error) - : conn_(conn), - query_(std::move(query)), - param_values_(std::move(param_values)), - error_(error) {} + explicit PqResultHelper(PGconn* conn, std::string query) + : conn_(conn), query_(std::move(query)) {} + + PqResultHelper(PqResultHelper&& other) + : PqResultHelper(other.conn_, std::move(other.query_)) { + result_ = other.result_; + other.result_ = nullptr; + } ~PqResultHelper(); - AdbcStatusCode Prepare(); - AdbcStatusCode Execute(); + void set_param_format(Format format) { param_format_ = format; } + void set_output_format(Format format) { output_format_ = format; } + + AdbcStatusCode Prepare(struct AdbcError* error); + AdbcStatusCode Prepare(const std::vector& param_oids, struct AdbcError* error); + AdbcStatusCode DescribePrepared(struct AdbcError* error); + AdbcStatusCode Execute(struct AdbcError* error, + const std::vector& params = {}, + PostgresType* param_types = nullptr); + AdbcStatusCode ExecuteCopy(struct AdbcError* error); + AdbcStatusCode ResolveParamTypes(PostgresTypeResolver& type_resolver, + PostgresType* param_types, struct AdbcError* error); + AdbcStatusCode ResolveOutputTypes(PostgresTypeResolver& type_resolver, + PostgresType* result_types, struct AdbcError* error); + + bool HasResult() { return result_ != nullptr; } + + PGresult* ReleaseResult(); + + void ClearResult() { + PQclear(result_); + result_ = nullptr; + } + + int64_t AffectedRows(); int NumRows() const { return PQntuples(result_); } int NumColumns() const { return PQnfields(result_); } + const char* FieldName(int column_number) const { + return PQfname(result_, column_number); + } + Oid FieldType(int column_number) const { return PQftype(result_, column_number); } + class iterator { const PqResultHelper& outer_; int curr_row_ = 0; @@ -127,7 +162,58 @@ class PqResultHelper { PGresult* result_ = nullptr; PGconn* conn_; std::string query_; - std::vector param_values_; - struct AdbcError* error_; + Format param_format_ = Format::kText; + Format output_format_ = Format::kText; + + AdbcStatusCode PrepareInternal(int n_params, const Oid* param_oids, + struct AdbcError* error); }; + +class PqResultArrayReader { + public: + PqResultArrayReader(PGconn* conn, std::shared_ptr type_resolver, + std::string query) + : helper_(conn, std::move(query)), type_resolver_(type_resolver) { + ArrowErrorInit(&na_error_); + error_ = ADBC_ERROR_INIT; + } + + ~PqResultArrayReader() { ResetErrors(); } + + int GetSchema(struct ArrowSchema* out); + int GetNext(struct ArrowArray* out); + const char* GetLastError(); + + AdbcStatusCode ToArrayStream(int64_t* affected_rows, struct ArrowArrayStream* out, + struct AdbcError* error); + + AdbcStatusCode Initialize(struct AdbcError* error); + + private: + PqResultHelper helper_; + std::shared_ptr type_resolver_; + std::vector> field_readers_; + nanoarrow::UniqueSchema schema_; + struct AdbcError error_; + struct ArrowError na_error_; + + explicit PqResultArrayReader(PqResultArrayReader* other) + : helper_(std::move(other->helper_)), + type_resolver_(std::move(other->type_resolver_)), + field_readers_(std::move(other->field_readers_)), + schema_(std::move(other->schema_)) { + ArrowErrorInit(&na_error_); + error_ = ADBC_ERROR_INIT; + } + + void ResetErrors() { + ArrowErrorInit(&na_error_); + + if (error_.private_data != nullptr) { + error_.release(&error_); + } + error_ = ADBC_ERROR_INIT; + } +}; + } // namespace adbcpq diff --git a/c/driver/postgresql/statement.cc b/c/driver/postgresql/statement.cc index 4443551235..0fa8a79b9d 100644 --- a/c/driver/postgresql/statement.cc +++ b/c/driver/postgresql/statement.cc @@ -82,30 +82,6 @@ struct OneValueStream { } }; -/// Build an PostgresType object from a PGresult* -AdbcStatusCode ResolvePostgresType(const PostgresTypeResolver& type_resolver, - PGresult* result, PostgresType* out, - struct AdbcError* error) { - ArrowError na_error; - const int num_fields = PQnfields(result); - PostgresType root_type(PostgresTypeId::kRecord); - - for (int i = 0; i < num_fields; i++) { - const Oid pg_oid = PQftype(result, i); - PostgresType pg_type; - if (type_resolver.Find(pg_oid, &pg_type, &na_error) != NANOARROW_OK) { - SetError(error, "%s%d%s%s%s%d", "[libpq] Column #", i + 1, " (\"", - PQfname(result, i), "\") has unknown type code ", pg_oid); - return ADBC_STATUS_NOT_IMPLEMENTED; - } - - root_type.AppendChild(PQfname(result, i), pg_type); - } - - *out = root_type; - return ADBC_STATUS_OK; -} - /// Helper to manage bind parameters with a prepared statement struct BindStream { Handle bind; @@ -1148,18 +1124,15 @@ AdbcStatusCode PostgresStatement::CreateBulkTable( return ADBC_STATUS_OK; } -AdbcStatusCode PostgresStatement::ExecutePreparedStatement( - struct ArrowArrayStream* stream, int64_t* rows_affected, struct AdbcError* error) { - if (!bind_.release) { - // TODO: set an empty stream just to unify the code paths - SetError(error, "%s", - "[libpq] Prepared statements without parameters are not implemented"); - return ADBC_STATUS_NOT_IMPLEMENTED; - } +AdbcStatusCode PostgresStatement::ExecuteBind(struct ArrowArrayStream* stream, + int64_t* rows_affected, + struct AdbcError* error) { if (stream) { // TODO: SetError(error, "%s", - "[libpq] Prepared statements returning result sets are not implemented"); + "[libpq] Prepared statements with parameters returning result sets are not " + "implemented"); + return ADBC_STATUS_NOT_IMPLEMENTED; } @@ -1178,27 +1151,10 @@ AdbcStatusCode PostgresStatement::ExecuteQuery(struct ArrowArrayStream* stream, int64_t* rows_affected, struct AdbcError* error) { ClearResult(); - if (prepared_) { - if (bind_.release || !stream) { - return ExecutePreparedStatement(stream, rows_affected, error); - } - // XXX: don't use a prepared statement to execute a no-parameter - // result-set-returning query for now, since we can't easily get - // access to COPY there. (This might have to become sequential - // executions of COPY (EXECUTE ($n, ...)) TO STDOUT which won't - // get all the benefits of a prepared statement.) At preparation - // time we don't know whether the query will be used with a result - // set or not without analyzing the query (we could prepare both?) - // and https://stackoverflow.com/questions/69233792 suggests that - // you can't PREPARE a query containing COPY. - } - if (!stream && !ingest_.target.empty()) { - return ExecuteUpdateBulk(rows_affected, error); - } - // Remove trailing semicolon(s) from the query before feeding it into COPY - while (!query_.empty() && query_.back() == ';') { - query_.pop_back(); + // Use a dedicated path to handle bulk ingest + if (!ingest_.target.empty()) { + return ExecuteIngest(stream, rows_affected, error); } if (query_.empty()) { @@ -1206,53 +1162,53 @@ AdbcStatusCode PostgresStatement::ExecuteQuery(struct ArrowArrayStream* stream, return ADBC_STATUS_INVALID_STATE; } - // 1. Prepare the query to get the schema - { - RAISE_ADBC(SetupReader(error)); - - // If the caller did not request a result set or if there are no - // inferred output columns (e.g. a CREATE or UPDATE), then don't - // use COPY (which would fail anyways) - if (!stream || reader_.copy_reader_->pg_type().n_children() == 0) { - RAISE_ADBC(ExecuteUpdateQuery(rows_affected, error)); - if (stream) { - struct ArrowSchema schema; - std::memset(&schema, 0, sizeof(schema)); - RAISE_NA(reader_.copy_reader_->GetSchema(&schema)); - nanoarrow::EmptyArrayStream::MakeUnique(&schema).move(stream); - } - return ADBC_STATUS_OK; - } + // Use a dedicated path to handle parameter binding + if (bind_.release != nullptr) { + return ExecuteBind(stream, rows_affected, error); + } - // This resolves the reader specific to each PostgresType -> ArrowSchema - // conversion. It is unlikely that this will fail given that we have just - // inferred these conversions ourselves. - struct ArrowError na_error; - int na_res = reader_.copy_reader_->InitFieldReaders(&na_error); - if (na_res != NANOARROW_OK) { - SetError(error, "[libpq] Failed to initialize field readers: %s", na_error.message); - return na_res; - } + // If we have been requested to avoid COPY or there is no output requested, + // execute using the PqResultArrayReader. + if (!stream || !use_copy_) { + PqResultArrayReader reader(connection_->conn(), type_resolver_, query_); + RAISE_ADBC(reader.ToArrayStream(rows_affected, stream, error)); + return ADBC_STATUS_OK; } - // 2. Execute the query with COPY to get binary tuples - { - std::string copy_query = "COPY (" + query_ + ") TO STDOUT (FORMAT binary)"; - reader_.result_ = - PQexecParams(connection_->conn(), copy_query.c_str(), /*nParams=*/0, - /*paramTypes=*/nullptr, /*paramValues=*/nullptr, - /*paramLengths=*/nullptr, /*paramFormats=*/nullptr, kPgBinaryFormat); - if (PQresultStatus(reader_.result_) != PGRES_COPY_OUT) { - AdbcStatusCode code = SetError( - error, reader_.result_, - "[libpq] Failed to execute query: could not begin COPY: %s\nQuery was: %s", - PQerrorMessage(connection_->conn()), copy_query.c_str()); - ClearResult(); - return code; - } - // Result is read from the connection, not the result, but we won't clear it here + PqResultHelper helper(connection_->conn(), query_); + RAISE_ADBC(helper.Prepare(error)); + RAISE_ADBC(helper.DescribePrepared(error)); + + // Initialize the copy reader and infer the output schema (i.e., error for + // unsupported types before issuing the COPY query). This could be lazier + // (i.e., executed on the first call to GetSchema() or GetNext()). + PostgresType root_type; + RAISE_ADBC(helper.ResolveOutputTypes(*type_resolver_, &root_type, error)); + + // If there will be no columns in the result, we can also avoid COPY + if (root_type.n_children() == 0) { + // Could/should move the helper into the reader instead of repreparing + PqResultArrayReader reader(connection_->conn(), type_resolver_, query_); + RAISE_ADBC(reader.ToArrayStream(rows_affected, stream, error)); + return ADBC_STATUS_OK; } + struct ArrowError na_error; + reader_.copy_reader_ = std::make_unique(); + CHECK_NA(INTERNAL, reader_.copy_reader_->Init(root_type), error); + CHECK_NA_DETAIL(INTERNAL, reader_.copy_reader_->InferOutputSchema(&na_error), &na_error, + error); + + CHECK_NA_DETAIL(INTERNAL, reader_.copy_reader_->InitFieldReaders(&na_error), &na_error, + error); + + // Execute the COPY query + RAISE_ADBC(helper.ExecuteCopy(error)); + + // We need the PQresult back for the reader + reader_.result_ = helper.ReleaseResult(); + + // Export to stream reader_.ExportTo(stream); if (rows_affected) *rows_affected = -1; return ADBC_STATUS_OK; @@ -1264,31 +1220,69 @@ AdbcStatusCode PostgresStatement::ExecuteSchema(struct ArrowSchema* schema, if (query_.empty()) { SetError(error, "%s", "[libpq] Must SetSqlQuery before ExecuteQuery"); return ADBC_STATUS_INVALID_STATE; - } else if (bind_.release) { - // TODO: if we have parameters, bind them (since they can affect the output schema) - SetError(error, "[libpq] ExecuteSchema with parameters is not implemented"); - return ADBC_STATUS_NOT_IMPLEMENTED; } - RAISE_ADBC(SetupReader(error)); - CHECK_NA(INTERNAL, reader_.copy_reader_->GetSchema(schema), error); + PqResultHelper helper(connection_->conn(), query_); + + if (bind_.release) { + nanoarrow::UniqueSchema schema; + struct ArrowError na_error; + ArrowErrorInit(&na_error); + CHECK_NA_DETAIL(INTERNAL, ArrowArrayStreamGetSchema(&bind_, schema.get(), &na_error), + &na_error, error); + + if (std::string(schema->format) != "+s") { + SetError(error, "%s", "[libpq] Bind parameters must have type STRUCT"); + return ADBC_STATUS_INVALID_STATE; + } + + std::vector param_oids(schema->n_children); + for (int64_t i = 0; i < schema->n_children; i++) { + PostgresType pg_type; + CHECK_NA_DETAIL(INTERNAL, + PostgresType::FromSchema(*type_resolver_, schema->children[i], + &pg_type, &na_error), + &na_error, error); + param_oids[i] = pg_type.oid(); + } + + RAISE_ADBC(helper.Prepare(param_oids, error)); + } else { + RAISE_ADBC(helper.Prepare(error)); + } + + RAISE_ADBC(helper.DescribePrepared(error)); + + PostgresType output_type; + RAISE_ADBC(helper.ResolveOutputTypes(*type_resolver_, &output_type, error)); + + nanoarrow::UniqueSchema tmp; + ArrowSchemaInit(tmp.get()); + CHECK_NA(INTERNAL, output_type.SetSchema(tmp.get()), error); + + tmp.move(schema); return ADBC_STATUS_OK; } -AdbcStatusCode PostgresStatement::ExecuteUpdateBulk(int64_t* rows_affected, - struct AdbcError* error) { +AdbcStatusCode PostgresStatement::ExecuteIngest(struct ArrowArrayStream* stream, + int64_t* rows_affected, + struct AdbcError* error) { if (!bind_.release) { SetError(error, "%s", "[libpq] Must Bind() before Execute() for bulk ingestion"); return ADBC_STATUS_INVALID_STATE; } + if (stream != nullptr) { + SetError(error, "%s", "[libpq] Bulk ingest with result set is not supported"); + return ADBC_STATUS_NOT_IMPLEMENTED; + } + // Need the current schema to avoid being shadowed by temp tables // This is a little unfortunate; we need another DB roundtrip std::string current_schema; { - PqResultHelper result_helper{connection_->conn(), "SELECT CURRENT_SCHEMA", {}, error}; - RAISE_ADBC(result_helper.Prepare()); - RAISE_ADBC(result_helper.Execute()); + PqResultHelper result_helper{connection_->conn(), "SELECT CURRENT_SCHEMA"}; + RAISE_ADBC(result_helper.Execute(error)); auto it = result_helper.begin(); if (it == result_helper.end()) { SetError(error, "[libpq] PostgreSQL returned no rows for 'SELECT CURRENT_SCHEMA'"); @@ -1329,37 +1323,6 @@ AdbcStatusCode PostgresStatement::ExecuteUpdateBulk(int64_t* rows_affected, return ADBC_STATUS_OK; } -AdbcStatusCode PostgresStatement::ExecuteUpdateQuery(int64_t* rows_affected, - struct AdbcError* error) { - // NOTE: must prepare first (used in ExecuteQuery) - PGresult* result = - PQexecPrepared(connection_->conn(), /*stmtName=*/"", /*nParams=*/0, - /*paramValues=*/nullptr, /*paramLengths=*/nullptr, - /*paramFormats=*/nullptr, /*resultFormat=*/kPgBinaryFormat); - ExecStatusType status = PQresultStatus(result); - if (status != PGRES_COMMAND_OK && status != PGRES_TUPLES_OK) { - AdbcStatusCode code = - SetError(error, result, "[libpq] Failed to execute query: %s\nQuery was:%s", - PQerrorMessage(connection_->conn()), query_.c_str()); - PQclear(result); - return code; - } - if (rows_affected) { - if (status == PGRES_TUPLES_OK) { - *rows_affected = PQntuples(reader_.result_); - } else { - // In theory, PQcmdTuples would work here, but experimentally it gives - // an empty string even for a DELETE. (Also, why does it return a - // string...) Possibly, it doesn't work because we use PQexecPrepared - // but the docstring is careful to specify it works on an EXECUTE of a - // prepared statement. - *rows_affected = -1; - } - } - PQclear(result); - return ADBC_STATUS_OK; -} - AdbcStatusCode PostgresStatement::GetOption(const char* key, char* value, size_t* length, struct AdbcError* error) { std::string result; @@ -1384,6 +1347,12 @@ AdbcStatusCode PostgresStatement::GetOption(const char* key, char* value, size_t } } else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES) == 0) { result = std::to_string(reader_.batch_size_hint_bytes_); + } else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_USE_COPY) == 0) { + if (use_copy_) { + result = "true"; + } else { + result = "false"; + } } else { SetError(error, "[libpq] Unknown statement option '%s'", key); return ADBC_STATUS_NOT_FOUND; @@ -1503,6 +1472,15 @@ AdbcStatusCode PostgresStatement::SetOption(const char* key, const char* value, } this->reader_.batch_size_hint_bytes_ = int_value; + } else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_USE_COPY) == 0) { + if (std::strcmp(value, ADBC_OPTION_VALUE_ENABLED) == 0) { + use_copy_ = true; + } else if (std::strcmp(value, ADBC_OPTION_VALUE_DISABLED) == 0) { + use_copy_ = false; + } else { + SetError(error, "[libpq] Invalid value '%s' for option '%s'", value, key); + return ADBC_STATUS_INVALID_ARGUMENT; + } } else { SetError(error, "[libpq] Unknown statement option '%s'", key); return ADBC_STATUS_NOT_IMPLEMENTED; @@ -1537,53 +1515,9 @@ AdbcStatusCode PostgresStatement::SetOptionInt(const char* key, int64_t value, return ADBC_STATUS_NOT_IMPLEMENTED; } -AdbcStatusCode PostgresStatement::SetupReader(struct AdbcError* error) { - // TODO: we should pipeline here and assume this will succeed - PGresult* result = PQprepare(connection_->conn(), /*stmtName=*/"", query_.c_str(), - /*nParams=*/0, nullptr); - if (PQresultStatus(result) != PGRES_COMMAND_OK) { - AdbcStatusCode code = - SetError(error, result, - "[libpq] Failed to execute query: could not infer schema: failed to " - "prepare query: %s\nQuery was:%s", - PQerrorMessage(connection_->conn()), query_.c_str()); - PQclear(result); - return code; - } - PQclear(result); - result = PQdescribePrepared(connection_->conn(), /*stmtName=*/""); - if (PQresultStatus(result) != PGRES_COMMAND_OK) { - AdbcStatusCode code = - SetError(error, result, - "[libpq] Failed to execute query: could not infer schema: failed to " - "describe prepared statement: %s\nQuery was:%s", - PQerrorMessage(connection_->conn()), query_.c_str()); - PQclear(result); - return code; - } - - // Resolve the information from the PGresult into a PostgresType - PostgresType root_type; - AdbcStatusCode status = ResolvePostgresType(*type_resolver_, result, &root_type, error); - PQclear(result); - if (status != ADBC_STATUS_OK) return status; - - // Initialize the copy reader and infer the output schema (i.e., error for - // unsupported types before issuing the COPY query) - reader_.copy_reader_ = std::make_unique(); - reader_.copy_reader_->Init(root_type); - struct ArrowError na_error; - int na_res = reader_.copy_reader_->InferOutputSchema(&na_error); - if (na_res != NANOARROW_OK) { - SetError(error, "[libpq] Failed to infer output schema: (%d) %s: %s", na_res, - std::strerror(na_res), na_error.message); - return ADBC_STATUS_INTERNAL; - } - return ADBC_STATUS_OK; -} - void PostgresStatement::ClearResult() { // TODO: we may want to synchronize here for safety reader_.Release(); } + } // namespace adbcpq diff --git a/c/driver/postgresql/statement.h b/c/driver/postgresql/statement.h index f2387a3ace..1cd60bff5f 100644 --- a/c/driver/postgresql/statement.h +++ b/c/driver/postgresql/statement.h @@ -33,6 +33,8 @@ #define ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES \ "adbc.postgresql.batch_size_hint_bytes" +#define ADBC_POSTGRESQL_OPTION_USE_COPY "adbc.postgresql.use_copy" + namespace adbcpq { class PostgresConnection; class PostgresStatement; @@ -90,7 +92,11 @@ class TupleReader final { class PostgresStatement { public: PostgresStatement() - : connection_(nullptr), query_(), prepared_(false), reader_(nullptr) { + : connection_(nullptr), + query_(), + prepared_(false), + use_copy_(true), + reader_(nullptr) { std::memset(&bind_, 0, sizeof(bind_)); } @@ -130,12 +136,10 @@ class PostgresStatement { const std::vector& source_schema_fields, std::string* escaped_table, std::string* escaped_field_list, struct AdbcError* error); - AdbcStatusCode ExecuteUpdateBulk(int64_t* rows_affected, struct AdbcError* error); - AdbcStatusCode ExecuteUpdateQuery(int64_t* rows_affected, struct AdbcError* error); - AdbcStatusCode ExecutePreparedStatement(struct ArrowArrayStream* stream, - int64_t* rows_affected, - struct AdbcError* error); - AdbcStatusCode SetupReader(struct AdbcError* error); + AdbcStatusCode ExecuteIngest(struct ArrowArrayStream* stream, int64_t* rows_affected, + struct AdbcError* error); + AdbcStatusCode ExecuteBind(struct ArrowArrayStream* stream, int64_t* rows_affected, + struct AdbcError* error); private: std::shared_ptr type_resolver_; @@ -154,6 +158,9 @@ class PostgresStatement { kCreateAppend, }; + // Options + bool use_copy_; + struct { std::string db_schema; std::string target; diff --git a/c/validation/adbc_validation_util.cc b/c/validation/adbc_validation_util.cc index b319e54951..54c18cce70 100644 --- a/c/validation/adbc_validation_util.cc +++ b/c/validation/adbc_validation_util.cc @@ -36,6 +36,20 @@ std::optional ConnectionGetOption(struct AdbcConnection* connection return std::string(buffer, buffer_size - 1); } +std::optional StatementGetOption(struct AdbcStatement* statement, + std::string_view option, + struct AdbcError* error) { + char buffer[128]; + size_t buffer_size = sizeof(buffer); + AdbcStatusCode status = + AdbcStatementGetOption(statement, option.data(), buffer, &buffer_size, error); + EXPECT_THAT(status, IsOkStatus(error)); + if (status != ADBC_STATUS_OK) return std::nullopt; + EXPECT_GT(buffer_size, 0); + if (buffer_size == 0) return std::nullopt; + return std::string(buffer, buffer_size - 1); +} + std::string StatusCodeToString(AdbcStatusCode code) { #define CASE(CONSTANT) \ case ADBC_STATUS_##CONSTANT: \ diff --git a/c/validation/adbc_validation_util.h b/c/validation/adbc_validation_util.h index 0dba6dafbc..21eca52de0 100644 --- a/c/validation/adbc_validation_util.h +++ b/c/validation/adbc_validation_util.h @@ -43,6 +43,10 @@ std::optional ConnectionGetOption(struct AdbcConnection* connection std::string_view option, struct AdbcError* error); +std::optional StatementGetOption(struct AdbcStatement* statement, + std::string_view option, + struct AdbcError* error); + // ------------------------------------------------------------ // Helpers to print values diff --git a/python/adbc_driver_postgresql/tests/test_dbapi.py b/python/adbc_driver_postgresql/tests/test_dbapi.py index 94cd9f82d0..9e2661d542 100644 --- a/python/adbc_driver_postgresql/tests/test_dbapi.py +++ b/python/adbc_driver_postgresql/tests/test_dbapi.py @@ -148,7 +148,7 @@ def test_query_execute_schema(postgres: dbapi.Connection) -> None: def test_query_invalid(postgres: dbapi.Connection) -> None: with postgres.cursor() as cur: with pytest.raises( - postgres.ProgrammingError, match="failed to prepare query" + postgres.ProgrammingError, match="Failed to prepare query" ) as excinfo: cur.execute("SELECT * FROM tabledoesnotexist")