Skip to content

Commit

Permalink
Add integration test for csharp sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
tsaitsung-han.tht committed Jul 30, 2024
1 parent 7b65925 commit c363fe1
Show file tree
Hide file tree
Showing 5 changed files with 330 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<PackageReference Include="ParallelExtensionsExtras" Version="1.2.0" />

<Protobuf Include="..\..\protos\apache\rocketmq\v2\definition.proto" ProtoRoot="..\..\protos" GrpcServices="Client" />
<Protobuf Include="..\..\protos\apache\rocketmq\v2\service.proto" ProtoRoot="..\..\protos" GrpcServices="Client" />
<Protobuf Include="..\..\protos\apache\rocketmq\v2\service.proto" ProtoRoot="..\..\protos" GrpcServices="Both" />
<None Include="logo.png" Pack="true" PackagePath="" />
</ItemGroup>
</Project>
117 changes: 117 additions & 0 deletions csharp/tests/AttemptIdIntegrationTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Grpc.Core;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Org.Apache.Rocketmq;
using FilterExpression = Org.Apache.Rocketmq.FilterExpression;

namespace tests
{
[TestClass]
public class AttemptIdIntegrationTest : GrpcServerIntegrationTest
{
private const string Topic = "topic";
private const string Broker = "broker";

private Server _server;
private readonly List<string> _attemptIdList = new ConcurrentBag<string>().ToList();

[TestInitialize]
public void SetUp()
{
var mockServer = new MockServer(Topic, Broker, _attemptIdList);
_server = SetUpServer(mockServer, Port);
mockServer.Port = Port;
}

[TestCleanup]
public async Task TearDown()
{
await _server.ShutdownAsync();
}

[TestMethod]
public async Task Test()
{
var endpoint = "127.0.0.1" + ":" + Port;
var credentialsProvider = new StaticSessionCredentialsProvider("yourAccessKey", "yourSecretKey");
var clientConfig = new ClientConfig.Builder()
.SetEndpoints(endpoint)
.SetCredentialsProvider(credentialsProvider)
.EnableSsl(false)
.Build();

const string consumerGroup = "yourConsumerGroup";
const string topic = "yourTopic";
var subscription = new Dictionary<string, FilterExpression>
{ { topic, new FilterExpression("*") } };

var pushConsumer = await new PushConsumer.Builder()
.SetClientConfig(clientConfig)
.SetConsumerGroup(consumerGroup)
.SetSubscriptionExpression(subscription)
.SetMessageListener(new CustomMessageListener())
.Build();

await Task.Run(async () =>
{
await WaitForConditionAsync(() =>
{
Assert.IsTrue(_attemptIdList.Count >= 3);
Assert.AreEqual(_attemptIdList[0], _attemptIdList[1]);
Assert.AreNotEqual(_attemptIdList[0], _attemptIdList[2]);
}, TimeSpan.FromSeconds(5));
});
}

private async Task WaitForConditionAsync(Action assertCondition, TimeSpan timeout)
{
var startTime = DateTime.UtcNow;
while (DateTime.UtcNow - startTime < timeout)
{
try
{
assertCondition();
return; // Condition met, exit the method
}
catch
{
// Condition not met, ignore exception and try again after a delay
}

await Task.Delay(100); // Small delay to avoid tight loop
}

// Perform last check to throw the exception
assertCondition();
}

private class CustomMessageListener : IMessageListener
{
public ConsumeResult Consume(MessageView messageView)
{
return ConsumeResult.SUCCESS;
}
}
}
}
40 changes: 40 additions & 0 deletions csharp/tests/GrpcServerIntegrationTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System.Linq;
using Apache.Rocketmq.V2;
using Grpc.Core;

namespace tests
{
public abstract class GrpcServerIntegrationTest
{
protected int Port;

protected Server SetUpServer(MessagingService.MessagingServiceBase mockServer, int port)
{
var server = new Server
{
Ports = { new ServerPort("127.0.0.1", Port, ServerCredentials.Insecure) },
Services = { MessagingService.BindService(mockServer) }
};
server.Start();
Port = server.Ports.First().BoundPort;
return server;
}
}
}
171 changes: 171 additions & 0 deletions csharp/tests/MockServer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using Proto = Apache.Rocketmq.V2;

