Skip to content

Commit

Permalink
feat(csharp): Implement support for transactions, isolation level and…
Browse files Browse the repository at this point in the history
… read-only flag (#1784)

Implements support for transactions, isolation level and read-only flags
on imported and exported drivers.

Closes #1782
  • Loading branch information
CurtHagenlocher authored Apr 29, 2024
1 parent 7402f9a commit 9afc04c
Show file tree
Hide file tree
Showing 10 changed files with 536 additions and 11 deletions.
77 changes: 77 additions & 0 deletions csharp/src/Apache.Arrow.Adbc/AdbcOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;

namespace Apache.Arrow.Adbc
{
public static class AdbcOptions
{
public const string Enabled = "true";
public const string Disabled = "false";

public const string Autocommit = "adbc.connection.autocommit";
public const string ReadOnly = "adbc.connection.readonly";
public const string IsolationLevel = "adbc.connection.transaction.isolation_level";

public static class IsolationLevels
{
public const string Default = "adbc.connection.transaction.isolation.default";
public const string ReadUncommitted = "adbc.connection.transaction.isolation.read_uncommitted";
public const string ReadCommitted = "adbc.connection.transaction.isolation.read_committed";
public const string RepeatableRead = "adbc.connection.transaction.isolation.repeatable_read";
public const string Snapshot = "adbc.connection.transaction.isolation.snapshot";
public const string Serializable = "adbc.connection.transaction.isolation.serializable";
public const string Linearizable = "adbc.connection.transaction.isolation.linearizable";
}

public static string GetEnabled(bool value) => value ? Enabled : Disabled;
public static bool GetEnabled(string value)
{
if (StringComparer.OrdinalIgnoreCase.Equals(value, Enabled)) { return true; }
if (StringComparer.OrdinalIgnoreCase.Equals(value, Disabled)) { return false; }
throw new NotSupportedException("unknown enabled flag");
}

public static string GetIsolationLevel(IsolationLevel value)
{
return value switch
{
Adbc.IsolationLevel.Default => IsolationLevels.Default,
Adbc.IsolationLevel.ReadUncommitted => IsolationLevels.ReadUncommitted,
Adbc.IsolationLevel.ReadCommitted => IsolationLevels.ReadCommitted,
Adbc.IsolationLevel.RepeatableRead => IsolationLevels.RepeatableRead,
Adbc.IsolationLevel.Snapshot => IsolationLevels.Snapshot,
Adbc.IsolationLevel.Serializable => IsolationLevels.Serializable,
Adbc.IsolationLevel.Linearizable => IsolationLevels.Linearizable,
_ => throw new NotSupportedException("unknown isolation level"),
};
}

public static IsolationLevel GetIsolationLevel(string value)
{
if (StringComparer.OrdinalIgnoreCase.Equals(value, IsolationLevels.Default)) { return Adbc.IsolationLevel.Default; }
if (StringComparer.OrdinalIgnoreCase.Equals(value, IsolationLevels.ReadUncommitted)) { return Adbc.IsolationLevel.ReadUncommitted; }
if (StringComparer.OrdinalIgnoreCase.Equals(value, IsolationLevels.ReadCommitted)) { return Adbc.IsolationLevel.ReadCommitted; }
if (StringComparer.OrdinalIgnoreCase.Equals(value, IsolationLevels.RepeatableRead)) { return Adbc.IsolationLevel.RepeatableRead; }
if (StringComparer.OrdinalIgnoreCase.Equals(value, IsolationLevels.Snapshot)) { return Adbc.IsolationLevel.Snapshot; }
if (StringComparer.OrdinalIgnoreCase.Equals(value, IsolationLevels.Serializable)) { return Adbc.IsolationLevel.Serializable; }
if (StringComparer.OrdinalIgnoreCase.Equals(value, IsolationLevels.Linearizable)) { return Adbc.IsolationLevel.Linearizable; }
throw new NotSupportedException("unknown isolation level");
}
}
}
28 changes: 27 additions & 1 deletion csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,33 @@ public ConnectionStub(AdbcDriver driver)

public unsafe void SetOption(byte* name, byte* value)
{
options[MarshalExtensions.PtrToStringUTF8(name)] = MarshalExtensions.PtrToStringUTF8(value);
string stringName = MarshalExtensions.PtrToStringUTF8(name);
string stringValue = MarshalExtensions.PtrToStringUTF8(value);

if (connection == null)
{
options[stringName] = stringValue;
}
else
{
// TODO: how best to normalize this?
if (StringComparer.OrdinalIgnoreCase.Equals(stringName, AdbcOptions.Autocommit))
{
connection.AutoCommit = AdbcOptions.GetEnabled(stringValue);
}
else if (StringComparer.OrdinalIgnoreCase.Equals(stringName, AdbcOptions.IsolationLevel))
{
connection.IsolationLevel = AdbcOptions.GetIsolationLevel(stringValue);
}
else if (StringComparer.OrdinalIgnoreCase.Equals(stringName, AdbcOptions.ReadOnly))
{
connection.ReadOnly = AdbcOptions.GetEnabled(stringValue);
}
else
{
connection.SetOption(stringName, stringValue);
}
}
}

public void Rollback() { this.connection.Rollback(); }
Expand Down
57 changes: 57 additions & 0 deletions csharp/src/Apache.Arrow.Adbc/C/CAdbcDriverImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,46 @@ internal sealed class AdbcConnectionNative : AdbcConnection
{
private CAdbcDriver _nativeDriver;
private CAdbcConnection _nativeConnection;
private bool? _autoCommit;
private IsolationLevel? _isolationLevel;
private bool? _readOnly;

public AdbcConnectionNative(CAdbcDriver nativeDriver, CAdbcConnection nativeConnection)
{
_nativeDriver = nativeDriver;
_nativeConnection = nativeConnection;
}

public override bool AutoCommit
{
get => _autoCommit.Value;
set
{
SetOption(AdbcOptions.Autocommit, AdbcOptions.GetEnabled(value));
_autoCommit = value;
}
}

public override IsolationLevel IsolationLevel
{
get => _isolationLevel.Value;
set
{
SetOption(AdbcOptions.IsolationLevel, AdbcOptions.GetIsolationLevel(value));
_isolationLevel = value;
}
}

public override bool ReadOnly
{
get => _readOnly.Value;
set
{
SetOption(AdbcOptions.ReadOnly, AdbcOptions.GetEnabled(value));
_readOnly = value;
}
}

public unsafe override AdbcStatement CreateStatement()
{
CAdbcStatement nativeStatement = new CAdbcStatement();
Expand Down Expand Up @@ -330,6 +363,30 @@ public unsafe override Schema GetTableSchema(string catalog, string db_schema, s
}
}
}

public unsafe override void Commit()
{
using (CallHelper caller = new CallHelper())
{
caller.Call(_nativeDriver.ConnectionCommit, ref _nativeConnection);
}
}

public unsafe override void Rollback()
{
using (CallHelper caller = new CallHelper())
{
caller.Call(_nativeDriver.ConnectionRollback, ref _nativeConnection);
}
}

public unsafe override void SetOption(string key, string value)
{
using (CallHelper caller = new CallHelper())
{
caller.Call(_nativeDriver.ConnectionSetOption, ref _nativeConnection, key, value);
}
}
}

