Skip to content

Commit

Permalink
VCST-1230: Add UnregisterEventHandler() (#2797)
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-dudarev authored May 23, 2024
1 parent 94aa186 commit f87c135
Show file tree
Hide file tree
Showing 13 changed files with 335 additions and 39 deletions.
2 changes: 2 additions & 0 deletions src/VirtoCommerce.Platform.Core/Bus/HandlerWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ public sealed class HandlerWrapper
{
public Type EventType { get; set; }

public Type HandlerType { get; set; }

public string HandlerModuleName { get; set; }

public Func<IMessage, CancellationToken, Task> Handler { get; set; }
Expand Down
2 changes: 1 addition & 1 deletion src/VirtoCommerce.Platform.Core/Bus/IHandlerRegistrar.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace VirtoCommerce.Platform.Core.Bus
{
public interface IHandlerRegistrar
{
[Obsolete("Use IApplicationBuilder.RegisterEventHandler<>()", DiagnosticId = "VC0008", UrlFormat = "https://docs.virtocommerce.org/products/products-virto3-versions")]
[Obsolete("Use IApplicationBuilder.RegisterEventHandler<TEvent, THandler>()", DiagnosticId = "VC0008", UrlFormat = "https://docs.virtocommerce.org/products/products-virto3-versions")]
void RegisterHandler<T>(Func<T, CancellationToken, Task> handler) where T : IMessage;
}
}
61 changes: 58 additions & 3 deletions src/VirtoCommerce.Platform.Core/Bus/InProcessBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public InProcessBus(ILogger<InProcessBus> logger)
_logger = logger;
}

[Obsolete("Use IApplicationBuilder.RegisterEventHandler<TEvent, THandler>()", DiagnosticId = "VC0008", UrlFormat = "https://docs.virtocommerce.org/products/products-virto3-versions")]
public void RegisterEventHandler<T>(Func<T, Task> handler)
where T : IEvent
{
Expand All @@ -35,15 +36,14 @@ public void RegisterEventHandler<T>(Func<T, Task> handler)
_handlers.Add(handlerWrapper);
}

[Obsolete("Use IApplicationBuilder.RegisterEventHandler<TEvent, THandler>()", DiagnosticId = "VC0008", UrlFormat = "https://docs.virtocommerce.org/products/products-virto3-versions")]
public void RegisterEventHandler<T>(Func<T, CancellationToken, Task> handler)
where T : IEvent
{
#pragma warning disable VC0008 // Type or member is obsolete
RegisterHandler(handler);
#pragma warning restore VC0008 // Type or member is obsolete
}

[Obsolete("Use IApplicationBuilder.RegisterEventHandler<>()", DiagnosticId = "VC0008", UrlFormat = "https://docs.virtocommerce.org/products/products-virto3-versions")]
[Obsolete("Use IApplicationBuilder.RegisterEventHandler<TEvent, THandler>()", DiagnosticId = "VC0008", UrlFormat = "https://docs.virtocommerce.org/products/products-virto3-versions")]
public void RegisterHandler<T>(Func<T, CancellationToken, Task> handler)
where T : IMessage
{
Expand All @@ -60,6 +60,61 @@ public void RegisterHandler<T>(Func<T, CancellationToken, Task> handler)
_handlers.Add(handlerWrapper);
}

public void RegisterEventHandler<T>(IEventHandler<T> handler)
where T : IEvent
{
var eventType = typeof(T);
var handlerType = handler.GetType();

var handlerWrapper = new HandlerWrapper
{
EventType = eventType,
HandlerType = handlerType,
HandlerModuleName = handlerType.Module.Assembly.GetName().Name,
Handler = (message, _) => handler.Handle((T)message),
Logger = _logger
};

_handlers.Add(handlerWrapper);
}

public void RegisterEventHandler<T>(ICancellableEventHandler<T> handler)
where T : IEvent
{
var eventType = typeof(T);
var handlerType = handler.GetType();

var handlerWrapper = new HandlerWrapper
{
EventType = eventType,
HandlerType = handlerType,
HandlerModuleName = handlerType.Module.Assembly.GetName().Name,
Handler = (message, cancellationToken) => handler.Handle((T)message, cancellationToken),
Logger = _logger
};

_handlers.Add(handlerWrapper);
}

public void UnregisterEventHandler<T>(Type handlerType = null)
where T : IEvent
{
var eventType = typeof(T);

var handlersToRemove = _handlers
.Where(x =>
x.EventType.IsAssignableFrom(eventType) &&
(handlerType is null || x.HandlerType == handlerType))
.ToList();

handlersToRemove.ForEach(x => _handlers.Remove(x));
}

public void UnregisterAllEventHandlers()
{
_handlers.Clear();
}

public async Task Publish<T>(T @event, CancellationToken cancellationToken = default)
where T : IEvent
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ public static IApplicationBuilder RegisterEventHandler<TEvent, THandler>(this IA
where TEvent : IEvent
where THandler : IEventHandler<TEvent>
{
var registrar = applicationBuilder.ApplicationServices.GetRequiredService<IEventHandlerRegistrar>();
var handler = applicationBuilder.ApplicationServices.GetRequiredService<THandler>();
return applicationBuilder.RegisterEventHandler<TEvent>(handler.Handle);
registrar.RegisterEventHandler(handler);
return applicationBuilder;
}

[Obsolete("Use IApplicationBuilder.RegisterEventHandler<TEvent, THandler>()", DiagnosticId = "VC0008", UrlFormat = "https://docs.virtocommerce.org/products/products-virto3-versions")]
public static IApplicationBuilder RegisterEventHandler<TEvent>(this IApplicationBuilder applicationBuilder, Func<TEvent, Task> handler)
where TEvent : IEvent
{
Expand All @@ -28,15 +31,51 @@ public static IApplicationBuilder RegisterCancellableEventHandler<TEvent, THandl
where TEvent : IEvent
where THandler : ICancellableEventHandler<TEvent>
{
var registrar = applicationBuilder.ApplicationServices.GetRequiredService<IEventHandlerRegistrar>();
var handler = applicationBuilder.ApplicationServices.GetRequiredService<THandler>();
return applicationBuilder.RegisterCancellableEventHandler<TEvent>(handler.Handle);
registrar.RegisterEventHandler(handler);
return applicationBuilder;
}

[Obsolete("Use IApplicationBuilder.RegisterEventHandler<TEvent, THandler>()", DiagnosticId = "VC0008", UrlFormat = "https://docs.virtocommerce.org/products/products-virto3-versions")]
public static IApplicationBuilder RegisterCancellableEventHandler<TEvent>(this IApplicationBuilder applicationBuilder, Func<TEvent, CancellationToken, Task> handler)
where TEvent : IEvent
{
var registrar = applicationBuilder.ApplicationServices.GetRequiredService<IEventHandlerRegistrar>();
registrar.RegisterEventHandler(handler);
return applicationBuilder;
}

public static IApplicationBuilder UnregisterEventHandler<TEvent, THandler>(this IApplicationBuilder applicationBuilder)
where TEvent : IEvent
where THandler : IEventHandler<TEvent>
{
var registrar = applicationBuilder.ApplicationServices.GetRequiredService<IEventHandlerRegistrar>();
registrar.UnregisterEventHandler<TEvent>(typeof(THandler));
return applicationBuilder;
}

public static IApplicationBuilder UnregisterCancellableEventHandler<TEvent, THandler>(this IApplicationBuilder applicationBuilder)
where TEvent : IEvent
where THandler : ICancellableEventHandler<TEvent>
{
var registrar = applicationBuilder.ApplicationServices.GetRequiredService<IEventHandlerRegistrar>();
registrar.UnregisterEventHandler<TEvent>(typeof(THandler));
return applicationBuilder;
}

public static IApplicationBuilder UnregisterEventHandlers<TEvent>(this IApplicationBuilder applicationBuilder)
where TEvent : IEvent
{
var registrar = applicationBuilder.ApplicationServices.GetRequiredService<IEventHandlerRegistrar>();
registrar.UnregisterEventHandler<TEvent>();
return applicationBuilder;
}

public static IApplicationBuilder UnregisterAllEventHandlers(this IApplicationBuilder applicationBuilder)
{
var registrar = applicationBuilder.ApplicationServices.GetRequiredService<IEventHandlerRegistrar>();
registrar.UnregisterAllEventHandlers();
return applicationBuilder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ namespace VirtoCommerce.Platform.Core.Events;

public interface IEventHandlerRegistrar
{
[Obsolete("Use IApplicationBuilder.RegisterEventHandler<TEvent, THandler>()", DiagnosticId = "VC0008", UrlFormat = "https://docs.virtocommerce.org/products/products-virto3-versions")]
void RegisterEventHandler<T>(Func<T, Task> handler) where T : IEvent;

[Obsolete("Use IApplicationBuilder.RegisterEventHandler<TEvent, THandler>()", DiagnosticId = "VC0008", UrlFormat = "https://docs.virtocommerce.org/products/products-virto3-versions")]
void RegisterEventHandler<T>(Func<T, CancellationToken, Task> handler) where T : IEvent;

void RegisterEventHandler<T>(IEventHandler<T> handler) where T : IEvent;
void RegisterEventHandler<T>(ICancellableEventHandler<T> handler) where T : IEvent;

void UnregisterEventHandler<T>(Type handlerType = null) where T : IEvent;
void UnregisterAllEventHandlers();
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
using Microsoft.Extensions.Options;
using VirtoCommerce.Platform.Core.Events;
using VirtoCommerce.Platform.Core.Security;
using VirtoCommerce.Platform.Core.Settings;
using VirtoCommerce.Platform.Core.Settings.Events;
using VirtoCommerce.Platform.Hangfire.Middleware;

Expand Down Expand Up @@ -61,9 +60,7 @@ public static IApplicationBuilder UseHangfire(this IApplicationBuilder appBuilde
var mvcJsonOptions = appBuilder.ApplicationServices.GetService<IOptions<MvcNewtonsoftJsonOptions>>();
GlobalConfiguration.Configuration.UseSerializerSettings(mvcJsonOptions.Value.SerializerSettings);

var recurringJobManager = appBuilder.ApplicationServices.GetService<IRecurringJobManager>();
var settingsManager = appBuilder.ApplicationServices.GetService<ISettingsManager>();
appBuilder.RegisterEventHandler<ObjectSettingChangedEvent>(message => recurringJobManager.HandleSettingChangeAsync(settingsManager, message));
appBuilder.RegisterEventHandler<ObjectSettingChangedEvent, RecurringJobService>();

// Add Hangfire filters/middlewares
var userNameResolver = appBuilder.ApplicationServices.CreateScope().ServiceProvider.GetRequiredService<IUserNameResolver>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public static class RecurringJobExtensions
/// <param name="settingsManager"></param>
/// <param name="settingCronJob"></param>
/// <returns></returns>
[Obsolete("Use IRecurringJobService.WatchJobSetting()", DiagnosticId = "VC0008", UrlFormat = "https://docs.virtocommerce.org/products/products-virto3-versions")]
public static void WatchJobSetting(this IRecurringJobManager recurringJobManager,
ISettingsManager settingsManager,
SettingCronJob settingCronJob)
Expand All @@ -41,6 +42,7 @@ public static void WatchJobSetting(this IRecurringJobManager recurringJobManager
/// <param name="jobId"></param>
/// <param name="timeZoneInfo"></param>
/// <param name="queue"></param>
[Obsolete("Use IRecurringJobService.WatchJobSetting()", DiagnosticId = "VC0008", UrlFormat = "https://docs.virtocommerce.org/products/products-virto3-versions")]
public static void WatchJobSetting<T>(this IRecurringJobManager recurringJobManager,
ISettingsManager settingsManager,
SettingDescriptor enablerSetting,
Expand Down Expand Up @@ -78,6 +80,7 @@ public static void WatchJobSetting<T>(this IRecurringJobManager recurringJobMana
/// <param name="settingsManager"></param>
/// <param name="settingCronJob"></param>
/// <returns></returns>
[Obsolete("Use IRecurringJobService.WatchJobSettingAsync()", DiagnosticId = "VC0008", UrlFormat = "https://docs.virtocommerce.org/products/products-virto3-versions")]
public static Task WatchJobSettingAsync(this IRecurringJobManager recurringJobManager,
ISettingsManager settingsManager,
SettingCronJob settingCronJob)
Expand All @@ -104,6 +107,7 @@ public static Task WatchJobSettingAsync(this IRecurringJobManager recurringJobMa
/// <param name="settingsManager"></param>
/// <param name="message"></param>
/// <returns></returns>
[Obsolete("Use RecurringJobService.Handle()", DiagnosticId = "VC0008", UrlFormat = "https://docs.virtocommerce.org/products/products-virto3-versions")]
public static Task HandleSettingChangeAsync(this IRecurringJobManager recurringJobManager, ISettingsManager settingsManager, ObjectSettingChangedEvent message)
{
if (recurringJobManager == null)
Expand All @@ -117,6 +121,7 @@ public static Task HandleSettingChangeAsync(this IRecurringJobManager recurringJ
return recurringJobManager.HandleSettingChangeAsyncIntnl(settingsManager, message);
}

[Obsolete("Use RecurringJobService.Handle()", DiagnosticId = "VC0008", UrlFormat = "https://docs.virtocommerce.org/products/products-virto3-versions")]
private static async Task HandleSettingChangeAsyncIntnl(this IRecurringJobManager recurringJobManager, ISettingsManager settingsManager, ObjectSettingChangedEvent message)
{
foreach (var changedEntry in message.ChangedEntries.Where(x => x.EntryState == EntryState.Modified
Expand All @@ -129,6 +134,7 @@ private static async Task HandleSettingChangeAsyncIntnl(this IRecurringJobManage
}
}

[Obsolete("Use RecurringJobService.RunOrRemoveJobAsync()", DiagnosticId = "VC0008", UrlFormat = "https://docs.virtocommerce.org/products/products-virto3-versions")]
private static async Task RunOrRemoveJobAsync(this IRecurringJobManager recurringJobManager, ISettingsManager settingsManager, SettingCronJob settingCronJob)
{
var processJobEnableSettingValue = await settingsManager.GetValueAsync<object>(settingCronJob.EnableSetting);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public static IGlobalConfiguration AddHangfireStorage(this IGlobalConfiguration

public static object AddHangfire(this IServiceCollection services, IConfiguration configuration)
{
services.AddSingleton<RecurringJobService>();
services.AddSingleton<IRecurringJobService, RecurringJobService>();

var section = configuration.GetSection("VirtoCommerce:Hangfire");
var hangfireOptions = new HangfireOptions();
section.Bind(hangfireOptions);
Expand Down
21 changes: 21 additions & 0 deletions src/VirtoCommerce.Platform.Hangfire/IRecurringJobService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System;
using System.Linq.Expressions;
using System.Threading.Tasks;
using VirtoCommerce.Platform.Core.Settings;

namespace VirtoCommerce.Platform.Hangfire;

public interface IRecurringJobService
{
void WatchJobSetting<T>(
SettingDescriptor enablerSetting,
SettingDescriptor cronSetting,
Expression<Func<T, Task>> methodCall,
string jobId,
TimeZoneInfo timeZoneInfo,
string queue);

void WatchJobSetting(SettingCronJob settingCronJob);

Task WatchJobSettingAsync(SettingCronJob settingCronJob);
}
108 changes: 108 additions & 0 deletions src/VirtoCommerce.Platform.Hangfire/RecurringJobService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// This service is functionally identical to the obsolete RecurringJobExtensions
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Linq.Expressions;
using System.Threading.Tasks;
using Hangfire;
using VirtoCommerce.Platform.Core.Common;
using VirtoCommerce.Platform.Core.Events;
using VirtoCommerce.Platform.Core.Settings;
using VirtoCommerce.Platform.Core.Settings.Events;
using VirtoCommerce.Platform.Hangfire.Extensions;

namespace VirtoCommerce.Platform.Hangfire;

public class RecurringJobService : IRecurringJobService, IEventHandler<ObjectSettingChangedEvent>
{
// Key is the observed setting name, value is the object of setting job
private static readonly ConcurrentDictionary<string, SettingCronJob> _observedSettingsDict = new();

private readonly IRecurringJobManager _recurringJobManager;
private readonly ISettingsManager _settingsManager;

public RecurringJobService(IRecurringJobManager recurringJobManager, ISettingsManager settingsManager)
{
_recurringJobManager = recurringJobManager;
_settingsManager = settingsManager;
}

public void WatchJobSetting<T>(
SettingDescriptor enablerSetting,
SettingDescriptor cronSetting,
Expression<Func<T, Task>> methodCall,
string jobId,
TimeZoneInfo timeZoneInfo,
string queue)
{
var settingCronJob = new SettingCronJobBuilder(new SettingCronJob())
.SetEnablerSetting(enablerSetting)
.SetCronSetting(cronSetting)
.SetJobId(jobId)
.SetQueueName(queue)
.SetTimeZoneInfo(timeZoneInfo)
.ToJob(methodCall)
.Build();

WatchJobSetting(settingCronJob);
}

/// <summary>
/// Use SettingCronJobBuilder for creating SettingCronJob
/// </summary>
public void WatchJobSetting(SettingCronJob settingCronJob)
{
WatchJobSettingAsync(settingCronJob).GetAwaiter().GetResult();
}

/// <summary>
/// Use SettingCronJobBuilder for creating SettingCronJob
/// </summary>
public Task WatchJobSettingAsync(SettingCronJob settingCronJob)
{
_observedSettingsDict.AddOrUpdate(settingCronJob.EnableSetting.Name, settingCronJob, (_, _) => settingCronJob);
_observedSettingsDict.AddOrUpdate(settingCronJob.CronSetting.Name, settingCronJob, (_, _) => settingCronJob);

return RunOrRemoveJobAsync(settingCronJob);
}

public async Task Handle(ObjectSettingChangedEvent message)
{
foreach (var settingName in message.ChangedEntries
.Where(x => x.EntryState is EntryState.Modified or EntryState.Added)
.Select(x => x.NewEntry.Name))
{
if (_observedSettingsDict.TryGetValue(settingName, out var settingCronJob))
{
await RunOrRemoveJobAsync(settingCronJob);
}
}

// Temporary solution for backward compatibility
#pragma warning disable VC0008 // Type or member is obsolete
await _recurringJobManager.HandleSettingChangeAsync(_settingsManager, message);
#pragma warning restore VC0008 // Type or member is obsolete
}

private async Task RunOrRemoveJobAsync(SettingCronJob settingCronJob)
{
var processJobEnableSettingValue = await _settingsManager.GetValueAsync<object>(settingCronJob.EnableSetting);
var processJobEnable = settingCronJob.EnabledEvaluator(processJobEnableSettingValue);

if (processJobEnable)
{
var cronExpression = await _settingsManager.GetValueAsync<string>(settingCronJob.CronSetting);

_recurringJobManager.AddOrUpdate(
settingCronJob.RecurringJobId,
settingCronJob.Job,
cronExpression,
settingCronJob.TimeZone,
settingCronJob.Queue);
}
else
{
_recurringJobManager.RemoveIfExists(settingCronJob.RecurringJobId);
}
}
}
Loading

0 comments on commit f87c135

Please sign in to comment.