From 98585a31240ede820e842605bb6cbc001de515cf Mon Sep 17 00:00:00 2001 From: "tsaitsung-han.tht" Date: Tue, 25 Jun 2024 15:16:50 +0800 Subject: [PATCH 1/2] Add push consumer for normal message in c# sdk --- csharp/examples/PushConsumerExample.cs | 75 +++ csharp/examples/QuickStart.cs | 1 + csharp/rocketmq-client-csharp/Assignment.cs | 51 ++ csharp/rocketmq-client-csharp/Assignments.cs | 65 ++ csharp/rocketmq-client-csharp/Client.cs | 11 +- .../rocketmq-client-csharp/ConsumeResult.cs | 34 + .../rocketmq-client-csharp/ConsumeService.cs | 82 +++ csharp/rocketmq-client-csharp/ConsumeTask.cs | 57 ++ csharp/rocketmq-client-csharp/Consumer.cs | 20 +- .../CustomizedBackoffRetryPolicy.cs | 101 +++ .../ExponentialBackoffRetryPolicy.cs | 25 +- .../IMessageListener.cs | 32 + csharp/rocketmq-client-csharp/MessageView.cs | 5 + csharp/rocketmq-client-csharp/ProcessQueue.cs | 520 +++++++++++++++ csharp/rocketmq-client-csharp/PushConsumer.cs | 623 ++++++++++++++++++ .../PushSubscriptionSettings.cs | 137 ++++ .../StandardConsumeService.cs | 61 ++ .../rocketmq-client-csharp/TopicRouteData.cs | 34 + .../rocketmq-client-csharp.csproj | 1 + 19 files changed, 1927 insertions(+), 8 deletions(-) create mode 100644 csharp/examples/PushConsumerExample.cs create mode 100644 csharp/rocketmq-client-csharp/Assignment.cs create mode 100644 csharp/rocketmq-client-csharp/Assignments.cs create mode 100644 csharp/rocketmq-client-csharp/ConsumeResult.cs create mode 100644 csharp/rocketmq-client-csharp/ConsumeService.cs create mode 100644 csharp/rocketmq-client-csharp/ConsumeTask.cs create mode 100644 csharp/rocketmq-client-csharp/CustomizedBackoffRetryPolicy.cs create mode 100644 csharp/rocketmq-client-csharp/IMessageListener.cs create mode 100644 csharp/rocketmq-client-csharp/ProcessQueue.cs create mode 100644 csharp/rocketmq-client-csharp/PushConsumer.cs create mode 100644 csharp/rocketmq-client-csharp/PushSubscriptionSettings.cs create mode 100644 csharp/rocketmq-client-csharp/StandardConsumeService.cs diff --git a/csharp/examples/PushConsumerExample.cs b/csharp/examples/PushConsumerExample.cs new file mode 100644 index 000000000..66291e284 --- /dev/null +++ b/csharp/examples/PushConsumerExample.cs @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Org.Apache.Rocketmq; + +namespace examples +{ + public class PushConsumerExample + { + private static readonly ILogger Logger = MqLogManager.CreateLogger(typeof(PushConsumerExample).FullName); + + private static readonly string AccessKey = Environment.GetEnvironmentVariable("ROCKETMQ_ACCESS_KEY"); + private static readonly string SecretKey = Environment.GetEnvironmentVariable("ROCKETMQ_SECRET_KEY"); + private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT"); + + internal static async Task QuickStart() + { + // Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL. + // AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); + + // Credential provider is optional for client configuration. + var credentialsProvider = new StaticSessionCredentialsProvider(AccessKey, SecretKey); + var clientConfig = new ClientConfig.Builder() + .SetEndpoints(Endpoint) + .SetCredentialsProvider(credentialsProvider) + .Build(); + + // Add your subscriptions. + const string consumerGroup = "yourConsumerGroup"; + const string topic = "yourTopic"; + var subscription = new Dictionary + { { topic, new FilterExpression("*") } }; + + var pushConsumer = await new PushConsumer.Builder() + .SetClientConfig(clientConfig) + .SetConsumerGroup(consumerGroup) + .SetSubscriptionExpression(subscription) + .SetMessageListener(new CustomMessageListener()) + .Build(); + + Thread.Sleep(Timeout.Infinite); + + // Close the push consumer if you don't need it anymore. + // await pushConsumer.DisposeAsync(); + } + + private class CustomMessageListener : IMessageListener + { + public ConsumeResult Consume(MessageView messageView) + { + // Handle the received message and return consume result. + Logger.LogInformation($"Consume message={messageView}"); + return ConsumeResult.SUCCESS; + } + } + } +} \ No newline at end of file diff --git a/csharp/examples/QuickStart.cs b/csharp/examples/QuickStart.cs index 63d57e85e..ec5992d23 100644 --- a/csharp/examples/QuickStart.cs +++ b/csharp/examples/QuickStart.cs @@ -34,6 +34,7 @@ public static void Main() // ProducerFifoMessageExample.QuickStart().Wait(); // ProducerDelayMessageExample.QuickStart().Wait(); // ProducerTransactionMessageExample.QuickStart().Wait(); + // PushConsumerExample.QuickStart().Wait(); // SimpleConsumerExample.QuickStart().Wait(); // ProducerBenchmark.QuickStart().Wait(); } diff --git a/csharp/rocketmq-client-csharp/Assignment.cs b/csharp/rocketmq-client-csharp/Assignment.cs new file mode 100644 index 000000000..100567614 --- /dev/null +++ b/csharp/rocketmq-client-csharp/Assignment.cs @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; + +namespace Org.Apache.Rocketmq +{ + public class Assignment + { + public Assignment(MessageQueue messageQueue) + { + MessageQueue = messageQueue ?? throw new ArgumentNullException(nameof(messageQueue)); + } + + public MessageQueue MessageQueue { get; } + + public override bool Equals(object obj) + { + if (this == obj) return true; + if (obj == null || GetType() != obj.GetType()) return false; + + var other = (Assignment)obj; + return EqualityComparer.Default.Equals(MessageQueue, other.MessageQueue); + } + + public override int GetHashCode() + { + return EqualityComparer.Default.GetHashCode(MessageQueue); + } + + public override string ToString() + { + return $"Assignment{{messageQueue={MessageQueue}}}"; + } + } +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Assignments.cs b/csharp/rocketmq-client-csharp/Assignments.cs new file mode 100644 index 000000000..a25f5ea98 --- /dev/null +++ b/csharp/rocketmq-client-csharp/Assignments.cs @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using Apache.Rocketmq.V2; + +namespace Org.Apache.Rocketmq +{ + public class Assignments + { + private readonly List _assignmentList; + + public Assignments(List assignmentList) + { + _assignmentList = assignmentList; + } + + public override bool Equals(object obj) + { + if (this == obj) + { + return true; + } + + if (obj == null || GetType() != obj.GetType()) + { + return false; + } + + var other = (Assignments)obj; + return _assignmentList.SequenceEqual(other._assignmentList); + } + + public override int GetHashCode() + { + return HashCode.Combine(_assignmentList); + } + + public override string ToString() + { + return $"{nameof(Assignments)} {{ {nameof(_assignmentList)} = {_assignmentList} }}"; + } + + public List GetAssignmentList() + { + return _assignmentList; + } + } +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs index f3ea4a87d..af9f5d32a 100644 --- a/csharp/rocketmq-client-csharp/Client.cs +++ b/csharp/rocketmq-client-csharp/Client.cs @@ -113,7 +113,7 @@ protected virtual async Task Shutdown() Logger.LogDebug($"Shutdown the rocketmq client successfully, clientId={ClientId}"); } - private (bool, Session) GetSession(Endpoints endpoints) + private protected (bool, Session) GetSession(Endpoints endpoints) { _sessionLock.EnterReadLock(); try @@ -261,7 +261,7 @@ private void Stats() $"AvailableCompletionPortThreads={availableIo}"); } - private void ScheduleWithFixedDelay(Action action, TimeSpan delay, TimeSpan period, CancellationToken token) + private protected void ScheduleWithFixedDelay(Action action, TimeSpan delay, TimeSpan period, CancellationToken token) { Task.Run(async () => { @@ -432,6 +432,11 @@ internal ClientConfig GetClientConfig() return ClientConfig; } + internal IClientManager GetClientManager() + { + return ClientManager; + } + internal virtual void OnRecoverOrphanedTransactionCommand(Endpoints endpoints, Proto.RecoverOrphanedTransactionCommand command) { @@ -439,7 +444,7 @@ internal virtual void OnRecoverOrphanedTransactionCommand(Endpoints endpoints, $"clientId={ClientId}, endpoints={endpoints}"); } - internal async void OnVerifyMessageCommand(Endpoints endpoints, Proto.VerifyMessageCommand command) + internal virtual async void OnVerifyMessageCommand(Endpoints endpoints, Proto.VerifyMessageCommand command) { // Only push consumer support message consumption verification. Logger.LogWarning($"Ignore verify message command from remote, which is not expected, clientId={ClientId}, " + diff --git a/csharp/rocketmq-client-csharp/ConsumeResult.cs b/csharp/rocketmq-client-csharp/ConsumeResult.cs new file mode 100644 index 000000000..6cd212402 --- /dev/null +++ b/csharp/rocketmq-client-csharp/ConsumeResult.cs @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Org.Apache.Rocketmq +{ + /// + /// Designed for push consumer specifically. + /// + public enum ConsumeResult + { + /// + /// Consume message successfully. + /// + SUCCESS, + /// + /// Failed to consume message. + /// + FAILURE + } +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/ConsumeService.cs b/csharp/rocketmq-client-csharp/ConsumeService.cs new file mode 100644 index 000000000..447ecd87a --- /dev/null +++ b/csharp/rocketmq-client-csharp/ConsumeService.cs @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; + +namespace Org.Apache.Rocketmq +{ + public abstract class ConsumeService + { + private static readonly ILogger Logger = MqLogManager.CreateLogger(); + + protected readonly string ClientId; + private readonly IMessageListener _messageListener; + private readonly TaskScheduler _consumptionTaskScheduler; + private readonly CancellationToken _consumptionCtsToken; + + public ConsumeService(string clientId, IMessageListener messageListener, TaskScheduler consumptionTaskScheduler, + CancellationToken consumptionCtsToken) + { + ClientId = clientId; + _messageListener = messageListener; + _consumptionTaskScheduler = consumptionTaskScheduler; + _consumptionCtsToken = consumptionCtsToken; + } + + public abstract Task Consume(ProcessQueue pq, List messageViews); + + public async Task Consume(MessageView messageView) + { + return await Consume(messageView, TimeSpan.Zero); + } + + public async Task Consume(MessageView messageView, TimeSpan delay) + { + var task = new ConsumeTask(ClientId, _messageListener, messageView); + var delayMilliseconds = (int) delay.TotalMilliseconds; + + if (delayMilliseconds <= 0) + { + return await Task.Factory.StartNew(() => task.Call(), _consumptionCtsToken, TaskCreationOptions.None, + _consumptionTaskScheduler); + } + + var tcs = new TaskCompletionSource(); + + await Task.Delay(delay, _consumptionCtsToken).ContinueWith(async _ => + { + try + { + var result = await Task.Factory.StartNew(() => task.Call(), _consumptionCtsToken, + TaskCreationOptions.None, _consumptionTaskScheduler); + tcs.SetResult(result); + } + catch (Exception e) + { + Logger.LogError(e, $"Error while consuming message, clientId={ClientId}"); + tcs.SetException(e); + } + }, TaskScheduler.Default); + + return await tcs.Task; + } + } +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/ConsumeTask.cs b/csharp/rocketmq-client-csharp/ConsumeTask.cs new file mode 100644 index 000000000..3c7625064 --- /dev/null +++ b/csharp/rocketmq-client-csharp/ConsumeTask.cs @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using Microsoft.Extensions.Logging; + +namespace Org.Apache.Rocketmq +{ + public class ConsumeTask + { + private static readonly ILogger Logger = MqLogManager.CreateLogger(); + + private readonly string _clientId; + private readonly IMessageListener _messageListener; + private readonly MessageView _messageView; + + public ConsumeTask(string clientId, IMessageListener messageListener, MessageView messageView) + { + _clientId = clientId; + _messageListener = messageListener; + _messageView = messageView; + } + + /// + /// Invoke IMessageListener to consume the message. + /// + /// Message(s) which are consumed successfully. + public ConsumeResult Call() + { + try + { + var consumeResult = _messageListener.Consume(_messageView); + return consumeResult; + } + catch (Exception e) + { + Logger.LogError(e, $"Message listener raised an exception while consuming messages, clientId={_clientId}," + + $" mq={_messageView.MessageQueue}, messageId={_messageView.MessageId}"); + return ConsumeResult.FAILURE; + } + } + } +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Consumer.cs b/csharp/rocketmq-client-csharp/Consumer.cs index 0bf7a45aa..196220a53 100644 --- a/csharp/rocketmq-client-csharp/Consumer.cs +++ b/csharp/rocketmq-client-csharp/Consumer.cs @@ -36,7 +36,7 @@ protected Consumer(ClientConfig clientConfig, string consumerGroup) : base( ConsumerGroup = consumerGroup; } - protected async Task ReceiveMessage(Proto.ReceiveMessageRequest request, MessageQueue mq, + internal async Task ReceiveMessage(Proto.ReceiveMessageRequest request, MessageQueue mq, TimeSpan awaitDuration) { var tolerance = ClientConfig.RequestTimeout; @@ -103,5 +103,23 @@ protected Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, M InvisibleDuration = Duration.FromTimeSpan(invisibleDuration) }; } + + internal Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, MessageQueue mq, + FilterExpression filterExpression, TimeSpan awaitDuration) + { + var group = new Proto.Resource + { + Name = ConsumerGroup + }; + return new Proto.ReceiveMessageRequest + { + Group = group, + MessageQueue = mq.ToProtobuf(), + FilterExpression = WrapFilterExpression(filterExpression), + LongPollingTimeout = Duration.FromTimeSpan(awaitDuration), + BatchSize = batchSize, + AutoRenew = true + }; + } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/CustomizedBackoffRetryPolicy.cs b/csharp/rocketmq-client-csharp/CustomizedBackoffRetryPolicy.cs new file mode 100644 index 000000000..83f1bffce --- /dev/null +++ b/csharp/rocketmq-client-csharp/CustomizedBackoffRetryPolicy.cs @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using Apache.Rocketmq.V2; +using Google.Protobuf.WellKnownTypes; +using Proto = Apache.Rocketmq.V2; + +namespace Org.Apache.Rocketmq +{ + public class CustomizedBackoffRetryPolicy : IRetryPolicy + { + private readonly int _maxAttempts; + private readonly List _durations; + + public CustomizedBackoffRetryPolicy(List durations, int maxAttempts) + { + if (durations == null || !durations.Any()) + { + throw new ArgumentException("durations must not be empty", nameof(durations)); + } + _durations = durations; + _maxAttempts = maxAttempts; + } + + public int GetMaxAttempts() + { + return _maxAttempts; + } + + public List GetDurations() + { + return _durations; + } + + public TimeSpan GetNextAttemptDelay(int attempt) + { + if (attempt <= 0) + { + throw new ArgumentException("attempt must be positive", nameof(attempt)); + } + return attempt > _durations.Count ? _durations.Last() : _durations[attempt - 1]; + } + + public static CustomizedBackoffRetryPolicy FromProtobuf(RetryPolicy retryPolicy) + { + if (!retryPolicy.StrategyCase.Equals(RetryPolicy.StrategyOneofCase.CustomizedBackoff)) + { + throw new ArgumentException("Illegal retry policy"); + } + var customizedBackoff = retryPolicy.CustomizedBackoff; + var durations = customizedBackoff.Next.Select(duration => duration.ToTimeSpan()).ToList(); + return new CustomizedBackoffRetryPolicy(durations, retryPolicy.MaxAttempts); + } + + public RetryPolicy ToProtobuf() + { + var customizedBackoff = new CustomizedBackoff + { + Next = { _durations.Select(Duration.FromTimeSpan) } + }; + return new RetryPolicy + { + MaxAttempts = _maxAttempts, + CustomizedBackoff = customizedBackoff + }; + } + + public IRetryPolicy InheritBackoff(Proto.RetryPolicy retryPolicy) + { + if (!retryPolicy.StrategyCase.Equals(RetryPolicy.StrategyOneofCase.CustomizedBackoff)) + { + throw new InvalidOperationException("Strategy must be customized backoff"); + } + + return InheritBackoff(retryPolicy.CustomizedBackoff); + } + + private IRetryPolicy InheritBackoff(CustomizedBackoff retryPolicy) + { + var durations = retryPolicy.Next.Select(duration => duration.ToTimeSpan()).ToList(); + return new CustomizedBackoffRetryPolicy(durations, _maxAttempts); + } + } +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs b/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs index d4826d855..1ee7a28df 100644 --- a/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs +++ b/csharp/rocketmq-client-csharp/ExponentialBackoffRetryPolicy.cs @@ -25,7 +25,7 @@ public class ExponentialBackoffRetryPolicy : IRetryPolicy { private readonly int _maxAttempts; - private ExponentialBackoffRetryPolicy(int maxAttempts, TimeSpan initialBackoff, TimeSpan maxBackoff, + public ExponentialBackoffRetryPolicy(int maxAttempts, TimeSpan initialBackoff, TimeSpan maxBackoff, double backoffMultiplier) { _maxAttempts = maxAttempts; @@ -39,11 +39,11 @@ public int GetMaxAttempts() return _maxAttempts; } - private TimeSpan InitialBackoff { get; } + public TimeSpan InitialBackoff { get; } - private TimeSpan MaxBackoff { get; } + public TimeSpan MaxBackoff { get; } - private double BackoffMultiplier { get; } + public double BackoffMultiplier { get; } public IRetryPolicy InheritBackoff(Proto.RetryPolicy retryPolicy) { @@ -63,6 +63,10 @@ private IRetryPolicy InheritBackoff(Proto.ExponentialBackoff retryPolicy) public TimeSpan GetNextAttemptDelay(int attempt) { + if (attempt <= 0) + { + throw new ArgumentException("attempt must be positive", nameof(attempt)); + } var delayMillis = Math.Min( InitialBackoff.TotalMilliseconds * Math.Pow(BackoffMultiplier, 1.0 * (attempt - 1)), MaxBackoff.TotalMilliseconds); @@ -88,5 +92,18 @@ public static ExponentialBackoffRetryPolicy ImmediatelyRetryPolicy(int maxAttemp ExponentialBackoff = exponentialBackoff }; } + + public static ExponentialBackoffRetryPolicy FromProtobuf(Proto.RetryPolicy retryPolicy) + { + if (!retryPolicy.StrategyCase.Equals(Proto.RetryPolicy.StrategyOneofCase.ExponentialBackoff)) + { + throw new ArgumentException("Illegal retry policy"); + } + var exponentialBackoff = retryPolicy.ExponentialBackoff; + return new ExponentialBackoffRetryPolicy(retryPolicy.MaxAttempts, + exponentialBackoff.Initial.ToTimeSpan(), + exponentialBackoff.Max.ToTimeSpan(), + exponentialBackoff.Multiplier); + } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/IMessageListener.cs b/csharp/rocketmq-client-csharp/IMessageListener.cs new file mode 100644 index 000000000..d011fc8cd --- /dev/null +++ b/csharp/rocketmq-client-csharp/IMessageListener.cs @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Org.Apache.Rocketmq +{ + public interface IMessageListener + { + /// + /// The callback interface to consume the message. + /// + /// + /// You should process the and return the corresponding . + /// The consumption is successful only when is returned, null pointer is returned + /// or exception is thrown would cause message consumption failure too. + /// + ConsumeResult Consume(MessageView messageView); + } +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/MessageView.cs b/csharp/rocketmq-client-csharp/MessageView.cs index 52b821aca..0766e8a86 100644 --- a/csharp/rocketmq-client-csharp/MessageView.cs +++ b/csharp/rocketmq-client-csharp/MessageView.cs @@ -79,6 +79,11 @@ private MessageView(string messageId, string topic, byte[] body, string tag, str public int DeliveryAttempt { get; } + public bool IsCorrupted() + { + return _corrupted; + } + public static MessageView FromProtobuf(Proto.Message message, MessageQueue messageQueue = null) { var topic = message.Topic.Name; diff --git a/csharp/rocketmq-client-csharp/ProcessQueue.cs b/csharp/rocketmq-client-csharp/ProcessQueue.cs new file mode 100644 index 000000000..56aa78742 --- /dev/null +++ b/csharp/rocketmq-client-csharp/ProcessQueue.cs @@ -0,0 +1,520 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Apache.Rocketmq.V2; +using Microsoft.Extensions.Logging; +using Org.Apache.Rocketmq.Error; + +namespace Org.Apache.Rocketmq +{ + /// + /// Process queue is a cache to store fetched messages from remote for PushConsumer. + /// + /// PushConsumer queries assignments periodically and converts them into message queues, each message queue is + /// mapped into one process queue to fetch message from remote. If the message queue is removed from the newest + /// assignment, the corresponding process queue is marked as expired soon, which means its lifecycle is over. + /// + public class ProcessQueue + { + private static readonly ILogger Logger = MqLogManager.CreateLogger(); + + private static readonly TimeSpan AckMessageFailureBackoffDelay = TimeSpan.FromSeconds(1); + private static readonly TimeSpan ChangeInvisibleDurationFailureBackoffDelay = TimeSpan.FromSeconds(1); + + private static readonly TimeSpan ReceivingFlowControlBackoffDelay = TimeSpan.FromMilliseconds(20); + private static readonly TimeSpan ReceivingFailureBackoffDelay = TimeSpan.FromSeconds(1); + private static readonly TimeSpan ReceivingBackoffDelayWhenCacheIsFull = TimeSpan.FromSeconds(1); + + private readonly PushConsumer _consumer; + + /// + /// Dropped means ProcessQueue is deprecated, which means no message would be fetched from remote anymore. + /// + private volatile bool _dropped; + private readonly MessageQueue _mq; + private readonly FilterExpression _filterExpression; + + /// + /// Messages which is pending means have been cached, but are not taken by consumer dispatcher yet. + /// + private readonly List _cachedMessages; + private readonly ReaderWriterLockSlim _cachedMessageLock; + private long _cachedMessagesBytes; + + private long _activityTime = DateTime.UtcNow.Ticks; + private long _cacheFullTime = long.MinValue; + + private readonly CancellationTokenSource _receiveMsgCts; + private readonly CancellationTokenSource _ackMsgCts; + private readonly CancellationTokenSource _changeInvisibleDurationCts; + + public ProcessQueue(PushConsumer consumer, MessageQueue mq, FilterExpression filterExpression, + CancellationTokenSource receiveMsgCts, CancellationTokenSource ackMsgCts, CancellationTokenSource changeInvisibleDurationCts) + { + _consumer = consumer; + _dropped = false; + _mq = mq; + _filterExpression = filterExpression; + _cachedMessages = new List(); + _cachedMessageLock = new ReaderWriterLockSlim(); + _cachedMessagesBytes = 0; + _receiveMsgCts = receiveMsgCts; + _ackMsgCts = ackMsgCts; + _changeInvisibleDurationCts = changeInvisibleDurationCts; + } + + /// + /// Get the mapped message queue. + /// + /// mapped message queue. + public MessageQueue GetMessageQueue() + { + return _mq; + } + + /// + /// Drop the current process queue, which means the process queue's lifecycle is over, + /// thus it would not fetch messages from the remote anymore if dropped. + /// + public void Drop() + { + _dropped = true; + } + + /// + /// ProcessQueue would be regarded as expired if no fetch message for a long time. + /// + /// if it is expired. + public bool Expired() + { + var longPollingTimeout = _consumer.GetPushConsumerSettings().GetLongPollingTimeout(); + var requestTimeout = _consumer.GetClientConfig().RequestTimeout; + var maxIdleDuration = longPollingTimeout.Add(requestTimeout).Multiply(3); + var idleDuration = DateTime.UtcNow.Ticks - Interlocked.Read(ref _activityTime); + if (idleDuration < maxIdleDuration.Ticks) + { + return false; + } + var afterCacheFullDuration = DateTime.UtcNow.Ticks - Interlocked.Read(ref _cacheFullTime); + if (afterCacheFullDuration < maxIdleDuration.Ticks) + { + return false; + } + Logger.LogWarning( + $"Process queue is idle, idleDuration={idleDuration}, maxIdleDuration={maxIdleDuration}," + + $" afterCacheFullDuration={afterCacheFullDuration}, mq={_mq}, clientId={_consumer.GetClientId()}"); + return true; + } + + internal void CacheMessages(List messageList) + { + _cachedMessageLock.EnterWriteLock(); + try + { + foreach (var messageView in messageList) + { + _cachedMessages.Add(messageView); + Interlocked.Add(ref _cachedMessagesBytes, messageView.Body.Length); + } + } + finally + { + _cachedMessageLock.ExitWriteLock(); + } + } + + private int GetReceptionBatchSize() + { + var bufferSize = _consumer.CacheMessageCountThresholdPerQueue() - CachedMessagesCount(); + bufferSize = Math.Max(bufferSize, 1); + return Math.Min(bufferSize, _consumer.GetPushConsumerSettings().GetReceiveBatchSize()); + } + + /// + /// Start to fetch messages from remote immediately. + /// + public async Task FetchMessageImmediately() + { + await ReceiveMessageImmediately(); + } + + /// + /// Receive message later by message queue. + /// + /// + /// Make sure that no exception will be thrown. + /// + public async Task OnReceiveMessageException(Exception t) + { + var delay = t is TooManyRequestsException ? ReceivingFlowControlBackoffDelay : ReceivingFailureBackoffDelay; + await ReceiveMessageLater(delay); + } + + private async Task ReceiveMessageLater(TimeSpan delay) + { + var clientId = _consumer.GetClientId(); + try + { + Logger.LogInformation($"Try to receive message later, mq={_mq}, delay={delay}, clientId={clientId}"); + await Task.Delay(delay, _receiveMsgCts.Token); + await ReceiveMessage(); + } + catch (Exception ex) + { + if (_receiveMsgCts.IsCancellationRequested) + { + return; + } + Logger.LogError(ex, $"[Bug] Failed to schedule message receiving request, mq={_mq}, clientId={clientId}"); + await OnReceiveMessageException(ex); + } + } + + public async Task ReceiveMessage() + { + var clientId = _consumer.GetClientId(); + if (_dropped) + { + Logger.LogInformation($"Process queue has been dropped, no longer receive message, mq={_mq}, clientId={clientId}"); + return; + } + if (IsCacheFull()) + { + Logger.LogWarning($"Process queue cache is full, would receive message later, mq={_mq}, clientId={clientId}"); + await ReceiveMessageLater(ReceivingBackoffDelayWhenCacheIsFull); + return; + } + await ReceiveMessageImmediately(); + } + + private async Task ReceiveMessageImmediately() + { + var clientId = _consumer.GetClientId(); + if (_consumer.State != State.Running) + { + Logger.LogInformation($"Stop to receive message because consumer is not running, mq={_mq}, clientId={clientId}"); + return; + } + try + { + var endpoints = _mq.Broker.Endpoints; + var batchSize = GetReceptionBatchSize(); + var longPollingTimeout = _consumer.GetPushConsumerSettings().GetLongPollingTimeout(); + var request = _consumer.WrapReceiveMessageRequest(batchSize, _mq, _filterExpression, longPollingTimeout); + + Interlocked.Exchange(ref _activityTime, DateTime.UtcNow.Ticks); + + var result = await _consumer.ReceiveMessage(request, _mq, longPollingTimeout); + await OnReceiveMessageResult(result); + } + catch (Exception ex) + { + Logger.LogError(ex, $"Exception raised during message reception, mq={_mq}, clientId={clientId}"); + await OnReceiveMessageException(ex); + } + } + + private async Task OnReceiveMessageResult(ReceiveMessageResult result) + { + var messages = result.Messages; + if (messages.Count > 0) + { + CacheMessages(messages); + await _consumer.GetConsumeService().Consume(this, messages); + } + await ReceiveMessage(); + } + + private bool IsCacheFull() + { + var cacheMessageCountThresholdPerQueue = _consumer.CacheMessageCountThresholdPerQueue(); + var actualMessagesQuantity = CachedMessagesCount(); + var clientId = _consumer.GetClientId(); + if (cacheMessageCountThresholdPerQueue <= actualMessagesQuantity) + { + Logger.LogWarning($"Process queue total cached messages quantity exceeds the threshold," + + $" threshold={cacheMessageCountThresholdPerQueue}, actual={actualMessagesQuantity}," + + $" mq={_mq}, clientId={clientId}"); + Interlocked.Exchange(ref _cacheFullTime, DateTime.UtcNow.Ticks); + return true; + } + + var cacheMessageBytesThresholdPerQueue = _consumer.CacheMessageBytesThresholdPerQueue(); + var actualCachedMessagesBytes = CachedMessageBytes(); + if (cacheMessageBytesThresholdPerQueue <= actualCachedMessagesBytes) + { + Logger.LogWarning($"Process queue total cached messages memory exceeds the threshold," + + $" threshold={cacheMessageBytesThresholdPerQueue} bytes," + + $" actual={actualCachedMessagesBytes} bytes, mq={_mq}, clientId={clientId}"); + Interlocked.Exchange(ref _cacheFullTime, DateTime.UtcNow.Ticks); + return true; + } + + return false; + } + + /// + /// Erase messages(Non-FIFO-consume-mode) which have been consumed properly. + /// + /// the message to erase. + /// consume result. + public async Task EraseMessage(MessageView messageView, ConsumeResult consumeResult) + { + await (ConsumeResult.SUCCESS.Equals(consumeResult) ? AckMessage(messageView) : NackMessage(messageView)); + EvictCache(messageView); + } + + private async Task AckMessage(MessageView messageView) + { + await AckMessage(messageView, 1); + } + + private async Task AckMessage(MessageView messageView, int attempt) + { + var clientId = _consumer.GetClientId(); + var consumerGroup = _consumer.GetConsumerGroup(); + var messageId = messageView.MessageId; + var endpoints = messageView.MessageQueue.Broker.Endpoints; + + try + { + var request = _consumer.WrapAckMessageRequest(messageView); + var response = await _consumer.GetClientManager().AckMessage(messageView.MessageQueue.Broker.Endpoints, request, + _consumer.GetClientConfig().RequestTimeout); + var requestId = response.RequestId; + var status = response.Response.Status; + var statusCode = status.Code; + + if (statusCode == Code.InvalidReceiptHandle) + { + Logger.LogError($"Failed to ack message due to the invalid receipt handle, forgive to retry," + + $" clientId={clientId}, consumerGroup={consumerGroup}, messageId={messageId}," + + $" attempt={attempt}, mq={_mq}, endpoints={endpoints}, requestId={requestId}," + + $" status message={status.Message}"); + throw new BadRequestException((int) statusCode, requestId, status.Message); + } + + if (statusCode != Code.Ok) + { + Logger.LogError($"Failed to change invisible duration, would retry later, clientId={clientId}," + + $" consumerGroup={consumerGroup}, messageId={messageId}, attempt={attempt}, mq={_mq}," + + $" endpoints={endpoints}, requestId={requestId}, status message={status.Message}"); + await AckMessageLater(messageView, attempt + 1); + return; + } + + if (attempt > 1) + { + Logger.LogInformation($"Successfully acked message finally, clientId={clientId}," + + $" consumerGroup={consumerGroup}, messageId={messageId}," + + $" attempt={attempt}, mq={_mq}, endpoints={endpoints}," + + $" requestId={requestId}"); + } + else + { + Logger.LogDebug($"Successfully acked message, clientId={clientId}," + + $" consumerGroup={consumerGroup}, messageId={messageId}, mq={_mq}," + + $" endpoints={endpoints}, requestId={requestId}"); + } + } + catch (Exception ex) + { + Logger.LogError(ex, $"Exception raised while acknowledging message, would retry later," + + $" clientId={clientId}, consumerGroup={consumerGroup}, messageId={messageId}," + + $" mq={_mq}, endpoints={endpoints}"); + await AckMessageLater(messageView, attempt + 1); + } + } + + private async Task AckMessageLater(MessageView messageView, int attempt) + { + try + { + await Task.Delay(AckMessageFailureBackoffDelay, _ackMsgCts.Token); + await AckMessage(messageView, attempt + 1); + } + catch (Exception ex) + { + if (_ackMsgCts.IsCancellationRequested) + { + return; + } + Logger.LogError(ex, $"[Bug] Failed to schedule message ack request, mq={_mq}," + + $" messageId={messageView.MessageId}, clientId={_consumer.GetClientId()}"); + await AckMessageLater(messageView, attempt + 1); + } + } + + private async Task NackMessage(MessageView messageView) + { + var deliveryAttempt = messageView.DeliveryAttempt; + var duration = _consumer.GetRetryPolicy().GetNextAttemptDelay(deliveryAttempt); + await ChangeInvisibleDuration(messageView, duration, 1); + } + + private async Task ChangeInvisibleDuration(MessageView messageView, TimeSpan duration, int attempt) + { + var clientId = _consumer.GetClientId(); + var consumerGroup = _consumer.GetConsumerGroup(); + var messageId = messageView.MessageId; + var endpoints = messageView.MessageQueue.Broker.Endpoints; + + try + { + var request = _consumer.WrapChangeInvisibleDuration(messageView, duration); + var response = await _consumer.GetClientManager().ChangeInvisibleDuration(endpoints, + request, _consumer.GetClientConfig().RequestTimeout); + var requestId = response.RequestId; + var status = response.Response.Status; + var statusCode = status.Code; + + if (statusCode == Code.InvalidReceiptHandle) + { + Logger.LogError($"Failed to change invisible duration due to the invalid receipt handle," + + $" forgive to retry, clientId={clientId}, consumerGroup={consumerGroup}," + + $" messageId={messageId}, attempt={attempt}, mq={_mq}, endpoints={endpoints}," + + $" requestId={requestId}, status message={status.Message}"); + throw new BadRequestException((int) statusCode, requestId, status.Message); + } + + if (statusCode != Code.Ok) + { + Logger.LogError($"Failed to change invisible duration, would retry later," + + $" clientId={clientId}, consumerGroup={consumerGroup}, messageId={messageId}," + + $" attempt={attempt}, mq={_mq}, endpoints={endpoints}, requestId={requestId}," + + $" status message={status.Message}"); + await ChangeInvisibleDurationLater(messageView, duration, attempt + 1); + return; + } + + if (attempt > 1) + { + Logger.LogInformation($"Finally, changed invisible duration successfully," + + $" clientId={clientId}, consumerGroup={consumerGroup}, messageId={messageId}," + + $" attempt={attempt}, mq={_mq}, endpoints={endpoints}, requestId={requestId}"); + } + else + { + Logger.LogDebug($"Changed invisible duration successfully, clientId={clientId}," + + $" consumerGroup={consumerGroup}, messageId={messageId}, mq={_mq}," + + $" endpoints={endpoints}, requestId={requestId}"); + } + } + catch (Exception ex) + { + Logger.LogError(ex, $"Exception raised while changing invisible duration, would retry later," + + $" clientId={clientId}, consumerGroup={consumerGroup}, messageId={messageId}," + + $" mq={_mq}, endpoints={endpoints}"); + await ChangeInvisibleDurationLater(messageView, duration, attempt + 1); + } + } + + private async Task ChangeInvisibleDurationLater(MessageView messageView, TimeSpan duration, int attempt) + { + try + { + await Task.Delay(ChangeInvisibleDurationFailureBackoffDelay, _changeInvisibleDurationCts.Token); + await ChangeInvisibleDuration(messageView, duration, attempt); + } + catch (Exception ex) + { + if (_changeInvisibleDurationCts.IsCancellationRequested) + { + return; + } + Logger.LogError(ex, $"[Bug] Failed to schedule message change invisible duration request," + + $" mq={_mq}, messageId={messageView.MessageId}, clientId={_consumer.GetClientId()}"); + await ChangeInvisibleDurationLater(messageView, duration, attempt + 1); + } + } + + /// + /// Discard the message(Non-FIFO-consume-mode) which could not be consumed properly. + /// + /// the message to discard. + public async Task DiscardMessage(MessageView messageView) + { + Logger.LogInformation($"Discard message, mq={_mq}, messageId={messageView.MessageId}, clientId={_consumer.GetClientId()}"); + await NackMessage(messageView); + EvictCache(messageView); + } + + private void EvictCache(MessageView messageView) + { + _cachedMessageLock.EnterWriteLock(); + try + { + if (_cachedMessages.Remove(messageView)) + { + Interlocked.Add(ref _cachedMessagesBytes, -messageView.Body.Length); + } + } + finally + { + _cachedMessageLock.ExitWriteLock(); + } + } + + public int CachedMessagesCount() + { + _cachedMessageLock.EnterReadLock(); + try + { + return _cachedMessages.Count; + } + finally + { + _cachedMessageLock.ExitReadLock(); + } + } + + public long CachedMessageBytes() + { + return Interlocked.Read(ref _cachedMessagesBytes); + } + + /// + /// Get the count of cached messages. + /// + /// count of pending messages. + public long GetCachedMessageCount() + { + _cachedMessageLock.EnterReadLock(); + try + { + return _cachedMessages.Count; + } + finally + { + _cachedMessageLock.ExitReadLock(); + } + } + + /// + /// Get the bytes of cached message memory footprint. + /// + /// bytes of cached message memory footprint. + public long GetCachedMessageBytes() + { + return _cachedMessagesBytes; + } + } +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/PushConsumer.cs b/csharp/rocketmq-client-csharp/PushConsumer.cs new file mode 100644 index 000000000..badce3cce --- /dev/null +++ b/csharp/rocketmq-client-csharp/PushConsumer.cs @@ -0,0 +1,623 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Schedulers; +using Apache.Rocketmq.V2; +using Google.Protobuf.WellKnownTypes; +using Proto = Apache.Rocketmq.V2; +using Microsoft.Extensions.Logging; + +namespace Org.Apache.Rocketmq +{ + public class PushConsumer : Consumer, IAsyncDisposable, IDisposable + { + private static readonly ILogger Logger = MqLogManager.CreateLogger(); + + private static readonly TimeSpan AssignmentScanScheduleDelay = TimeSpan.FromSeconds(1); + private static readonly TimeSpan AssignmentScanSchedulePeriod = TimeSpan.FromSeconds(5); + + private readonly ClientConfig _clientConfig; + private readonly PushSubscriptionSettings _pushSubscriptionSettings; + private readonly string _consumerGroup; + private readonly ConcurrentDictionary _subscriptionExpressions; + private readonly ConcurrentDictionary _cacheAssignments; + private readonly IMessageListener _messageListener; + private readonly int _maxCacheMessageCount; + private readonly int _maxCacheMessageSizeInBytes; + + private readonly ConcurrentDictionary _processQueueTable; + private ConsumeService _consumeService; + private readonly TaskScheduler _consumptionTaskScheduler; + private readonly CancellationTokenSource _consumptionCts; + + private readonly CancellationTokenSource _scanAssignmentCts; + + private readonly CancellationTokenSource _receiveMsgCts; + private readonly CancellationTokenSource _ackMsgCts; + private readonly CancellationTokenSource _changeInvisibleDurationCts; + + /// + /// The caller is supposed to have validated the arguments and handled throwing exception or + /// logging warnings already, so we avoid repeating args check here. + /// + public PushConsumer(ClientConfig clientConfig, string consumerGroup, + ConcurrentDictionary subscriptionExpressions, IMessageListener messageListener, + int maxCacheMessageCount, int maxCacheMessageSizeInBytes, int consumptionThreadCount) + : base(clientConfig, consumerGroup) + { + _clientConfig = clientConfig; + _consumerGroup = consumerGroup; + _subscriptionExpressions = subscriptionExpressions; + _pushSubscriptionSettings = new PushSubscriptionSettings(ClientId, Endpoints, consumerGroup, + clientConfig.RequestTimeout, subscriptionExpressions); + _cacheAssignments = new ConcurrentDictionary(); + _messageListener = messageListener; + _maxCacheMessageCount = maxCacheMessageCount; + _maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes; + + _scanAssignmentCts = new CancellationTokenSource(); + + _processQueueTable = new ConcurrentDictionary(); + _consumptionTaskScheduler = new LimitedConcurrencyLevelTaskScheduler(consumptionThreadCount); + _consumptionCts = new CancellationTokenSource(); + + _receiveMsgCts = new CancellationTokenSource(); + _ackMsgCts = new CancellationTokenSource(); + _changeInvisibleDurationCts = new CancellationTokenSource(); + } + + protected override async Task Start() + { + try + { + State = State.Starting; + Logger.LogInformation($"Begin to start the rocketmq push consumer, clientId={ClientId}"); + await base.Start(); + _consumeService = CreateConsumerService(); + ScheduleWithFixedDelay(ScanAssignments, AssignmentScanScheduleDelay, AssignmentScanSchedulePeriod, + _scanAssignmentCts.Token); + Logger.LogInformation($"The rocketmq push consumer starts successfully, clientId={ClientId}"); + State = State.Running; + } + catch (Exception) + { + State = State.Failed; + throw; + } + } + + public async ValueTask DisposeAsync() + { + await Shutdown().ConfigureAwait(false); + GC.SuppressFinalize(this); + } + + public void Dispose() + { + Shutdown().Wait(); + GC.SuppressFinalize(this); + } + + protected override async Task Shutdown() + { + try + { + State = State.Stopping; + Logger.LogInformation($"Begin to shutdown the rocketmq push consumer, clientId={ClientId}"); + _receiveMsgCts.Cancel(); + _ackMsgCts.Cancel(); + _changeInvisibleDurationCts.Cancel(); + _scanAssignmentCts.Cancel(); + await base.Shutdown(); + _consumptionCts.Cancel(); + Logger.LogInformation($"Shutdown the rocketmq push consumer successfully, clientId={ClientId}"); + State = State.Terminated; + } + catch (Exception) + { + State = State.Failed; + throw; + } + } + + private ConsumeService CreateConsumerService() + { + Logger.LogInformation( + $"Create standard consume service, consumerGroup={_consumerGroup}, clientId={ClientId}"); + return new StandardConsumeService(ClientId, _messageListener, _consumptionTaskScheduler, _consumptionCts.Token); + } + + /// + /// Adds a subscription expression dynamically. + /// + /// The new filter expression to add. + /// The push consumer instance. + public async Task Subscribe(string topic, FilterExpression filterExpression) + { + if (State.Running != State) + { + throw new InvalidOperationException("Push consumer is not running"); + } + + await GetRouteData(topic); + _subscriptionExpressions[topic] = filterExpression; + } + + /// + /// Removes a subscription expression dynamically by topic. + /// + /// + /// It stops the backend task to fetch messages from the server. + /// The locally cached messages whose topic was removed before would not be delivered + /// to the anymore. + /// + /// Nothing occurs if the specified topic does not exist in subscription expressions + /// of the push consumer. + /// + /// The topic to remove the subscription. + /// The push consumer instance. + public void Unsubscribe(string topic) + { + if (State.Running != State) + { + throw new InvalidOperationException("Push consumer is not running"); + } + + _subscriptionExpressions.TryRemove(topic, out _); + } + + private async void ScanAssignments() + { + try + { + Logger.LogDebug($"Start to scan assignments periodically, clientId={ClientId}"); + foreach (var (topic, filterExpression) in _subscriptionExpressions) + { + var existed = _cacheAssignments.GetValueOrDefault(topic); + + try + { + var latest = await QueryAssignment(topic); + if (!latest.GetAssignmentList().Any()) + { + if (existed == null || !existed.GetAssignmentList().Any()) + { + Logger.LogInformation($"Acquired empty assignments from remote, would scan later," + + $" topic={topic}, clientId={ClientId}"); + return; + } + Logger.LogInformation($"Attention!!! acquired empty assignments from remote, but" + + $" existed assignments are not empty, topic={topic}, clientId={ClientId}"); + } + + if (!latest.Equals(existed)) + { + Logger.LogInformation($"Assignments of topic={topic} has changed, {existed} =>" + + $" {latest}, clientId={ClientId}"); + await SyncProcessQueue(topic, latest, filterExpression); + _cacheAssignments[topic] = latest; + return; + } + Logger.LogDebug($"Assignments of topic={topic} remain the same," + + $" assignments={existed}, clientId={ClientId}"); + // Process queue may be dropped, need to be synchronized anyway. + await SyncProcessQueue(topic, latest, filterExpression); + } + catch (Exception ex) + { + Logger.LogError(ex, $"Exception raised while querying the assignment," + + $" topic={topic}, clientId={ClientId}"); + } + } + } + catch (Exception ex) + { + Logger.LogError(ex, $"Exception raised while scanning the assignments for all topics, clientId={ClientId}"); + } + } + + private async Task SyncProcessQueue(string topic, Assignments assignments, + FilterExpression filterExpression) + { + var latest = new HashSet(); + var assignmentList = assignments.GetAssignmentList(); + foreach (var assignment in assignmentList) + { + latest.Add(assignment.MessageQueue); + } + + var activeMqs = new HashSet(); + foreach (var (mq, pq) in _processQueueTable) + { + if (!topic.Equals(mq.Topic)) + { + continue; + } + + if (!latest.Contains(mq)) + { + Logger.LogInformation($"Drop message queue according to the latest assignmentList," + + $" mq={mq}, clientId={ClientId}"); + DropProcessQueue(mq); + continue; + } + + if (pq.Expired()) + { + Logger.LogWarning($"Drop message queue because it is expired," + + $" mq={mq}, clientId={ClientId}"); + DropProcessQueue(mq); + continue; + } + activeMqs.Add(mq); + } + + foreach (var mq in latest) + { + if (activeMqs.Contains(mq)) + { + continue; + } + var processQueue = CreateProcessQueue(mq, filterExpression); + if (processQueue != null) + { + Logger.LogInformation($"Start to fetch message from remote, mq={mq}, clientId={ClientId}"); + await processQueue.FetchMessageImmediately(); + } + } + } + + private async Task QueryAssignment(string topic) + { + var endpoints = await PickEndpointsToQueryAssignments(topic); + var request = WrapQueryAssignmentRequest(topic); + var requestTimeout = _clientConfig.RequestTimeout; + var invocation = await ClientManager.QueryAssignment(endpoints, request, requestTimeout); + StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId); + var assignmentList = invocation.Response.Assignments.Select(assignment => + new Assignment(new MessageQueue(assignment.MessageQueue))).ToList(); + var assignments = new Assignments(assignmentList); + return assignments; + } + + private async Task PickEndpointsToQueryAssignments(string topic) + { + var topicRouteData = await GetRouteData(topic); + var endpoints = topicRouteData.PickEndpointsToQueryAssignments(); + return endpoints; + } + + private QueryAssignmentRequest WrapQueryAssignmentRequest(string topic) + { + var topicResource = new Proto.Resource + { + Name = topic + }; + return new QueryAssignmentRequest + { + Topic = topicResource, + Group = GetProtobufGroup(), + Endpoints = Endpoints.ToProtobuf() + }; + } + + /// + /// Drops the by . + /// must be removed before it is dropped. + /// + /// The message queue. + internal void DropProcessQueue(MessageQueue mq) + { + if (_processQueueTable.TryRemove(mq, out var pq)) + { + pq.Drop(); + } + } + + /// + /// Creates a process queue and adds it into the . + /// Returns if the mapped process queue already exists. + /// + /// + /// This function and ensure that a process queue is not dropped if + /// it is contained in . Once a process queue is dropped, it must have been + /// removed from . + /// + /// The message queue. + /// The filter expression of the topic. + /// A process queue. + protected ProcessQueue CreateProcessQueue(MessageQueue mq, FilterExpression filterExpression) + { + var processQueue = new ProcessQueue(this, mq, filterExpression, _receiveMsgCts, _ackMsgCts, + _changeInvisibleDurationCts); + if (_processQueueTable.TryGetValue(mq, out var previous)) + { + return null; + } + _processQueueTable.TryAdd(mq, processQueue); + return processQueue; + } + + public async Task AckMessage(MessageView messageView) + { + if (State.Running != State) + { + throw new InvalidOperationException("Push consumer is not running"); + } + + var request = WrapAckMessageRequest(messageView); + var invocation = await ClientManager.AckMessage(messageView.MessageQueue.Broker.Endpoints, request, + ClientConfig.RequestTimeout); + StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId); + } + + protected override IEnumerable GetTopics() + { + return _subscriptionExpressions.Keys; + } + + protected override Proto.HeartbeatRequest WrapHeartbeatRequest() + { + return new Proto::HeartbeatRequest + { + ClientType = Proto.ClientType.PushConsumer, + Group = GetProtobufGroup() + }; + } + + protected internal ChangeInvisibleDurationRequest WrapChangeInvisibleDuration(MessageView messageView, + TimeSpan invisibleDuration) + { + var topicResource = new Proto.Resource + { + Name = messageView.Topic + }; + return new Proto.ChangeInvisibleDurationRequest + { + Topic = topicResource, + Group = GetProtobufGroup(), + ReceiptHandle = messageView.ReceiptHandle, + InvisibleDuration = Duration.FromTimeSpan(invisibleDuration), + MessageId = messageView.MessageId + }; + } + + protected internal AckMessageRequest WrapAckMessageRequest(MessageView messageView) + { + var topicResource = new Proto.Resource + { + Name = messageView.Topic + }; + var entry = new Proto.AckMessageEntry + { + MessageId = messageView.MessageId, + ReceiptHandle = messageView.ReceiptHandle, + }; + return new Proto.AckMessageRequest + { + Group = GetProtobufGroup(), + Topic = topicResource, + Entries = { entry } + }; + } + + protected override void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData) + { + } + + internal override async void OnVerifyMessageCommand(Endpoints endpoints, VerifyMessageCommand command) + { + var nonce = command.Nonce; + var messageView = MessageView.FromProtobuf(command.Message); + var messageId = messageView.MessageId; + Proto.TelemetryCommand telemetryCommand = null; + + try + { + var consumeResult = await _consumeService.Consume(messageView); + var code = consumeResult == ConsumeResult.SUCCESS ? Code.Ok : Code.FailedToConsumeMessage; + var status = new Status + { + Code = code + }; + var verifyMessageResult = new VerifyMessageResult + { + Nonce = nonce + }; + telemetryCommand = new TelemetryCommand + { + VerifyMessageResult = verifyMessageResult, + Status = status + }; + var (_, session) = GetSession(endpoints); + await session.WriteAsync(telemetryCommand); + } + catch (Exception e) + { + Logger.LogError(e, + $"Failed to send message verification result command, endpoints={Endpoints}, command={telemetryCommand}, messageId={messageId}, clientId={ClientId}"); + } + } + + protected override NotifyClientTerminationRequest WrapNotifyClientTerminationRequest() + { + return new NotifyClientTerminationRequest() + { + Group = GetProtobufGroup() + }; + } + + internal int GetQueueSize() + { + return _processQueueTable.Count; + } + + internal int CacheMessageBytesThresholdPerQueue() + { + var size = this.GetQueueSize(); + // All process queues are removed, no need to cache messages. + return size <= 0 ? 0 : Math.Max(1, _maxCacheMessageSizeInBytes / size); + } + + internal int CacheMessageCountThresholdPerQueue() + { + var size = this.GetQueueSize(); + // All process queues are removed, no need to cache messages. + if (size <= 0) + { + return 0; + } + + return Math.Max(1, _maxCacheMessageCount / size); + } + + internal override Settings GetSettings() + { + return _pushSubscriptionSettings; + } + + /// + /// Gets the load balancing group for the consumer. + /// + /// The consumer load balancing group. + public string GetConsumerGroup() + { + return _consumerGroup; + } + + public PushSubscriptionSettings GetPushConsumerSettings() + { + return _pushSubscriptionSettings; + } + + /// + /// Lists the existing subscription expressions in the push consumer. + /// + /// Collections of the subscription expressions. + public ConcurrentDictionary GetSubscriptionExpressions() + { + return _subscriptionExpressions; + } + + public IRetryPolicy GetRetryPolicy() + { + return _pushSubscriptionSettings.GetRetryPolicy(); + } + + public ConsumeService GetConsumeService() + { + return _consumeService; + } + + private Proto.Resource GetProtobufGroup() + { + return new Proto.Resource() + { + Name = ConsumerGroup + }; + } + + public class Builder + { + private ClientConfig _clientConfig; + private string _consumerGroup; + private ConcurrentDictionary _subscriptionExpressions; + private IMessageListener _messageListener; + private int _maxCacheMessageCount = 1024; + private int _maxCacheMessageSizeInBytes = 64 * 1024 * 1024; + private int _consumptionThreadCount = 20; + + public Builder SetClientConfig(ClientConfig clientConfig) + { + Preconditions.CheckArgument(null != clientConfig, "clientConfig should not be null"); + _clientConfig = clientConfig; + return this; + } + + public Builder SetConsumerGroup(string consumerGroup) + { + Preconditions.CheckArgument(null != consumerGroup, "consumerGroup should not be null"); + Preconditions.CheckArgument(consumerGroup != null && ConsumerGroupRegex.Match(consumerGroup).Success, + $"topic does not match the regex {ConsumerGroupRegex}"); + _consumerGroup = consumerGroup; + return this; + } + + public Builder SetSubscriptionExpression(Dictionary subscriptionExpressions) + { + Preconditions.CheckArgument(null != subscriptionExpressions, + "subscriptionExpressions should not be null"); + Preconditions.CheckArgument(subscriptionExpressions!.Count != 0, + "subscriptionExpressions should not be empty"); + _subscriptionExpressions = new ConcurrentDictionary(subscriptionExpressions!); + return this; + } + + public Builder SetMessageListener(IMessageListener messageListener) + { + Preconditions.CheckArgument(null != messageListener, + "messageListener should not be null"); + _messageListener = messageListener; + return this; + } + + public Builder SetMaxCacheMessageCount(int maxCacheMessageCount) + { + Preconditions.CheckArgument(maxCacheMessageCount > 0, + "maxCacheMessageCount should be positive"); + _maxCacheMessageCount = maxCacheMessageCount; + return this; + } + + public Builder SetMaxCacheMessageSizeInBytes(int maxCacheMessageSizeInBytes) + { + Preconditions.CheckArgument(maxCacheMessageSizeInBytes > 0, + "maxCacheMessageSizeInBytes should be positive"); + _maxCacheMessageSizeInBytes = maxCacheMessageSizeInBytes; + return this; + } + + public Builder SetConsumptionThreadCount(int consumptionThreadCount) + { + Preconditions.CheckArgument(consumptionThreadCount > 0, + "consumptionThreadCount should be positive"); + _consumptionThreadCount = consumptionThreadCount; + return this; + } + + public async Task Build() + { + Preconditions.CheckArgument(null != _clientConfig, "clientConfig has not been set yet"); + Preconditions.CheckArgument(null != _consumerGroup, "consumerGroup has not been set yet"); + Preconditions.CheckArgument(!_subscriptionExpressions!.IsEmpty, + "subscriptionExpressions has not been set yet"); + Preconditions.CheckArgument(null != _messageListener, "messageListener has not been set yet"); + var pushConsumer = new PushConsumer(_clientConfig, _consumerGroup, _subscriptionExpressions, + _messageListener, _maxCacheMessageCount, + _maxCacheMessageSizeInBytes, _consumptionThreadCount); + await pushConsumer.Start(); + return pushConsumer; + } + } + } +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/PushSubscriptionSettings.cs b/csharp/rocketmq-client-csharp/PushSubscriptionSettings.cs new file mode 100644 index 000000000..92eb21f0f --- /dev/null +++ b/csharp/rocketmq-client-csharp/PushSubscriptionSettings.cs @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using Google.Protobuf.WellKnownTypes; +using Microsoft.Extensions.Logging; +using Proto = Apache.Rocketmq.V2; + +namespace Org.Apache.Rocketmq +{ + public class PushSubscriptionSettings : Settings + { + private static readonly ILogger Logger = MqLogManager.CreateLogger(); + + private readonly Resource _group; + private readonly ConcurrentDictionary _subscriptionExpressions; + private volatile bool _fifo = false; + private volatile int _receiveBatchSize = 32; + private TimeSpan _longPollingTimeout = TimeSpan.FromSeconds(30); + + public PushSubscriptionSettings(string clientId, Endpoints endpoints, string consumerGroup, + TimeSpan requestTimeout, ConcurrentDictionary subscriptionExpressions) + : base(clientId, ClientType.PushConsumer, endpoints, requestTimeout) + { + _group = new Resource(consumerGroup); + _subscriptionExpressions = subscriptionExpressions; + } + + public bool IsFifo() + { + return _fifo; + } + + public int GetReceiveBatchSize() + { + return _receiveBatchSize; + } + + public TimeSpan GetLongPollingTimeout() + { + return _longPollingTimeout; + } + + public override Proto.Settings ToProtobuf() + { + var subscriptionEntries = new List(); + foreach (var (key, value) in _subscriptionExpressions) + { + var topic = new Proto.Resource() + { + Name = key + }; + var filterExpression = new Proto.FilterExpression() + { + Expression = value.Expression + }; + switch (value.Type) + { + case ExpressionType.Tag: + filterExpression.Type = Proto.FilterType.Tag; + break; + case ExpressionType.Sql92: + filterExpression.Type = Proto.FilterType.Sql; + break; + default: + Logger.LogWarning($"[Bug] Unrecognized filter type={value.Type} for push consumer"); + break; + } + + var subscriptionEntry = new Proto.SubscriptionEntry + { + Topic = topic, + Expression = filterExpression + }; + + subscriptionEntries.Add(subscriptionEntry); + } + + var subscription = new Proto.Subscription + { + Group = _group.ToProtobuf(), + Subscriptions = { subscriptionEntries } + }; + + return new Proto.Settings + { + AccessPoint = Endpoints.ToProtobuf(), + ClientType = ClientTypeHelper.ToProtobuf(ClientType), + RequestTimeout = Duration.FromTimeSpan(RequestTimeout), + Subscription = subscription, + UserAgent = UserAgent.Instance.ToProtobuf() + }; + } + + public override void Sync(Proto.Settings settings) + { + if (Proto.Settings.PubSubOneofCase.Subscription != settings.PubSubCase) + { + Logger.LogError($"[Bug] Issued settings doesn't match with the client type, clientId={ClientId}, " + + $"pubSubCase={settings.PubSubCase}, clientType={ClientType}"); + } + + var subscription = settings.Subscription; + _fifo = subscription.Fifo; + _receiveBatchSize = subscription.ReceiveBatchSize; + _longPollingTimeout = subscription.LongPollingTimeout.ToTimeSpan(); + var backoffPolicy = settings.BackoffPolicy; + switch (backoffPolicy.StrategyCase) + { + case Proto.RetryPolicy.StrategyOneofCase.ExponentialBackoff: + RetryPolicy = ExponentialBackoffRetryPolicy.FromProtobuf(backoffPolicy); + break; + case Proto.RetryPolicy.StrategyOneofCase.CustomizedBackoff: + RetryPolicy = CustomizedBackoffRetryPolicy.FromProtobuf(backoffPolicy); + break; + default: + throw new ArgumentException("Unrecognized backoff policy strategy."); + } + } + } +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/StandardConsumeService.cs b/csharp/rocketmq-client-csharp/StandardConsumeService.cs new file mode 100644 index 000000000..96b379a2d --- /dev/null +++ b/csharp/rocketmq-client-csharp/StandardConsumeService.cs @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; + +namespace Org.Apache.Rocketmq +{ + public class StandardConsumeService : ConsumeService + { + private static readonly ILogger Logger = MqLogManager.CreateLogger(); + + public StandardConsumeService(string clientId, IMessageListener messageListener, + TaskScheduler consumptionTaskScheduler, CancellationToken consumptionCtsToken) : + base(clientId, messageListener, consumptionTaskScheduler, consumptionCtsToken) + { + } + + public override async Task Consume(ProcessQueue pq, List messageViews) + { + foreach (var messageView in messageViews) + { + // Discard corrupted message. + if (messageView.IsCorrupted()) + { + Logger.LogError("Message is corrupted for standard consumption, prepare to discard it," + + $" mq={pq.GetMessageQueue()}, messageId={messageView.MessageId}, clientId={ClientId}"); + await pq.DiscardMessage(messageView); + continue; + } + + try + { + var consumeResult = await Consume(messageView); + await pq.EraseMessage(messageView, consumeResult); + } + catch (Exception ex) + { + Logger.LogError(ex, $"[Bug] Exception raised in consumption callback, clientId={ClientId}"); + } + } + } + } +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/TopicRouteData.cs b/csharp/rocketmq-client-csharp/TopicRouteData.cs index 885db5f62..b56b40bc1 100644 --- a/csharp/rocketmq-client-csharp/TopicRouteData.cs +++ b/csharp/rocketmq-client-csharp/TopicRouteData.cs @@ -18,12 +18,16 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; +using Org.Apache.Rocketmq.Error; using Proto = Apache.Rocketmq.V2; namespace Org.Apache.Rocketmq { public class TopicRouteData : IEquatable { + private int _index = 0; + public TopicRouteData(IEnumerable messageQueues) { var messageQueuesList = messageQueues.Select(mq => new MessageQueue(mq)).ToList(); @@ -33,6 +37,36 @@ public TopicRouteData(IEnumerable messageQueues) public List MessageQueues { get; } + public Endpoints PickEndpointsToQueryAssignments() + { + var nextIndex = Interlocked.Increment(ref _index) - 1; + foreach (var mq in MessageQueues) + { + var modIndex = Mod(nextIndex++, MessageQueues.Count); + var curMessageQueue = MessageQueues[modIndex]; + + if (Utilities.MasterBrokerId != curMessageQueue.Broker.Id) + { + continue; + } + if (Permission.None.Equals(curMessageQueue.Permission)) + { + continue; + } + return curMessageQueue.Broker.Endpoints; + } + throw new NotFoundException("Failed to pick endpoints to query assignment"); + } + + private int Mod(int x, int m) + { + if (m <= 0) + { + throw new ArgumentException("Modulus must be positive", nameof(m)); + } + var result = x % m; + return result >= 0 ? result : result + m; + } public bool Equals(TopicRouteData other) { diff --git a/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj b/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj index b13ea1d1e..a51d09b69 100644 --- a/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj +++ b/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj @@ -34,6 +34,7 @@ + From 6292e81b094546d595bf9adef1498b22854eeac1 Mon Sep 17 00:00:00 2001 From: "tsaitsung-han.tht" Date: Tue, 2 Jul 2024 14:03:31 +0800 Subject: [PATCH 2/2] Add push consumer for fifo message in c# sdk --- .../rocketmq-client-csharp/ClientManager.cs | 10 ++ .../FifoConsumeService.cs | 57 ++++++++ .../rocketmq-client-csharp/IClientManager.cs | 10 ++ csharp/rocketmq-client-csharp/MessageView.cs | 7 +- csharp/rocketmq-client-csharp/ProcessQueue.cs | 124 +++++++++++++++++- csharp/rocketmq-client-csharp/PushConsumer.cs | 31 ++++- 6 files changed, 235 insertions(+), 4 deletions(-) create mode 100644 csharp/rocketmq-client-csharp/FifoConsumeService.cs diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs b/csharp/rocketmq-client-csharp/ClientManager.cs index 90fdde7f8..0542dea02 100644 --- a/csharp/rocketmq-client-csharp/ClientManager.cs +++ b/csharp/rocketmq-client-csharp/ClientManager.cs @@ -165,6 +165,16 @@ public async Task Shutdown() return new RpcInvocation( request, response, metadata); } + + public async Task> + ForwardMessageToDeadLetterQueue(Endpoints endpoints, + Proto.ForwardMessageToDeadLetterQueueRequest request, TimeSpan timeout) + { + var metadata = _client.Sign(); + var response = await GetRpcClient(endpoints).ForwardMessageToDeadLetterQueue(metadata, request, timeout); + return new RpcInvocation( + request, response, metadata); + } public async Task> EndTransaction( Endpoints endpoints, Proto.EndTransactionRequest request, TimeSpan timeout) diff --git a/csharp/rocketmq-client-csharp/FifoConsumeService.cs b/csharp/rocketmq-client-csharp/FifoConsumeService.cs new file mode 100644 index 000000000..4c4312bde --- /dev/null +++ b/csharp/rocketmq-client-csharp/FifoConsumeService.cs @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; + +namespace Org.Apache.Rocketmq +{ + public class FifoConsumeService : ConsumeService + { + private static readonly ILogger Logger = MqLogManager.CreateLogger(); + + public FifoConsumeService(string clientId, IMessageListener messageListener, + TaskScheduler consumptionExecutor, CancellationToken consumptionCtsToken) : + base(clientId, messageListener, consumptionExecutor, consumptionCtsToken) + { + } + + public override async Task Consume(ProcessQueue pq, List messageViews) + { + await ConsumeIteratively(pq, messageViews.GetEnumerator()); + } + + public async Task ConsumeIteratively(ProcessQueue pq, IEnumerator iterator) + { + while (iterator.MoveNext()) + { + var messageView = iterator.Current; + if (messageView != null && messageView.IsCorrupted()) + { + Logger.LogError($"Message is corrupted for FIFO consumption, prepare to discard it," + + $" mq={pq.GetMessageQueue()}, messageId={messageView.MessageId}, clientId={ClientId}"); + await pq.DiscardFifoMessage(messageView); + continue; + } + var consumeResult = await Consume(messageView); + await pq.EraseFifoMessage(messageView, consumeResult); + } + } + } +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/IClientManager.cs b/csharp/rocketmq-client-csharp/IClientManager.cs index 19f9459f7..50f033558 100644 --- a/csharp/rocketmq-client-csharp/IClientManager.cs +++ b/csharp/rocketmq-client-csharp/IClientManager.cs @@ -111,6 +111,16 @@ Task> AckMessage(Endpoints /// Task> ChangeInvisibleDuration( Endpoints endpoints, ChangeInvisibleDurationRequest request, TimeSpan timeout); + + /// + /// Send a message to the dead letter queue asynchronously, the method ensures no throwable. + /// + /// Requested endpoints. + /// Request of sending a message to DLQ. + /// Request max duration. + /// + Task> ForwardMessageToDeadLetterQueue( + Endpoints endpoints, ForwardMessageToDeadLetterQueueRequest request, TimeSpan timeout); /// /// Transaction ending request. diff --git a/csharp/rocketmq-client-csharp/MessageView.cs b/csharp/rocketmq-client-csharp/MessageView.cs index 0766e8a86..aaa5ebb42 100644 --- a/csharp/rocketmq-client-csharp/MessageView.cs +++ b/csharp/rocketmq-client-csharp/MessageView.cs @@ -77,7 +77,12 @@ private MessageView(string messageId, string topic, byte[] body, string tag, str public DateTime BornTime { get; } - public int DeliveryAttempt { get; } + public int DeliveryAttempt { get; set; } + + public int IncrementAndGetDeliveryAttempt() + { + return ++DeliveryAttempt; + } public bool IsCorrupted() { diff --git a/csharp/rocketmq-client-csharp/ProcessQueue.cs b/csharp/rocketmq-client-csharp/ProcessQueue.cs index 56aa78742..be9c0be6e 100644 --- a/csharp/rocketmq-client-csharp/ProcessQueue.cs +++ b/csharp/rocketmq-client-csharp/ProcessQueue.cs @@ -38,6 +38,7 @@ public class ProcessQueue private static readonly TimeSpan AckMessageFailureBackoffDelay = TimeSpan.FromSeconds(1); private static readonly TimeSpan ChangeInvisibleDurationFailureBackoffDelay = TimeSpan.FromSeconds(1); + private static readonly TimeSpan ForwardMessageToDeadLetterQueueFailureBackoffDelay = TimeSpan.FromSeconds(1); private static readonly TimeSpan ReceivingFlowControlBackoffDelay = TimeSpan.FromMilliseconds(20); private static readonly TimeSpan ReceivingFailureBackoffDelay = TimeSpan.FromSeconds(1); @@ -65,9 +66,11 @@ public class ProcessQueue private readonly CancellationTokenSource _receiveMsgCts; private readonly CancellationTokenSource _ackMsgCts; private readonly CancellationTokenSource _changeInvisibleDurationCts; + private readonly CancellationTokenSource _forwardMessageToDeadLetterQueueCts; public ProcessQueue(PushConsumer consumer, MessageQueue mq, FilterExpression filterExpression, - CancellationTokenSource receiveMsgCts, CancellationTokenSource ackMsgCts, CancellationTokenSource changeInvisibleDurationCts) + CancellationTokenSource receiveMsgCts, CancellationTokenSource ackMsgCts, + CancellationTokenSource changeInvisibleDurationCts, CancellationTokenSource forwardMessageToDeadLetterQueueCts) { _consumer = consumer; _dropped = false; @@ -79,6 +82,7 @@ public ProcessQueue(PushConsumer consumer, MessageQueue mq, FilterExpression fil _receiveMsgCts = receiveMsgCts; _ackMsgCts = ackMsgCts; _changeInvisibleDurationCts = changeInvisibleDurationCts; + _forwardMessageToDeadLetterQueueCts = forwardMessageToDeadLetterQueueCts; } /// @@ -445,6 +449,113 @@ private async Task ChangeInvisibleDurationLater(MessageView messageView, TimeSpa await ChangeInvisibleDurationLater(messageView, duration, attempt + 1); } } + + public async Task EraseFifoMessage(MessageView messageView, ConsumeResult consumeResult) + { + var retryPolicy = _consumer.GetRetryPolicy(); + var maxAttempts = retryPolicy.GetMaxAttempts(); + var attempt = messageView.DeliveryAttempt; + var messageId = messageView.MessageId; + var service = _consumer.GetConsumeService(); + var clientId = _consumer.GetClientId(); + + if (consumeResult == ConsumeResult.FAILURE && attempt < maxAttempts) + { + var nextAttemptDelay = retryPolicy.GetNextAttemptDelay(attempt); + attempt = messageView.IncrementAndGetDeliveryAttempt(); + Logger.LogDebug($"Prepare to redeliver the fifo message because of the consumption failure," + + $" maxAttempt={maxAttempts}, attempt={attempt}, mq={messageView.MessageQueue}," + + $" messageId={messageId}, nextAttemptDelay={nextAttemptDelay}, clientId={clientId}"); + var redeliverResult = await service.Consume(messageView, nextAttemptDelay); + await EraseFifoMessage(messageView, redeliverResult); + return; + } + + var success = consumeResult == ConsumeResult.SUCCESS; + if (!success) + { + Logger.LogInformation($"Failed to consume fifo message finally, run out of attempt times," + + $" maxAttempts={maxAttempts}, attempt={attempt}, mq={messageView.MessageQueue}," + + $" messageId={messageId}, clientId={clientId}"); + } + + await (ConsumeResult.SUCCESS.Equals(consumeResult) ? AckMessage(messageView) : ForwardToDeadLetterQueue(messageView)); + + EvictCache(messageView); + } + + private async Task ForwardToDeadLetterQueue(MessageView messageView) + { + await ForwardToDeadLetterQueue(messageView, 1); + } + + private async Task ForwardToDeadLetterQueue(MessageView messageView, int attempt) + { + try + { + var clientId = _consumer.GetClientId(); + var consumerGroup = _consumer.GetConsumerGroup(); + var messageId = messageView.MessageId; + var endpoints = messageView.MessageQueue.Broker.Endpoints; + + var request = _consumer.WrapForwardMessageToDeadLetterQueueRequest(messageView); + var invocation = await _consumer.GetClientManager().ForwardMessageToDeadLetterQueue(endpoints, request, + _consumer.GetClientConfig().RequestTimeout); + var requestId = invocation.RequestId; + var status = invocation.Response.Status; + var statusCode = status.Code; + + // Log failure and retry later. + if (statusCode != Code.Ok) + { + Logger.LogError($"Failed to forward message to dead letter queue, would attempt to re-forward later, " + + $"clientId={clientId}, consumerGroup={consumerGroup}, messageId={messageId}, attempt={attempt}, " + + $"mq={_mq}, endpoints={endpoints}, requestId={requestId}, code={statusCode}, status message={status.Message}"); + await ForwardToDeadLetterQueueLater(messageView, attempt); + return; + } + + // Log success. + if (attempt > 1) + { + Logger.LogInformation($"Re-forward message to dead letter queue successfully, " + + $"clientId={clientId}, consumerGroup={consumerGroup}, attempt={attempt}, messageId={messageId}, " + + $"mq={_mq}, endpoints={endpoints}, requestId={requestId}"); + } + else + { + Logger.LogInformation($"Forward message to dead letter queue successfully, " + + $"clientId={clientId}, consumerGroup={consumerGroup}, messageId={messageId}, mq={_mq}, " + + $"endpoints={endpoints}, requestId={requestId}"); + } + } + catch (Exception ex) + { + // Log failure and retry later. + Logger.LogError($"Exception raised while forward message to DLQ, would attempt to re-forward later, " + + $"clientId={_consumer.GetClientId()}, consumerGroup={_consumer.GetConsumerGroup()}," + + $" messageId={messageView.MessageId}, mq={_mq}", ex); + + await ForwardToDeadLetterQueueLater(messageView, attempt); + } + } + + private async Task ForwardToDeadLetterQueueLater(MessageView messageView, int attempt) + { + try + { + await Task.Delay(ForwardMessageToDeadLetterQueueFailureBackoffDelay, _forwardMessageToDeadLetterQueueCts.Token); + await ForwardToDeadLetterQueue(messageView, attempt); + } + catch (Exception ex) + { + // Should never reach here. + Logger.LogError($"[Bug] Failed to schedule DLQ message request, " + + $"mq={_mq}, messageId={messageView.MessageId}, clientId={_consumer.GetClientId()}", ex); + + await ForwardToDeadLetterQueueLater(messageView, attempt + 1); + } + } /// /// Discard the message(Non-FIFO-consume-mode) which could not be consumed properly. @@ -456,6 +567,17 @@ public async Task DiscardMessage(MessageView messageView) await NackMessage(messageView); EvictCache(messageView); } + + /// + /// Discard the message(FIFO-consume-mode) which could not consumed properly. + /// + /// the FIFO message to discard. + public async Task DiscardFifoMessage(MessageView messageView) + { + Logger.LogInformation($"Discard fifo message, mq={_mq}, messageId={messageView.MessageId}, clientId={_consumer.GetClientId()}"); + await ForwardToDeadLetterQueue(messageView); + EvictCache(messageView); + } private void EvictCache(MessageView messageView) { diff --git a/csharp/rocketmq-client-csharp/PushConsumer.cs b/csharp/rocketmq-client-csharp/PushConsumer.cs index badce3cce..71f6433fe 100644 --- a/csharp/rocketmq-client-csharp/PushConsumer.cs +++ b/csharp/rocketmq-client-csharp/PushConsumer.cs @@ -55,6 +55,7 @@ public class PushConsumer : Consumer, IAsyncDisposable, IDisposable private readonly CancellationTokenSource _receiveMsgCts; private readonly CancellationTokenSource _ackMsgCts; private readonly CancellationTokenSource _changeInvisibleDurationCts; + private readonly CancellationTokenSource _forwardMsgToDeadLetterQueueCts; /// /// The caller is supposed to have validated the arguments and handled throwing exception or @@ -84,6 +85,7 @@ public PushConsumer(ClientConfig clientConfig, string consumerGroup, _receiveMsgCts = new CancellationTokenSource(); _ackMsgCts = new CancellationTokenSource(); _changeInvisibleDurationCts = new CancellationTokenSource(); + _forwardMsgToDeadLetterQueueCts = new CancellationTokenSource(); } protected override async Task Start() @@ -127,6 +129,7 @@ protected override async Task Shutdown() _receiveMsgCts.Cancel(); _ackMsgCts.Cancel(); _changeInvisibleDurationCts.Cancel(); + _forwardMsgToDeadLetterQueueCts.Cancel(); _scanAssignmentCts.Cancel(); await base.Shutdown(); _consumptionCts.Cancel(); @@ -142,6 +145,12 @@ protected override async Task Shutdown() private ConsumeService CreateConsumerService() { + if (_pushSubscriptionSettings.IsFifo()) + { + Logger.LogInformation( + $"Create FIFO consume service, consumerGroup={_consumerGroup}, clientId={ClientId}"); + return new FifoConsumeService(ClientId, _messageListener, _consumptionTaskScheduler, _consumptionCts.Token); + } Logger.LogInformation( $"Create standard consume service, consumerGroup={_consumerGroup}, clientId={ClientId}"); return new StandardConsumeService(ClientId, _messageListener, _consumptionTaskScheduler, _consumptionCts.Token); @@ -349,7 +358,7 @@ internal void DropProcessQueue(MessageQueue mq) protected ProcessQueue CreateProcessQueue(MessageQueue mq, FilterExpression filterExpression) { var processQueue = new ProcessQueue(this, mq, filterExpression, _receiveMsgCts, _ackMsgCts, - _changeInvisibleDurationCts); + _changeInvisibleDurationCts, _forwardMsgToDeadLetterQueueCts); if (_processQueueTable.TryGetValue(mq, out var previous)) { return null; @@ -357,7 +366,7 @@ protected ProcessQueue CreateProcessQueue(MessageQueue mq, FilterExpression filt _processQueueTable.TryAdd(mq, processQueue); return processQueue; } - + public async Task AckMessage(MessageView messageView) { if (State.Running != State) @@ -420,6 +429,24 @@ protected internal AckMessageRequest WrapAckMessageRequest(MessageView messageVi Entries = { entry } }; } + + protected internal ForwardMessageToDeadLetterQueueRequest WrapForwardMessageToDeadLetterQueueRequest(MessageView messageView) + { + var topicResource = new Proto.Resource + { + Name = messageView.Topic + }; + + return new ForwardMessageToDeadLetterQueueRequest + { + Group = GetProtobufGroup(), + Topic = topicResource, + ReceiptHandle = messageView.ReceiptHandle, + MessageId = messageView.MessageId, + DeliveryAttempt = messageView.DeliveryAttempt, + MaxDeliveryAttempts = GetRetryPolicy().GetMaxAttempts() + }; + } protected override void OnTopicRouteDataUpdated0(string topic, TopicRouteData topicRouteData) {