浏览代码

重构eventbus

xfe 9 月之前
父节点
当前提交
4960c69e3c

+ 8 - 8
src/Hotline.Api/StartupExtensions.cs

@@ -27,6 +27,8 @@ using Hotline.Application.CallCenter.Calls;
 using Hotline.Application.CallCenter;
 using Hotline.CallCenter.Calls;
 using Swashbuckle.AspNetCore.SwaggerUI;
+using Hotline.EventBus;
+using MediatR.Pipeline;
 
 namespace Hotline.Api;
 
@@ -94,10 +96,8 @@ internal static class StartupExtensions
         services.RegisterMapper();
 
         //mediatr
-        services.AddMediatR(d =>
-        {
-            d.RegisterServicesFromAssembly(typeof(ApplicationStartupExtensions).Assembly);
-        });
+        //todo 
+        services.RegisterMediatR("YiBin");
 
         //callcenter
         var callCenterConfiguration = configuration
@@ -106,14 +106,14 @@ internal static class StartupExtensions
         services.AddNewRock(callCenterConfiguration.NewRock);
         switch (callCenterConfiguration.CallCenterType)
         {
-            case "XunShi":
+            case AppDefaults.CallCenterType.XunShi:
                 break;
-            case "WerErXin":
+            case AppDefaults.CallCenterType.WeiErXin:
                 services
                     .AddWex(callCenterConfiguration.Wex)
                     .AddWexDb(configuration);
                 break;
-            case "TianRun":
+            case AppDefaults.CallCenterType.TianRun:
                 services
                     .AddScoped<ICallApplication, TianRunCallApplication>()
                     .AddScoped<ITrApplication, TrApplication>()
@@ -123,7 +123,7 @@ internal static class StartupExtensions
                         callCenterConfiguration.TianRun.Username,
                         callCenterConfiguration.TianRun.Password);
                 break;
-            case "XingTang":
+            case AppDefaults.CallCenterType.XingTang:
                 services.AddXingTangDb(callCenterConfiguration.XingTang)
                     .AddScoped<ICallApplication, XingTangCallApplication>()
                     .AddScoped<CallIdManager>()

+ 20 - 0
src/Hotline.Api/StartupHelper.cs

@@ -1,14 +1,17 @@
 using System.IdentityModel.Tokens.Jwt;
 using System.Reflection;
 using System.Text;
+using Hotline.Application;
 using Hotline.Application.Jobs;
 using Hotline.Application.Orders;
 using Hotline.CallCenter.Configs;
+using Hotline.EventBus;
 using Hotline.Identity;
 using Hotline.Repository.SqlSugar;
 using Hotline.Repository.SqlSugar.Ts;
 using Mapster;
 using MapsterMapper;
+using MediatR.Pipeline;
 using Microsoft.AspNetCore.Authentication.JwtBearer;
 using Microsoft.IdentityModel.Tokens;
 using Microsoft.OpenApi.Models;
@@ -301,5 +304,22 @@ namespace Hotline.Api
 
             return services;
         }
+
+        public static IServiceCollection RegisterMediatR(this IServiceCollection services, string appScope)
+        {
+            services.AddMediatR(cfg =>
+            {
+                cfg.TypeEvaluator = d =>
+                {
+                    var attr = d.GetCustomAttribute(typeof(HandlerInjectAttribute)) as HandlerInjectAttribute;
+                    if (attr is null) return true;
+                    return attr.Enable && attr.AppScope == appScope;
+                };
+                cfg.RegisterServicesFromAssemblyContaining<Program>();
+            });
+            services.AddSingleton<Publisher>();
+
+            return services;
+        }
     }
 }

+ 7 - 0
src/Hotline/AppDefaults.cs

@@ -27,5 +27,12 @@ namespace Hotline
             public const string TianRun = "TianRun";
             public const string XingTang = "XingTang";
         }
+
+        public class AppScope
+        {
+            public const string YiBin = "YiBin";
+            public const string ZiGong = "ZiGong";
+            public const string LuZhou = "LuZhou";
+        }
     }
 }

+ 39 - 0
src/Hotline/EventBus/CustomMediator.cs

