Jelajahi Sumber

1. 批量延期审批发起后批量更新延期状态
2. 结束发送通知

xf 2 minggu lalu
induk
melakukan
e8c8ece57e

+ 32 - 6
src/Hotline.Application/Jobs/ApptaskJob.cs

@@ -1,9 +1,9 @@
 using Hotline.BatchTask;
+using Hotline.BatchTask.Notifications;
 using Quartz;
 using Hotline.Share.Enums.BatchTask;
-using Hotline.Share.Dtos.Order;
 using Microsoft.Extensions.DependencyInjection;
-using Hotline.Application.OrderApp.OrderVisitApp;
+using Hotline.EventBus;
 using Hotline.Share.Dtos.Order.OrderVisit;
 using Hotline.Share.Dtos.Order.OrderDelay;
 
@@ -13,12 +13,16 @@ namespace Hotline.Application.Jobs
     {
         private readonly IApptaskDomainService _apptaskDomainService;
         private readonly IServiceProvider _serviceProvider;
+        private readonly Publisher _publisher;
 
-        public ApptaskJob(IApptaskDomainService apptaskDomainService,
-            IServiceProvider serviceProvider)
+        public ApptaskJob(
+            IApptaskDomainService apptaskDomainService,
+            IServiceProvider serviceProvider,
+            Publisher publisher)
         {
             _apptaskDomainService = apptaskDomainService;
             _serviceProvider = serviceProvider;
+            _publisher = publisher;
         }
 
         public async Task Execute(IJobExecutionContext context)
@@ -26,21 +30,43 @@ namespace Hotline.Application.Jobs
             //Console.WriteLine($"执行ApptaskJob: {DateTime.Now}");
             var task = await _apptaskDomainService.GetWaitingTaskAsync(context.CancellationToken);
             if (task is null) return;
+            ApptaskExecuteResult? result = null;
             switch (task.TaskType)
             {
                 case ETaskType.OrderDelay:
                     var delayExecutor = _serviceProvider.GetService<IApptaskExecutor<OrderDelayReviewRequest>>();
-                    await _apptaskDomainService.ExecuteAsync(delayExecutor, task, context.CancellationToken);
+                    result = await _apptaskDomainService.ExecuteAsync(delayExecutor, task, context.CancellationToken);
                     break;
                 case ETaskType.OrderScreen:
                     break;
                 case ETaskType.VoiceVisit:
                     var vvExecutor = _serviceProvider.GetService<IApptaskExecutor<VoiceVisitRequest>>();
-                    await _apptaskDomainService.ExecuteAsync(vvExecutor, task, context.CancellationToken);
+                    result = await _apptaskDomainService.ExecuteAsync(vvExecutor, task, context.CancellationToken);
                     break;
                 default:
                     throw new ArgumentOutOfRangeException();
             }
+
+            if (result is not null)
+            {
+                if (result.IsSuccess)
+                {
+                    //todo pub single complete suc
+                    await _publisher.PublishAsync(new ApptaskSuccessNotify(task), PublishStrategy.Async, context.CancellationToken);
+
+                    //todo check if task is all complete
+                    //pub all complete
+                    var isCompleted = await _apptaskDomainService.IsCompletedAsync(task.ApptaskId, context.CancellationToken);
+                    if (isCompleted) 
+                        await _publisher.PublishAsync(new ApptaskCompletedNotify(task), PublishStrategy.Async, context.CancellationToken);
+                }
+                else
+                {
+                    //todo pub single complete fail
+                    await _publisher.PublishAsync(new ApptaskFailNotify(task), PublishStrategy.Async, context.CancellationToken);
+                }
+
+            }
         }
 
         /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>

+ 62 - 0
src/Hotline.Application/OrderApp/Handlers/OrderDelayHandler/OrderDelayBatchReviewTaskCompetedHandler.cs

