Skip to content

Commit

Permalink
fix: bettering channel flow
Browse files Browse the repository at this point in the history
  • Loading branch information
Farenheith committed Oct 21, 2024
1 parent 47f39cc commit 358a3d5
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 91 deletions.
32 changes: 32 additions & 0 deletions src/Codibre.GrpcSqlProxy.Client/Impl/ContextInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using Codibre.GrpcSqlProxy.Api;
using Grpc.Core;

namespace Codibre.GrpcSqlProxy.Client.Impl;

internal class ContextInfo : IDisposable
{
private readonly Action _clear;
private readonly ExecutionContext? _executionContext;
public bool Disposed { get; private set; } = false;
public bool Transaction { get; set; } = false;
public AsyncDuplexStreamingCall<SqlRequest, SqlResponse> Stream { get; }
public SqlProxyClientResponseMonitor Monitor { get; }
public ContextInfo(
Func<AsyncDuplexStreamingCall<SqlRequest, SqlResponse>> getStream,
Action clear
)
{
Stream = getStream();
Monitor = new(Stream);
_clear = clear;
_executionContext = ExecutionContext.Capture();
}

public void Dispose()
{
Monitor.Dispose();
Stream.Dispose();
Disposed = true;
if (_executionContext is not null) ExecutionContext.Run(_executionContext, (_) => _clear(), null);
}
}
4 changes: 2 additions & 2 deletions src/Codibre.GrpcSqlProxy.Client/Impl/SqlProxyBatchQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,10 @@ public async Task<T> RunInTransaction<T>(Func<ISqlProxyBatchQuery, ValueTask<T>>
else await SendTransaction();
return result;
}
catch (Exception)
catch (Exception ex)
{
if (_transaction.TransactionOpen) await _tunnel.Rollback();
throw;
throw new SqlProxyException(ex);
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@ internal sealed class SqlProxyClientResponseMonitor : IDisposable
{
private readonly AsyncDuplexStreamingCall<SqlRequest, SqlResponse> _stream;
internal readonly ConcurrentDictionary<string, ChannelWriter<SqlResponse>> _responseHooks = new();
private readonly CancellationTokenSource _cancellationTokenSource = new();
private CancellationTokenSource _cancellationTokenSource = new();

internal CancellationToken CancellationToken => _cancellationTokenSource.Token;

internal event ErrorHandlerEvent? ErrorHandler;

internal bool Running { get; private set; } = true;
internal bool Started { get; private set; } = false;

internal SqlProxyClientResponseMonitor(
Expand All @@ -34,34 +32,27 @@ AsyncDuplexStreamingCall<SqlRequest, SqlResponse> stream

internal async void Start()
{
if (Started) return;
Started = true;
try
{
await ReadStream();
}
catch (Exception ex)
{
TreatException(ex);
}
finally
{
await CompleteStream();
}
}

private void TreatException(Exception ex)
{
if (
ex is not OperationCanceledException
&& ErrorHandler is not null
) ErrorHandler(this, ex);
ex is not OperationCanceledException
&& ErrorHandler is not null
) ErrorHandler(this, ex);
}

private async Task CompleteStream()
{
_cancellationTokenSource.Cancel();
Running = false;
_responseHooks.Clear();
try
{
Expand All @@ -72,25 +63,30 @@ private async Task CompleteStream()
{
// Ignoring errors due to already closed stream
}
_cancellationTokenSource.Cancel();
_cancellationTokenSource = new();
}

private async Task ReadStream()
{
while (Running && await _stream.ResponseStream.MoveNext(_cancellationTokenSource.Token))
if (Started) return;
Started = true;
while (await _stream.ResponseStream.MoveNext(_cancellationTokenSource.Token))
{
var response = _stream.ResponseStream.Current;
if (response is not null && _responseHooks.TryGetValue(response.Id, out var hook))
{
await hook.WriteAsync(response);
if (response.Last == LastEnum.Last)
{
_responseHooks.TryRemove(response.Id, out _);
hook.Complete();
}
}
}
Started = false;
}

internal void AddHook(string id, ChannelWriter<SqlResponse> writer) => _responseHooks.TryAdd(id, writer);
internal void RemoveHook(string id) => _responseHooks.TryRemove(id, out _);
public void Dispose()
{
Running = false;
_cancellationTokenSource.Cancel();
}
public void Dispose() => _ = CompleteStream();
}

This file was deleted.

3 changes: 1 addition & 2 deletions src/Codibre.GrpcSqlProxy.Client/Impl/SqlProxyClientTunnel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

namespace Codibre.GrpcSqlProxy.Client.Impl;

public sealed partial class SqlProxyClientTunnel : ISqlProxyClientTunnel
public sealed class SqlProxyClientTunnel : ISqlProxyClientTunnel
{
private readonly AsyncLocal<ContextInfo?> _context = new();
private ContextInfo Context
Expand Down Expand Up @@ -120,7 +120,6 @@ ContextInfo context
yield return current;
if (current.Last == LastEnum.Last) break;
}
context.Monitor.RemoveHook(id);
ClearWhenNotInTransaction(context);
}

Expand Down
15 changes: 14 additions & 1 deletion src/Codibre.GrpcSqlProxy.Client/SqlProxyException.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
namespace Codibre.GrpcSqlProxy.Client;

public class SqlProxyException(string message) : Exception(message) { }
public class SqlProxyException : Exception
{
private readonly string? _stack;
public SqlProxyException(string message) : base(message)
{
_stack = null;
}
public SqlProxyException(Exception ex) : base(ex.Message)
{
_stack = ex.StackTrace;
}

public override string? StackTrace => _stack ?? base.StackTrace;
}
29 changes: 14 additions & 15 deletions test/Codibre.GrpcSqlProxy.Test/GrpcSqlProxyClientAsyncLocalTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

namespace Codibre.GrpcSqlProxy.Test;

[Collection("Sequential")]
public class GrpcSqlProxyClientAsyncLocalTest
{
[Fact]
Expand Down Expand Up @@ -146,30 +145,30 @@ public async Task Should_Keep_Parallel_Transaction_Opened()

// Act
using var channel2 = client.CreateChannel();
await client.Channel.Execute("DELETE FROM TB_PESSOA");
await client.Channel.Execute("INSERT INTO TB_PESSOA (CD_PESSOA) VALUES (1)");
await client.Channel.Execute("INSERT INTO TB_PESSOA (CD_PESSOA) VALUES (2)");
await client.Channel.Execute("DELETE FROM TB_PESSOA WHERE CD_PESSOA IN (10, 20, 30, 50)");
await client.Channel.Execute("INSERT INTO TB_PESSOA (CD_PESSOA) VALUES (10)");
await client.Channel.Execute("INSERT INTO TB_PESSOA (CD_PESSOA) VALUES (20)");
await client.Channel.BeginTransaction();
await channel2.BeginTransaction();
await client.Channel.Execute("UPDATE TB_PESSOA SET CD_PESSOA = 3 WHERE CD_PESSOA = @Id", new()
await client.Channel.Execute("UPDATE TB_PESSOA SET CD_PESSOA = 30 WHERE CD_PESSOA = @Id", new()
{
Params = new
{
Id = 1
Id = 10
}
});
var result1 = await client.Channel.QueryFirst<TB_PESSOA>("SELECT * FROM TB_PESSOA WHERE CD_PESSOA = @Id", new()
{
Params = new
{
Id = 3
Id = 30
}
});
await client.Channel.Rollback();
await channel2.Execute("UPDATE TB_PESSOA SET CD_PESSOA = 5 WHERE CD_PESSOA = 2");
var result2 = await channel2.Query<TB_PESSOA>("SELECT * FROM TB_PESSOA").ToArrayAsync();
await channel2.Execute("UPDATE TB_PESSOA SET CD_PESSOA = 50 WHERE CD_PESSOA = 20");
var result2 = await channel2.Query<TB_PESSOA>("SELECT * FROM TB_PESSOA WHERE CD_PESSOA IN (10, 20, 30, 50)").ToArrayAsync();
await channel2.Rollback();
var result3 = await client.Channel.Query<TB_PESSOA>("SELECT * FROM TB_PESSOA", new()
var result3 = await client.Channel.Query<TB_PESSOA>("SELECT * FROM TB_PESSOA WHERE CD_PESSOA IN (10, 20, 30, 50)", new()
{
PacketSize = 1
}).ToArrayAsync();
Expand All @@ -178,14 +177,14 @@ public async Task Should_Keep_Parallel_Transaction_Opened()
result1.Should().BeOfType<TB_PESSOA>();
result2.Should().BeOfType<TB_PESSOA[]>();
result3.Should().BeOfType<TB_PESSOA[]>();
result1.Should().BeEquivalentTo(new TB_PESSOA { CD_PESSOA = 3 });
result1.Should().BeEquivalentTo(new TB_PESSOA { CD_PESSOA = 30 });
result2.OrderBy(x => x.CD_PESSOA).ToArray().Should().BeEquivalentTo(new TB_PESSOA[] {
new () { CD_PESSOA = 1 },
new () { CD_PESSOA = 5 }
new () { CD_PESSOA = 10 },
new () { CD_PESSOA = 50 }
});
result3.OrderBy(x => x.CD_PESSOA).ToArray().Should().BeEquivalentTo(new TB_PESSOA[] {
new () { CD_PESSOA = 1 },
new () { CD_PESSOA = 2 }
new () { CD_PESSOA = 10 },
new () { CD_PESSOA = 20 }
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

namespace Codibre.GrpcSqlProxy.Test;

[Collection("Sequential")]
public class GrpcSqlProxyClientBatchTest
{
[Fact]
Expand Down
29 changes: 14 additions & 15 deletions test/Codibre.GrpcSqlProxy.Test/GrpcSqlProxyClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

namespace Codibre.GrpcSqlProxy.Test;

[Collection("Sequential")]
public class GrpcSqlProxyClientTest
{
[Fact]
Expand Down Expand Up @@ -150,30 +149,30 @@ public async Task Should_Keep_Parallel_Transaction_Opened()
// Act
using var channel1 = client.CreateChannel();
using var channel2 = client.CreateChannel();
await channel1.Execute("DELETE FROM TB_PESSOA");
await channel1.Execute("INSERT INTO TB_PESSOA (CD_PESSOA) VALUES (1)");
await channel1.Execute("INSERT INTO TB_PESSOA (CD_PESSOA) VALUES (2)");
await channel1.Execute("DELETE FROM TB_PESSOA WHERE CD_PESSOA IN (100, 200, 300, 500)");
await channel1.Execute("INSERT INTO TB_PESSOA (CD_PESSOA) VALUES (100)");
await channel1.Execute("INSERT INTO TB_PESSOA (CD_PESSOA) VALUES (200)");
await channel1.BeginTransaction();
await channel2.BeginTransaction();
await channel1.Execute("UPDATE TB_PESSOA SET CD_PESSOA = 3 WHERE CD_PESSOA = @Id", new()
await channel1.Execute("UPDATE TB_PESSOA SET CD_PESSOA = 300 WHERE CD_PESSOA = @Id", new()
{
Params = new
{
Id = 1
Id = 100
}
});
var result1 = await channel1.QueryFirst<TB_PESSOA>("SELECT * FROM TB_PESSOA WHERE CD_PESSOA = @Id", new()
{
Params = new
{
Id = 3
Id = 300
}
});
await channel1.Rollback();
await channel2.Execute("UPDATE TB_PESSOA SET CD_PESSOA = 5 WHERE CD_PESSOA = 2");
var result2 = await channel2.Query<TB_PESSOA>("SELECT * FROM TB_PESSOA").ToArrayAsync();
await channel2.Execute("UPDATE TB_PESSOA SET CD_PESSOA = 500 WHERE CD_PESSOA = 200");
var result2 = await channel2.Query<TB_PESSOA>("SELECT * FROM TB_PESSOA WHERE CD_PESSOA IN (100, 200, 300, 500)").ToArrayAsync();
await channel2.Rollback();
var result3 = await channel1.Query<TB_PESSOA>("SELECT * FROM TB_PESSOA", new()
var result3 = await channel1.Query<TB_PESSOA>("SELECT * FROM TB_PESSOA WHERE CD_PESSOA IN (100, 200, 300, 500)", new()
{
PacketSize = 1
}).ToArrayAsync();
Expand All @@ -182,14 +181,14 @@ public async Task Should_Keep_Parallel_Transaction_Opened()
result1.Should().BeOfType<TB_PESSOA>();
result2.Should().BeOfType<TB_PESSOA[]>();
result3.Should().BeOfType<TB_PESSOA[]>();
result1.Should().BeEquivalentTo(new TB_PESSOA { CD_PESSOA = 3 });
result1.Should().BeEquivalentTo(new TB_PESSOA { CD_PESSOA = 300 });
result2.OrderBy(x => x.CD_PESSOA).ToArray().Should().BeEquivalentTo(new TB_PESSOA[] {
new () { CD_PESSOA = 1 },
new () { CD_PESSOA = 5 }
new () { CD_PESSOA = 100 },
new () { CD_PESSOA = 500 }
});
result3.OrderBy(x => x.CD_PESSOA).ToArray().Should().BeEquivalentTo(new TB_PESSOA[] {
new () { CD_PESSOA = 1 },
new () { CD_PESSOA = 2 }
new () { CD_PESSOA = 100 },
new () { CD_PESSOA = 200 }
});
}
}

0 comments on commit 358a3d5

Please sign in to comment.