Skip to content

Commit

Permalink
Add unit test for csharp sdk and fix some corresponding errors
Browse files Browse the repository at this point in the history
  • Loading branch information
tsaitsung-han.tht committed Jul 31, 2024
1 parent e655b6b commit aa690b0
Show file tree
Hide file tree
Showing 33 changed files with 3,259 additions and 27 deletions.
4 changes: 2 additions & 2 deletions csharp/examples/ProducerTransactionMessageExample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ internal static async Task QuickStart()
var sendReceipt = await producer.Send(message, transaction);
Logger.LogInformation("Send transaction message successfully, messageId={}", sendReceipt.MessageId);
// Commit the transaction.
transaction.Commit();
await transaction.Commit();
// Or rollback the transaction.
// transaction.Rollback();
// await transaction.Rollback();

// Close the producer if you don't need it anymore.
await producer.DisposeAsync();
Expand Down
17 changes: 13 additions & 4 deletions csharp/rocketmq-client-csharp/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
using System.Threading;
using System;
using System.Linq;
using System.Runtime.CompilerServices;
using Microsoft.Extensions.Logging;
using Proto = Apache.Rocketmq.V2;
using grpc = Grpc.Core;

[assembly:InternalsVisibleTo("tests")]
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
namespace Org.Apache.Rocketmq
{
public abstract class Client
Expand All @@ -49,7 +52,7 @@ public abstract class Client

protected readonly ClientConfig ClientConfig;
protected readonly Endpoints Endpoints;
protected readonly IClientManager ClientManager;
protected IClientManager ClientManager;
protected readonly string ClientId;
protected readonly ClientMeterManager ClientMeterManager;

Expand Down Expand Up @@ -151,11 +154,11 @@ private protected (bool, Session) GetSession(Endpoints endpoints)

protected abstract IEnumerable<string> GetTopics();

protected abstract Proto::HeartbeatRequest WrapHeartbeatRequest();
internal abstract Proto::HeartbeatRequest WrapHeartbeatRequest();

protected abstract void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData);

private async Task OnTopicRouteDataFetched(string topic, TopicRouteData topicRouteData)
internal async Task OnTopicRouteDataFetched(string topic, TopicRouteData topicRouteData)
{
var routeEndpoints = new HashSet<Endpoints>();
foreach (var mq in topicRouteData.MessageQueues)
Expand Down Expand Up @@ -398,7 +401,7 @@ internal grpc.Metadata Sign()
return metadata;
}

protected abstract Proto::NotifyClientTerminationRequest WrapNotifyClientTerminationRequest();
internal abstract Proto::NotifyClientTerminationRequest WrapNotifyClientTerminationRequest();

private async void NotifyClientTermination()
{
Expand Down Expand Up @@ -438,6 +441,12 @@ internal IClientManager GetClientManager()
return ClientManager;
}

// Only for testing
internal void SetClientManager(IClientManager clientManager)
{
ClientManager = clientManager;
}

