diff --git a/csharp/Apache.Arrow.sln b/csharp/Apache.Arrow.sln index 7e7f7c6331e88..0e569de1d6c8f 100644 --- a/csharp/Apache.Arrow.sln +++ b/csharp/Apache.Arrow.sln @@ -27,6 +27,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Flight.Sql.Tes EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Flight.Sql", "src\Apache.Arrow.Flight.Sql\Apache.Arrow.Flight.Sql.csproj", "{2ADE087A-B424-4895-8CC5-10170D10BA62}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Flight.IntegrationTest", "test\Apache.Arrow.Flight.IntegrationTest\Apache.Arrow.Flight.IntegrationTest.csproj", "{7E66CBB4-D921-41E7-A98A-7C6DEA521696}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -81,6 +83,10 @@ Global {2ADE087A-B424-4895-8CC5-10170D10BA62}.Debug|Any CPU.Build.0 = Debug|Any CPU {2ADE087A-B424-4895-8CC5-10170D10BA62}.Release|Any CPU.ActiveCfg = Release|Any CPU {2ADE087A-B424-4895-8CC5-10170D10BA62}.Release|Any CPU.Build.0 = Release|Any CPU + {7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Debug|Any CPU.Build.0 = Debug|Any CPU + {7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Release|Any CPU.ActiveCfg = Release|Any CPU + {7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/csharp/src/Apache.Arrow.Flight/FlightRecordBatchStreamWriter.cs b/csharp/src/Apache.Arrow.Flight/FlightRecordBatchStreamWriter.cs index f76f08224541f..7a8a6fd677c68 100644 --- a/csharp/src/Apache.Arrow.Flight/FlightRecordBatchStreamWriter.cs +++ b/csharp/src/Apache.Arrow.Flight/FlightRecordBatchStreamWriter.cs @@ -64,7 +64,7 @@ protected virtual void Dispose(bool disposing) { if (!_disposed) { - _flightDataStream.Dispose(); + _flightDataStream?.Dispose(); _disposed = true; } } diff --git a/csharp/test/Apache.Arrow.Flight.IntegrationTest/Apache.Arrow.Flight.IntegrationTest.csproj b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Apache.Arrow.Flight.IntegrationTest.csproj new file mode 100644 index 0000000000000..34030621b4bde --- /dev/null +++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Apache.Arrow.Flight.IntegrationTest.csproj @@ -0,0 +1,18 @@ + + + + + Exe + net8.0 + Apache.Arrow.Flight.IntegrationTest + + + + + + + + + + + diff --git a/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightClientCommand.cs b/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightClientCommand.cs new file mode 100644 index 0000000000000..d9e0ff5230611 --- /dev/null +++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightClientCommand.cs @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.IO; +using System.Threading.Tasks; + +namespace Apache.Arrow.Flight.IntegrationTest; + +public class FlightClientCommand +{ + private readonly int _port; + private readonly string _scenario; + private readonly FileInfo _jsonFileInfo; + + public FlightClientCommand(int port, string scenario, FileInfo jsonFileInfo) + { + _port = port; + _scenario = scenario; + _jsonFileInfo = jsonFileInfo; + } + + public async Task Execute() + { + if (!string.IsNullOrEmpty(_scenario)) + { + // No named scenarios are currently implemented + throw new Exception($"Scenario '{_scenario}' is not supported."); + } + + if (!(_jsonFileInfo?.Exists ?? false)) + { + throw new Exception($"Invalid JSON file path '{_jsonFileInfo?.FullName}'"); + } + + var scenario = new JsonTestScenario(_port, _jsonFileInfo); + await scenario.RunClient().ConfigureAwait(false); + } +} diff --git a/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightServerCommand.cs b/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightServerCommand.cs new file mode 100644 index 0000000000000..c3a7694485b69 --- /dev/null +++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightServerCommand.cs @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Net; +using System.Threading.Tasks; +using Apache.Arrow.Flight.TestWeb; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.Hosting.Server; +using Microsoft.AspNetCore.Hosting.Server.Features; +using Microsoft.AspNetCore.Server.Kestrel.Core; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +namespace Apache.Arrow.Flight.IntegrationTest; + +public class FlightServerCommand +{ + private readonly string _scenario; + + public FlightServerCommand(string scenario) + { + _scenario = scenario; + } + + public async Task Execute() + { + if (!string.IsNullOrEmpty(_scenario)) + { + // No named scenarios are currently implemented + throw new Exception($"Scenario '{_scenario}' is not supported."); + } + + var host = Host.CreateDefaultBuilder() + .ConfigureWebHostDefaults(webBuilder => + { + webBuilder + .ConfigureKestrel(options => + { + options.Listen(IPEndPoint.Parse("127.0.0.1:0"), l => l.Protocols = HttpProtocols.Http2); + }) + .UseStartup(); + }) + .Build(); + + await host.StartAsync().ConfigureAwait(false); + + var addresses = host.Services.GetService().Features.Get().Addresses; + foreach (var address in addresses) + { + Console.WriteLine($"Server listening on {address}"); + } + + await host.WaitForShutdownAsync().ConfigureAwait(false); + } +} diff --git a/csharp/test/Apache.Arrow.Flight.IntegrationTest/GrpcResolver.cs b/csharp/test/Apache.Arrow.Flight.IntegrationTest/GrpcResolver.cs new file mode 100644 index 0000000000000..44b1075e7abf2 --- /dev/null +++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/GrpcResolver.cs @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Grpc.Net.Client.Balancer; + +namespace Apache.Arrow.Flight.IntegrationTest; + +/// +/// The Grpc.Net.Client library doesn't know how to handle the "grpc+tcp" scheme used by Arrow Flight. +/// This ResolverFactory passes these through to the standard Static Resolver used for the http scheme. +/// +public class GrpcTcpResolverFactory : ResolverFactory +{ + public override string Name => "grpc+tcp"; + + public override Resolver Create(ResolverOptions options) + { + return new StaticResolverFactory( + uri => new[] { new BalancerAddress(options.Address.Host, options.Address.Port) }) + .Create(options); + } +} diff --git a/csharp/test/Apache.Arrow.Flight.IntegrationTest/JsonTestScenario.cs b/csharp/test/Apache.Arrow.Flight.IntegrationTest/JsonTestScenario.cs new file mode 100644 index 0000000000000..f4f3ac28bfa1b --- /dev/null +++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/JsonTestScenario.cs @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using Apache.Arrow.Flight.Client; +using Apache.Arrow.IntegrationTest; +using Apache.Arrow.Tests; +using Apache.Arrow.Types; +using Google.Protobuf; +using Grpc.Net.Client; +using Grpc.Core; +using Grpc.Net.Client.Balancer; +using Microsoft.Extensions.DependencyInjection; + +namespace Apache.Arrow.Flight.IntegrationTest; + +/// +/// A test scenario defined using a JSON data file +/// +internal class JsonTestScenario +{ + private readonly int _serverPort; + private readonly FileInfo _jsonFile; + private readonly ServiceProvider _serviceProvider; + + public JsonTestScenario(int serverPort, FileInfo jsonFile) + { + _serverPort = serverPort; + _jsonFile = jsonFile; + + var services = new ServiceCollection(); + services.AddSingleton(new GrpcTcpResolverFactory()); + _serviceProvider = services.BuildServiceProvider(); + } + + public async Task RunClient() + { + var address = $"grpc+tcp://localhost:{_serverPort}"; + using var channel = GrpcChannel.ForAddress( + address, + new GrpcChannelOptions + { + ServiceProvider = _serviceProvider, + Credentials = ChannelCredentials.Insecure + }); + var client = new FlightClient(channel); + + var descriptor = FlightDescriptor.CreatePathDescriptor(_jsonFile.FullName); + + var jsonFile = await JsonFile.ParseAsync(_jsonFile).ConfigureAwait(false); + var schema = jsonFile.GetSchemaAndDictionaries(out Func dictionaries); + var batches = jsonFile.Batches.Select(batch => batch.ToArrow(schema, dictionaries)).ToArray(); + + // 1. Put the data to the server. + await UploadBatches(client, descriptor, batches).ConfigureAwait(false); + + // 2. Get the ticket for the data. + var info = await client.GetInfo(descriptor).ConfigureAwait(false); + if (info.Endpoints.Count == 0) + { + throw new Exception("No endpoints received"); + } + + // 3. Stream data from the server, comparing individual batches. + foreach (var endpoint in info.Endpoints) + { + var locations = endpoint.Locations.ToArray(); + if (locations.Length == 0) + { + // Can read with existing client + await ConsumeFlightLocation(client, endpoint.Ticket, batches).ConfigureAwait(false); + } + else + { + foreach (var location in locations) + { + using var readChannel = GrpcChannel.ForAddress( + location.Uri, + new GrpcChannelOptions + { + ServiceProvider = _serviceProvider, + Credentials = ChannelCredentials.Insecure + }); + var readClient = new FlightClient(readChannel); + await ConsumeFlightLocation(readClient, endpoint.Ticket, batches).ConfigureAwait(false); + } + } + } + } + + private static async Task UploadBatches(FlightClient client, FlightDescriptor descriptor, RecordBatch[] batches) + { + using var putCall = client.StartPut(descriptor); + using var writer = putCall.RequestStream; + + try + { + var counter = 0; + foreach (var batch in batches) + { + var metadata = $"{counter}"; + + await writer.WriteAsync(batch, ByteString.CopyFromUtf8(metadata)).ConfigureAwait(false); + + // Verify server has acknowledged the write request + await putCall.ResponseStream.MoveNext().ConfigureAwait(false); + var responseString = putCall.ResponseStream.Current.ApplicationMetadata.ToStringUtf8(); + + if (responseString != metadata) + { + throw new Exception($"Response metadata '{responseString}' does not match expected metadata '{metadata}'"); + } + + counter++; + } + } + finally + { + await writer.CompleteAsync().ConfigureAwait(false); + } + + // Drain the response stream to ensure the server has stored the data + var hasMore = await putCall.ResponseStream.MoveNext().ConfigureAwait(false); + if (hasMore) + { + throw new Exception("Expected to have reached the end of the response stream"); + } + } + + private static async Task ConsumeFlightLocation(FlightClient client, FlightTicket ticket, RecordBatch[] batches) + { + using var readStream = client.GetStream(ticket); + var counter = 0; + foreach (var originalBatch in batches) + { + if (!await readStream.ResponseStream.MoveNext().ConfigureAwait(false)) + { + throw new Exception($"Expected {batches.Length} batches but received {counter}"); + } + + var batch = readStream.ResponseStream.Current; + ArrowReaderVerifier.CompareBatches(originalBatch, batch, strictCompare: false); + + counter++; + } + + if (await readStream.ResponseStream.MoveNext().ConfigureAwait(false)) + { + throw new Exception($"Expected to reach the end of the response stream after {batches.Length} batches"); + } + } +} diff --git a/csharp/test/Apache.Arrow.Flight.IntegrationTest/Program.cs b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Program.cs new file mode 100644 index 0000000000000..24d39de28a731 --- /dev/null +++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Program.cs @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.CommandLine; +using System.IO; +using System.Threading.Tasks; + +namespace Apache.Arrow.Flight.IntegrationTest; + +public static class Program +{ + public static async Task Main(string[] args) + { + var portOption = new Option( + new[] { "--port", "-p" }, + description: "Port the Flight server is listening on"); + var scenarioOption = new Option( + new[] { "--scenario", "-s" }, + "The name of the scenario to run"); + var pathOption = new Option( + new[] { "--path", "-j" }, + "Path to a JSON file of test data"); + + var rootCommand = new RootCommand( + "Integration test application for Apache.Arrow .NET Flight."); + + var clientCommand = new Command("client", "Run the Flight client") + { + portOption, + scenarioOption, + pathOption, + }; + rootCommand.AddCommand(clientCommand); + + clientCommand.SetHandler(async (port, scenario, jsonFile) => + { + var command = new FlightClientCommand(port, scenario, jsonFile); + await command.Execute().ConfigureAwait(false); + }, portOption, scenarioOption, pathOption); + + var serverCommand = new Command("server", "Run the Flight server") + { + scenarioOption, + }; + rootCommand.AddCommand(serverCommand); + + serverCommand.SetHandler(async scenario => + { + var command = new FlightServerCommand(scenario); + await command.Execute().ConfigureAwait(false); + }, scenarioOption); + + return await rootCommand.InvokeAsync(args).ConfigureAwait(false); + } +} diff --git a/csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs b/csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs index 97c1af2f06cb8..d1cfe9e445808 100644 --- a/csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs +++ b/csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs @@ -13,15 +13,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Console; namespace Apache.Arrow.Flight.TestWeb { @@ -35,6 +33,11 @@ public void ConfigureServices(IServiceCollection services) .AddFlightServer(); services.AddSingleton(new FlightStore()); + + // The integration tests rely on the port being written to the first line of stdout, + // so send all logging to stderr. + services.Configure( + o => o.LogToStandardErrorThreshold = LogLevel.Debug); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. diff --git a/csharp/test/Apache.Arrow.Flight.TestWeb/TestFlightServer.cs b/csharp/test/Apache.Arrow.Flight.TestWeb/TestFlightServer.cs index 4a72b73274f1e..46c5460912d8c 100644 --- a/csharp/test/Apache.Arrow.Flight.TestWeb/TestFlightServer.cs +++ b/csharp/test/Apache.Arrow.Flight.TestWeb/TestFlightServer.cs @@ -67,14 +67,16 @@ public override async Task DoPut(FlightServerRecordBatchStreamReader requestStre if(!_flightStore.Flights.TryGetValue(flightDescriptor, out var flightHolder)) { - flightHolder = new FlightHolder(flightDescriptor, await requestStream.Schema, $"http://{context.Host}"); + flightHolder = new FlightHolder(flightDescriptor, await requestStream.Schema, $"grpc+tcp://{context.Host}"); _flightStore.Flights.Add(flightDescriptor, flightHolder); } while (await requestStream.MoveNext()) { - flightHolder.AddBatch(new RecordBatchWithMetadata(requestStream.Current, requestStream.ApplicationMetadata.FirstOrDefault())); - await responseStream.WriteAsync(FlightPutResult.Empty); + var applicationMetadata = requestStream.ApplicationMetadata.FirstOrDefault(); + flightHolder.AddBatch(new RecordBatchWithMetadata(requestStream.Current, applicationMetadata)); + await responseStream.WriteAsync( + applicationMetadata == null ? FlightPutResult.Empty : new FlightPutResult(applicationMetadata)); } } diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py index 9f86d172ddbcf..bc862963405f2 100644 --- a/dev/archery/archery/integration/datagen.py +++ b/dev/archery/archery/integration/datagen.py @@ -25,7 +25,7 @@ import numpy as np from .util import frombytes, tobytes, random_bytes, random_utf8 -from .util import SKIP_C_SCHEMA, SKIP_C_ARRAY +from .util import SKIP_C_SCHEMA, SKIP_C_ARRAY, SKIP_FLIGHT def metadata_key_values(pairs): @@ -1890,7 +1890,10 @@ def _temp_path(): return file_objs = [ - generate_primitive_case([], name='primitive_no_batches'), + generate_primitive_case([], name='primitive_no_batches') + # TODO(https://github.com/apache/arrow/issues/44363) + .skip_format(SKIP_FLIGHT, 'C#'), + generate_primitive_case([17, 20], name='primitive'), generate_primitive_case([0, 0, 0], name='primitive_zerolength'), @@ -1952,16 +1955,22 @@ def _temp_path(): generate_dictionary_case() # TODO(https://github.com/apache/arrow-nanoarrow/issues/622) - .skip_tester('nanoarrow'), + .skip_tester('nanoarrow') + # TODO(https://github.com/apache/arrow/issues/38045) + .skip_format(SKIP_FLIGHT, 'C#'), generate_dictionary_unsigned_case() .skip_tester('nanoarrow') - .skip_tester('Java'), # TODO(ARROW-9377) + .skip_tester('Java') # TODO(ARROW-9377) + # TODO(https://github.com/apache/arrow/issues/38045) + .skip_format(SKIP_FLIGHT, 'C#'), generate_nested_dictionary_case() # TODO(https://github.com/apache/arrow-nanoarrow/issues/622) .skip_tester('nanoarrow') - .skip_tester('Java'), # TODO(ARROW-7779) + .skip_tester('Java') # TODO(ARROW-7779) + # TODO(https://github.com/apache/arrow/issues/38045) + .skip_format(SKIP_FLIGHT, 'C#'), generate_run_end_encoded_case() .skip_tester('C#') @@ -1988,7 +1997,9 @@ def _temp_path(): .skip_tester('nanoarrow') # TODO: ensure the extension is registered in the C++ entrypoint .skip_format(SKIP_C_SCHEMA, 'C++') - .skip_format(SKIP_C_ARRAY, 'C++'), + .skip_format(SKIP_C_ARRAY, 'C++') + # TODO(https://github.com/apache/arrow/issues/38045) + .skip_format(SKIP_FLIGHT, 'C#'), ] generated_paths = [] diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py index e276738846371..378b17d75fdce 100644 --- a/dev/archery/archery/integration/runner.py +++ b/dev/archery/archery/integration/runner.py @@ -631,10 +631,13 @@ def append_tester(implementation, tester): flight_scenarios = [ Scenario( "auth:basic_proto", - description="Authenticate using the BasicAuth protobuf."), + description="Authenticate using the BasicAuth protobuf.", + skip_testers={"C#"}, + ), Scenario( "middleware", description="Ensure headers are propagated via middleware.", + skip_testers={"C#"}, ), Scenario( "ordered", @@ -689,12 +692,12 @@ def append_tester(implementation, tester): Scenario( "flight_sql", description="Ensure Flight SQL protocol is working as expected.", - skip_testers={"Rust"} + skip_testers={"Rust", "C#"} ), Scenario( "flight_sql:extension", description="Ensure Flight SQL extensions work as expected.", - skip_testers={"Rust"} + skip_testers={"Rust", "C#"} ), Scenario( "flight_sql:ingestion", diff --git a/dev/archery/archery/integration/tester_csharp.py b/dev/archery/archery/integration/tester_csharp.py index 02ced0701deaf..50b3499fbf285 100644 --- a/dev/archery/archery/integration/tester_csharp.py +++ b/dev/archery/archery/integration/tester_csharp.py @@ -17,6 +17,7 @@ from contextlib import contextmanager import os +import subprocess from . import cdata from .tester import Tester, CDataExporter, CDataImporter @@ -25,12 +26,20 @@ _ARTIFACTS_PATH = os.path.join(ARROW_ROOT_DEFAULT, "csharp/artifacts") +_BUILD_SUBDIR = "Debug/net8.0" _EXE_PATH = os.path.join(_ARTIFACTS_PATH, "Apache.Arrow.IntegrationTest", - "Debug/net8.0/Apache.Arrow.IntegrationTest", + _BUILD_SUBDIR, + "Apache.Arrow.IntegrationTest", ) +_FLIGHT_EXE_PATH = os.path.join(_ARTIFACTS_PATH, + "Apache.Arrow.Flight.IntegrationTest", + _BUILD_SUBDIR, + "Apache.Arrow.Flight.IntegrationTest", + ) + _clr_loaded = False @@ -44,10 +53,10 @@ def _load_clr(): import clr clr.AddReference( f"{_ARTIFACTS_PATH}/Apache.Arrow.IntegrationTest/" - f"Debug/net8.0/Apache.Arrow.IntegrationTest.dll") + f"{_BUILD_SUBDIR}/Apache.Arrow.IntegrationTest.dll") clr.AddReference( f"{_ARTIFACTS_PATH}/Apache.Arrow.Tests/" - f"Debug/net8.0/Apache.Arrow.Tests.dll") + f"{_BUILD_SUBDIR}/Apache.Arrow.Tests.dll") from Apache.Arrow.IntegrationTest import CDataInterface CDataInterface.Initialize() @@ -146,6 +155,8 @@ def run_gc(self): class CSharpTester(Tester): PRODUCER = True CONSUMER = True + FLIGHT_SERVER = True + FLIGHT_CLIENT = True C_DATA_SCHEMA_EXPORTER = True C_DATA_SCHEMA_IMPORTER = True C_DATA_ARRAY_EXPORTER = True @@ -192,3 +203,43 @@ def make_c_data_exporter(self): def make_c_data_importer(self): return CSharpCDataImporter(self.debug, self.args) + + def flight_request(self, port, json_path=None, scenario_name=None): + cmd = [_FLIGHT_EXE_PATH, 'client', '--port', f'{port}'] + if json_path: + cmd.extend(['--path', json_path]) + elif scenario_name: + cmd.extend(['--scenario', scenario_name]) + else: + raise TypeError("Must provide one of json_path or scenario_name") + + if self.debug: + log(' '.join(cmd)) + run_cmd(cmd) + + @contextmanager + def flight_server(self, scenario_name=None): + cmd = [_FLIGHT_EXE_PATH, 'server'] + if scenario_name: + cmd.extend(['--scenario', scenario_name]) + if self.debug: + log(' '.join(cmd)) + server = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + try: + output = server.stdout.readline().decode() + if not output.startswith("Server listening on "): + server.kill() + out, err = server.communicate() + raise RuntimeError( + '.NET Flight server did not start properly, ' + 'stdout: \n{}\n\nstderr:\n{}\n'.format( + output + out.decode(), err.decode() + ) + ) + port = int(output.split(':')[-1]) + yield port + finally: + server.kill() + server.wait(5) diff --git a/dev/release/rat_exclude_files.txt b/dev/release/rat_exclude_files.txt index e149c179813a0..dda1d36dc1aeb 100644 --- a/dev/release/rat_exclude_files.txt +++ b/dev/release/rat_exclude_files.txt @@ -116,6 +116,7 @@ csharp/src/Apache.Arrow.Flight.AspNetCore/Apache.Arrow.Flight.AspNetCore.csproj csharp/src/Apache.Arrow.Compression/Apache.Arrow.Compression.csproj csharp/src/Apache.Arrow.Flight.Sql/Apache.Arrow.Flight.Sql.csproj csharp/test/Apache.Arrow.Benchmarks/Apache.Arrow.Benchmarks.csproj +csharp/test/Apache.Arrow.Flight.IntegrationTest/Apache.Arrow.Flight.IntegrationTest.csproj csharp/test/Apache.Arrow.Flight.Tests/Apache.Arrow.Flight.Tests.csproj csharp/test/Apache.Arrow.Flight.Sql.Tests/Apache.Arrow.Flight.Sql.Tests.csproj csharp/test/Apache.Arrow.Flight.TestWeb/Apache.Arrow.Flight.TestWeb.csproj