Skip to content

Commit

Permalink
Refactor push consumer implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
tsaitsung-han.tht committed Aug 13, 2024
1 parent 62ef7fd commit 73a8716
Show file tree
Hide file tree
Showing 22 changed files with 489 additions and 350 deletions.
12 changes: 7 additions & 5 deletions csharp/examples/ProducerBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public static class ProducerBenchmark
private const int TpsLimit = 1024;
private static long _successCounter;
private static long _failureCounter;

private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");

private static readonly BlockingCollection<Task<ISendReceipt>> Tasks =
new BlockingCollection<Task<ISendReceipt>>();
Expand Down Expand Up @@ -79,14 +83,11 @@ internal static async Task QuickStart()
{
// Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
// AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
const string accessKey = "yourAccessKey";
const string secretKey = "yourSecretKey";

// Credential provider is optional for client configuration.
var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
const string endpoints = "foobar.com:8080";
var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
var clientConfig = new ClientConfig.Builder()
.SetEndpoints(endpoints)
.SetEndpoints(Endpoint)
.SetCredentialsProvider(credentialsProvider)
.Build();

Expand All @@ -108,6 +109,7 @@ internal static async Task QuickStart()
.SetTag(tag)
// You could set multiple keys for the single message actually.
.SetKeys("yourMessageKey-7044358f98fc")
.SetMessageGroup("fifo-group")
.Build();

DoStats();
Expand Down
11 changes: 6 additions & 5 deletions csharp/examples/ProducerDelayMessageExample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,20 @@ namespace examples
internal static class ProducerDelayMessageExample
{
private static readonly ILogger Logger = MqLogManager.CreateLogger(typeof(ProducerDelayMessageExample).FullName);

private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");

internal static async Task QuickStart()
{
// Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
// AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
const string accessKey = "yourAccessKey";
const string secretKey = "yourSecretKey";

// Credential provider is optional for client configuration.
var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
const string endpoints = "foobar.com:8080";
var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
var clientConfig = new ClientConfig.Builder()
.SetEndpoints(endpoints)
.SetEndpoints(Endpoint)
.SetCredentialsProvider(credentialsProvider)
.Build();

Expand Down
12 changes: 7 additions & 5 deletions csharp/examples/ProducerFifoMessageExample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand All @@ -25,19 +26,20 @@ namespace examples
internal static class ProducerFifoMessageExample
{
private static readonly ILogger Logger = MqLogManager.CreateLogger(typeof(ProducerFifoMessageExample).FullName);

private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");

internal static async Task QuickStart()
{
// Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
// AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
const string accessKey = "yourAccessKey";
const string secretKey = "yourSecretKey";

// Credential provider is optional for client configuration.
var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
const string endpoints = "foobar.com:8080";
var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
var clientConfig = new ClientConfig.Builder()
.SetEndpoints(endpoints)
.SetEndpoints(Endpoint)
.SetCredentialsProvider(credentialsProvider)
.Build();

Expand Down
12 changes: 7 additions & 5 deletions csharp/examples/ProducerNormalMessageExample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand All @@ -25,19 +26,20 @@ namespace examples
internal static class ProducerNormalMessageExample
{
private static readonly ILogger Logger = MqLogManager.CreateLogger(typeof(ProducerNormalMessageExample).FullName);

private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");

internal static async Task QuickStart()
{
// Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
// AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
const string accessKey = "yourAccessKey";
const string secretKey = "yourSecretKey";

// Credential provider is optional for client configuration.
var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
const string endpoints = "foobar.com:8080";
var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
var clientConfig = new ClientConfig.Builder()
.SetEndpoints(endpoints)
.SetEndpoints(Endpoint)
.SetCredentialsProvider(credentialsProvider)
.Build();

Expand Down
12 changes: 7 additions & 5 deletions csharp/examples/ProducerTransactionMessageExample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand All @@ -25,6 +26,10 @@ namespace examples
internal static class ProducerTransactionMessageExample
{
private static readonly ILogger Logger = MqLogManager.CreateLogger(typeof(ProducerTransactionMessageExample).FullName);

private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");

private class TransactionChecker : ITransactionChecker
{
Expand All @@ -39,14 +44,11 @@ internal static async Task QuickStart()
{
// Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
// AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
const string accessKey = "yourAccessKey";
const string secretKey = "yourSecretKey";

// Credential provider is optional for client configuration.
var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
const string endpoints = "foobar.com:8080";
var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
var clientConfig = new ClientConfig.Builder()
.SetEndpoints(endpoints)
.SetEndpoints(Endpoint)
.SetCredentialsProvider(credentialsProvider)
.Build();

Expand Down
4 changes: 2 additions & 2 deletions csharp/examples/examples.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
<ProjectReference Include="..\rocketmq-client-csharp\rocketmq-client-csharp.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="3.1.32" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="6.0.0" />
</ItemGroup>

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFrameworks>net5.0;netcoreapp3.1</TargetFrameworks>
<TargetFrameworks>net8.0;net5.0;netcoreapp3.1</TargetFrameworks>
<ServerGarbageCollection>true</ServerGarbageCollection>
</PropertyGroup>
</Project>
2 changes: 1 addition & 1 deletion csharp/rocketmq-client-csharp/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ internal async void OnPrintThreadStackTraceCommand(Endpoints endpoints,

internal void OnSettingsCommand(Endpoints endpoints, Proto.Settings settings)
{
var metric = new Metric(settings.Metric);
var metric = new Metric(settings.Metric ?? new Proto.Metric());
ClientMeterManager.Reset(metric);
GetSettings().Sync(settings);
}
Expand Down
1 change: 1 addition & 0 deletions csharp/rocketmq-client-csharp/ClientManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public async Task Shutdown()
{
var metadata = _client.Sign();
var response = await GetRpcClient(endpoints).ReceiveMessage(metadata, request, timeout);

return new RpcInvocation<Proto.ReceiveMessageRequest, List<Proto.ReceiveMessageResponse>>(
request, response, metadata);
}
Expand Down
21 changes: 11 additions & 10 deletions csharp/rocketmq-client-csharp/ConsumeService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,30 +41,31 @@ public ConsumeService(string clientId, IMessageListener messageListener, TaskSch
_consumptionCtsToken = consumptionCtsToken;
}

public abstract Task Consume(ProcessQueue pq, List<MessageView> messageViews);
public abstract void Consume(ProcessQueue pq, List<MessageView> messageViews);

public async Task<ConsumeResult> Consume(MessageView messageView)
public Task<ConsumeResult> Consume(MessageView messageView)
{
return await Consume(messageView, TimeSpan.Zero);
return Consume(messageView, TimeSpan.Zero);
}

public async Task<ConsumeResult> Consume(MessageView messageView, TimeSpan delay)
public Task<ConsumeResult> Consume(MessageView messageView, TimeSpan delay)
{
var task = new ConsumeTask(ClientId, _messageListener, messageView);
var delayMilliseconds = (int) delay.TotalMilliseconds;

if (delayMilliseconds <= 0)
{
return await Task.Factory.StartNew(() => task.Call(), _consumptionCtsToken, TaskCreationOptions.None,
return Task.Factory.StartNew(() => task.Call(), _consumptionCtsToken, TaskCreationOptions.None,
_consumptionTaskScheduler);
}

var tcs = new TaskCompletionSource<ConsumeResult>();

await Task.Delay(delay, _consumptionCtsToken).ContinueWith(async _ =>
Task.Run(async () =>
{
try
{
await Task.Delay(delay, _consumptionCtsToken);
var result = await Task.Factory.StartNew(() => task.Call(), _consumptionCtsToken,
TaskCreationOptions.None, _consumptionTaskScheduler);
tcs.SetResult(result);
Expand All @@ -74,9 +75,9 @@ await Task.Delay(delay, _consumptionCtsToken).ContinueWith(async _ =>
Logger.LogError(e, $"Error while consuming message, clientId={ClientId}");
tcs.SetException(e);
}
}, TaskScheduler.Default);

return await tcs.Task;
}, _consumptionCtsToken);
return tcs.Task;
}
}
}
39 changes: 25 additions & 14 deletions csharp/rocketmq-client-csharp/FifoConsumeService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,37 @@ public FifoConsumeService(string clientId, IMessageListener messageListener,
{
}

public override async Task Consume(ProcessQueue pq, List<MessageView> messageViews)
public override void Consume(ProcessQueue pq, List<MessageView> messageViews)
{
await ConsumeIteratively(pq, messageViews.GetEnumerator());
ConsumeIteratively(pq, messageViews.GetEnumerator());
}

public async Task ConsumeIteratively(ProcessQueue pq, IEnumerator<MessageView> iterator)
public void ConsumeIteratively(ProcessQueue pq, IEnumerator<MessageView> iterator)
{
while (iterator.MoveNext())
if (!iterator.MoveNext())
{
var messageView = iterator.Current;
if (messageView != null && messageView.IsCorrupted())
{
Logger.LogError($"Message is corrupted for FIFO consumption, prepare to discard it," +
$" mq={pq.GetMessageQueue()}, messageId={messageView.MessageId}, clientId={ClientId}");
await pq.DiscardFifoMessage(messageView);
continue;
}
var consumeResult = await Consume(messageView);
await pq.EraseFifoMessage(messageView, consumeResult);
return;
}

var messageView = iterator.Current;

if (messageView != null && messageView.IsCorrupted())
{
// Discard corrupted message.
Logger.LogError($"Message is corrupted for FIFO consumption, prepare to discard it," +
$" mq={pq.GetMessageQueue()}, messageId={messageView.MessageId}, clientId={ClientId}");
pq.DiscardFifoMessage(messageView);
ConsumeIteratively(pq, iterator); // Recursively consume the next message
return;
}

var consumeTask = Consume(messageView);
consumeTask.ContinueWith(async t =>
{
var result = await t;
await pq.EraseFifoMessage(messageView, result);
}, TaskContinuationOptions.ExecuteSynchronously).ContinueWith(_ => ConsumeIteratively(pq, iterator),
TaskContinuationOptions.ExecuteSynchronously);
}
}
}
Loading

0 comments on commit 73a8716

Please sign in to comment.