Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: bettering channel flow #9

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Codibre.GrpcSqlProxy.Api/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public static WebApplication GetApp(string[] args)
// Configure the HTTP request pipeline.
app.MapGrpcService<SqlProxyService>();
app.MapGet("/", () => "Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909");
app.Urls.Add("http://localhost:3000");
app.Urls.Add($"http://localhost:{args.FirstOrDefault() ?? app.Configuration.GetSection("PORT").Value ?? "3000"}");
return app;
}
}
32 changes: 32 additions & 0 deletions src/Codibre.GrpcSqlProxy.Client/Impl/ContextInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using Codibre.GrpcSqlProxy.Api;
using Grpc.Core;

namespace Codibre.GrpcSqlProxy.Client.Impl;

internal class ContextInfo : IDisposable
{
private readonly Action _clear;
private readonly ExecutionContext? _executionContext;
public bool Disposed { get; private set; } = false;
public bool Transaction { get; set; } = false;
public AsyncDuplexStreamingCall<SqlRequest, SqlResponse> Stream { get; }
public SqlProxyClientResponseMonitor Monitor { get; }
public ContextInfo(
Func<AsyncDuplexStreamingCall<SqlRequest, SqlResponse>> getStream,
Action clear
)
{
Stream = getStream();
Monitor = new(Stream);
_clear = clear;
_executionContext = ExecutionContext.Capture();
}

public void Dispose()
{
Monitor.Dispose();
Stream.Dispose();
Disposed = true;
if (_executionContext is not null) ExecutionContext.Run(_executionContext, (_) => _clear(), null);
}
}
4 changes: 2 additions & 2 deletions src/Codibre.GrpcSqlProxy.Client/Impl/SqlProxyBatchQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,10 @@ public async Task<T> RunInTransaction<T>(Func<ISqlProxyBatchQuery, ValueTask<T>>
else await SendTransaction();
return result;
}
catch (Exception)
catch (Exception ex)
{
if (_transaction.TransactionOpen) await _tunnel.Rollback();
throw;
throw new SqlProxyException(ex);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better stackTrace

}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@ internal sealed class SqlProxyClientResponseMonitor : IDisposable
{
private readonly AsyncDuplexStreamingCall<SqlRequest, SqlResponse> _stream;
internal readonly ConcurrentDictionary<string, ChannelWriter<SqlResponse>> _responseHooks = new();
private readonly CancellationTokenSource _cancellationTokenSource = new();
private CancellationTokenSource _cancellationTokenSource = new();

internal CancellationToken CancellationToken => _cancellationTokenSource.Token;

internal event ErrorHandlerEvent? ErrorHandler;

internal bool Running { get; private set; } = true;
internal bool Started { get; private set; } = false;

internal SqlProxyClientResponseMonitor(
Expand All @@ -34,34 +32,27 @@ AsyncDuplexStreamingCall<SqlRequest, SqlResponse> stream

internal async void Start()
{
if (Started) return;
Started = true;
try
{
await ReadStream();
}
catch (Exception ex)
{
TreatException(ex);
}
finally
{
await CompleteStream();
}
}

private void TreatException(Exception ex)
{
if (
ex is not OperationCanceledException
&& ErrorHandler is not null
) ErrorHandler(this, ex);
ex is not OperationCanceledException
&& ErrorHandler is not null
) ErrorHandler(this, ex);
}

private async Task CompleteStream()
{
_cancellationTokenSource.Cancel();
Running = false;
_responseHooks.Clear();
try
{
Expand All @@ -72,25 +63,30 @@ private async Task CompleteStream()
{
// Ignoring errors due to already closed stream
}
_cancellationTokenSource.Cancel();
_cancellationTokenSource = new();
}

private async Task ReadStream()
{
while (Running && await _stream.ResponseStream.MoveNext(_cancellationTokenSource.Token))
if (Started) return;
Started = true;
while (await _stream.ResponseStream.MoveNext(_cancellationTokenSource.Token))
{
var response = _stream.ResponseStream.Current;
if (response is not null && _responseHooks.TryGetValue(response.Id, out var hook))
{
await hook.WriteAsync(response);
if (response.Last == LastEnum.Last)
{
_responseHooks.TryRemove(response.Id, out _);
hook.Complete();
}
}
}
Started = false;
}

internal void AddHook(string id, ChannelWriter<SqlResponse> writer) => _responseHooks.TryAdd(id, writer);
internal void RemoveHook(string id) => _responseHooks.TryRemove(id, out _);
public void Dispose()
{
Running = false;
_cancellationTokenSource.Cancel();
}
public void Dispose() => _ = CompleteStream();
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

namespace Codibre.GrpcSqlProxy.Client.Impl;

public sealed partial class SqlProxyClientTunnel : ISqlProxyClientTunnel
public sealed class SqlProxyClientTunnel : ISqlProxyClientTunnel
{
private readonly AsyncLocal<ContextInfo?> _context = new();
private ContextInfo Context
Expand Down Expand Up @@ -120,7 +120,6 @@ ContextInfo context
yield return current;
if (current.Last == LastEnum.Last) break;
}
context.Monitor.RemoveHook(id);
ClearWhenNotInTransaction(context);
}

Expand Down
15 changes: 14 additions & 1 deletion src/Codibre.GrpcSqlProxy.Client/SqlProxyException.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
namespace Codibre.GrpcSqlProxy.Client;

public class SqlProxyException(string message) : Exception(message) { }
public class SqlProxyException : Exception
{
private readonly string? _stack;
public SqlProxyException(string message) : base(message)
{
_stack = null;
}
public SqlProxyException(Exception ex) : base(ex.Message)
{
_stack = ex.StackTrace;
}

public override string? StackTrace => _stack ?? base.StackTrace;
}
10 changes: 8 additions & 2 deletions src/Codibre.GrpcSqlProxy.Common/DictionaryExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@ public static V GetOrSet<K, V>(this Dictionary<K, V> dictionary, K key, Func<V>
{
if (!dictionary.TryGetValue(key, out var result))
{
result = create();
dictionary[key] = result;
lock (dictionary)
{
if (!dictionary.TryGetValue(key, out result))
{
result = create();
dictionary[key] = result;
}
}
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

namespace Codibre.GrpcSqlProxy.Test;

[Collection("Sequential")]
public class GrpcSqlProxyClientAsyncLocalTest
{
[Fact]
Expand All @@ -27,17 +26,17 @@ public async Task Should_Keep_Transaction_Opened()
);

// Act
await client.Channel.Execute("DELETE FROM TB_PEDIDO");
await client.Channel.Execute("DELETE FROM TB_PEDIDO WHERE CD_PEDIDO = 400001");
await client.Channel.BeginTransaction();
await client.Channel.Execute("INSERT INTO TB_PEDIDO (CD_PEDIDO) VALUES (1)");
var result1 = await client.Channel.QueryFirstOrDefault<TB_PEDIDO>("SELECT * FROM TB_PEDIDO");
await client.Channel.Execute("INSERT INTO TB_PEDIDO (CD_PEDIDO) VALUES (400001)");
var result1 = await client.Channel.QueryFirstOrDefault<TB_PEDIDO>("SELECT * FROM TB_PEDIDO WHERE CD_PEDIDO = 400001");
await client.Channel.Rollback();
var result2 = await client.Channel.Query<TB_PEDIDO>("SELECT * FROM TB_PEDIDO").ToArrayAsync();
var result2 = await client.Channel.Query<TB_PEDIDO>("SELECT * FROM TB_PEDIDO WHERE CD_PEDIDO = 400001").ToArrayAsync();

// Assert
result1.Should().BeOfType<TB_PEDIDO>();
result2.Should().BeOfType<TB_PEDIDO[]>();
result1.Should().BeEquivalentTo(new TB_PEDIDO { CD_PEDIDO = 1 });
result1.Should().BeEquivalentTo(new TB_PEDIDO { CD_PEDIDO = 400001 });
result2.Should().BeEquivalentTo(Array.Empty<TB_PEDIDO>());
}

Expand All @@ -55,17 +54,17 @@ public async Task Should_Inject_SqlProxy_Properly()
var client = app.Services.GetRequiredService<ISqlProxyClient>();

// Act
await client.Channel.Execute("DELETE FROM TB_PEDIDO");
await client.Channel.Execute("DELETE FROM TB_PEDIDO WHERE CD_PEDIDO = 500001");
await client.Channel.BeginTransaction();
await client.Channel.Execute("INSERT INTO TB_PEDIDO (CD_PEDIDO) VALUES (1)");
var result1 = await client.Channel.QueryFirstOrDefault<TB_PEDIDO>("SELECT * FROM TB_PEDIDO");
await client.Channel.Execute("INSERT INTO TB_PEDIDO (CD_PEDIDO) VALUES (500001)");
var result1 = await client.Channel.QueryFirstOrDefault<TB_PEDIDO>("SELECT * FROM TB_PEDIDO WHERE CD_PEDIDO = 500001");
await client.Channel.Rollback();
var result2 = await client.Channel.Query<TB_PEDIDO>("SELECT * FROM TB_PEDIDO").ToArrayAsync();
var result2 = await client.Channel.Query<TB_PEDIDO>("SELECT * FROM TB_PEDIDO WHERE CD_PEDIDO = 500001").ToArrayAsync();

// Assert
result1.Should().BeOfType<TB_PEDIDO>();
result2.Should().BeOfType<TB_PEDIDO[]>();
result1.Should().BeEquivalentTo(new TB_PEDIDO { CD_PEDIDO = 1 });
result1.Should().BeEquivalentTo(new TB_PEDIDO { CD_PEDIDO = 500001 });
result2.Should().BeEquivalentTo(Array.Empty<TB_PEDIDO>());
}

Expand All @@ -86,16 +85,16 @@ public async Task Should_Use_Compression()

// Act
await client.Channel.BeginTransaction();
await client.Channel.Execute("DELETE FROM TB_PRODUTO");
await client.Channel.Execute("INSERT INTO TB_PRODUTO (CD_PRODUTO) VALUES (1)");
var result1 = await client.Channel.QueryFirstOrDefault<TB_PRODUTO>("SELECT * FROM TB_PRODUTO");
await client.Channel.Execute("DELETE FROM TB_PRODUTO WHERE CD_PRODUTO = 600001");
await client.Channel.Execute("INSERT INTO TB_PRODUTO (CD_PRODUTO) VALUES (600001)");
var result1 = await client.Channel.QueryFirstOrDefault<TB_PRODUTO>("SELECT * FROM TB_PRODUTO WHERE CD_PRODUTO = 600001");
await client.Channel.Rollback();
var result2 = await client.Channel.Query<TB_PRODUTO>("SELECT * FROM TB_PRODUTO").ToArrayAsync();
var result2 = await client.Channel.Query<TB_PRODUTO>("SELECT * FROM TB_PRODUTO WHERE CD_PRODUTO = 600001").ToArrayAsync();

// Assert
result1.Should().BeOfType<TB_PRODUTO>();
result2.Should().BeOfType<TB_PRODUTO[]>();
result1.Should().BeEquivalentTo(new TB_PRODUTO { CD_PRODUTO = 1 });
result1.Should().BeEquivalentTo(new TB_PRODUTO { CD_PRODUTO = 600001 });
}

[Fact]
Expand Down Expand Up @@ -146,30 +145,30 @@ public async Task Should_Keep_Parallel_Transaction_Opened()

// Act
using var channel2 = client.CreateChannel();
await client.Channel.Execute("DELETE FROM TB_PESSOA");
await client.Channel.Execute("INSERT INTO TB_PESSOA (CD_PESSOA) VALUES (1)");
await client.Channel.Execute("INSERT INTO TB_PESSOA (CD_PESSOA) VALUES (2)");
await client.Channel.Execute("DELETE FROM TB_PESSOA WHERE CD_PESSOA IN (10, 20, 30, 50)");
await client.Channel.Execute("INSERT INTO TB_PESSOA (CD_PESSOA) VALUES (10)");
await client.Channel.Execute("INSERT INTO TB_PESSOA (CD_PESSOA) VALUES (20)");
await client.Channel.BeginTransaction();
await channel2.BeginTransaction();
await client.Channel.Execute("UPDATE TB_PESSOA SET CD_PESSOA = 3 WHERE CD_PESSOA = @Id", new()
await client.Channel.Execute("UPDATE TB_PESSOA SET CD_PESSOA = 30 WHERE CD_PESSOA = @Id", new()
{
Params = new
{
Id = 1
Id = 10
}
});
var result1 = await client.Channel.QueryFirst<TB_PESSOA>("SELECT * FROM TB_PESSOA WHERE CD_PESSOA = @Id", new()
{
Params = new
{
Id = 3
Id = 30
}
});
await client.Channel.Rollback();
await channel2.Execute("UPDATE TB_PESSOA SET CD_PESSOA = 5 WHERE CD_PESSOA = 2");
var result2 = await channel2.Query<TB_PESSOA>("SELECT * FROM TB_PESSOA").ToArrayAsync();
await channel2.Execute("UPDATE TB_PESSOA SET CD_PESSOA = 50 WHERE CD_PESSOA = 20");
var result2 = await channel2.Query<TB_PESSOA>("SELECT * FROM TB_PESSOA WHERE CD_PESSOA IN (10, 20, 30, 50)").ToArrayAsync();
await channel2.Rollback();
var result3 = await client.Channel.Query<TB_PESSOA>("SELECT * FROM TB_PESSOA", new()
var result3 = await client.Channel.Query<TB_PESSOA>("SELECT * FROM TB_PESSOA WHERE CD_PESSOA IN (10, 20, 30, 50)", new()
{
PacketSize = 1
}).ToArrayAsync();
Expand All @@ -178,14 +177,14 @@ public async Task Should_Keep_Parallel_Transaction_Opened()
result1.Should().BeOfType<TB_PESSOA>();
result2.Should().BeOfType<TB_PESSOA[]>();
result3.Should().BeOfType<TB_PESSOA[]>();
result1.Should().BeEquivalentTo(new TB_PESSOA { CD_PESSOA = 3 });
result1.Should().BeEquivalentTo(new TB_PESSOA { CD_PESSOA = 30 });
result2.OrderBy(x => x.CD_PESSOA).ToArray().Should().BeEquivalentTo(new TB_PESSOA[] {
new () { CD_PESSOA = 1 },
new () { CD_PESSOA = 5 }
new () { CD_PESSOA = 10 },
new () { CD_PESSOA = 50 }
});
result3.OrderBy(x => x.CD_PESSOA).ToArray().Should().BeEquivalentTo(new TB_PESSOA[] {
new () { CD_PESSOA = 1 },
new () { CD_PESSOA = 2 }
new () { CD_PESSOA = 10 },
new () { CD_PESSOA = 20 }
});
}
}
Loading
Loading