Skip to content

Commit

Permalink
Add basic functions from Rabbit.Client: ExchangeDeclare, QueueBind.
Browse files Browse the repository at this point in the history
  • Loading branch information
ifgris committed Sep 9, 2024
1 parent 5ba118b commit d745d64
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 6 deletions.
18 changes: 18 additions & 0 deletions Src/NanoRabbit/IRabbitHelper.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using RabbitMQ.Client;
using System.Threading.Channels;

namespace NanoRabbit;

Expand Down Expand Up @@ -44,6 +45,15 @@ public interface IRabbitHelper

#region utils

/// <summary>
/// Declare an exchange.
/// </summary>
/// <param name="exchangeName"></param>
/// <param name="exchangeType"></param>
/// <param name="durable"></param>
/// <param name="autoDelete"></param>
/// <param name="arguments"></param>
public void ExchangeDeclare(string exchangeName, string exchangeType, bool durable = false, bool autoDelete = false, IDictionary<string, object>? arguments = null);
/// <summary>
/// Declare a queue.
/// </summary>
Expand All @@ -54,6 +64,14 @@ public interface IRabbitHelper
/// <param name="arguments"></param>
public void DeclareQueue(string queueName, bool durable = true, bool exclusive = false, bool autoDelete = false, IDictionary<string, object>? arguments = null);
/// <summary>
/// Bind a queue to an exchange.
/// </summary>
/// <param name="queueName"></param>
/// <param name="exchangeName"></param>
/// <param name="routingKey"></param>
/// <param name="arguments"></param>
public void QueueBind(string queueName, string exchangeName, string routingKey, IDictionary<string, object> arguments);
/// <summary>
/// Create a custom BasicProperties.
/// </summary>
/// <returns></returns>
Expand Down
36 changes: 30 additions & 6 deletions Src/NanoRabbit/RabbitHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public RabbitHelper(RabbitConfiguration rabbitConfig, ILogger logger)
};

// TODO needs testing.
if (_rabbitConfig.TLSConfig != null )
if (_rabbitConfig.TLSConfig != null)
{
factory.Ssl.Enabled = _rabbitConfig.TLSConfig.Enabled;
factory.Ssl.ServerName = _rabbitConfig.TLSConfig.ServerName;
Expand Down Expand Up @@ -124,7 +124,7 @@ public void Publish<T>(string producerName, T message, IBasicProperties? propert
var option = GetProducerOption(producerName);

var messageStr = typeof(T) == typeof(string) ? message.ToString() : JsonConvert.SerializeObject(message);

Check warning on line 126 in Src/NanoRabbit/RabbitHelper.cs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, net6.0)

Dereference of a possibly null reference.

Check warning on line 126 in Src/NanoRabbit/RabbitHelper.cs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, net7.0)

Dereference of a possibly null reference.

Check warning on line 126 in Src/NanoRabbit/RabbitHelper.cs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, net8.0)

Dereference of a possibly null reference.

var body = Encoding.UTF8.GetBytes(messageStr);

Check warning on line 128 in Src/NanoRabbit/RabbitHelper.cs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, net6.0)

Possible null reference argument for parameter 's' in 'byte[] Encoding.GetBytes(string s)'.

Check warning on line 128 in Src/NanoRabbit/RabbitHelper.cs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, net7.0)

Possible null reference argument for parameter 's' in 'byte[] Encoding.GetBytes(string s)'.

Check warning on line 128 in Src/NanoRabbit/RabbitHelper.cs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, net8.0)

Possible null reference argument for parameter 's' in 'byte[] Encoding.GetBytes(string s)'.

_pipeline.Execute(token =>
Expand All @@ -134,9 +134,9 @@ public void Publish<T>(string producerName, T message, IBasicProperties? propert
properties ??= _channel.CreateBasicProperties();
properties.Persistent = true;
_channel.BasicPublish(
exchange: option.ExchangeName,
routingKey: option.RoutingKey,
basicProperties: properties,
exchange: option.ExchangeName,
routingKey: option.RoutingKey,
basicProperties: properties,
body: body);
_logger?.LogInformation($"{producerName}|Published|{messageStr}");
Expand Down Expand Up @@ -264,6 +264,19 @@ public void AddAsyncConsumer(string consumerName, Func<string, Task> onMessageRe

#region utils

/// <summary>
/// Declare an exchange.
/// </summary>
/// <param name="exchangeName"></param>
/// <param name="exchangeType"></param>
/// <param name="durable"></param>
/// <param name="autoDelete"></param>
/// <param name="arguments"></param>
public void ExchangeDeclare(string exchangeName, string exchangeType, bool durable = false, bool autoDelete = false, IDictionary<string, object>? arguments = null)
{
_channel.ExchangeDeclare(exchangeName, exchangeType, durable, autoDelete, arguments);
}

/// <summary>
/// Declare a queue based on RabbitMQ.Client.
/// </summary>
Expand All @@ -275,7 +288,18 @@ public void AddAsyncConsumer(string consumerName, Func<string, Task> onMessageRe
public void DeclareQueue(string queueName, bool durable = true, bool exclusive = false, bool autoDelete = false, IDictionary<string, object>? arguments = null)
{
_channel.QueueDeclare(queue: queueName, durable, exclusive, autoDelete, arguments);
_channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
}

/// <summary>
/// Bind a queue to an exchange.
/// </summary>
/// <param name="queueName"></param>
/// <param name="exchangeName"></param>
/// <param name="routingKey"></param>
/// <param name="arguments"></param>
public void QueueBind(string queueName, string exchangeName, string routingKey, IDictionary<string, object> arguments)
{
_channel.QueueBind(queueName, exchangeName, routingKey, arguments);
}

/// <summary>
Expand Down

0 comments on commit d745d64

Please sign in to comment.