Skip to content

Commit

Permalink
GH-36812: [C#] Fix C API support to work with .NET desktop framework (#…
Browse files Browse the repository at this point in the history
…36813)

### What changes are included in this PR?

The C API support in the C# library has been modified to work correctly on .NET 4.7.2.
The tests have been modified to work correctly on .NET 4.7.2, though that platform is disabled by default as the Python interop seem to cause a hang when unloading the xUnit AppDomain.

**This PR contains a "Critical Fix".**
* Closes: #36812

Authored-by: Curt Hagenlocher <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
  • Loading branch information
CurtHagenlocher authored Jul 21, 2023
1 parent f43bfd6 commit b557e85
Show file tree
Hide file tree
Showing 18 changed files with 200 additions and 88 deletions.
14 changes: 9 additions & 5 deletions csharp/src/Apache.Arrow/C/CArrowArray.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ public unsafe struct CArrowArray
public byte** buffers;
public CArrowArray** children;
public CArrowArray* dictionary;
internal delegate* unmanaged
#if !NET5_0_OR_GREATER
[Cdecl]
#if NET5_0_OR_GREATER
internal delegate* unmanaged<CArrowArray*, void> release;
#else
internal IntPtr release;
#endif
<CArrowArray*, void> release;
public void* private_data;

/// <summary>
Expand All @@ -68,10 +68,14 @@ internal delegate* unmanaged
/// </remarks>
public static void Free(CArrowArray* array)
{
if (array->release != null)
if (array->release != default)
{
// Call release if not already called.
#if NET5_0_OR_GREATER
array->release(array);
#else
Marshal.GetDelegateForFunctionPointer<CArrowArrayExporter.ReleaseArrowArray>(array->release)(array);
#endif
}
Marshal.FreeHGlobal((IntPtr)array);
}
Expand Down
8 changes: 4 additions & 4 deletions csharp/src/Apache.Arrow/C/CArrowArrayExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ public static class CArrowArrayExporter
#if NET5_0_OR_GREATER
private static unsafe delegate* unmanaged<CArrowArray*, void> ReleaseArrayPtr => &ReleaseArray;
#else
private unsafe delegate void ReleaseArrowArray(CArrowArray* cArray);
internal unsafe delegate void ReleaseArrowArray(CArrowArray* cArray);
private static unsafe readonly NativeDelegate<ReleaseArrowArray> s_releaseArray = new NativeDelegate<ReleaseArrowArray>(ReleaseArray);
private static unsafe delegate* unmanaged[Cdecl]<CArrowArray*, void> ReleaseArrayPtr => (delegate* unmanaged[Cdecl]<CArrowArray*, void>)s_releaseArray.Pointer;
private static IntPtr ReleaseArrayPtr => s_releaseArray.Pointer;
#endif
/// <summary>
/// Export an <see cref="IArrowArray"/> to a <see cref="CArrowArray"/>. Whether or not the
Expand Down Expand Up @@ -93,7 +93,7 @@ public static unsafe void ExportRecordBatch(RecordBatch batch, CArrowArray* cArr
{
throw new ArgumentNullException(nameof(cArray));
}
if (cArray->release != null)
if (cArray->release != default)
{
throw new ArgumentException("Cannot export array to a struct that is already initialized.", nameof(cArray));
}
Expand Down Expand Up @@ -191,7 +191,7 @@ private unsafe static void ConvertRecordBatch(ExportedAllocationOwner sharedOwne
private unsafe static void ReleaseArray(CArrowArray* cArray)
{
Dispose(&cArray->private_data);
cArray->release = null;
cArray->release = default;
}

private unsafe static void* FromDisposable(IDisposable disposable)
Expand Down
11 changes: 8 additions & 3 deletions csharp/src/Apache.Arrow/C/CArrowArrayImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using Apache.Arrow.Memory;
using Apache.Arrow.Types;

Expand Down Expand Up @@ -104,21 +105,25 @@ public ImportedArrowArray(CArrowArray* cArray)
{
throw new ArgumentNullException(nameof(cArray));
}
if (cArray->release == null)
if (cArray->release == default)
{
throw new ArgumentException("Tried to import an array that has already been released.", nameof(cArray));
}
_cArray = *cArray;
cArray->release = null;
cArray->release = default;
}

protected override void FinalRelease()
{
if (_cArray.release != null)
if (_cArray.release != default)
{
fixed (CArrowArray* cArray = &_cArray)
{
#if NET5_0_OR_GREATER
cArray->release(cArray);
#else
Marshal.GetDelegateForFunctionPointer<CArrowArrayExporter.ReleaseArrowArray>(cArray->release)(cArray);
#endif
}
}
}
Expand Down
39 changes: 22 additions & 17 deletions csharp/src/Apache.Arrow/C/CArrowArrayStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,23 @@ public unsafe struct CArrowArrayStream
///
/// Return value: 0 if successful, an `errno`-compatible error code otherwise.
///</summary>
internal delegate* unmanaged
#if !NET5_0_OR_GREATER
[Cdecl]
#if NET5_0_OR_GREATER
internal delegate* unmanaged <CArrowArrayStream*, CArrowSchema*, int> get_schema;
#else
internal IntPtr get_schema;
#endif
<CArrowArrayStream*, CArrowSchema*, int> get_schema;

/// <summary>
/// Callback to get the next array. If no error and the array is released, the stream has ended.
/// If successful, the ArrowArray must be released independently from the stream.
///
/// Return value: 0 if successful, an `errno`-compatible error code otherwise.
/// </summary>
internal delegate* unmanaged
#if !NET5_0_OR_GREATER
[Cdecl]
#if NET5_0_OR_GREATER
internal delegate* unmanaged<CArrowArrayStream*, CArrowArray*, int> get_next;
#else
internal IntPtr get_next;
#endif
<CArrowArrayStream*, CArrowArray*, int> get_next;

/// <summary>
/// Callback to get optional detailed error information. This must only
Expand All @@ -62,21 +62,21 @@ internal delegate* unmanaged
/// Return value: pointer to a null-terminated character array describing the last
/// error, or NULL if no description is available.
///</summary>
internal delegate* unmanaged
#if !NET5_0_OR_GREATER
[Cdecl]
#if NET5_0_OR_GREATER
internal delegate* unmanaged<CArrowArrayStream*, byte*> get_last_error;
#else
internal IntPtr get_last_error;
#endif
<CArrowArrayStream*, byte*> get_last_error;

/// <summary>
/// Release callback: release the stream's own resources. Note that arrays returned by
/// get_next must be individually released.
/// </summary>
internal delegate* unmanaged
#if !NET5_0_OR_GREATER
[Cdecl]
#if NET5_0_OR_GREATER
internal delegate* unmanaged <CArrowArrayStream*, void> release;
#else
internal IntPtr release;
#endif
<CArrowArrayStream*, void> release;

public void* private_data;

Expand All @@ -103,10 +103,15 @@ internal delegate* unmanaged
/// </remarks>
public static void Free(CArrowArrayStream* arrayStream)
{
if (arrayStream->release != null)
if (arrayStream->release != default)
{
// Call release if not already called.
#if NET5_0_OR_GREATER

arrayStream->release(arrayStream);
#else
Marshal.GetDelegateForFunctionPointer<CArrowArrayStreamExporter.ReleaseArrayStream>(arrayStream->release)(arrayStream);
#endif
}
Marshal.FreeHGlobal((IntPtr)arrayStream);
}
Expand Down
24 changes: 10 additions & 14 deletions csharp/src/Apache.Arrow/C/CArrowArrayStreamExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,18 @@ public static class CArrowArrayStreamExporter
private static unsafe delegate* unmanaged<CArrowArrayStream*, byte*> GetLastErrorPtr => &GetLastError;
private static unsafe delegate* unmanaged<CArrowArrayStream*, void> ReleasePtr => &Release;
#else
private unsafe delegate int GetSchemaArrayStream(CArrowArrayStream* cArrayStream, CArrowSchema* cSchema);
internal unsafe delegate int GetSchemaArrayStream(CArrowArrayStream* cArrayStream, CArrowSchema* cSchema);
private static unsafe NativeDelegate<GetSchemaArrayStream> s_getSchemaArrayStream = new NativeDelegate<GetSchemaArrayStream>(GetSchema);
private static unsafe delegate* unmanaged[Cdecl]<CArrowArrayStream*, CArrowSchema*, int> GetSchemaPtr =>
(delegate* unmanaged[Cdecl]<CArrowArrayStream*, CArrowSchema*, int>)s_getSchemaArrayStream.Pointer;
private unsafe delegate int GetNextArrayStream(CArrowArrayStream* cArrayStream, CArrowArray* cArray);
private static unsafe IntPtr GetSchemaPtr => s_getSchemaArrayStream.Pointer;
internal unsafe delegate int GetNextArrayStream(CArrowArrayStream* cArrayStream, CArrowArray* cArray);
private static unsafe NativeDelegate<GetNextArrayStream> s_getNextArrayStream = new NativeDelegate<GetNextArrayStream>(GetNext);
private static unsafe delegate* unmanaged[Cdecl]<CArrowArrayStream*, CArrowArray*, int> GetNextPtr =>
(delegate* unmanaged[Cdecl]<CArrowArrayStream*, CArrowArray*, int>)s_getNextArrayStream.Pointer;
private unsafe delegate byte* GetLastErrorArrayStream(CArrowArrayStream* cArrayStream);
private static unsafe IntPtr GetNextPtr => s_getNextArrayStream.Pointer;
internal unsafe delegate byte* GetLastErrorArrayStream(CArrowArrayStream* cArrayStream);
private static unsafe NativeDelegate<GetLastErrorArrayStream> s_getLastErrorArrayStream = new NativeDelegate<GetLastErrorArrayStream>(GetLastError);
private static unsafe delegate* unmanaged[Cdecl]<CArrowArrayStream*, byte*> GetLastErrorPtr =>
(delegate* unmanaged[Cdecl]<CArrowArrayStream*, byte*>)s_getLastErrorArrayStream.Pointer;
private unsafe delegate void ReleaseArrayStream(CArrowArrayStream* cArrayStream);
private static unsafe IntPtr GetLastErrorPtr => s_getLastErrorArrayStream.Pointer;
internal unsafe delegate void ReleaseArrayStream(CArrowArrayStream* cArrayStream);
private static unsafe NativeDelegate<ReleaseArrayStream> s_releaseArrayStream = new NativeDelegate<ReleaseArrayStream>(Release);
private static unsafe delegate* unmanaged[Cdecl]<CArrowArrayStream*, void> ReleasePtr =>
(delegate* unmanaged[Cdecl]<CArrowArrayStream*, void>)s_releaseArrayStream.Pointer;
private static unsafe IntPtr ReleasePtr => s_releaseArrayStream.Pointer;
#endif

/// <summary>
Expand Down Expand Up @@ -103,7 +99,7 @@ private unsafe static int GetNext(CArrowArrayStream* cArrayStream, CArrowArray*
ExportedArrayStream arrayStream = null;
try
{
cArray->release = null;
cArray->release = default;
arrayStream = ExportedArrayStream.FromPointer(cArrayStream->private_data);
RecordBatch recordBatch = arrayStream.ArrowArrayStream.ReadNextRecordBatchAsync().Result;
if (recordBatch != null)
Expand Down Expand Up @@ -140,7 +136,7 @@ private unsafe static int GetNext(CArrowArrayStream* cArrayStream, CArrowArray*
private unsafe static void Release(CArrowArrayStream* cArrayStream)
{
ExportedArrayStream.Free(&cArrayStream->private_data);
cArrayStream->release = null;
cArrayStream->release = default;
}

sealed unsafe class ExportedArrayStream : IDisposable
Expand Down
25 changes: 21 additions & 4 deletions csharp/src/Apache.Arrow/C/CArrowArrayStreamImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

using System;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Ipc;
Expand Down Expand Up @@ -57,7 +58,11 @@ private sealed unsafe class ImportedArrowArrayStream : IArrowArrayStream

internal static string GetLastError(CArrowArrayStream* arrayStream, int errno)
{
#if NET5_0_OR_GREATER
byte* error = arrayStream->get_last_error(arrayStream);
#else
byte* error = Marshal.GetDelegateForFunctionPointer<CArrowArrayStreamExporter.GetLastErrorArrayStream>(arrayStream->get_last_error)(arrayStream);
#endif
if (error == null)
{
return $"Array stream operation failed with no message. Error code: {errno}";
Expand All @@ -71,21 +76,25 @@ public ImportedArrowArrayStream(CArrowArrayStream* cArrayStream)
{
throw new ArgumentNullException(nameof(cArrayStream));
}
if (cArrayStream->release == null)
if (cArrayStream->release == default)
{
throw new ArgumentException("Tried to import an array stream that has already been released.", nameof(cArrayStream));
}

CArrowSchema cSchema = new CArrowSchema();
#if NET5_0_OR_GREATER
int errno = cArrayStream->get_schema(cArrayStream, &cSchema);
#else
int errno = Marshal.GetDelegateForFunctionPointer<CArrowArrayStreamExporter.GetSchemaArrayStream>(cArrayStream->get_schema)(cArrayStream, &cSchema);
#endif
if (errno != 0)
{
throw new Exception(GetLastError(cArrayStream, errno));
}
_schema = CArrowSchemaImporter.ImportSchema(&cSchema);

_cArrayStream = *cArrayStream;
cArrayStream->release = null;
cArrayStream->release = default;
}

~ImportedArrowArrayStream()
Expand All @@ -111,12 +120,16 @@ public ValueTask<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancell
CArrowArray cArray = new CArrowArray();
fixed (CArrowArrayStream* cArrayStream = &_cArrayStream)
{
#if NET5_0_OR_GREATER
int errno = cArrayStream->get_next(cArrayStream, &cArray);
#else
int errno = Marshal.GetDelegateForFunctionPointer<CArrowArrayStreamExporter.GetNextArrayStream>(cArrayStream->get_next)(cArrayStream, &cArray);
#endif
if (errno != 0)
{
return new(Task.FromException<RecordBatch>(new Exception(GetLastError(cArrayStream, errno))));
}
if (cArray.release != null)
if (cArray.release != default)
{
result = CArrowArrayImporter.ImportRecordBatch(&cArray, _schema);
}
Expand All @@ -127,12 +140,16 @@ public ValueTask<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancell

public void Dispose()
{
if (!_disposed && _cArrayStream.release != null)
if (!_disposed && _cArrayStream.release != default)
{
_disposed = true;
fixed (CArrowArrayStream* cArrayStream = &_cArrayStream)
{
#if NET5_0_OR_GREATER
cArrayStream->release(cArrayStream);
#else
Marshal.GetDelegateForFunctionPointer<CArrowArrayStreamExporter.ReleaseArrayStream>(cArrayStream->release)(cArrayStream);
#endif
}
}
GC.SuppressFinalize(this);
Expand Down
14 changes: 9 additions & 5 deletions csharp/src/Apache.Arrow/C/CArrowSchema.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ public unsafe struct CArrowSchema
public long n_children;
public CArrowSchema** children;
public CArrowSchema* dictionary;
internal delegate* unmanaged
#if !NET5_0_OR_GREATER
[Cdecl]
#if NET5_0_OR_GREATER
internal delegate* unmanaged<CArrowSchema*, void> release;
#else
internal IntPtr release;
#endif
<CArrowSchema*, void> release;
public void* private_data;

/// <summary>
Expand All @@ -69,10 +69,14 @@ internal delegate* unmanaged
/// </remarks>
public static void Free(CArrowSchema* schema)
{
if (schema->release != null)
if (schema->release != default)
{
// Call release if not already called.
#if NET5_0_OR_GREATER
schema->release(schema);
#else
Marshal.GetDelegateForFunctionPointer<CArrowSchemaExporter.ReleaseArrowSchema>(schema->release)(schema);
#endif
}
Marshal.FreeHGlobal((IntPtr)schema);
}
Expand Down
8 changes: 4 additions & 4 deletions csharp/src/Apache.Arrow/C/CArrowSchemaExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ public static class CArrowSchemaExporter
#if NET5_0_OR_GREATER
private static unsafe delegate* unmanaged<CArrowSchema*, void> ReleaseSchemaPtr => &ReleaseCArrowSchema;
#else
private unsafe delegate void ReleaseArrowSchema(CArrowSchema* cArray);
internal unsafe delegate void ReleaseArrowSchema(CArrowSchema* cArray);
private static unsafe readonly NativeDelegate<ReleaseArrowSchema> s_releaseSchema = new NativeDelegate<ReleaseArrowSchema>(ReleaseCArrowSchema);
private static unsafe delegate* unmanaged[Cdecl]<CArrowSchema*, void> ReleaseSchemaPtr => (delegate* unmanaged[Cdecl]<CArrowSchema*, void>)s_releaseSchema.Pointer;
private static IntPtr ReleaseSchemaPtr => s_releaseSchema.Pointer;
#endif

/// <summary>
Expand Down Expand Up @@ -297,7 +297,7 @@ private unsafe static void WriteMetadataString(ref byte* ptr, int length, string
private static unsafe void ReleaseCArrowSchema(CArrowSchema* schema)
{
if (schema == null) return;
if (schema->release == null) return;
if (schema->release == default) return;

Marshal.FreeHGlobal((IntPtr)schema->format);
Marshal.FreeHGlobal((IntPtr)schema->name);
Expand All @@ -324,7 +324,7 @@ private static unsafe void ReleaseCArrowSchema(CArrowSchema* schema)
schema->n_children = 0;
schema->dictionary = null;
schema->children = null;
schema->release = null;
schema->release = default;
}
}
}
8 changes: 6 additions & 2 deletions csharp/src/Apache.Arrow/C/CArrowSchemaImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public ImportedArrowSchema(CArrowSchema* cSchema)
throw new ArgumentException("Passed null pointer for cSchema.");
}
_cSchema = cSchema;
if (_cSchema->release == null)
if (_cSchema->release == default)
{
throw new ArgumentException("Tried to import a schema that has already been released.");
}
Expand All @@ -128,9 +128,13 @@ public ImportedArrowSchema(CArrowSchema* handle, bool isRoot) : this(handle)
public void Dispose()
{
// We only call release on a root-level schema, not child ones.
if (_isRoot && _cSchema->release != null)
if (_isRoot && _cSchema->release != default)
{
#if NET5_0_OR_GREATER
_cSchema->release(_cSchema);
#else
Marshal.GetDelegateForFunctionPointer<CArrowSchemaExporter.ReleaseArrowSchema>(_cSchema->release)(_cSchema);
#endif
}
}

Expand Down
Loading

0 comments on commit b557e85

Please sign in to comment.