/// <summary>
Expand Down
5 changes: 3 additions & 2 deletions csharp/src/Client/AdbcCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,13 @@ protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior)
/// <returns><see cref="AdbcDataReader"/></returns>
public new AdbcDataReader ExecuteReader(CommandBehavior behavior)
{
switch (behavior)
bool closeConnection = (behavior & CommandBehavior.CloseConnection) != 0;
switch (behavior & ~CommandBehavior.CloseConnection)
{
case CommandBehavior.SchemaOnly: // The schema is not known until a read happens
case CommandBehavior.Default:
QueryResult result = this.ExecuteQuery();
return new AdbcDataReader(this, result, this.DecimalBehavior);
return new AdbcDataReader(this, result, this.DecimalBehavior, closeConnection);

default:
throw new InvalidOperationException($"{behavior} is not supported with this provider");
Expand Down
90 changes: 86 additions & 4 deletions csharp/src/Client/AdbcConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public sealed class AdbcConnection : DbConnection
private readonly Dictionary<string, string> adbcConnectionOptions;

private AdbcStatement adbcStatement;
private AdbcTransaction currentTransaction;

/// <summary>
/// Overloaded. Intializes an <see cref="AdbcConnection"/>.
Expand Down Expand Up @@ -92,6 +93,14 @@ public AdbcConnection(AdbcDriver adbcDriver, Dictionary<string, string> paramete
this.adbcConnectionOptions = options;
}

// For testing
internal AdbcConnection(AdbcDriver driver, AdbcDatabase database, Adbc.AdbcConnection connection)
{
this.AdbcDriver = driver;
this.adbcDatabase = database;
this.adbcConnectionInternal = connection;
}

/// <summary>
/// Creates a new <see cref="AdbcCommand"/>.
/// </summary>
Expand Down Expand Up @@ -258,6 +267,65 @@ public override DataTable GetSchema(string collectionName, string[] restrictionV
return collection.GetSchema(this.Connection, restrictionValues);
}

protected override DbTransaction BeginDbTransaction(System.Data.IsolationLevel isolationLevel)
{
if (this.currentTransaction != null) throw new InvalidOperationException("connection is already enlisted in a transaction");

this.Connection.AutoCommit = false;

if (isolationLevel != System.Data.IsolationLevel.Unspecified)
{
this.Connection.IsolationLevel = GetIsolationLevel(isolationLevel);
}

this.currentTransaction = new AdbcTransaction(this, isolationLevel);
return this.currentTransaction;
}

private static Adbc.IsolationLevel GetIsolationLevel(System.Data.IsolationLevel isolationLevel)
{
return isolationLevel switch
{
System.Data.IsolationLevel.Unspecified => Adbc.IsolationLevel.Default,
System.Data.IsolationLevel.ReadUncommitted => Adbc.IsolationLevel.ReadUncommitted,
System.Data.IsolationLevel.ReadCommitted => Adbc.IsolationLevel.ReadCommitted,
System.Data.IsolationLevel.RepeatableRead => Adbc.IsolationLevel.RepeatableRead,
System.Data.IsolationLevel.Snapshot => Adbc.IsolationLevel.Snapshot,
System.Data.IsolationLevel.Serializable => Adbc.IsolationLevel.Serializable,
_ => throw new NotSupportedException("unknown isolation level"),
};
}

private void Commit()
{
if (this.currentTransaction == null) throw new InvalidOperationException("connection is not enlisted in a transaction");
System.Data.IsolationLevel isolationLevel = this.currentTransaction.IsolationLevel;

this.Connection.Commit();

this.currentTransaction = null;
this.Connection.AutoCommit = true;
if (isolationLevel != System.Data.IsolationLevel.Unspecified)
{
this.adbcConnectionInternal.IsolationLevel = IsolationLevel.Default;
}
}

private void Rollback()
{
if (this.currentTransaction == null) throw new InvalidOperationException("connection is not enlisted in a transaction");
System.Data.IsolationLevel isolationLevel = this.currentTransaction.IsolationLevel;

this.Connection.Rollback();

this.currentTransaction = null;
this.Connection.AutoCommit = true;
if (isolationLevel != System.Data.IsolationLevel.Unspecified)
{
this.adbcConnectionInternal.IsolationLevel = IsolationLevel.Default;
}
}

#region NOT_IMPLEMENTED

public override string Database => throw new NotImplementedException();
Expand All @@ -271,12 +339,26 @@ public override void ChangeDatabase(string databaseName)
throw new NotImplementedException();
}

protected override DbTransaction BeginDbTransaction(System.Data.IsolationLevel isolationLevel)
#endregion

sealed class AdbcTransaction : DbTransaction
{
throw new NotImplementedException();
}
readonly AdbcConnection connection;
readonly System.Data.IsolationLevel isolationLevel;

#endregion
public AdbcTransaction(AdbcConnection connection, System.Data.IsolationLevel isolationLevel)
{
this.connection = connection;
this.isolationLevel = isolationLevel;
}

public override System.Data.IsolationLevel IsolationLevel => this.isolationLevel;

protected override DbConnection DbConnection => this.connection;

public override void Commit() => this.connection.Commit();
public override void Rollback() => this.connection.Rollback();
}

