Skip to content

Commit

Permalink
Add push consumer for normal message in c# sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
tsaitsung-han.tht committed Jul 3, 2024
1 parent cc15eaf commit 79f8dd4
Show file tree
Hide file tree
Showing 19 changed files with 1,881 additions and 4 deletions.
75 changes: 75 additions & 0 deletions csharp/examples/PushConsumerExample.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Org.Apache.Rocketmq;

namespace examples
{
public class PushConsumerExample
{
private static readonly ILogger Logger = MqLogManager.CreateLogger(typeof(PushConsumerExample).FullName);

private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY");
private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY");
private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");

internal static async Task QuickStart()
{
// Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
// AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);

// Credential provider is optional for client configuration.
var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey);
var clientConfig = new ClientConfig.Builder()
.SetEndpoints(Endpoint)
.SetCredentialsProvider(credentialsProvider)
.Build();

// Add your subscriptions.
const string consumerGroup = "yourConsumerGroup";
const string topic = "yourTopic";
var subscription = new Dictionary<string, FilterExpression>
{ { topic, new FilterExpression("*") } };

var pushConsumer = await new PushConsumer.Builder()
.SetClientConfig(clientConfig)
.SetConsumerGroup(consumerGroup)
.SetSubscriptionExpression(subscription)
.SetMessageListener(new CustomMessageListener())
.Build();

Thread.Sleep(Timeout.Infinite);

// Close the push consumer if you don't need it anymore.
// await pushConsumer.DisposeAsync();
}

private class CustomMessageListener : IMessageListener
{
public ConsumeResult Consume(MessageView messageView)
{
// Handle the received message and return consume result.
Logger.LogInformation($"Consume message={messageView}");
return ConsumeResult.SUCCESS;
}
}
}
}
1 change: 1 addition & 0 deletions csharp/examples/QuickStart.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public static void Main()
// ProducerFifoMessageExample.QuickStart().Wait();
// ProducerDelayMessageExample.QuickStart().Wait();
// ProducerTransactionMessageExample.QuickStart().Wait();
// PushConsumerExample.QuickStart().Wait();
// SimpleConsumerExample.QuickStart().Wait();
// ProducerBenchmark.QuickStart().Wait();
}
Expand Down
51 changes: 51 additions & 0 deletions csharp/rocketmq-client-csharp/Assignment.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Collections.Generic;

namespace Org.Apache.Rocketmq
{
public class Assignment
{
public Assignment(MessageQueue messageQueue)
{
MessageQueue = messageQueue ?? throw new ArgumentNullException(nameof(messageQueue));
}

public MessageQueue MessageQueue { get; }

public override bool Equals(object obj)
{
if (this == obj) return true;
if (obj == null || GetType() != obj.GetType()) return false;

var other = (Assignment)obj;
return EqualityComparer<MessageQueue>.Default.Equals(MessageQueue, other.MessageQueue);
}

public override int GetHashCode()
{
return EqualityComparer<MessageQueue>.Default.GetHashCode(MessageQueue);
}

public override string ToString()
{
return $"Assignment{{messageQueue={MessageQueue}}}";
}
}
}
65 changes: 65 additions & 0 deletions csharp/rocketmq-client-csharp/Assignments.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Collections.Generic;
using System.Linq;
using Apache.Rocketmq.V2;

namespace Org.Apache.Rocketmq
{
public class Assignments
{
private readonly List<Assignment> _assignmentList;

public Assignments(List<Assignment> assignmentList)
{
_assignmentList = assignmentList;
}

public override bool Equals(object obj)
{
if (this == obj)
{
return true;
}

if (obj == null || GetType() != obj.GetType())
{
return false;
}

var other = (Assignments)obj;
return _assignmentList.SequenceEqual(other._assignmentList);
}

public override int GetHashCode()
{
return HashCode.Combine(_assignmentList);
}

public override string ToString()
{
return $"{nameof(Assignments)} {{ {nameof(_assignmentList)} = {_assignmentList} }}";
}

public List<Assignment> GetAssignmentList()
{
return _assignmentList;
}
}
}
11 changes: 8 additions & 3 deletions csharp/rocketmq-client-csharp/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ protected virtual async Task Shutdown()
Logger.LogDebug($"Shutdown the rocketmq client successfully, clientId={ClientId}");
}

