From 33ebd9d0f5abcb363ebb26195c5349dbfaa11c91 Mon Sep 17 00:00:00 2001 From: William Ayd Date: Tue, 2 Jul 2024 19:57:59 -0400 Subject: [PATCH] feat(c/driver/postgresql): UInt(8/16/32) Writer (#1961) More progress towards https://github.com/apache/arrow-adbc/issues/1950 --- .../copy/postgres_copy_writer_test.cc | 100 ++++++++++++++++++ c/driver/postgresql/copy/writer.h | 3 + 2 files changed, 103 insertions(+) diff --git a/c/driver/postgresql/copy/postgres_copy_writer_test.cc b/c/driver/postgresql/copy/postgres_copy_writer_test.cc index 8777f1c816..1a31126958 100644 --- a/c/driver/postgresql/copy/postgres_copy_writer_test.cc +++ b/c/driver/postgresql/copy/postgres_copy_writer_test.cc @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include #include #include @@ -188,6 +189,105 @@ TEST(PostgresCopyUtilsTest, PostgresCopyWriteInt64) { } } +// COPY (SELECT CAST("col" AS SMALLINT) AS "col" FROM ( VALUES (0), (255), +// (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT binary); +static const uint8_t kTestPgCopyUInt8[] = { + 0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, + 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, + 0x00, 0xff, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}; + +TEST(PostgresCopyUtilsTest, PostgresCopyWriteUInt8) { + adbc_validation::Handle schema; + adbc_validation::Handle array; + struct ArrowError na_error; + ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", NANOARROW_TYPE_UINT8}}), + ADBC_STATUS_OK); + ASSERT_EQ(adbc_validation::MakeBatch( + &schema.value, &array.value, &na_error, + {0, (std::numeric_limits::max)(), std::nullopt}), + ADBC_STATUS_OK); + + PostgresCopyStreamWriteTester tester; + ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK); + ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); + + const struct ArrowBuffer buf = tester.WriteBuffer(); + // The last 2 bytes of a message can be transmitted via PQputCopyData + // so no need to test those bytes from the Writer + constexpr size_t buf_size = sizeof(kTestPgCopyUInt8) - 2; + ASSERT_EQ(buf.size_bytes, buf_size); + for (size_t i = 0; i < buf_size; i++) { + ASSERT_EQ(buf.data[i], kTestPgCopyUInt8[i]); + } +} + +// COPY (SELECT CAST("col" AS INTEGER) AS "col" FROM ( VALUES (0), (65535), +// (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT binary); +static const uint8_t kTestPgCopyUInt16[] = { + 0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, + 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, + 0x00, 0xff, 0xff, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}; + +TEST(PostgresCopyUtilsTest, PostgresCopyWriteUInt16) { + adbc_validation::Handle schema; + adbc_validation::Handle array; + struct ArrowError na_error; + ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", NANOARROW_TYPE_UINT16}}), + ADBC_STATUS_OK); + ASSERT_EQ(adbc_validation::MakeBatch( + &schema.value, &array.value, &na_error, + {0, (std::numeric_limits::max)(), std::nullopt}), + ADBC_STATUS_OK); + + PostgresCopyStreamWriteTester tester; + ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK); + ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); + + const struct ArrowBuffer buf = tester.WriteBuffer(); + // The last 2 bytes of a message can be transmitted via PQputCopyData + // so no need to test those bytes from the Writer + constexpr size_t buf_size = sizeof(kTestPgCopyUInt16) - 2; + ASSERT_EQ(buf.size_bytes, buf_size); + for (size_t i = 0; i < buf_size; i++) { + ASSERT_EQ(buf.data[i], kTestPgCopyUInt16[i]); + } +} + +// COPY (SELECT CAST("col" AS BIGINT) AS "col" FROM ( VALUES (0), (2^32-1), +// (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT binary); +static const uint8_t kTestPgCopyUInt32[] = { + 0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, + 0x00, 0xff, 0xff, 0xff, 0xff, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}; + +TEST(PostgresCopyUtilsTest, PostgresCopyWriteUInt32) { + adbc_validation::Handle schema; + adbc_validation::Handle array; + struct ArrowError na_error; + ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", NANOARROW_TYPE_UINT32}}), + ADBC_STATUS_OK); + ASSERT_EQ(adbc_validation::MakeBatch( + &schema.value, &array.value, &na_error, + {0, (std::numeric_limits::max)(), std::nullopt}), + ADBC_STATUS_OK); + + PostgresCopyStreamWriteTester tester; + ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK); + ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); + + const struct ArrowBuffer buf = tester.WriteBuffer(); + // The last 2 bytes of a message can be transmitted via PQputCopyData + // so no need to test those bytes from the Writer + constexpr size_t buf_size = sizeof(kTestPgCopyUInt32) - 2; + ASSERT_EQ(buf.size_bytes, buf_size); + for (size_t i = 0; i < buf_size; i++) { + ASSERT_EQ(buf.data[i], kTestPgCopyUInt32[i]); + } +} + TEST(PostgresCopyUtilsTest, PostgresCopyWriteReal) { adbc_validation::Handle schema; adbc_validation::Handle array; diff --git a/c/driver/postgresql/copy/writer.h b/c/driver/postgresql/copy/writer.h index 99791ad433..d7acf3cc6f 100644 --- a/c/driver/postgresql/copy/writer.h +++ b/c/driver/postgresql/copy/writer.h @@ -506,12 +506,15 @@ static inline ArrowErrorCode MakeCopyFieldWriter( return NANOARROW_OK; case NANOARROW_TYPE_INT8: case NANOARROW_TYPE_INT16: + case NANOARROW_TYPE_UINT8: *out = std::make_unique>(); return NANOARROW_OK; case NANOARROW_TYPE_INT32: + case NANOARROW_TYPE_UINT16: *out = std::make_unique>(); return NANOARROW_OK; case NANOARROW_TYPE_INT64: + case NANOARROW_TYPE_UINT32: *out = std::make_unique>(); return NANOARROW_OK; case NANOARROW_TYPE_DATE32: {