From f3348aabe02ca82b9e300ea83262c638c62ca4c5 Mon Sep 17 00:00:00 2001 From: Thiago Oliveira Santos Date: Sun, 20 Oct 2024 21:52:36 -0300 Subject: [PATCH] feat: Creating async local controlled Channel --- .../ISqlProxyClient.cs | 1 + .../ISqlProxyClientTunnel.cs | 3 +- .../Impl/GrpcSqlProxyClient.cs | 6 +- .../Impl/SqlProxyBatchQuery.cs | 1 + .../Impl/SqlProxyClientTunnel.ContextInfo.cs | 35 ++++ .../Impl/SqlProxyClientTunnel.cs | 155 ++++++++++---- .../Impl/SqlProxyClientTunnelExtension.cs | 40 ++++ .../GrpcSqlProxyClientAsyncLocalTest.cs | 191 ++++++++++++++++++ 8 files changed, 385 insertions(+), 47 deletions(-) create mode 100644 src/Codibre.GrpcSqlProxy.Client/Impl/SqlProxyClientTunnel.ContextInfo.cs create mode 100644 src/Codibre.GrpcSqlProxy.Client/Impl/SqlProxyClientTunnelExtension.cs create mode 100644 test/Codibre.GrpcSqlProxy.Test/GrpcSqlProxyClientAsyncLocalTest.cs diff --git a/src/Codibre.GrpcSqlProxy.Client/ISqlProxyClient.cs b/src/Codibre.GrpcSqlProxy.Client/ISqlProxyClient.cs index ca77ef2..3b4c080 100644 --- a/src/Codibre.GrpcSqlProxy.Client/ISqlProxyClient.cs +++ b/src/Codibre.GrpcSqlProxy.Client/ISqlProxyClient.cs @@ -2,5 +2,6 @@ public interface ISqlProxyClient { + ISqlProxyClientTunnel Channel { get; } ISqlProxyClientTunnel CreateChannel(); } \ No newline at end of file diff --git a/src/Codibre.GrpcSqlProxy.Client/ISqlProxyClientTunnel.cs b/src/Codibre.GrpcSqlProxy.Client/ISqlProxyClientTunnel.cs index 9c65abd..ecf5947 100644 --- a/src/Codibre.GrpcSqlProxy.Client/ISqlProxyClientTunnel.cs +++ b/src/Codibre.GrpcSqlProxy.Client/ISqlProxyClientTunnel.cs @@ -5,13 +5,12 @@ namespace Codibre.GrpcSqlProxy.Client; public interface ISqlProxyClientTunnel : IDisposable { ISqlProxyBatchQuery Batch { get; } + void Start(); ValueTask BeginTransaction(); ValueTask Commit(); ValueTask Rollback(); ValueTask Execute(string sql, SqlProxyQueryOptions? options = null); IAsyncEnumerable Query(string sql, SqlProxyQueryOptions? options = null) where T : class, new(); - ValueTask QueryFirstOrDefault(string sql, SqlProxyQueryOptions? options = null) where T : class, new(); - ValueTask QueryFirst(string sql, SqlProxyQueryOptions? options = null) where T : class, new(); Reader QueryMultipleAsync(string sql, string[] schemas, SqlProxyQueryOptions? options); void OnError(ErrorHandlerEvent handler); } \ No newline at end of file diff --git a/src/Codibre.GrpcSqlProxy.Client/Impl/GrpcSqlProxyClient.cs b/src/Codibre.GrpcSqlProxy.Client/Impl/GrpcSqlProxyClient.cs index 85a8e33..62b9072 100644 --- a/src/Codibre.GrpcSqlProxy.Client/Impl/GrpcSqlProxyClient.cs +++ b/src/Codibre.GrpcSqlProxy.Client/Impl/GrpcSqlProxyClient.cs @@ -5,6 +5,10 @@ namespace Codibre.GrpcSqlProxy.Client.Impl; public class GrpcSqlProxyClient(SqlProxyClientOptions options) : ISqlProxyClient { + private readonly AsyncLocal _asyncLocal = new(); private readonly SqlProxy.SqlProxyClient _client = new(GrpcChannel.ForAddress(options.Url)); - public ISqlProxyClientTunnel CreateChannel() => new SqlProxyClientTunnel(_client.Run(), options); + + public ISqlProxyClientTunnel Channel => _asyncLocal.Value ??= CreateChannel(); + + public ISqlProxyClientTunnel CreateChannel() => new SqlProxyClientTunnel(() => _client.Run(), options); } \ No newline at end of file diff --git a/src/Codibre.GrpcSqlProxy.Client/Impl/SqlProxyBatchQuery.cs b/src/Codibre.GrpcSqlProxy.Client/Impl/SqlProxyBatchQuery.cs index 73e5425..52aad87 100644 --- a/src/Codibre.GrpcSqlProxy.Client/Impl/SqlProxyBatchQuery.cs +++ b/src/Codibre.GrpcSqlProxy.Client/Impl/SqlProxyBatchQuery.cs @@ -195,6 +195,7 @@ public async Task RunInTransaction(Func> if (_transaction is not null) throw new InvalidOperationException("RunInTransaction Already called"); if (_builder.QueryCount > 0) throw new InvalidOperationException("Query buffer not empty"); _transaction = new(options); + _tunnel.Start(); try { var result = await query(this); diff --git a/src/Codibre.GrpcSqlProxy.Client/Impl/SqlProxyClientTunnel.ContextInfo.cs b/src/Codibre.GrpcSqlProxy.Client/Impl/SqlProxyClientTunnel.ContextInfo.cs new file mode 100644 index 0000000..d0ecab5 --- /dev/null +++ b/src/Codibre.GrpcSqlProxy.Client/Impl/SqlProxyClientTunnel.ContextInfo.cs @@ -0,0 +1,35 @@ +using Codibre.GrpcSqlProxy.Api; +using Grpc.Core; + +namespace Codibre.GrpcSqlProxy.Client.Impl; + +public sealed partial class SqlProxyClientTunnel +{ + internal class ContextInfo : IDisposable + { + private readonly Action _clear; + private readonly ExecutionContext? _executionContext; + public bool Disposed { get; private set; } = false; + public bool Transaction { get; set; } = false; + public AsyncDuplexStreamingCall Stream { get; } + public SqlProxyClientResponseMonitor Monitor { get; } + public ContextInfo( + Func> getStream, + Action clear + ) + { + Stream = getStream(); + Monitor = new(Stream); + _clear = clear; + _executionContext = ExecutionContext.Capture(); + } + + public void Dispose() + { + Monitor.Dispose(); + Stream.Dispose(); + Disposed = true; + if (_executionContext is not null) ExecutionContext.Run(_executionContext, (_) => _clear(), null); + } + } +} \ No newline at end of file diff --git a/src/Codibre.GrpcSqlProxy.Client/Impl/SqlProxyClientTunnel.cs b/src/Codibre.GrpcSqlProxy.Client/Impl/SqlProxyClientTunnel.cs index 9549106..2374ea9 100644 --- a/src/Codibre.GrpcSqlProxy.Client/Impl/SqlProxyClientTunnel.cs +++ b/src/Codibre.GrpcSqlProxy.Client/Impl/SqlProxyClientTunnel.cs @@ -1,6 +1,7 @@ using System.Collections.Concurrent; using System.Text.Json; using System.Threading.Channels; +using System.Transactions; using Avro; using Avro.Generic; using Avro.IO; @@ -12,38 +13,48 @@ namespace Codibre.GrpcSqlProxy.Client.Impl; -public sealed class SqlProxyClientTunnel : ISqlProxyClientTunnel +public sealed partial class SqlProxyClientTunnel : ISqlProxyClientTunnel { + private readonly AsyncLocal _context = new(); + private ContextInfo Context + { + get + { + var result = _context.Value; + if (result is null || result.Disposed) result = _context.Value = new(_getStream, () => _context.Value = null); + return result; + } + } + private readonly SqlProxyClientOptions _clientOptions; - private readonly AsyncDuplexStreamingCall _stream; - private readonly SqlProxyClientResponseMonitor _monitor; - private readonly string _connString; + private readonly Func> _getStream; public ISqlProxyBatchQuery Batch { get; } internal SqlProxyClientTunnel( - AsyncDuplexStreamingCall stream, + Func> getStream, SqlProxyClientOptions clientOptions ) { Batch = new SqlProxyBatchQuery(this); - _monitor = new(stream); - _stream = stream; + _getStream = getStream; _clientOptions = clientOptions; - _connString = clientOptions.SqlConnectionString; } - public IAsyncEnumerable Query(string sql, SqlProxyQueryOptions? options = null) + private (ContextInfo, IAsyncEnumerable) QueryInternal(string sql, SqlProxyQueryOptions? options) where T : class, new() { - if (!_monitor.Running) throw new InvalidOperationException("Tunnel closed"); var type = typeof(T); var schema = type.GetCachedSchema(); - var results = InternalRun(sql, [schema.Item2], options); - return ConvertResult(type, schema, results); + var (context, results) = InternalRun(sql, [schema.Item2], options); + return (context, ConvertResult(type, schema, results)); } + public IAsyncEnumerable Query(string sql, SqlProxyQueryOptions? options = null) + where T : class, new() + => QueryInternal(sql, options).Item2; + internal static async IAsyncEnumerable ConvertResult(Type type, (RecordSchema, string) schema, IAsyncEnumerable results) where T : class, new() { await foreach (var result in results) @@ -65,74 +76,130 @@ public IAsyncEnumerable Query(string sql, SqlProxyQueryOptions? options = } } - public async ValueTask Execute(string sql, SqlProxyQueryOptions? options = null) + public ValueTask Execute( + string sql, + SqlProxyQueryOptions? options = null + ) => InternalRun(sql, null, options).Item2.Complete(); + + private (ContextInfo, IAsyncEnumerable) InternalRun( + string sql, + string[]? schemas, + SqlProxyQueryOptions? options + ) { - await InternalRun(sql, null, options).LastAsync(); + var context = Context; + return (context, InternalRun(sql, schemas, options, context)); } - private async IAsyncEnumerable InternalRun(string sql, string[]? schemas, SqlProxyQueryOptions? options) + private async IAsyncEnumerable InternalRun( + string sql, + string[]? schemas, + SqlProxyQueryOptions? options, + ContextInfo context + ) { var id = GuidEx.NewBase64Guid(); - var message = GetRequest(_clientOptions, sql, schemas, options, id); - await _stream.RequestStream.WriteAsync(message); - _monitor.Start(); + var message = GetRequest( + _clientOptions, + sql, + schemas, + options, + id, + context + ); + await context.Stream.RequestStream.WriteAsync(message); + context.Monitor.Start(); var channel = Channel.CreateUnbounded(); - _monitor.AddHook(id, channel.Writer); + context.Monitor.AddHook(id, channel.Writer); var reader = channel.Reader; - while (await reader.WaitToReadAsync(_monitor.CancellationToken)) + while (await reader.WaitToReadAsync(context.Monitor.CancellationToken)) { - if (reader.TryRead(out var current)) - { - if (!string.IsNullOrEmpty(current.Error)) throw new SqlProxyException(current.Error); - yield return current; - if (current.Last == LastEnum.Last) break; - } + if (!reader.TryRead(out var current)) continue; + if (!string.IsNullOrEmpty(current.Error)) throw new SqlProxyException(current.Error); + yield return current; + if (current.Last == LastEnum.Last) break; } - _monitor.RemoveHook(id); + context.Monitor.RemoveHook(id); + ClearWhenNotInTransaction(context); + } + + private static void ClearWhenNotInTransaction(ContextInfo context) + { + if (!context.Transaction) context.Dispose(); } - private SqlRequest GetRequest(SqlProxyClientOptions clientOptions, string sql, string[]? schemas, SqlProxyQueryOptions? options, string id) + private SqlRequest GetRequest( + SqlProxyClientOptions clientOptions, + string sql, string[]? schemas, + SqlProxyQueryOptions? options, + string id, + ContextInfo context + ) { SqlRequest message = new() { Id = id, - ConnString = _monitor.Started ? "" : _connString, + ConnString = context.Monitor.Started is true ? "" : _clientOptions.SqlConnectionString, Query = sql, Schema = { }, Compress = options?.Compress ?? clientOptions.Compress, PacketSize = options?.PacketSize ?? clientOptions.PacketSize, Params = JsonSerializer.Serialize(options?.Params) }; - foreach (var schema in schemas ?? []) - { - message.Schema.Add(schema); - } + foreach (var schema in schemas ?? []) message.Schema.Add(schema); return message; } - public ValueTask QueryFirstOrDefault(string sql, SqlProxyQueryOptions? options = null) where T : class, new() - => Query(sql, options).FirstOrDefaultAsync(); + public async ValueTask QueryFirstOrDefault(string sql, SqlProxyQueryOptions? options = null) where T : class, new() + { + var (contextInfo, results) = QueryInternal(sql, options); + var result = await results.FirstOrDefaultAsync(); + ClearWhenNotInTransaction(contextInfo); + return result; + } - public ValueTask QueryFirst(string sql, SqlProxyQueryOptions? options = null) where T : class, new() - => Query(sql, options).FirstAsync(); + public async ValueTask QueryFirst(string sql, SqlProxyQueryOptions? options = null) where T : class, new() + { + var (contextInfo, results) = QueryInternal(sql, options); + var result = await results.FirstAsync(); + ClearWhenNotInTransaction(contextInfo); + return result; + } public void Dispose() { - _monitor.Dispose(); - _stream.Dispose(); + _context.Value?.Dispose(); + _context.Value = null; } - public ValueTask BeginTransaction() => Execute("BEGIN TRANSACTION"); + public ValueTask BeginTransaction() + { + Context.Transaction = true; + return Execute("BEGIN TRANSACTION"); + } - public ValueTask Commit() => Execute("COMMIT"); + public ValueTask Commit() + { + Context.Transaction = false; + return Execute("COMMIT"); + } - public ValueTask Rollback() => Execute("ROLLBACK"); + public ValueTask Rollback() + { + Context.Transaction = false; + return Execute("ROLLBACK"); + } public Reader QueryMultipleAsync(string sql, string[] schemas, SqlProxyQueryOptions? options) - => new(InternalRun(sql, schemas, options)); + => new(InternalRun(sql, schemas, options).Item2); public void OnError(ErrorHandlerEvent handler) - => _monitor.ErrorHandler += handler; + { + var monitor = _context.Value?.Monitor; + if (monitor is not null) monitor.ErrorHandler += handler; + } + + public void Start() => _ = Context; } \ No newline at end of file diff --git a/src/Codibre.GrpcSqlProxy.Client/Impl/SqlProxyClientTunnelExtension.cs b/src/Codibre.GrpcSqlProxy.Client/Impl/SqlProxyClientTunnelExtension.cs new file mode 100644 index 0000000..02198c6 --- /dev/null +++ b/src/Codibre.GrpcSqlProxy.Client/Impl/SqlProxyClientTunnelExtension.cs @@ -0,0 +1,40 @@ +using System.Collections.Concurrent; +using System.Text.Json; +using System.Threading.Channels; +using System.Transactions; +using Avro; +using Avro.Generic; +using Avro.IO; +using Codibre.GrpcSqlProxy.Api; +using Codibre.GrpcSqlProxy.Client.Impl.Utils; +using Codibre.GrpcSqlProxy.Common; +using Google.Protobuf.Collections; +using Grpc.Core; + +namespace Codibre.GrpcSqlProxy.Client.Impl; + +public static class SqlProxyClientTunnelExtension +{ + public static ValueTask QueryFirstOrDefault( + this ISqlProxyClientTunnel tunnel, + string sql, + SqlProxyQueryOptions? options = null + ) where T : class, new() + => tunnel.Query(sql, options).FirstOrDefaultAsync(); + + public static ValueTask QueryFirst( + this ISqlProxyClientTunnel tunnel, + string sql, + SqlProxyQueryOptions? options = null + ) where T : class, new() + => tunnel.Query(sql, options).FirstAsync(); + + + internal static async ValueTask Complete(this IAsyncEnumerable stream) + { + await foreach (var _ in stream) + { + // Dummy + } + } +} \ No newline at end of file diff --git a/test/Codibre.GrpcSqlProxy.Test/GrpcSqlProxyClientAsyncLocalTest.cs b/test/Codibre.GrpcSqlProxy.Test/GrpcSqlProxyClientAsyncLocalTest.cs new file mode 100644 index 0000000..9e9c50a --- /dev/null +++ b/test/Codibre.GrpcSqlProxy.Test/GrpcSqlProxyClientAsyncLocalTest.cs @@ -0,0 +1,191 @@ +using Codibre.GrpcSqlProxy.Api; +using Codibre.GrpcSqlProxy.Client; +using Codibre.GrpcSqlProxy.Client.Impl; +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +namespace Codibre.GrpcSqlProxy.Test; + +[Collection("Sequential")] +public class GrpcSqlProxyClientAsyncLocalTest +{ + [Fact] + public async Task Should_Keep_Transaction_Opened() + { + // Arrange + var server = await TestServer.Get(); + var client = new GrpcSqlProxyClient( + new SqlProxyClientOptions( + server.Url, + server.Config.GetConnectionString("SqlConnection") ?? throw new Exception("No connection string") + ) + { + Compress = false + } + ); + + // Act + await client.Channel.Execute("DELETE FROM TB_PEDIDO"); + await client.Channel.BeginTransaction(); + await client.Channel.Execute("INSERT INTO TB_PEDIDO (CD_PEDIDO) VALUES (1)"); + var result1 = await client.Channel.QueryFirstOrDefault("SELECT * FROM TB_PEDIDO"); + await client.Channel.Rollback(); + var result2 = await client.Channel.Query("SELECT * FROM TB_PEDIDO").ToArrayAsync(); + + // Assert + result1.Should().BeOfType(); + result2.Should().BeOfType(); + result1.Should().BeEquivalentTo(new TB_PEDIDO { CD_PEDIDO = 1 }); + result2.Should().BeEquivalentTo(Array.Empty()); + } + + [Fact] + public async Task Should_Inject_SqlProxy_Properly() + { + // Arrange + var server = await TestServer.Get(); + var builder = Host.CreateApplicationBuilder([]); + builder.Configuration.GetSection("GrpcSqlProxy").GetSection("Url").Value = server.Url; + builder.Configuration.GetSection("GrpcSqlProxy").GetSection("Compress").Value = "False"; + builder.Configuration.GetSection("GrpcSqlProxy").GetSection("PacketSize").Value = "2000"; + builder.Services.AddGrpcSqlProxy(); + var app = builder.Build(); + var client = app.Services.GetRequiredService(); + + // Act + await client.Channel.Execute("DELETE FROM TB_PEDIDO"); + await client.Channel.BeginTransaction(); + await client.Channel.Execute("INSERT INTO TB_PEDIDO (CD_PEDIDO) VALUES (1)"); + var result1 = await client.Channel.QueryFirstOrDefault("SELECT * FROM TB_PEDIDO"); + await client.Channel.Rollback(); + var result2 = await client.Channel.Query("SELECT * FROM TB_PEDIDO").ToArrayAsync(); + + // Assert + result1.Should().BeOfType(); + result2.Should().BeOfType(); + result1.Should().BeEquivalentTo(new TB_PEDIDO { CD_PEDIDO = 1 }); + result2.Should().BeEquivalentTo(Array.Empty()); + } + + [Fact] + public async Task Should_Use_Compression() + { + // Arrange + var server = await TestServer.Get(); + var client = new GrpcSqlProxyClient( + new SqlProxyClientOptions( + server.Url, + server.Config.GetConnectionString("SqlConnection") ?? throw new Exception("No connection string") + ) + { + Compress = true + } + ); + + // Act + await client.Channel.BeginTransaction(); + await client.Channel.Execute("DELETE FROM TB_PRODUTO"); + await client.Channel.Execute("INSERT INTO TB_PRODUTO (CD_PRODUTO) VALUES (1)"); + var result1 = await client.Channel.QueryFirstOrDefault("SELECT * FROM TB_PRODUTO"); + await client.Channel.Rollback(); + var result2 = await client.Channel.Query("SELECT * FROM TB_PRODUTO").ToArrayAsync(); + + // Assert + result1.Should().BeOfType(); + result2.Should().BeOfType(); + result1.Should().BeEquivalentTo(new TB_PRODUTO { CD_PRODUTO = 1 }); + } + + [Fact] + public async Task Should_Throw_Error_For_Invalid_Queries() + { + // Arrange + var server = await TestServer.Get(); + var client = new GrpcSqlProxyClient( + new SqlProxyClientOptions( + server.Url, + server.Config.GetConnectionString("SqlConnection") ?? throw new Exception("No connection string") + ) + { + Compress = false + } + ); + Exception? thrownException = null; + + // Act + try + { + using var channel = client.CreateChannel(); + await client.Channel.Execute("SELECT * FROM INVALID_TABLE"); + } + catch (Exception ex) + { + thrownException = ex; + } + + // Assert + thrownException.Should().BeOfType(); + } + + [Fact] + public async Task Should_Keep_Parallel_Transaction_Opened() + { + // Arrange + var server = await TestServer.Get(); + var client = new GrpcSqlProxyClient( + new SqlProxyClientOptions( + server.Url, + server.Config.GetConnectionString("SqlConnection") ?? throw new Exception("No connection string") + ) + { + Compress = false + } + ); + + // Act + using var channel2 = client.CreateChannel(); + await client.Channel.Execute("DELETE FROM TB_PESSOA"); + await client.Channel.Execute("INSERT INTO TB_PESSOA (CD_PESSOA) VALUES (1)"); + await client.Channel.Execute("INSERT INTO TB_PESSOA (CD_PESSOA) VALUES (2)"); + await client.Channel.BeginTransaction(); + await channel2.BeginTransaction(); + await client.Channel.Execute("UPDATE TB_PESSOA SET CD_PESSOA = 3 WHERE CD_PESSOA = @Id", new() + { + Params = new + { + Id = 1 + } + }); + var result1 = await client.Channel.QueryFirst("SELECT * FROM TB_PESSOA WHERE CD_PESSOA = @Id", new() + { + Params = new + { + Id = 3 + } + }); + await client.Channel.Rollback(); + await channel2.Execute("UPDATE TB_PESSOA SET CD_PESSOA = 5 WHERE CD_PESSOA = 2"); + var result2 = await channel2.Query("SELECT * FROM TB_PESSOA").ToArrayAsync(); + await channel2.Rollback(); + var result3 = await client.Channel.Query("SELECT * FROM TB_PESSOA", new() + { + PacketSize = 1 + }).ToArrayAsync(); + + // Assert + result1.Should().BeOfType(); + result2.Should().BeOfType(); + result3.Should().BeOfType(); + result1.Should().BeEquivalentTo(new TB_PESSOA { CD_PESSOA = 3 }); + result2.OrderBy(x => x.CD_PESSOA).ToArray().Should().BeEquivalentTo(new TB_PESSOA[] { + new () { CD_PESSOA = 1 }, + new () { CD_PESSOA = 5 } + }); + result3.OrderBy(x => x.CD_PESSOA).ToArray().Should().BeEquivalentTo(new TB_PESSOA[] { + new () { CD_PESSOA = 1 }, + new () { CD_PESSOA = 2 } + }); + } +} \ No newline at end of file