From c363fe1f674e822cee30a4e65018c9fccbc07b17 Mon Sep 17 00:00:00 2001 From: "tsaitsung-han.tht" Date: Mon, 29 Jul 2024 13:41:25 +0800 Subject: [PATCH] Add integration test for csharp sdk --- .../rocketmq-client-csharp.csproj | 2 +- csharp/tests/AttemptIdIntegrationTest.cs | 117 ++++++++++++ csharp/tests/GrpcServerIntegrationTest.cs | 40 ++++ csharp/tests/MockServer.cs | 171 ++++++++++++++++++ csharp/tests/tests.csproj | 1 + 5 files changed, 330 insertions(+), 1 deletion(-) create mode 100644 csharp/tests/AttemptIdIntegrationTest.cs create mode 100644 csharp/tests/GrpcServerIntegrationTest.cs create mode 100644 csharp/tests/MockServer.cs diff --git a/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj b/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj index a51d09b69..673831904 100644 --- a/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj +++ b/csharp/rocketmq-client-csharp/rocketmq-client-csharp.csproj @@ -37,7 +37,7 @@ - + diff --git a/csharp/tests/AttemptIdIntegrationTest.cs b/csharp/tests/AttemptIdIntegrationTest.cs new file mode 100644 index 000000000..e51f1bfc5 --- /dev/null +++ b/csharp/tests/AttemptIdIntegrationTest.cs @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Grpc.Core; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.Rocketmq; +using FilterExpression = Org.Apache.Rocketmq.FilterExpression; + +namespace tests +{ + [TestClass] + public class AttemptIdIntegrationTest : GrpcServerIntegrationTest + { + private const string Topic = "topic"; + private const string Broker = "broker"; + + private Server _server; + private readonly List _attemptIdList = new ConcurrentBag().ToList(); + + [TestInitialize] + public void SetUp() + { + var mockServer = new MockServer(Topic, Broker, _attemptIdList); + _server = SetUpServer(mockServer, Port); + mockServer.Port = Port; + } + + [TestCleanup] + public async Task TearDown() + { + await _server.ShutdownAsync(); + } + + [TestMethod] + public async Task Test() + { + var endpoint = "127.0.0.1" + ":" + Port; + var credentialsProvider = new StaticSessionCredentialsProvider("yourAccessKey", "yourSecretKey"); + var clientConfig = new ClientConfig.Builder() + .SetEndpoints(endpoint) + .SetCredentialsProvider(credentialsProvider) + .EnableSsl(false) + .Build(); + + const string consumerGroup = "yourConsumerGroup"; + const string topic = "yourTopic"; + var subscription = new Dictionary + { { topic, new FilterExpression("*") } }; + + var pushConsumer = await new PushConsumer.Builder() + .SetClientConfig(clientConfig) + .SetConsumerGroup(consumerGroup) + .SetSubscriptionExpression(subscription) + .SetMessageListener(new CustomMessageListener()) + .Build(); + + await Task.Run(async () => + { + await WaitForConditionAsync(() => + { + Assert.IsTrue(_attemptIdList.Count >= 3); + Assert.AreEqual(_attemptIdList[0], _attemptIdList[1]); + Assert.AreNotEqual(_attemptIdList[0], _attemptIdList[2]); + }, TimeSpan.FromSeconds(5)); + }); + } + + private async Task WaitForConditionAsync(Action assertCondition, TimeSpan timeout) + { + var startTime = DateTime.UtcNow; + while (DateTime.UtcNow - startTime < timeout) + { + try + { + assertCondition(); + return; // Condition met, exit the method + } + catch + { + // Condition not met, ignore exception and try again after a delay + } + + await Task.Delay(100); // Small delay to avoid tight loop + } + + // Perform last check to throw the exception + assertCondition(); + } + + private class CustomMessageListener : IMessageListener + { + public ConsumeResult Consume(MessageView messageView) + { + return ConsumeResult.SUCCESS; + } + } + } +} \ No newline at end of file diff --git a/csharp/tests/GrpcServerIntegrationTest.cs b/csharp/tests/GrpcServerIntegrationTest.cs new file mode 100644 index 000000000..3e207ce59 --- /dev/null +++ b/csharp/tests/GrpcServerIntegrationTest.cs @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System.Linq; +using Apache.Rocketmq.V2; +using Grpc.Core; + +namespace tests +{ + public abstract class GrpcServerIntegrationTest + { + protected int Port; + + protected Server SetUpServer(MessagingService.MessagingServiceBase mockServer, int port) + { + var server = new Server + { + Ports = { new ServerPort("127.0.0.1", Port, ServerCredentials.Insecure) }, + Services = { MessagingService.BindService(mockServer) } + }; + server.Start(); + Port = server.Ports.First().BoundPort; + return server; + } + } +} \ No newline at end of file diff --git a/csharp/tests/MockServer.cs b/csharp/tests/MockServer.cs new file mode 100644 index 000000000..ff7ae3fc5 --- /dev/null +++ b/csharp/tests/MockServer.cs @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Google.Protobuf.WellKnownTypes; +using Grpc.Core; +using Proto = Apache.Rocketmq.V2; + +namespace tests +{ + public class MockServer : Proto.MessagingService.MessagingServiceBase + { + private readonly List _attemptIdList; + private int _serverDeadlineFlag = 1; + + private readonly Proto.Status _mockStatus = new Proto.Status + { + Code = Proto.Code.Ok, + Message = "mock test" + }; + + private readonly string _topic; + private readonly string _broker; + + public MockServer(string topic, string broker, List attemptIdList) + { + _topic = topic; + _broker = broker; + _attemptIdList = attemptIdList; + } + + public int Port { get; set; } + + public override Task QueryRoute(Proto.QueryRouteRequest request, + ServerCallContext context) + { + var response = new Proto.QueryRouteResponse + { + Status = _mockStatus, + MessageQueues = + { + new Proto.MessageQueue + { + Topic = new Proto.Resource { Name = _topic }, + Id = 0, + Permission = Proto.Permission.ReadWrite, + Broker = new Proto.Broker + { + Name = _broker, + Id = 0, + Endpoints = new Proto.Endpoints + { + Addresses = + { + new Proto.Address { Host = "127.0.0.1", Port = Port } + } + } + }, + AcceptMessageTypes = { Proto.MessageType.Normal } + } + } + }; + return Task.FromResult(response); + } + + public override Task Heartbeat(Proto.HeartbeatRequest request, + ServerCallContext context) + { + var response = new Proto.HeartbeatResponse { Status = _mockStatus }; + return Task.FromResult(response); + } + + public override Task QueryAssignment(Proto.QueryAssignmentRequest request, + ServerCallContext context) + { + var response = new Proto.QueryAssignmentResponse + { + Status = _mockStatus, + Assignments = + { + new Proto.Assignment + { + MessageQueue = new Proto.MessageQueue + { + Topic = new Proto.Resource { Name = _topic }, + Id = 0, + Permission = Proto.Permission.ReadWrite, + Broker = new Proto.Broker + { + Name = _broker, + Id = 0, + Endpoints = new Proto.Endpoints + { + Addresses = + { + new Proto.Address { Host = "127.0.0.1", Port = Port } + } + } + }, + AcceptMessageTypes = { Proto.MessageType.Normal } + } + } + } + }; + return Task.FromResult(response); + } + + public override async Task ReceiveMessage(Proto.ReceiveMessageRequest request, + IServerStreamWriter responseStream, ServerCallContext context) + { + if (_attemptIdList.Count >= 3) + { + await Task.Delay(100); + } + + _attemptIdList.Add(request.AttemptId); + + if (CompareAndSetServerDeadlineFlag(true, false)) + { + // timeout + } + else + { + var response = new Proto.ReceiveMessageResponse { Status = _mockStatus }; + await responseStream.WriteAsync(response); + } + } + + public override async Task Telemetry(IAsyncStreamReader requestStream, + IServerStreamWriter responseStream, ServerCallContext context) + { + await foreach (var command in requestStream.ReadAllAsync()) + { + var response = command.Clone(); + response.Status = _mockStatus; + response.Settings.BackoffPolicy.MaxAttempts = 16; + response.Settings.BackoffPolicy.ExponentialBackoff = new Proto.ExponentialBackoff + { + Initial = new Duration { Seconds = 1 }, + Max = new Duration { Seconds = 10 }, + Multiplier = 1.5f + }; + + await responseStream.WriteAsync(response); + } + } + + private bool CompareAndSetServerDeadlineFlag(bool expectedValue, bool newValue) + { + var expected = expectedValue ? 1 : 0; + var newVal = newValue ? 1 : 0; + return Interlocked.CompareExchange(ref _serverDeadlineFlag, newVal, expected) == expected; + } + } +} \ No newline at end of file diff --git a/csharp/tests/tests.csproj b/csharp/tests/tests.csproj index 1e3b9feef..30f14bb32 100644 --- a/csharp/tests/tests.csproj +++ b/csharp/tests/tests.csproj @@ -7,6 +7,7 @@ +