Skip to content

Commit

Permalink
feat: Creating async local controlled Channel
Browse files Browse the repository at this point in the history
  • Loading branch information
Farenheith committed Oct 21, 2024
1 parent 56c1f3d commit f19cdcb
Show file tree
Hide file tree
Showing 8 changed files with 385 additions and 47 deletions.
1 change: 1 addition & 0 deletions src/Codibre.GrpcSqlProxy.Client/ISqlProxyClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

public interface ISqlProxyClient
{
ISqlProxyClientTunnel Channel { get; }
ISqlProxyClientTunnel CreateChannel();
}
3 changes: 1 addition & 2 deletions src/Codibre.GrpcSqlProxy.Client/ISqlProxyClientTunnel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ namespace Codibre.GrpcSqlProxy.Client;
public interface ISqlProxyClientTunnel : IDisposable
{
ISqlProxyBatchQuery Batch { get; }
void Start();
ValueTask BeginTransaction();
ValueTask Commit();
ValueTask Rollback();
ValueTask Execute(string sql, SqlProxyQueryOptions? options = null);
IAsyncEnumerable<T> Query<T>(string sql, SqlProxyQueryOptions? options = null) where T : class, new();
ValueTask<T?> QueryFirstOrDefault<T>(string sql, SqlProxyQueryOptions? options = null) where T : class, new();
ValueTask<T> QueryFirst<T>(string sql, SqlProxyQueryOptions? options = null) where T : class, new();
Reader QueryMultipleAsync(string sql, string[] schemas, SqlProxyQueryOptions? options);
void OnError(ErrorHandlerEvent handler);
}
6 changes: 5 additions & 1 deletion src/Codibre.GrpcSqlProxy.Client/Impl/GrpcSqlProxyClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ namespace Codibre.GrpcSqlProxy.Client.Impl;