@@ -0,0 +1,39 @@
+using MediatR;
+using MediatR.Pipeline;
+using Microsoft.Extensions.Logging;
+
+namespace Hotline.EventBus;
+
+public class CustomMediator : Mediator
+{
+    private readonly Func<IEnumerable<NotificationHandlerExecutor>, INotification, CancellationToken, Task> _publish;
+
+    public CustomMediator(IServiceProvider serviceFactory, Func<IEnumerable<NotificationHandlerExecutor>, INotification, CancellationToken, Task> publish) : base(serviceFactory) 
+        => _publish = publish;
+
+    protected override Task PublishCore(IEnumerable<NotificationHandlerExecutor> handlerExecutors, INotification notification, CancellationToken cancellationToken) 
+        => _publish(handlerExecutors, notification, cancellationToken);
+}
+
+public class ExceptionLoggingHandler<TRequest, TResponse, TException> : IRequestExceptionHandler<TRequest, TResponse, TException>
+    where TRequest : IRequest<TResponse>
+    where TException : Exception
+{
+    private readonly ILogger<ExceptionLoggingHandler<TRequest, TResponse, TException>> _logger;
+
+    public ExceptionLoggingHandler(ILogger<ExceptionLoggingHandler<TRequest, TResponse, TException>> logger)
+    {
+        _logger = logger;
+    }
+
+    public Task Handle(TRequest request, TException exception, RequestExceptionHandlerState<TResponse> state, CancellationToken cancellationToken)
+    {
+        _logger.LogError(exception, "Something went wrong while handling request of type {@requestType}", typeof(TRequest));
+
+        // TODO: when we want to show the user somethig went wrong, we need to expand this with something like
+        // a ResponseBase where we wrap the actual response and return an indication whether the call was successful or not.
+        state.SetHandled(default!);
+
+        return Task.CompletedTask;
+    }
+}

+ 22 - 0
src/Hotline/EventBus/HandlerInjectAttribute.cs

@@ -0,0 +1,22 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Hotline.EventBus
+{
+    [AttributeUsage(AttributeTargets.Class)]
+    public class HandlerInjectAttribute : Attribute
+    {
+        /// <summary>
+        /// 适用范围
+        /// </summary>
+        public string AppScope { get; set; }
+
+        /// <summary>
+        /// 是否启用
+        /// </summary>
+        public bool Enable { get; set; } = true;
+    }
+}

+ 37 - 0
src/Hotline/EventBus/PublishStrategy.cs

@@ -0,0 +1,37 @@
+namespace Hotline.EventBus;
+
+/// <summary>
+/// Strategy to use when publishing notifications
+/// </summary>
+public enum PublishStrategy
+{
+    /// <summary>
+    /// Run each notification handler after one another. Returns when all handlers are finished. In case of any exception(s), they will be captured in an AggregateException.
+    /// </summary>
+    SyncContinueOnException = 0,
+
+    /// <summary>
+    /// Run each notification handler after one another. Returns when all handlers are finished or an exception has been thrown. In case of an exception, any handlers after that will not be run.
+    /// </summary>
+    SyncStopOnException = 1,
+
+    /// <summary>
+    /// Run all notification handlers asynchronously. Returns when all handlers are finished. In case of any exception(s), they will be captured in an AggregateException.
+    /// </summary>
+    Async = 2,
+
+    /// <summary>
+    /// Run each notification handler on its own thread using Task.Run(). Returns immediately and does not wait for any handlers to finish. Note that you cannot capture any exceptions, even if you await the call to Publish.
+    /// </summary>
+    ParallelNoWait = 3,
+
+    /// <summary>
+    /// Run each notification handler on its own thread using Task.Run(). Returns when all threads (handlers) are finished. In case of any exception(s), they are captured in an AggregateException by Task.WhenAll.
+    /// </summary>
+    ParallelWhenAll = 4,
+
+    /// <summary>
+    /// Run each notification handler on its own thread using Task.Run(). Returns when any thread (handler) is finished. Note that you cannot capture any exceptions (See msdn documentation of Task.WhenAny)
+    /// </summary>
+    ParallelWhenAny = 5,
+}

+ 146 - 0
src/Hotline/EventBus/Publisher.cs

