Skip to content

Commit

Permalink
fix(csharp/src/Drivers/Apache): remove interleaved async look-ahead c…
Browse files Browse the repository at this point in the history
…ode (#2273)

The attempt to use “look-ahead” buffering in the HiveServer2Reader is
not supported by the Thrift library. The issue is that the Thrift
library uses a shared buffer on the Client/Protocol/Transport object -so
interleaving Fetch and ExecuteStatement will fail because the buffer
will get closed unexpectedly.
  • Loading branch information
birschick-bq authored Oct 22, 2024
1 parent 25725fd commit 4639ca8
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 15 deletions.
17 changes: 3 additions & 14 deletions csharp/src/Drivers/Apache/Hive2/HiveServer2Reader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ internal class HiveServer2Reader : IArrowArrayStream
{ ArrowTypeId.Timestamp, ConvertToTimestamp },
};

// Look-ahead task.
Task<TFetchResultsResp>? _fetchResultResponseTask = default;

public HiveServer2Reader(
HiveServer2Statement statement,
Schema schema,
Expand All @@ -76,8 +73,6 @@ public HiveServer2Reader(
_statement = statement;
Schema = schema;
_dataTypeConversion = dataTypeConversion;
// Start the pre-fetch of the first batch
_fetchResultResponseTask = FetchNext(_statement, cancellationToken);
}

public Schema Schema { get; }
Expand All @@ -91,7 +86,7 @@ public HiveServer2Reader(
}

// Await the fetch response
TFetchResultsResp response = await (_fetchResultResponseTask ?? throw new InvalidOperationException("unexpected state - fetch result task should be set."));
TFetchResultsResp response = await FetchNext(_statement, cancellationToken);

// Build the current batch
RecordBatch result = CreateBatch(response, out int fetchedRows);
Expand All @@ -100,12 +95,6 @@ public HiveServer2Reader(
{
// This is the last batch
_statement = null;
_fetchResultResponseTask = null;
}
else
{
// Otherwise, start the pre-fetch of the next batch
_fetchResultResponseTask = FetchNext(_statement, cancellationToken);
}

// Return the current batch.
Expand All @@ -128,10 +117,10 @@ private RecordBatch CreateBatch(TFetchResultsResp response, out int length)
return new RecordBatch(Schema, columnData, length);
}

private static Task<TFetchResultsResp> FetchNext(HiveServer2Statement statement, CancellationToken cancellationToken = default)
private static async Task<TFetchResultsResp> FetchNext(HiveServer2Statement statement, CancellationToken cancellationToken = default)
{
var request = new TFetchResultsReq(statement.OperationHandle, TFetchOrientation.FETCH_NEXT, statement.BatchSize);
return statement.Connection.Client.FetchResults(request, cancellationToken);
return await statement.Connection.Client.FetchResults(request, cancellationToken);
}

public void Dispose()
Expand Down
2 changes: 1 addition & 1 deletion csharp/test/Drivers/Apache/Spark/DriverTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void CanExecuteUpdate()
for (int i = 0; i < queries.Length; i++)
{
string query = queries[i];
AdbcStatement statement = adbcConnection.CreateStatement();
using AdbcStatement statement = adbcConnection.CreateStatement();
statement.SqlQuery = query;

UpdateResult updateResult = statement.ExecuteUpdate();
Expand Down

0 comments on commit 4639ca8

Please sign in to comment.