internal virtual void OnRecoverOrphanedTransactionCommand(Endpoints endpoints,
Proto.RecoverOrphanedTransactionCommand command)
{
Expand Down
5 changes: 4 additions & 1 deletion csharp/rocketmq-client-csharp/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using Proto = Apache.Rocketmq.V2;

[assembly:InternalsVisibleTo("tests")]
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
namespace Org.Apache.Rocketmq
{
public abstract class Consumer : Client
Expand Down Expand Up @@ -85,7 +88,7 @@ private static Proto.FilterExpression WrapFilterExpression(FilterExpression filt
};
}

protected Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, MessageQueue mq,
internal Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, MessageQueue mq,
FilterExpression filterExpression, TimeSpan awaitDuration, TimeSpan invisibleDuration)
{
var group = new Proto.Resource
Expand Down
6 changes: 4 additions & 2 deletions csharp/rocketmq-client-csharp/ITransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
* limitations under the License.
*/

using System.Threading.Tasks;

namespace Org.Apache.Rocketmq
{
public interface ITransaction
{
void Commit();
Task Commit();

void Rollback();
Task Rollback();
}
}
6 changes: 3 additions & 3 deletions csharp/rocketmq-client-csharp/ProcessQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ public class ProcessQueue
{
private static readonly ILogger Logger = MqLogManager.CreateLogger<ProcessQueue>();

private static readonly TimeSpan AckMessageFailureBackoffDelay = TimeSpan.FromSeconds(1);
private static readonly TimeSpan ChangeInvisibleDurationFailureBackoffDelay = TimeSpan.FromSeconds(1);
private static readonly TimeSpan ForwardMessageToDeadLetterQueueFailureBackoffDelay = TimeSpan.FromSeconds(1);
internal static readonly TimeSpan AckMessageFailureBackoffDelay = TimeSpan.FromSeconds(1);
internal static readonly TimeSpan ChangeInvisibleDurationFailureBackoffDelay = TimeSpan.FromSeconds(1);
internal static readonly TimeSpan ForwardMessageToDeadLetterQueueFailureBackoffDelay = TimeSpan.FromSeconds(1);

private static readonly TimeSpan ReceivingFlowControlBackoffDelay = TimeSpan.FromMilliseconds(20);
private static readonly TimeSpan ReceivingFailureBackoffDelay = TimeSpan.FromSeconds(1);
Expand Down
11 changes: 7 additions & 4 deletions csharp/rocketmq-client-csharp/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,27 @@
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Proto = Apache.Rocketmq.V2;
using Org.Apache.Rocketmq.Error;

[assembly:InternalsVisibleTo("tests")]
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
namespace Org.Apache.Rocketmq
{
public class Producer : Client, IAsyncDisposable, IDisposable
{
private static readonly ILogger Logger = MqLogManager.CreateLogger<Producer>();
private readonly ConcurrentDictionary<string /* topic */, PublishingLoadBalancer> _publishingRouteDataCache;
internal readonly ConcurrentDictionary<string /* topic */, PublishingLoadBalancer> _publishingRouteDataCache;
internal readonly PublishingSettings PublishingSettings;
private readonly ConcurrentDictionary<string, bool> _publishingTopics;
private readonly ITransactionChecker _checker;

private readonly Histogram<double> _sendCostTimeHistogram;

private Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> publishingTopics,
internal Producer(ClientConfig clientConfig, ConcurrentDictionary<string, bool> publishingTopics,
int maxAttempts, ITransactionChecker checker) : base(clientConfig)
{
var retryPolicy = ExponentialBackoffRetryPolicy.ImmediatelyRetryPolicy(maxAttempts);
Expand Down Expand Up @@ -102,15 +105,15 @@ protected override async Task Shutdown()
}
}

protected override Proto::HeartbeatRequest WrapHeartbeatRequest()
internal override Proto::HeartbeatRequest WrapHeartbeatRequest()
{
return new Proto::HeartbeatRequest
{
ClientType = Proto.ClientType.Producer
};
}

protected override Proto::NotifyClientTerminationRequest WrapNotifyClientTerminationRequest()
internal override Proto::NotifyClientTerminationRequest WrapNotifyClientTerminationRequest()
{
return new Proto::NotifyClientTerminationRequest();
}
Expand Down
11 changes: 7 additions & 4 deletions csharp/rocketmq-client-csharp/PushConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Schedulers;
Expand All @@ -27,6 +28,8 @@
using Proto = Apache.Rocketmq.V2;
using Microsoft.Extensions.Logging;

[assembly:InternalsVisibleTo("tests")]
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
namespace Org.Apache.Rocketmq
{
public class PushConsumer : Consumer, IAsyncDisposable, IDisposable
Expand Down Expand Up @@ -195,7 +198,7 @@ public void Unsubscribe(string topic)
_subscriptionExpressions.TryRemove(topic, out _);
}

private async void ScanAssignments()
internal async void ScanAssignments()
{
try
{
Expand Down Expand Up @@ -296,7 +299,7 @@ private async Task SyncProcessQueue(string topic, Assignments assignments,
}
}

private async Task<Assignments> QueryAssignment(string topic)
internal async Task<Assignments> QueryAssignment(string topic)
{
var endpoints = await PickEndpointsToQueryAssignments(topic);
var request = WrapQueryAssignmentRequest(topic);
Expand Down Expand Up @@ -386,7 +389,7 @@ protected override IEnumerable<string> GetTopics()
return _subscriptionExpressions.Keys;
}

protected override Proto.HeartbeatRequest WrapHeartbeatRequest()
internal override Proto.HeartbeatRequest WrapHeartbeatRequest()
{
return new Proto::HeartbeatRequest
{
Expand Down Expand Up @@ -490,7 +493,7 @@ internal override async void OnVerifyMessageCommand(Endpoints endpoints, VerifyM
}
}

protected override NotifyClientTerminationRequest WrapNotifyClientTerminationRequest()
internal override NotifyClientTerminationRequest WrapNotifyClientTerminationRequest()
{
return new NotifyClientTerminationRequest()
{
Expand Down
6 changes: 3 additions & 3 deletions csharp/rocketmq-client-csharp/SimpleConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,15 @@ protected override IEnumerable<string> GetTopics()
return _subscriptionExpressions.Keys;
}

protected override Proto.NotifyClientTerminationRequest WrapNotifyClientTerminationRequest()
internal override Proto.NotifyClientTerminationRequest WrapNotifyClientTerminationRequest()
{
return new Proto.NotifyClientTerminationRequest()
{
Group = GetProtobufGroup()
};
}

protected override Proto.HeartbeatRequest WrapHeartbeatRequest()
internal override Proto.HeartbeatRequest WrapHeartbeatRequest()
{
return new Proto::HeartbeatRequest
{
Expand Down Expand Up @@ -212,7 +212,7 @@ public async Task<List<MessageView>> Receive(int maxMessageNum, TimeSpan invisib
return receiveMessageResult.Messages;
}

public async void ChangeInvisibleDuration(MessageView messageView, TimeSpan invisibleDuration)
public async Task ChangeInvisibleDuration(MessageView messageView, TimeSpan invisibleDuration)
{
if (State.Running != State)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public override Proto.Settings ToProtobuf()

filterExpression.Expression = value.Expression;
subscriptionEntry.Topic = topic;
subscriptionEntry.Expression = filterExpression;
subscriptionEntries.Add(subscriptionEntry);
}

Expand Down
9 changes: 5 additions & 4 deletions csharp/rocketmq-client-csharp/Transaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Org.Apache.Rocketmq
{
Expand All @@ -44,7 +45,7 @@ public PublishingMessage TryAddMessage(Message message)
_messagesLock.EnterReadLock();
try
{
if (_messages.Count > MaxMessageNum)
if (_messages.Count >= MaxMessageNum)
{
throw new ArgumentException($"Message in transaction has exceed the threshold: {MaxMessageNum}");
}
Expand All @@ -57,7 +58,7 @@ public PublishingMessage TryAddMessage(Message message)
_messagesLock.EnterWriteLock();
try
{
if (_messages.Count > MaxMessageNum)
if (_messages.Count >= MaxMessageNum)
{
throw new ArgumentException($"Message in transaction has exceed the threshold: {MaxMessageNum}");
}
Expand Down Expand Up @@ -90,7 +91,7 @@ public void TryAddReceipt(PublishingMessage publishingMessage, SendReceipt sendR
}
}

public async void Commit()
public async Task Commit()
{
if (State.Running != _producer.State)
{
Expand All @@ -109,7 +110,7 @@ await _producer.EndTransaction(sendReceipt.Endpoints, publishingMessage.Topic, s
}
}

public async void Rollback()
public async Task Rollback()
{
if (State.Running != _producer.State)
{
Expand Down
Loading

0 comments on commit aa690b0

Please sign in to comment.