@@ -0,0 +1,62 @@
+using Hotline.BatchTask.Notifications;
+using Hotline.FlowEngine.Notifications;
+using MediatR;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Hotline.BatchTask;
+using Hotline.Share.Enums.BatchTask;
+using Hotline.Article;
+using Hotline.Share.Dtos.Article;
+using Hotline.Share.Enums.Article;
+using XF.Domain.Authentications;
+
+namespace Hotline.Application.OrderApp.Handlers.OrderDelayHandler;
+
+/// <summary>
+/// 延期批量审批任务执行完成
+/// </summary>
+public class OrderDelayBatchReviewTaskCompetedHandler : INotificationHandler<ApptaskCompletedNotify>
+{
+    private readonly IApptaskDomainService _apptaskDomainService;
+    private readonly ICircularRecordDomainService _circularRecordDomainService;
+
+    public OrderDelayBatchReviewTaskCompetedHandler(
+        IApptaskDomainService apptaskDomainService,
+        ICircularRecordDomainService circularRecordDomainService)
+    {
+        _apptaskDomainService = apptaskDomainService;
+        _circularRecordDomainService = circularRecordDomainService;
+    }
+
+    /// <summary>Handles a notification</summary>
+    /// <param name="notification">The notification</param>
+    /// <param name="cancellationToken">Cancellation token</param>
+    public async Task Handle(ApptaskCompletedNotify notification, CancellationToken cancellationToken)
+    {
+        if(notification.ApptaskItem.TaskType is not ETaskType.OrderDelay) return;
+
+        var progress = await _apptaskDomainService.GetProgressAsync(notification.ApptaskItem.ApptaskId, cancellationToken);
+        await _circularRecordDomainService.AddCircularMessage(new AddCircularDto
+        {
+            Title = $"{progress.Name} 执行完成",
+            Content = $"总共:{progress.Total}条数据,成功:{progress.Succeeded}条,失败:{progress.Failed}条",
+            CircularTypeId = "5",
+            CircularTypeName = "系统消息",
+            CircularType = ECircularType.Person,
+            IsMustRead = false,
+            SourceOrgId = progress.CreatorId,
+            SourceOrgName = progress.CreatorName,
+            CircularReadGroups = new List<CircularReadGroupDto>
+            {
+                new()
+                {
+                    UserId = progress.CreatorId,
+                    UserName = progress.CreatorName
+                }
+            }
+        }, cancellationToken);
+    }
+}

+ 12 - 1
src/Hotline.Application/OrderApp/OrderDelayApp/OrderDelayApplication.cs

@@ -201,11 +201,22 @@ public class OrderDelayApplication : IOrderDelayApplication, IScopeDependency
             });
         }
 
-        await _apptaskDomainService.AddAsync(new AddApptaskRequest
+        var taskId = await _apptaskDomainService.AddAsync(new AddApptaskRequest
         {
             TaskType = ETaskType.OrderDelay,
             Priority = 0,
             ApptaskItems = apptaskItems
         }, cancellation);
+
+        if (!string.IsNullOrEmpty(taskId))
+        {
+            foreach (var orderDelay in delays)
+            {
+                orderDelay.DelayState = EDelayState.BatchProcessing;
+            }
+            await _orderDelayRepository.Updateable(delays)
+                .UpdateColumns(d=>new {d.DelayState})
+                .ExecuteCommandAsync(cancellation);
+        }
     }
 }

+ 2 - 2
src/Hotline.Application/OrderApp/OrderDelayApp/OrderDelayAuditTaskExecutor.cs → src/Hotline.Application/OrderApp/OrderDelayApp/OrderDelayReviewTaskExecutor.cs

@@ -5,11 +5,11 @@ using XF.Domain.Dependency;
 
 namespace Hotline.Application.OrderApp.OrderDelayApp;
 