private (bool, Session) GetSession(Endpoints endpoints)
private protected (bool, Session) GetSession(Endpoints endpoints)
{
_sessionLock.EnterReadLock();
try
Expand Down Expand Up @@ -261,7 +261,7 @@ private void Stats()
$"AvailableCompletionPortThreads={availableIo}");
}

private void ScheduleWithFixedDelay(Action action, TimeSpan delay, TimeSpan period, CancellationToken token)
private protected void ScheduleWithFixedDelay(Action action, TimeSpan delay, TimeSpan period, CancellationToken token)
{
Task.Run(async () =>
{
Expand Down Expand Up @@ -432,14 +432,19 @@ internal ClientConfig GetClientConfig()
return ClientConfig;
}

internal IClientManager GetClientManager()
{
return ClientManager;
}

internal virtual void OnRecoverOrphanedTransactionCommand(Endpoints endpoints,
Proto.RecoverOrphanedTransactionCommand command)
{
Logger.LogWarning($"Ignore orphaned transaction recovery command from remote, which is not expected, " +
$"clientId={ClientId}, endpoints={endpoints}");
}

internal async void OnVerifyMessageCommand(Endpoints endpoints, Proto.VerifyMessageCommand command)
internal virtual async void OnVerifyMessageCommand(Endpoints endpoints, Proto.VerifyMessageCommand command)
{
// Only push consumer support message consumption verification.
Logger.LogWarning($"Ignore verify message command from remote, which is not expected, clientId={ClientId}, " +
Expand Down
31 changes: 31 additions & 0 deletions csharp/rocketmq-client-csharp/ConsumeResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace Org.Apache.Rocketmq
{
public enum ConsumeResult
{
/// <summary>
/// Consume message successfully.
/// </summary>
SUCCESS,
/// <summary>
/// Failed to consume message.
/// </summary>
FAILURE
}
}
83 changes: 83 additions & 0 deletions csharp/rocketmq-client-csharp/ConsumeService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace Org.Apache.Rocketmq
{
public abstract class ConsumeService
{
private static readonly ILogger Logger = MqLogManager.CreateLogger<ConsumeService>();

protected readonly string ClientId;
private readonly IMessageListener _messageListener;
private readonly TaskScheduler _consumptionExecutor;
private readonly CancellationToken _consumptionCtsToken;

public ConsumeService(string clientId, IMessageListener messageListener, TaskScheduler consumptionExecutor,
CancellationToken consumptionCtsToken)
{
ClientId = clientId;
_messageListener = messageListener;
_consumptionExecutor = consumptionExecutor;
_consumptionCtsToken = consumptionCtsToken;
}

public abstract Task Consume(ProcessQueue pq, List<MessageView> messageViews);

public Task<ConsumeResult> Consume(MessageView messageView)
{
return Consume(messageView, TimeSpan.Zero);
}

public Task<ConsumeResult> Consume(MessageView messageView, TimeSpan delay)
{
var task = new ConsumeTask(ClientId, _messageListener, messageView);
var delayMilliseconds = (int) delay.TotalMilliseconds;

if (delayMilliseconds <= 0)
{
return Task.Factory.StartNew(() => task.Call(), _consumptionCtsToken, TaskCreationOptions.None, _consumptionExecutor);
}

var tcs = new TaskCompletionSource<ConsumeResult>();

Task.Delay(delay, _consumptionCtsToken).ContinueWith(_ =>
{
Task.Factory.StartNew(() =>
{
try
{
var result = task.Call();
tcs.SetResult(result);
}
catch (Exception e)
{
Logger.LogError(e, $"Error while consuming message, clientId={ClientId}");
tcs.SetException(e);
}
}, _consumptionCtsToken, TaskCreationOptions.None, _consumptionExecutor);
}, TaskScheduler.Default);

return tcs.Task;
}
}
}
Loading

0 comments on commit 79f8dd4

Please sign in to comment.