public class GrpcSqlProxyClient(SqlProxyClientOptions options) : ISqlProxyClient
{
private readonly AsyncLocal<ISqlProxyClientTunnel> _asyncLocal = new();
private readonly SqlProxy.SqlProxyClient _client = new(GrpcChannel.ForAddress(options.Url));
public ISqlProxyClientTunnel CreateChannel() => new SqlProxyClientTunnel(_client.Run(), options);

public ISqlProxyClientTunnel Channel => _asyncLocal.Value ??= CreateChannel();

public ISqlProxyClientTunnel CreateChannel() => new SqlProxyClientTunnel(() => _client.Run(), options);
}
1 change: 1 addition & 0 deletions src/Codibre.GrpcSqlProxy.Client/Impl/SqlProxyBatchQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public async Task<T> RunInTransaction<T>(Func<ISqlProxyBatchQuery, ValueTask<T>>
if (_transaction is not null) throw new InvalidOperationException("RunInTransaction Already called");
if (_builder.QueryCount > 0) throw new InvalidOperationException("Query buffer not empty");
_transaction = new(options);
_tunnel.Start();
try
{
var result = await query(this);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using Codibre.GrpcSqlProxy.Api;
using Grpc.Core;

namespace Codibre.GrpcSqlProxy.Client.Impl;

public sealed partial class SqlProxyClientTunnel
{
internal class ContextInfo : IDisposable
{
private readonly Action _clear;
private readonly ExecutionContext? _executionContext;
public bool Disposed { get; private set; } = false;
public bool Transaction { get; set; } = false;
public AsyncDuplexStreamingCall<SqlRequest, SqlResponse> Stream { get; }
public SqlProxyClientResponseMonitor Monitor { get; }
public ContextInfo(
Func<AsyncDuplexStreamingCall<SqlRequest, SqlResponse>> getStream,
Action clear
)
{
Stream = getStream();
Monitor = new(Stream);
_clear = clear;
_executionContext = ExecutionContext.Capture();
}

public void Dispose()
{
Monitor.Dispose();
Stream.Dispose();
Disposed = true;
if (_executionContext is not null) ExecutionContext.Run(_executionContext, (_) => _clear(), null);
}
}
}
155 changes: 111 additions & 44 deletions src/Codibre.GrpcSqlProxy.Client/Impl/SqlProxyClientTunnel.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Collections.Concurrent;
using System.Text.Json;
using System.Threading.Channels;
using System.Transactions;
using Avro;
using Avro.Generic;
using Avro.IO;
Expand All @@ -12,38 +13,48 @@

namespace Codibre.GrpcSqlProxy.Client.Impl;

public sealed class SqlProxyClientTunnel : ISqlProxyClientTunnel
public sealed partial class SqlProxyClientTunnel : ISqlProxyClientTunnel
{
private readonly AsyncLocal<ContextInfo?> _context = new();
private ContextInfo Context
{
get
{
var result = _context.Value;
if (result is null || result.Disposed) result = _context.Value = new(_getStream, () => _context.Value = null);
return result;
}
}

private readonly SqlProxyClientOptions _clientOptions;
private readonly AsyncDuplexStreamingCall<SqlRequest, SqlResponse> _stream;
private readonly SqlProxyClientResponseMonitor _monitor;
private readonly string _connString;
private readonly Func<AsyncDuplexStreamingCall<SqlRequest, SqlResponse>> _getStream;

public ISqlProxyBatchQuery Batch { get; }

internal SqlProxyClientTunnel(
AsyncDuplexStreamingCall<SqlRequest, SqlResponse> stream,
Func<AsyncDuplexStreamingCall<SqlRequest, SqlResponse>> getStream,
SqlProxyClientOptions clientOptions
)
{
Batch = new SqlProxyBatchQuery(this);
_monitor = new(stream);
_stream = stream;
_getStream = getStream;
_clientOptions = clientOptions;
_connString = clientOptions.SqlConnectionString;
}

public IAsyncEnumerable<T> Query<T>(string sql, SqlProxyQueryOptions? options = null)
private (ContextInfo, IAsyncEnumerable<T>) QueryInternal<T>(string sql, SqlProxyQueryOptions? options)
where T : class, new()
{
if (!_monitor.Running) throw new InvalidOperationException("Tunnel closed");
var type = typeof(T);
var schema = type.GetCachedSchema();

var results = InternalRun(sql, [schema.Item2], options);
return ConvertResult<T>(type, schema, results);
var (context, results) = InternalRun(sql, [schema.Item2], options);
return (context, ConvertResult<T>(type, schema, results));
}

public IAsyncEnumerable<T> Query<T>(string sql, SqlProxyQueryOptions? options = null)
where T : class, new()
=> QueryInternal<T>(sql, options).Item2;

internal static async IAsyncEnumerable<T> ConvertResult<T>(Type type, (RecordSchema, string) schema, IAsyncEnumerable<SqlResponse> results) where T : class, new()
{
await foreach (var result in results)
Expand All @@ -65,74 +76,130 @@ public IAsyncEnumerable<T> Query<T>(string sql, SqlProxyQueryOptions? options =
}
}

public async ValueTask Execute(string sql, SqlProxyQueryOptions? options = null)
public ValueTask Execute(
string sql,
SqlProxyQueryOptions? options = null
) => InternalRun(sql, null, options).Item2.Complete();

private (ContextInfo, IAsyncEnumerable<SqlResponse>) InternalRun(
string sql,
string[]? schemas,
SqlProxyQueryOptions? options
)
{
await InternalRun(sql, null, options).LastAsync();
var context = Context;
return (context, InternalRun(sql, schemas, options, context));
}

private async IAsyncEnumerable<SqlResponse> InternalRun(string sql, string[]? schemas, SqlProxyQueryOptions? options)
private async IAsyncEnumerable<SqlResponse> InternalRun(
string sql,
string[]? schemas,
SqlProxyQueryOptions? options,
ContextInfo context
)
{
var id = GuidEx.NewBase64Guid();
var message = GetRequest(_clientOptions, sql, schemas, options, id);
await _stream.RequestStream.WriteAsync(message);
_monitor.Start();
var message = GetRequest(
_clientOptions,
sql,
schemas,
options,
id,
context
);
await context.Stream.RequestStream.WriteAsync(message);
context.Monitor.Start();
var channel = Channel.CreateUnbounded<SqlResponse>();
_monitor.AddHook(id, channel.Writer);
context.Monitor.AddHook(id, channel.Writer);
var reader = channel.Reader;

while (await reader.WaitToReadAsync(_monitor.CancellationToken))
while (await reader.WaitToReadAsync(context.Monitor.CancellationToken))
{
if (reader.TryRead(out var current))
{
if (!string.IsNullOrEmpty(current.Error)) throw new SqlProxyException(current.Error);
yield return current;
if (current.Last == LastEnum.Last) break;
}
if (!reader.TryRead(out var current)) continue;
if (!string.IsNullOrEmpty(current.Error)) throw new SqlProxyException(current.Error);
yield return current;
if (current.Last == LastEnum.Last) break;
}
_monitor.RemoveHook(id);
context.Monitor.RemoveHook(id);
ClearWhenNotInTransaction(context);
}

private static void ClearWhenNotInTransaction(ContextInfo context)
{
if (!context.Transaction) context.Dispose();
}

private SqlRequest GetRequest(SqlProxyClientOptions clientOptions, string sql, string[]? schemas, SqlProxyQueryOptions? options, string id)
private SqlRequest GetRequest(
SqlProxyClientOptions clientOptions,
string sql, string[]? schemas,
SqlProxyQueryOptions? options,
string id,
ContextInfo context
)
{
SqlRequest message = new()
{
Id = id,
ConnString = _monitor.Started ? "" : _connString,
ConnString = context.Monitor.Started ? "" : _clientOptions.SqlConnectionString,
Query = sql,
Schema = { },
Compress = options?.Compress ?? clientOptions.Compress,
PacketSize = options?.PacketSize ?? clientOptions.PacketSize,
Params = JsonSerializer.Serialize(options?.Params)
};
foreach (var schema in schemas ?? [])
{
message.Schema.Add(schema);
}
foreach (var schema in schemas ?? []) message.Schema.Add(schema);

return message;
}

public ValueTask<T?> QueryFirstOrDefault<T>(string sql, SqlProxyQueryOptions? options = null) where T : class, new()
=> Query<T>(sql, options).FirstOrDefaultAsync();
public async ValueTask<T?> QueryFirstOrDefault<T>(string sql, SqlProxyQueryOptions? options = null) where T : class, new()
{
var (contextInfo, results) = QueryInternal<T>(sql, options);
var result = await results.FirstOrDefaultAsync();
ClearWhenNotInTransaction(contextInfo);
return result;
}

public ValueTask<T> QueryFirst<T>(string sql, SqlProxyQueryOptions? options = null) where T : class, new()
=> Query<T>(sql, options).FirstAsync();
public async ValueTask<T> QueryFirst<T>(string sql, SqlProxyQueryOptions? options = null) where T : class, new()
{
var (contextInfo, results) = QueryInternal<T>(sql, options);
var result = await results.FirstAsync();
ClearWhenNotInTransaction(contextInfo);
return result;
}

public void Dispose()
{
_monitor.Dispose();
_stream.Dispose();
_context.Value?.Dispose();
_context.Value = null;
}

public ValueTask BeginTransaction() => Execute("BEGIN TRANSACTION");
public ValueTask BeginTransaction()
{
Context.Transaction = true;
return Execute("BEGIN TRANSACTION");
}

public ValueTask Commit() => Execute("COMMIT");
public ValueTask Commit()
{
Context.Transaction = false;
return Execute("COMMIT");
}

public ValueTask Rollback() => Execute("ROLLBACK");
public ValueTask Rollback()
{
Context.Transaction = false;
return Execute("ROLLBACK");
}

public Reader QueryMultipleAsync(string sql, string[] schemas, SqlProxyQueryOptions? options)
=> new(InternalRun(sql, schemas, options));
=> new(InternalRun(sql, schemas, options).Item2);

public void OnError(ErrorHandlerEvent handler)
=> _monitor.ErrorHandler += handler;
{
var monitor = _context.Value?.Monitor;
if (monitor is not null) monitor.ErrorHandler += handler;
}

public void Start() => _ = Context;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System.Collections.Concurrent;
using System.Text.Json;
using System.Threading.Channels;
using System.Transactions;
using Avro;
using Avro.Generic;
using Avro.IO;
using Codibre.GrpcSqlProxy.Api;
using Codibre.GrpcSqlProxy.Client.Impl.Utils;
using Codibre.GrpcSqlProxy.Common;
using Google.Protobuf.Collections;
using Grpc.Core;

namespace Codibre.GrpcSqlProxy.Client.Impl;

public static class SqlProxyClientTunnelExtension
{
public static ValueTask<T?> QueryFirstOrDefault<T>(
this ISqlProxyClientTunnel tunnel,
string sql,
SqlProxyQueryOptions? options = null
) where T : class, new()
=> tunnel.Query<T>(sql, options).FirstOrDefaultAsync();

public static ValueTask<T> QueryFirst<T>(
this ISqlProxyClientTunnel tunnel,
string sql,
SqlProxyQueryOptions? options = null
) where T : class, new()
=> tunnel.Query<T>(sql, options).FirstAsync();


internal static async ValueTask Complete<T>(this IAsyncEnumerable<T> stream)
{
await foreach (var _ in stream)
{
// Dummy
}
}
}
Loading

0 comments on commit f19cdcb

Please sign in to comment.