abstract class SchemaCollection
{
Expand Down
13 changes: 9 additions & 4 deletions csharp/src/Client/AdbcDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ namespace Apache.Arrow.Adbc.Client
public sealed class AdbcDataReader : DbDataReader, IDbColumnSchemaGenerator
{
private readonly AdbcCommand adbcCommand;
private readonly bool closeConnection;
private QueryResult adbcQueryResult;
private RecordBatch recordBatch;
private int currentRowInRecordBatch;
private Schema schema = null;
private bool isClosed;
private int recordsEffected = -1;
private int recordsAffected = -1;

internal AdbcDataReader(AdbcCommand adbcCommand, QueryResult adbcQueryResult, DecimalBehavior decimalBehavior)
internal AdbcDataReader(AdbcCommand adbcCommand, QueryResult adbcQueryResult, DecimalBehavior decimalBehavior, bool closeConnection)
{
if (adbcCommand == null)
throw new ArgumentNullException(nameof(adbcCommand));
Expand All @@ -58,6 +59,7 @@ internal AdbcDataReader(AdbcCommand adbcCommand, QueryResult adbcQueryResult, De
if (this.schema == null)
throw new ArgumentException("A Schema must be set for the AdbcQueryResult.Stream property");

this.closeConnection = closeConnection;
this.isClosed = false;
this.DecimalBehavior = decimalBehavior;
}
Expand All @@ -81,7 +83,7 @@ internal AdbcDataReader(AdbcCommand adbcCommand, QueryResult adbcQueryResult, De

public DecimalBehavior DecimalBehavior { get; set; }

public override int RecordsAffected => this.recordsEffected;
public override int RecordsAffected => this.recordsAffected;

/// <summary>
/// The total number of record batches in the result.
Expand All @@ -90,7 +92,10 @@ internal AdbcDataReader(AdbcCommand adbcCommand, QueryResult adbcQueryResult, De

public override void Close()
{
this.adbcCommand?.Connection?.Close();
if (this.closeConnection)
{
this.adbcCommand?.Connection?.Close();
}
this.adbcQueryResult = null;
this.isClosed = true;
}
Expand Down
20 changes: 20 additions & 0 deletions csharp/src/Client/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("Apache.Arrow.Adbc.Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100e504183f6d470d6b67b6d19212be3e1f598f70c246a120194bc38130101d0c1853e4a0f2232cb12e37a7a90e707aabd38511dac4f25fcb0d691b2aa265900bf42de7f70468fc997551a40e1e0679b605aa2088a4a69e07c117e988f5b1738c570ee66997fba02485e7856a49eca5fd0706d09899b8312577cbb9034599fc92d4")]
Loading

0 comments on commit 9afc04c

Please sign in to comment.