namespace tests
{
public class MockServer : Proto.MessagingService.MessagingServiceBase
{
private readonly List<string> _attemptIdList;
private int _serverDeadlineFlag = 1;

private readonly Proto.Status _mockStatus = new Proto.Status
{
Code = Proto.Code.Ok,
Message = "mock test"
};

private readonly string _topic;
private readonly string _broker;

public MockServer(string topic, string broker, List<string> attemptIdList)
{
_topic = topic;
_broker = broker;
_attemptIdList = attemptIdList;
}

public int Port { get; set; }

public override Task<Proto.QueryRouteResponse> QueryRoute(Proto.QueryRouteRequest request,
ServerCallContext context)
{
var response = new Proto.QueryRouteResponse
{
Status = _mockStatus,
MessageQueues =
{
new Proto.MessageQueue
{
Topic = new Proto.Resource { Name = _topic },
Id = 0,
Permission = Proto.Permission.ReadWrite,
Broker = new Proto.Broker
{
Name = _broker,
Id = 0,
Endpoints = new Proto.Endpoints
{
Addresses =
{
new Proto.Address { Host = "127.0.0.1", Port = Port }
}
}
},
AcceptMessageTypes = { Proto.MessageType.Normal }
}
}
};
return Task.FromResult(response);
}

public override Task<Proto.HeartbeatResponse> Heartbeat(Proto.HeartbeatRequest request,
ServerCallContext context)
{
var response = new Proto.HeartbeatResponse { Status = _mockStatus };
return Task.FromResult(response);
}

public override Task<Proto.QueryAssignmentResponse> QueryAssignment(Proto.QueryAssignmentRequest request,
ServerCallContext context)
{
var response = new Proto.QueryAssignmentResponse
{
Status = _mockStatus,
Assignments =
{
new Proto.Assignment
{
MessageQueue = new Proto.MessageQueue
{
Topic = new Proto.Resource { Name = _topic },
Id = 0,
Permission = Proto.Permission.ReadWrite,
Broker = new Proto.Broker
{
Name = _broker,
Id = 0,
Endpoints = new Proto.Endpoints
{
Addresses =
{
new Proto.Address { Host = "127.0.0.1", Port = Port }
}
}
},
AcceptMessageTypes = { Proto.MessageType.Normal }
}
}
}
};
return Task.FromResult(response);
}

public override async Task ReceiveMessage(Proto.ReceiveMessageRequest request,
IServerStreamWriter<Proto.ReceiveMessageResponse> responseStream, ServerCallContext context)
{
if (_attemptIdList.Count >= 3)
{
await Task.Delay(100);
}

_attemptIdList.Add(request.AttemptId);

if (CompareAndSetServerDeadlineFlag(true, false))
{
// timeout
}
else
{
var response = new Proto.ReceiveMessageResponse { Status = _mockStatus };
await responseStream.WriteAsync(response);
}
}

public override async Task Telemetry(IAsyncStreamReader<Proto.TelemetryCommand> requestStream,
IServerStreamWriter<Proto.TelemetryCommand> responseStream, ServerCallContext context)
{
await foreach (var command in requestStream.ReadAllAsync())
{
var response = command.Clone();
response.Status = _mockStatus;
response.Settings.BackoffPolicy.MaxAttempts = 16;
response.Settings.BackoffPolicy.ExponentialBackoff = new Proto.ExponentialBackoff
{
Initial = new Duration { Seconds = 1 },
Max = new Duration { Seconds = 10 },
Multiplier = 1.5f
};

await responseStream.WriteAsync(response);
}
}

private bool CompareAndSetServerDeadlineFlag(bool expectedValue, bool newValue)
{
var expected = expectedValue ? 1 : 0;
var newVal = newValue ? 1 : 0;
return Interlocked.CompareExchange(ref _serverDeadlineFlag, newVal, expected) == expected;
}
}
}
1 change: 1 addition & 0 deletions csharp/tests/tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Grpc.Core" Version="2.46.6" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="3.1.32" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.9.4" />
<PackageReference Include="Moq" Version="4.16.1" />
Expand Down

0 comments on commit c363fe1

Please sign in to comment.