diff --git a/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj b/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
index a51d09b6..67383190 100644
--- a/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
+++ b/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj
@@ -37,7 +37,7 @@
-
+
diff --git a/csharp/tests/AttemptIdIntegrationTest.cs b/csharp/tests/AttemptIdIntegrationTest.cs
new file mode 100644
index 00000000..e51f1bfc
--- /dev/null
+++ b/csharp/tests/AttemptIdIntegrationTest.cs
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+using FilterExpression = Org.Apache.Rocketmq.FilterExpression;
+
+namespace tests
+{
+ [TestClass]
+ public class AttemptIdIntegrationTest : GrpcServerIntegrationTest
+ {
+ private const string Topic = "topic";
+ private const string Broker = "broker";
+
+ private Server _server;
+ private readonly List _attemptIdList = new ConcurrentBag().ToList();
+
+ [TestInitialize]
+ public void SetUp()
+ {
+ var mockServer = new MockServer(Topic, Broker, _attemptIdList);
+ _server = SetUpServer(mockServer, Port);
+ mockServer.Port = Port;
+ }
+
+ [TestCleanup]
+ public async Task TearDown()
+ {
+ await _server.ShutdownAsync();
+ }
+
+ [TestMethod]
+ public async Task Test()
+ {
+ var endpoint = "127.0.0.1" + ":" + Port;
+ var credentialsProvider = new StaticSessionCredentialsProvider("yourAccessKey", "yourSecretKey");
+ var clientConfig = new ClientConfig.Builder()
+ .SetEndpoints(endpoint)
+ .SetCredentialsProvider(credentialsProvider)
+ .EnableSsl(false)
+ .Build();
+
+ const string consumerGroup = "yourConsumerGroup";
+ const string topic = "yourTopic";
+ var subscription = new Dictionary
+ { { topic, new FilterExpression("*") } };
+
+ var pushConsumer = await new PushConsumer.Builder()
+ .SetClientConfig(clientConfig)
+ .SetConsumerGroup(consumerGroup)
+ .SetSubscriptionExpression(subscription)
+ .SetMessageListener(new CustomMessageListener())
+ .Build();
+
+ await Task.Run(async () =>
+ {
+ await WaitForConditionAsync(() =>
+ {
+ Assert.IsTrue(_attemptIdList.Count >= 3);
+ Assert.AreEqual(_attemptIdList[0], _attemptIdList[1]);
+ Assert.AreNotEqual(_attemptIdList[0], _attemptIdList[2]);
+ }, TimeSpan.FromSeconds(5));
+ });
+ }
+
+ private async Task WaitForConditionAsync(Action assertCondition, TimeSpan timeout)
+ {
+ var startTime = DateTime.UtcNow;
+ while (DateTime.UtcNow - startTime < timeout)
+ {
+ try
+ {
+ assertCondition();
+ return; // Condition met, exit the method
+ }
+ catch
+ {
+ // Condition not met, ignore exception and try again after a delay
+ }
+
+ await Task.Delay(100); // Small delay to avoid tight loop
+ }
+
+ // Perform last check to throw the exception
+ assertCondition();
+ }
+
+ private class CustomMessageListener : IMessageListener
+ {
+ public ConsumeResult Consume(MessageView messageView)
+ {
+ return ConsumeResult.SUCCESS;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/GrpcServerIntegrationTest.cs b/csharp/tests/GrpcServerIntegrationTest.cs
new file mode 100644
index 00000000..3e207ce5
--- /dev/null
+++ b/csharp/tests/GrpcServerIntegrationTest.cs
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Linq;
+using Apache.Rocketmq.V2;
+using Grpc.Core;
+
+namespace tests
+{
+ public abstract class GrpcServerIntegrationTest
+ {
+ protected int Port;
+
+ protected Server SetUpServer(MessagingService.MessagingServiceBase mockServer, int port)
+ {
+ var server = new Server
+ {
+ Ports = { new ServerPort("127.0.0.1", Port, ServerCredentials.Insecure) },
+ Services = { MessagingService.BindService(mockServer) }
+ };
+ server.Start();
+ Port = server.Ports.First().BoundPort;
+ return server;
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/MockServer.cs b/csharp/tests/MockServer.cs
new file mode 100644
index 00000000..ff7ae3fc
--- /dev/null
+++ b/csharp/tests/MockServer.cs
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Google.Protobuf.WellKnownTypes;
+using Grpc.Core;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+ public class MockServer : Proto.MessagingService.MessagingServiceBase
+ {
+ private readonly List _attemptIdList;
+ private int _serverDeadlineFlag = 1;
+
+ private readonly Proto.Status _mockStatus = new Proto.Status
+ {
+ Code = Proto.Code.Ok,
+ Message = "mock test"
+ };
+
+ private readonly string _topic;
+ private readonly string _broker;
+
+ public MockServer(string topic, string broker, List attemptIdList)
+ {
+ _topic = topic;
+ _broker = broker;
+ _attemptIdList = attemptIdList;
+ }
+
+ public int Port { get; set; }
+
+ public override Task QueryRoute(Proto.QueryRouteRequest request,
+ ServerCallContext context)
+ {
+ var response = new Proto.QueryRouteResponse
+ {
+ Status = _mockStatus,
+ MessageQueues =
+ {
+ new Proto.MessageQueue
+ {
+ Topic = new Proto.Resource { Name = _topic },
+ Id = 0,
+ Permission = Proto.Permission.ReadWrite,
+ Broker = new Proto.Broker
+ {
+ Name = _broker,
+ Id = 0,
+ Endpoints = new Proto.Endpoints
+ {
+ Addresses =
+ {
+ new Proto.Address { Host = "127.0.0.1", Port = Port }
+ }
+ }
+ },
+ AcceptMessageTypes = { Proto.MessageType.Normal }
+ }
+ }
+ };
+ return Task.FromResult(response);
+ }
+
+ public override Task Heartbeat(Proto.HeartbeatRequest request,
+ ServerCallContext context)
+ {
+ var response = new Proto.HeartbeatResponse { Status = _mockStatus };
+ return Task.FromResult(response);
+ }
+
+ public override Task QueryAssignment(Proto.QueryAssignmentRequest request,
+ ServerCallContext context)
+ {
+ var response = new Proto.QueryAssignmentResponse
+ {
+ Status = _mockStatus,
+ Assignments =
+ {
+ new Proto.Assignment
+ {
+ MessageQueue = new Proto.MessageQueue
+ {
+ Topic = new Proto.Resource { Name = _topic },
+ Id = 0,
+ Permission = Proto.Permission.ReadWrite,
+ Broker = new Proto.Broker
+ {
+ Name = _broker,
+ Id = 0,
+ Endpoints = new Proto.Endpoints
+ {
+ Addresses =
+ {
+ new Proto.Address { Host = "127.0.0.1", Port = Port }
+ }
+ }
+ },
+ AcceptMessageTypes = { Proto.MessageType.Normal }
+ }
+ }
+ }
+ };
+ return Task.FromResult(response);
+ }
+
+ public override async Task ReceiveMessage(Proto.ReceiveMessageRequest request,
+ IServerStreamWriter responseStream, ServerCallContext context)
+ {
+ if (_attemptIdList.Count >= 3)
+ {
+ await Task.Delay(100);
+ }
+
+ _attemptIdList.Add(request.AttemptId);
+
+ if (CompareAndSetServerDeadlineFlag(true, false))
+ {
+ // timeout
+ }
+ else
+ {
+ var response = new Proto.ReceiveMessageResponse { Status = _mockStatus };
+ await responseStream.WriteAsync(response);
+ }
+ }
+
+ public override async Task Telemetry(IAsyncStreamReader requestStream,
+ IServerStreamWriter responseStream, ServerCallContext context)
+ {
+ await foreach (var command in requestStream.ReadAllAsync())
+ {
+ var response = command.Clone();
+ response.Status = _mockStatus;
+ response.Settings.BackoffPolicy.MaxAttempts = 16;
+ response.Settings.BackoffPolicy.ExponentialBackoff = new Proto.ExponentialBackoff
+ {
+ Initial = new Duration { Seconds = 1 },
+ Max = new Duration { Seconds = 10 },
+ Multiplier = 1.5f
+ };
+
+ await responseStream.WriteAsync(response);
+ }
+ }
+
+ private bool CompareAndSetServerDeadlineFlag(bool expectedValue, bool newValue)
+ {
+ var expected = expectedValue ? 1 : 0;
+ var newVal = newValue ? 1 : 0;
+ return Interlocked.CompareExchange(ref _serverDeadlineFlag, newVal, expected) == expected;
+ }
+ }
+}
\ No newline at end of file
diff --git a/csharp/tests/tests.csproj b/csharp/tests/tests.csproj
index 1e3b9fee..30f14bb3 100644
--- a/csharp/tests/tests.csproj
+++ b/csharp/tests/tests.csproj
@@ -7,6 +7,7 @@
+