Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Creating async local controlled Channel #8

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Codibre.GrpcSqlProxy.Client/ISqlProxyClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

public interface ISqlProxyClient
{
ISqlProxyClientTunnel Channel { get; }
ISqlProxyClientTunnel CreateChannel();
}
3 changes: 1 addition & 2 deletions src/Codibre.GrpcSqlProxy.Client/ISqlProxyClientTunnel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ namespace Codibre.GrpcSqlProxy.Client;
public interface ISqlProxyClientTunnel : IDisposable
{
ISqlProxyBatchQuery Batch { get; }
void Start();
ValueTask BeginTransaction();
ValueTask Commit();
ValueTask Rollback();
ValueTask Execute(string sql, SqlProxyQueryOptions? options = null);
IAsyncEnumerable<T> Query<T>(string sql, SqlProxyQueryOptions? options = null) where T : class, new();
ValueTask<T?> QueryFirstOrDefault<T>(string sql, SqlProxyQueryOptions? options = null) where T : class, new();
ValueTask<T> QueryFirst<T>(string sql, SqlProxyQueryOptions? options = null) where T : class, new();
Reader QueryMultipleAsync(string sql, string[] schemas, SqlProxyQueryOptions? options);
void OnError(ErrorHandlerEvent handler);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ namespace Codibre.GrpcSqlProxy.Client.Impl;

public class GrpcSqlProxyClient(SqlProxyClientOptions options) : ISqlProxyClient
{
private readonly AsyncLocal<ISqlProxyClientTunnel> _asyncLocal = new();
private readonly SqlProxy.SqlProxyClient _client = new(GrpcChannel.ForAddress(options.Url));
public ISqlProxyClientTunnel CreateChannel() => new SqlProxyClientTunnel(_client.Run(), options);

public ISqlProxyClientTunnel Channel => _asyncLocal.Value ??= CreateChannel();

public ISqlProxyClientTunnel CreateChannel() => new SqlProxyClientTunnel(() => _client.Run(), options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public async Task<T> RunInTransaction<T>(Func<ISqlProxyBatchQuery, ValueTask<T>>
if (_transaction is not null) throw new InvalidOperationException("RunInTransaction Already called");
if (_builder.QueryCount > 0) throw new InvalidOperationException("Query buffer not empty");
_transaction = new(options);
_tunnel.Start();
try
{
var result = await query(this);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using Codibre.GrpcSqlProxy.Api;
using Grpc.Core;

namespace Codibre.GrpcSqlProxy.Client.Impl;

public sealed partial class SqlProxyClientTunnel
{
internal class ContextInfo : IDisposable
{
private readonly Action _clear;
private readonly ExecutionContext? _executionContext;
public bool Disposed { get; private set; } = false;
public bool Transaction { get; set; } = false;
public AsyncDuplexStreamingCall<SqlRequest, SqlResponse> Stream { get; }
public SqlProxyClientResponseMonitor Monitor { get; }
public ContextInfo(
Func<AsyncDuplexStreamingCall<SqlRequest, SqlResponse>> getStream,
Action clear
)
{
Stream = getStream();
Monitor = new(Stream);
_clear = clear;
_executionContext = ExecutionContext.Capture();
}

public void Dispose()
{
Monitor.Dispose();
Stream.Dispose();
Disposed = true;
if (_executionContext is not null) ExecutionContext.Run(_executionContext, (_) => _clear(), null);
}
}
}
155 changes: 111 additions & 44 deletions src/Codibre.GrpcSqlProxy.Client/Impl/SqlProxyClientTunnel.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Collections.Concurrent;
using System.Text.Json;
using System.Threading.Channels;
using System.Transactions;
using Avro;
using Avro.Generic;
using Avro.IO;
Expand All @@ -12,38 +13,48 @@

namespace Codibre.GrpcSqlProxy.Client.Impl;

public sealed class SqlProxyClientTunnel : ISqlProxyClientTunnel
public sealed partial class SqlProxyClientTunnel : ISqlProxyClientTunnel
{
private readonly AsyncLocal<ContextInfo?> _context = new();
private ContextInfo Context
{
get
{
var result = _context.Value;
if (result is null || result.Disposed) result = _context.Value = new(_getStream, () => _context.Value = null);
return result;
}
}

private readonly SqlProxyClientOptions _clientOptions;
private readonly AsyncDuplexStreamingCall<SqlRequest, SqlResponse> _stream;
private readonly SqlProxyClientResponseMonitor _monitor;
private readonly string _connString;
private readonly Func<AsyncDuplexStreamingCall<SqlRequest, SqlResponse>> _getStream;

public ISqlProxyBatchQuery Batch { get; }

internal SqlProxyClientTunnel(
AsyncDuplexStreamingCall<SqlRequest, SqlResponse> stream,
Func<AsyncDuplexStreamingCall<SqlRequest, SqlResponse>> getStream,
SqlProxyClientOptions clientOptions
)
{
Batch = new SqlProxyBatchQuery(this);
_monitor = new(stream);
_stream = stream;
_getStream = getStream;
_clientOptions = clientOptions;
_connString = clientOptions.SqlConnectionString;
}

public IAsyncEnumerable<T> Query<T>(string sql, SqlProxyQueryOptions? options = null)
private (ContextInfo, IAsyncEnumerable<T>) QueryInternal<T>(string sql, SqlProxyQueryOptions? options)
where T : class, new()
{
if (!_monitor.Running) throw new InvalidOperationException("Tunnel closed");
var type = typeof(T);
var schema = type.GetCachedSchema();

var results = InternalRun(sql, [schema.Item2], options);
return ConvertResult<T>(type, schema, results);
var (context, results) = InternalRun(sql, [schema.Item2], options);
return (context, ConvertResult<T>(type, schema, results));
}

public IAsyncEnumerable<T> Query<T>(string sql, SqlProxyQueryOptions? options = null)
where T : class, new()
=> QueryInternal<T>(sql, options).Item2;

internal static async IAsyncEnumerable<T> ConvertResult<T>(Type type, (RecordSchema, string) schema, IAsyncEnumerable<SqlResponse> results) where T : class, new()
{
await foreach (var result in results)
Expand All @@ -65,74 +76,130 @@ public IAsyncEnumerable<T> Query<T>(string sql, SqlProxyQueryOptions? options =
}
}

public async ValueTask Execute(string sql, SqlProxyQueryOptions? options = null)
public ValueTask Execute(
string sql,
SqlProxyQueryOptions? options = null
) => InternalRun(sql, null, options).Item2.Complete();

private (ContextInfo, IAsyncEnumerable<SqlResponse>) InternalRun(
string sql,
string[]? schemas,
SqlProxyQueryOptions? options
)
{
await InternalRun(sql, null, options).LastAsync();
var context = Context;
return (context, InternalRun(sql, schemas, options, context));
}

private async IAsyncEnumerable<SqlResponse> InternalRun(string sql, string[]? schemas, SqlProxyQueryOptions? options)
private async IAsyncEnumerable<SqlResponse> InternalRun(
string sql,
string[]? schemas,
SqlProxyQueryOptions? options,
ContextInfo context
)
{
var id = GuidEx.NewBase64Guid();
var message = GetRequest(_clientOptions, sql, schemas, options, id);
await _stream.RequestStream.WriteAsync(message);
_monitor.Start();
var message = GetRequest(
_clientOptions,
sql,
schemas,
options,
id,
context
);
await context.Stream.RequestStream.WriteAsync(message);
context.Monitor.Start();
var channel = Channel.CreateUnbounded<SqlResponse>();
_monitor.AddHook(id, channel.Writer);
context.Monitor.AddHook(id, channel.Writer);
var reader = channel.Reader;

while (await reader.WaitToReadAsync(_monitor.CancellationToken))
while (await reader.WaitToReadAsync(context.Monitor.CancellationToken))
{
if (reader.TryRead(out var current))
{
if (!string.IsNullOrEmpty(current.Error)) throw new SqlProxyException(current.Error);
yield return current;
if (current.Last == LastEnum.Last) break;
}
if (!reader.TryRead(out var current)) continue;
if (!string.IsNullOrEmpty(current.Error)) throw new SqlProxyException(current.Error);
yield return current;
if (current.Last == LastEnum.Last) break;
}
_monitor.RemoveHook(id);
context.Monitor.RemoveHook(id);
ClearWhenNotInTransaction(context);
}

private static void ClearWhenNotInTransaction(ContextInfo context)
{
if (!context.Transaction) context.Dispose();
}

private SqlRequest GetRequest(SqlProxyClientOptions clientOptions, string sql, string[]? schemas, SqlProxyQueryOptions? options, string id)
private SqlRequest GetRequest(
SqlProxyClientOptions clientOptions,
string sql, string[]? schemas,
SqlProxyQueryOptions? options,
string id,
ContextInfo context
)
{
SqlRequest message = new()
{
Id = id,
ConnString = _monitor.Started ? "" : _connString,
ConnString = context.Monitor.Started ? "" : _clientOptions.SqlConnectionString,
Query = sql,
Schema = { },
Compress = options?.Compress ?? clientOptions.Compress,
PacketSize = options?.PacketSize ?? clientOptions.PacketSize,
Params = JsonSerializer.Serialize(options?.Params)
};
foreach (var schema in schemas ?? [])
{
message.Schema.Add(schema);
}
foreach (var schema in schemas ?? []) message.Schema.Add(schema);

return message;
}

public ValueTask<T?> QueryFirstOrDefault<T>(string sql, SqlProxyQueryOptions? options = null) where T : class, new()
=> Query<T>(sql, options).FirstOrDefaultAsync();
public async ValueTask<T?> QueryFirstOrDefault<T>(string sql, SqlProxyQueryOptions? options = null) where T : class, new()
{
var (contextInfo, results) = QueryInternal<T>(sql, options);
var result = await results.FirstOrDefaultAsync();
ClearWhenNotInTransaction(contextInfo);
return result;
}

public ValueTask<T> QueryFirst<T>(string sql, SqlProxyQueryOptions? options = null) where T : class, new()
=> Query<T>(sql, options).FirstAsync();
public async ValueTask<T> QueryFirst<T>(string sql, SqlProxyQueryOptions? options = null) where T : class, new()
{
var (contextInfo, results) = QueryInternal<T>(sql, options);
var result = await results.FirstAsync();
ClearWhenNotInTransaction(contextInfo);
return result;
}

public void Dispose()
{
_monitor.Dispose();
_stream.Dispose();
_context.Value?.Dispose();
_context.Value = null;
}

public ValueTask BeginTransaction() => Execute("BEGIN TRANSACTION");
public ValueTask BeginTransaction()
{
Context.Transaction = true;
return Execute("BEGIN TRANSACTION");
}

public ValueTask Commit() => Execute("COMMIT");
public ValueTask Commit()
{
Context.Transaction = false;
return Execute("COMMIT");
}

public ValueTask Rollback() => Execute("ROLLBACK");
public ValueTask Rollback()
{
Context.Transaction = false;
return Execute("ROLLBACK");
}

public Reader QueryMultipleAsync(string sql, string[] schemas, SqlProxyQueryOptions? options)
=> new(InternalRun(sql, schemas, options));
=> new(InternalRun(sql, schemas, options).Item2);

public void OnError(ErrorHandlerEvent handler)
=> _monitor.ErrorHandler += handler;
{
var monitor = _context.Value?.Monitor;
if (monitor is not null) monitor.ErrorHandler += handler;
}

public void Start() => _ = Context;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System.Collections.Concurrent;
using System.Text.Json;
using System.Threading.Channels;
using System.Transactions;
using Avro;
using Avro.Generic;
using Avro.IO;
using Codibre.GrpcSqlProxy.Api;
using Codibre.GrpcSqlProxy.Client.Impl.Utils;
using Codibre.GrpcSqlProxy.Common;
using Google.Protobuf.Collections;
using Grpc.Core;

namespace Codibre.GrpcSqlProxy.Client.Impl;

public static class SqlProxyClientTunnelExtension
{
public static ValueTask<T?> QueryFirstOrDefault<T>(
this ISqlProxyClientTunnel tunnel,
string sql,
SqlProxyQueryOptions? options = null
) where T : class, new()
=> tunnel.Query<T>(sql, options).FirstOrDefaultAsync();

public static ValueTask<T> QueryFirst<T>(
this ISqlProxyClientTunnel tunnel,
string sql,
SqlProxyQueryOptions? options = null
) where T : class, new()
=> tunnel.Query<T>(sql, options).FirstAsync();


internal static async ValueTask Complete<T>(this IAsyncEnumerable<T> stream)
{
await foreach (var _ in stream)
{
// Dummy
}
}
}
Loading
Loading