-public class OrderDelayAuditTaskExecutor : IApptaskExecutor<OrderDelayReviewRequest>, IScopeDependency
+public class OrderDelayReviewTaskExecutor : IApptaskExecutor<OrderDelayReviewRequest>, IScopeDependency
 {
     private readonly IOrderDelayApplication _orderDelayApplication;
 
-    public OrderDelayAuditTaskExecutor(IOrderDelayApplication orderDelayApplication)
+    public OrderDelayReviewTaskExecutor(IOrderDelayApplication orderDelayApplication)
     {
         _orderDelayApplication = orderDelayApplication;
     }

+ 6 - 0
src/Hotline.Share/Dtos/BatchTask/ApptaskProgressDto.cs

@@ -5,9 +5,15 @@
 /// </summary>
 public class ApptaskProgressDto
 {
+    /// <summary>
+    /// 任务名称
+    /// </summary>
+    public string Name { get; set; }
     public int Total { get; set; }
     public int Waiting { get; set; }
     public int Processing { get; set; }
     public int Succeeded { get; set; }
     public int Failed { get; set; }
+    public string? CreatorId { get; set; }
+    public string? CreatorName { get; set; }
 }

+ 6 - 0
src/Hotline.Share/Enums/Order/EDelayState.cs

@@ -27,6 +27,12 @@ namespace Hotline.Share.Enums.Order
         /// </summary>
         [Description("撤销")]
         Withdraw = 3,
+
+        /// <summary>
+        /// 批量审批处理中
+        /// </summary>
+        [Description("批量处理中")]
+        BatchProcessing = 9,
     }
 
     public enum EDelayApplyType

+ 21 - 4
src/Hotline/BatchTask/ApptaskDomainService.cs

@@ -4,10 +4,10 @@ using Hotline.Share.Enums.BatchTask;
 using Hotline.Validators.BatchTask;
 using MapsterMapper;
 using Microsoft.Extensions.Logging;
+using XF.Domain.Authentications;
 using XF.Domain.Dependency;
 using XF.Domain.Exceptions;
 using XF.Domain.Repository;
-using static Lucene.Net.Util.Fst.Util;
 
 namespace Hotline.BatchTask;
 
@@ -55,7 +55,7 @@ public class ApptaskDomainService : IApptaskDomainService, IScopeDependency
                 TaskStatus = ETaskStatus.Waiting,
                 TaskParams = d.TaskParams is null ? null : System.Text.Json.JsonSerializer.Serialize(d.TaskParams),
                 TryLimit = request.TryLimit,
-                Priority = priority,
+                Priority = priority
             }).ToList()
         };
 
@@ -83,6 +83,21 @@ public class ApptaskDomainService : IApptaskDomainService, IScopeDependency
         return _mapper.Map<ApptaskProgressDto>(apptask);
     }
 
+    /// <summary>
+    /// 查询当前任务是否全部执行完成
+    /// </summary>
+    /// <param name="taskId"></param>
+    /// <param name="cancellation"></param>
+    /// <returns></returns>
+    public async Task<bool> IsCompletedAsync(string taskId, CancellationToken cancellation)
+    {
+        var anyUnCompleted = await _apptaskItemRepository.Queryable()
+            .AnyAsync(d => d.Tries < d.TryLimit
+                           && (d.TaskStatus == ETaskStatus.Waiting || d.TaskStatus == ETaskStatus.Processing)
+                           && d.ApptaskId == taskId, cancellation);
+        return !anyUnCompleted;
+    }
+
     /// <summary>
     /// 终止任务
     /// </summary>
@@ -146,7 +161,7 @@ public class ApptaskDomainService : IApptaskDomainService, IScopeDependency
     /// <param name="apptaskItem"></param>
     /// <param name="cancellation"></param>
     /// <returns></returns>