@@ -0,0 +1,146 @@
+using MediatR;
+
+namespace Hotline.EventBus;
+
+public class Publisher
+{
+    private readonly IServiceProvider _serviceFactory;
+
+    public Publisher(IServiceProvider serviceFactory)
+    {
+        _serviceFactory = serviceFactory;
+
+        PublishStrategies[PublishStrategy.Async] = new CustomMediator(_serviceFactory, AsyncContinueOnException);
+        PublishStrategies[PublishStrategy.ParallelNoWait] = new CustomMediator(_serviceFactory, ParallelNoWait);
+        PublishStrategies[PublishStrategy.ParallelWhenAll] = new CustomMediator(_serviceFactory, ParallelWhenAll);
+        PublishStrategies[PublishStrategy.ParallelWhenAny] = new CustomMediator(_serviceFactory, ParallelWhenAny);
+        PublishStrategies[PublishStrategy.SyncContinueOnException] = new CustomMediator(_serviceFactory, SyncContinueOnException);
+        PublishStrategies[PublishStrategy.SyncStopOnException] = new CustomMediator(_serviceFactory, SyncStopOnException);
+    }
+
+    public IDictionary<PublishStrategy, IMediator> PublishStrategies = new Dictionary<PublishStrategy, IMediator>();
+    public PublishStrategy DefaultStrategy { get; set; } = PublishStrategy.SyncContinueOnException;
+
+    public Task PublishAsync<TNotification>(TNotification notification, CancellationToken cancellationToken)
+    {
+        return PublishAsync(notification, DefaultStrategy, cancellationToken);
+    }
+
+    public Task PublishAsync<TNotification>(TNotification notification, PublishStrategy strategy, CancellationToken cancellationToken)
+    {
+        if (!PublishStrategies.TryGetValue(strategy, out var mediator))
+        {
+            throw new ArgumentException($"Unknown strategy: {strategy}");
+        }
+        
+        return mediator.Publish(notification, cancellationToken);
+    }
+
+    #region private
+
+    private Task ParallelWhenAll(IEnumerable<NotificationHandlerExecutor> handlers, INotification notification, CancellationToken cancellationToken)
+    {
+        var tasks = new List<Task>();
+
+        foreach (var handler in handlers)
+        {
+            tasks.Add(Task.Run(() => handler.HandlerCallback(notification, cancellationToken)));
+        }
+
+        return Task.WhenAll(tasks);
+    }
+
+    private Task ParallelWhenAny(IEnumerable<NotificationHandlerExecutor> handlers, INotification notification, CancellationToken cancellationToken)
+    {
+        var tasks = new List<Task>();
+
+        foreach (var handler in handlers)
+        {
+            tasks.Add(Task.Run(() => handler.HandlerCallback(notification, cancellationToken)));
+        }
+
+        return Task.WhenAny(tasks);
+    }
+
+    private Task ParallelNoWait(IEnumerable<NotificationHandlerExecutor> handlers, INotification notification, CancellationToken cancellationToken)
+    {
+        foreach (var handler in handlers)
+        {
+            Task.Run(() => handler.HandlerCallback(notification, cancellationToken));
+        }
+
+        return Task.CompletedTask;
+    }
+
+    private async Task AsyncContinueOnException(IEnumerable<NotificationHandlerExecutor> handlers, INotification notification, CancellationToken cancellationToken)
+    {
+        var tasks = new List<Task>();
+        var exceptions = new List<Exception>();
+
+        foreach (var handler in handlers)
+        {
+            try
+            {
+                tasks.Add(handler.HandlerCallback(notification, cancellationToken));
+            }
+            catch (Exception ex) when (!(ex is OutOfMemoryException || ex is StackOverflowException))
+            {
+                exceptions.Add(ex);
+            }
+        }
+
+        try
+        {
+            await Task.WhenAll(tasks).ConfigureAwait(false);
+        }
+        catch (AggregateException ex)
+        {
+            exceptions.AddRange(ex.Flatten().InnerExceptions);
+        }
+        catch (Exception ex) when (!(ex is OutOfMemoryException || ex is StackOverflowException))
+        {
+            exceptions.Add(ex);
+        }
+
+        if (exceptions.Any())
+        {
+            throw new AggregateException(exceptions);
+        }
+    }
+
+    private async Task SyncStopOnException(IEnumerable<NotificationHandlerExecutor> handlers, INotification notification, CancellationToken cancellationToken)
+    {
+        foreach (var handler in handlers)
+        {
+            await handler.HandlerCallback(notification, cancellationToken).ConfigureAwait(false);
+        }
+    }
+
+    private async Task SyncContinueOnException(IEnumerable<NotificationHandlerExecutor> handlers, INotification notification, CancellationToken cancellationToken)
+    {
+        var exceptions = new List<Exception>();
+
+        foreach (var handler in handlers)
+        {
+            try
+            {
+                await handler.HandlerCallback(notification, cancellationToken).ConfigureAwait(false);
+            }
+            catch (AggregateException ex)
+            {
+                exceptions.AddRange(ex.Flatten().InnerExceptions);
+            }
+            catch (Exception ex) when (!(ex is OutOfMemoryException || ex is StackOverflowException))
+            {
+                exceptions.Add(ex);
+            }
+        }
+
+        if (exceptions.Any())
+        {
+            throw new AggregateException(exceptions);
+        }
+    }
+
+    #endregion
+}