Skip to content

Commit

Permalink
fix merge
Browse files Browse the repository at this point in the history
  • Loading branch information
paleolimbot committed Aug 13, 2024
1 parent db0605e commit 0937125
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 615 deletions.
51 changes: 30 additions & 21 deletions c/driver/postgresql/bind_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -531,12 +531,6 @@ struct BindStream {

AdbcStatusCode ExecuteCopy(PGconn* pg_conn, const PostgresTypeResolver& type_resolver,
int64_t* rows_affected, struct AdbcError* error) {
// https://github.com/apache/arrow-adbc/issues/1921: PostgreSQL has a max
// size for a single message that we need to respect (1 GiB - 1). Since
// the buffer can be chunked up as much as we want, go for 16 MiB as our
// limit.
// https://github.com/postgres/postgres/blob/23c5a0e7d43bc925c6001538f04a458933a11fc1/src/common/stringinfo.c#L28
constexpr int64_t kMaxCopyBufferSize = 0x1000000;
if (rows_affected) *rows_affected = 0;

PostgresCopyStreamWriter writer;
Expand Down Expand Up @@ -564,26 +558,15 @@ struct BindStream {
return ADBC_STATUS_IO;
}

ArrowBuffer buffer = writer.WriteBuffer();
{
auto* data = reinterpret_cast<char*>(buffer.data);
int64_t remaining = buffer.size_bytes;
while (remaining > 0) {
int64_t to_write = std::min<int64_t>(remaining, kMaxCopyBufferSize);
if (PQputCopyData(pg_conn, data, to_write) <= 0) {
SetError(error, "Error writing tuple field data: %s",
PQerrorMessage(pg_conn));
return ADBC_STATUS_IO;
}
remaining -= to_write;
data += to_write;
}
}
RAISE_ADBC(FlushCopyWriterToConn(pg_conn, writer, error));

if (rows_affected) *rows_affected += current->length;
writer.Rewind();
}

// If there were no arrays in the stream, we haven't flushed yet
RAISE_ADBC(FlushCopyWriterToConn(pg_conn, writer, error));

if (PQputCopyEnd(pg_conn, NULL) <= 0) {
SetError(error, "Error message returned by PQputCopyEnd: %s",
PQerrorMessage(pg_conn));
Expand All @@ -603,5 +586,31 @@ struct BindStream {
PQclear(result);
return ADBC_STATUS_OK;
}

AdbcStatusCode FlushCopyWriterToConn(PGconn* pg_conn,
const PostgresCopyStreamWriter& writer,
struct AdbcError* error) {
// https://github.com/apache/arrow-adbc/issues/1921: PostgreSQL has a max
// size for a single message that we need to respect (1 GiB - 1). Since
// the buffer can be chunked up as much as we want, go for 16 MiB as our
// limit.
// https://github.com/postgres/postgres/blob/23c5a0e7d43bc925c6001538f04a458933a11fc1/src/common/stringinfo.c#L28
constexpr int64_t kMaxCopyBufferSize = 0x1000000;
ArrowBuffer buffer = writer.WriteBuffer();

auto* data = reinterpret_cast<char*>(buffer.data);
int64_t remaining = buffer.size_bytes;
while (remaining > 0) {
int64_t to_write = std::min<int64_t>(remaining, kMaxCopyBufferSize);
if (PQputCopyData(pg_conn, data, to_write) <= 0) {
SetError(error, "Error writing tuple field data: %s", PQerrorMessage(pg_conn));
return ADBC_STATUS_IO;
}
remaining -= to_write;
data += to_write;
}

return ADBC_STATUS_OK;
}
};
} // namespace adbcpq
Loading

0 comments on commit 0937125

Please sign in to comment.