-    public async Task ExecuteAsync<TRequest>(IApptaskExecutor<TRequest> executor, ApptaskItem apptaskItem, CancellationToken cancellation)
+    public async Task<ApptaskExecuteResult> ExecuteAsync<TRequest>(IApptaskExecutor<TRequest> executor, ApptaskItem apptaskItem, CancellationToken cancellation)
     {
         try
         {
@@ -161,13 +176,15 @@ public class ApptaskDomainService : IApptaskDomainService, IScopeDependency
             apptaskItem.TaskStatus = result.IsSuccess ? ETaskStatus.Succeeded : ETaskStatus.Failed;
             apptaskItem.Message = result.Message;
             apptaskItem.TaskEndTime = DateTime.Now;
+            return ApptaskExecuteResult.Success();
         }
         catch (Exception e)
         {
-            _logger.LogError("批量任务执行异常:{err}", e.Message);
+            _logger.LogError("批量任务执行异常:taskId: {taskItemId}, {err}", apptaskItem.Id, e.Message);
             apptaskItem.TaskStatus = ETaskStatus.Failed;
             apptaskItem.Message = "批量任务执行异常";
             apptaskItem.TaskEndTime = DateTime.Now;
+            return ApptaskExecuteResult.Fail("批量任务执行异常");
         }
         finally
         {

+ 9 - 1
src/Hotline/BatchTask/IApptaskDomainService.cs

@@ -19,6 +19,14 @@ namespace Hotline.BatchTask
         /// </summary>
         /// <returns></returns>
         Task<ApptaskProgressDto> GetProgressAsync(string taskId, CancellationToken cancellation);
+        
+        /// <summary>
+        /// 查询当前任务是否全部完成
+        /// </summary>
+        /// <param name="taskId"></param>
+        /// <param name="cancellation"></param>
+        /// <returns></returns>
+        Task<bool> IsCompletedAsync(string taskId, CancellationToken cancellation);
 
         /// <summary>
         /// 终止任务
@@ -43,6 +51,6 @@ namespace Hotline.BatchTask
         /// <param name="apptaskItem"></param>
         /// <param name="cancellation"></param>
         /// <returns></returns>
-        Task ExecuteAsync<TRequest>(IApptaskExecutor<TRequest> executor, ApptaskItem apptaskItem, CancellationToken cancellation);
+        Task<ApptaskExecuteResult> ExecuteAsync<TRequest>(IApptaskExecutor<TRequest> executor, ApptaskItem apptaskItem, CancellationToken cancellation);
     }
 }

+ 10 - 0
src/Hotline/BatchTask/Notifications/ApptaskCompletedNotify.cs

@@ -0,0 +1,10 @@
+using MediatR;
+
+namespace Hotline.BatchTask.Notifications;
+
+public class ApptaskCompletedNotify : ApptaskExecutedNotify, INotification
+{
+    public ApptaskCompletedNotify(ApptaskItem apptaskItem) : base(apptaskItem)
+    {
+    }
+}

+ 20 - 0
src/Hotline/BatchTask/Notifications/ApptaskFailNotify.cs

@@ -0,0 +1,20 @@
+using MediatR;
+
+namespace Hotline.BatchTask.Notifications;
+
+public class ApptaskFailNotify : ApptaskExecutedNotify, INotification
+{
+    public ApptaskFailNotify(ApptaskItem apptaskItem) : base(apptaskItem)
+    {
+    }
+}
+
+public class ApptaskExecutedNotify
+{
+    public ApptaskItem ApptaskItem { get; set; }
+
+    public ApptaskExecutedNotify(ApptaskItem apptaskItem)
+    {
+        ApptaskItem = apptaskItem;
+    }
+}

+ 15 - 0
src/Hotline/BatchTask/Notifications/ApptaskSuccessNotify.cs

@@ -0,0 +1,15 @@
+using MediatR;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Hotline.BatchTask.Notifications;
+
+public class ApptaskSuccessNotify : ApptaskExecutedNotify, INotification
+{
+    public ApptaskSuccessNotify(ApptaskItem apptaskItem) : base(apptaskItem)
+    {
+    }
+}