Explorar el Código

重构批量任务管理,新增任务调度与执行逻辑

此次提交对批量任务管理进行了全面重构,主要变更包括:
- 重构 `ApptaskDomainService`,完善任务的创建、查询、终止和执行逻辑。
- 修改 `IApptaskDomainService` 和 `IApptaskExecutor` 接口,支持泛型参数。
- 新增 DTO 类 `AddApptaskDto` 和 `ApptaskProgressDto`,提升数据传输的清晰度。
- 引入 `Mapster`,配置对象映射规则,优化代码可维护性。
- 新增任务调度器 `ApptaskJob` 和任务执行器示例代码。
- 删除冗余代码,移除无用的类和枚举,精简代码结构。

此次改动显著提升了批量任务模块的扩展性和可维护性。
xf hace 3 semanas
padre
commit
72906b1f8e

+ 1 - 1
src/Hotline.Api/StartupExtensions.cs

@@ -242,7 +242,7 @@ internal static class StartupExtensions
     internal static WebApplication ConfigurePipelines(this WebApplication app)
     {
         app.UseSerilogRequestLogging();
-
+        
         // Configure the HTTP request pipeline.
         var swaggerEnable = app.Configuration.GetSection("Swagger").Get<bool>();
         if (swaggerEnable)

+ 48 - 0
src/Hotline.Application/Jobs/ApptaskJob.cs

@@ -0,0 +1,48 @@
+using Hotline.BatchTask;
+using Quartz;
+using Hotline.Share.Enums.BatchTask;
+using Hotline.Share.Dtos.Order;
+using Microsoft.Extensions.DependencyInjection;
+
+namespace Hotline.Application.Jobs
+{
+    public class ApptaskJob : IJob, IDisposable
+    {
+        private readonly IApptaskDomainService _apptaskDomainService;
+        private readonly IServiceProvider _serviceProvider;
+
+        public ApptaskJob(IApptaskDomainService apptaskDomainService,
+            IServiceProvider serviceProvider)
+        {
+            _apptaskDomainService = apptaskDomainService;
+            _serviceProvider = serviceProvider;
+        }
+
+        public async Task Execute(IJobExecutionContext context)
+        {
+            var task = await _apptaskDomainService.GetWaitingTaskAsync(context.CancellationToken);
+            if (task is null) return;
+            //create executor by task type
+            switch (task.TaskType)
+            {
+                case ETaskType.Delay:
+                     _serviceProvider.GetService<IApptaskExecutor<BatchDelayNextFlowDto>>();
+                    break;
+                case ETaskType.Screen:
+                    break;
+                default:
+                    throw new ArgumentOutOfRangeException();
+            }
+            
+            //await _apptaskDomainService.ExecuteAsync(new OrderDelayAuditTaskExecutor(), task, context.CancellationToken);
+        }
+
+        /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
+        public void Dispose()
+        {
+            GC.SuppressFinalize(this);
+        }
+
+        
+    }
+}

+ 46 - 0
src/Hotline.Application/Mappers/BatchTaskMapperConfigs.cs

@@ -0,0 +1,46 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Hotline.BatchTask;
+using Hotline.Share.Dtos.BatchTask;
+using Hotline.Share.Enums.BatchTask;
+using Mapster;
+using XF.Domain.Exceptions;
+
+namespace Hotline.Application.Mappers
+{
+    public class BatchTaskMapperConfigs : IRegister
+    {
+        public void Register(TypeAdapterConfig config)
+        {
+            config.ForType<AddApptaskDto, Apptask>()
+                .AfterMapping((s, d) =>
+                {
+                    var taskParams = s.RequestDto == null
+                        ? null
+                        : System.Text.Json.JsonSerializer.Serialize(s.RequestDto);
+                    d.ApptaskItems.ForEach(item =>
+                    {
+                        item.TaskType = s.TaskType;
+                        item.TryLimit = s.TryLimit;
+                        item.TaskParams = taskParams;
+                    });
+                });
+
+            config.ForType<Apptask, ApptaskProgressDto>()
+                .BeforeMapping((s, d) =>
+                {
+                    if (!s.ApptaskItems.Any())
+                        throw new UserFriendlyException("任务明细无数据");
+                })
+                .Map(d => d.Total, s => s.ApptaskItems.Count)
+                .Map(d => d.Waiting, s => s.ApptaskItems.Count(i => i.TaskStatus == ETaskStatus.Waiting))
+                .Map(d => d.Processing, s => s.ApptaskItems.Count(i => i.TaskStatus == ETaskStatus.Processing))
+                .Map(d => d.Succeeded, s => s.ApptaskItems.Count(i => i.TaskStatus == ETaskStatus.Succeeded))
+                .Map(d => d.Failed, s => s.ApptaskItems.Count(i => i.TaskStatus == ETaskStatus.Failed));
+
+        }
+    }
+}

+ 20 - 0
src/Hotline.Application/OrderApp/OrderDelayApp/OrderDelayAuditTaskExecutor.cs

@@ -0,0 +1,20 @@
+//using Hotline.BatchTask;
+//using Hotline.Share.Dtos.Order;
+//using XF.Domain.Dependency;
+
+//namespace Hotline.Application.OrderApp.OrderDelayApp;
+
+//public class OrderDelayAuditTaskExecutor : IApptaskExecutor<BatchDelayNextFlowDto>, IScopeDependency
+//{
+//    /// <summary>
+//    /// 执行任务
+//    /// </summary>
+//    /// <param name="request"></param>
+//    /// <param name="cancellation"></param>
+//    /// <returns>是否成功执行</returns>
+//    public async Task<ApptaskExecuteResult> ExecuteAsync(BatchDelayNextFlowDto? request, CancellationToken cancellation)
+//    {
+
+//    }
+
+//}

+ 44 - 0
src/Hotline.Share/Dtos/BatchTask/AddApptaskDto.cs

@@ -0,0 +1,44 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Hotline.Share.Enums.BatchTask;
+
+namespace Hotline.Share.Dtos.BatchTask
+{
+    public class AddApptaskDto
+    {
+        /// <summary>
+        /// 任务名称
+        /// </summary>
+        public string? Name { get; set; }
+
+        /// <summary>
+        /// 任务描述
+        /// </summary>
+        public string? Description { get; set; }
+
+        /// <summary>
+        /// 任务类型
+        /// </summary>
+        public ETaskType TaskType { get; set; }
+
+        /// <summary>
+        /// 执行次数上限
+        /// </summary>
+        public int TryLimit { get; set; } = 1;
+
+        public List<AddApptaskItemDto> ApptaskItems { get; set; }
+
+        public object? RequestDto { get; set; }
+    }
+
+    public class AddApptaskItemDto
+    {
+        /// <summary>
+        /// 业务id(冗余)
+        /// </summary>
+        public string BusinessId { get; set; }
+    }
+}

+ 13 - 0
src/Hotline.Share/Dtos/BatchTask/ApptaskProgressDto.cs

@@ -0,0 +1,13 @@
+namespace Hotline.Share.Dtos.BatchTask;
+
+/// <summary>
+/// 任务进度
+/// </summary>
+public class ApptaskProgressDto
+{
+    public int Total { get; set; }
+    public int Waiting { get; set; }
+    public int Processing { get; set; }
+    public int Succeeded { get; set; }
+    public int Failed { get; set; }
+}

+ 13 - 0
src/Hotline.Share/Enums/BatchTask/ETaskStatus.cs

@@ -0,0 +1,13 @@
+namespace Hotline.Share.Enums.BatchTask;
+
+/// <summary>
+/// 任务状态
+/// </summary>
+public enum ETaskStatus
+{
+    Waiting = 0,
+    Processing = 1,
+    Succeeded = 2,
+    Failed = 3,
+    Terminated = 9,
+}

+ 21 - 0
src/Hotline.Share/Enums/BatchTask/ETaskType.cs

@@ -0,0 +1,21 @@
+using System.ComponentModel;
+
+namespace Hotline.Share.Enums.BatchTask;
+
+/// <summary>
+/// 任务类型
+/// </summary>
+public enum ETaskType
+{
+    /// <summary>
+    /// 延期
+    /// </summary>
+    [Description("延期任务")]
+    Delay = 1,
+
+    /// <summary>
+    /// 甄别
+    /// </summary>
+    [Description("甄别任务")]
+    Screen = 2,
+}

+ 3 - 84
src/Hotline/BatchTask/Apptask.cs

@@ -1,5 +1,6 @@
-using System.ComponentModel;
-using Exam.Infrastructure.Extensions;
+using Exam.Infrastructure.Extensions;
+using Hotline.Orders;
+using Hotline.Share.Enums.BatchTask;
 using SqlSugar;
 using XF.Domain.Repository;
 
@@ -29,86 +30,4 @@ public class Apptask : CreationEntity
     public List<ApptaskItem> ApptaskItems { get; set; }
 
     public void CreateName() => Name = $"{TaskType.GetDescription()}-{DateTime.Now:yyyyMMddHHmmssfff}";
-}
-
-
-public class ApptaskItem
-{
-    /// <summary>
-    /// 任务id
-    /// </summary>
-    public string ApptaskId { get; set; }
-
-    /// <summary>
-    /// 业务id(冗余)
-    /// </summary>
-    public string BusinessId { get; set; }
-
-    /// <summary>
-    /// 任务类型(冗余)
-    /// </summary>
-    public ETaskType TaskType { get; set; }
-
-    /// <summary>
-    /// 任务状态
-    /// </summary>
-    public ETaskStatus TaskStatus { get; set; }
-
-    /// <summary>
-    /// 任务执行时间
-    /// </summary>
-    public DateTime? TaskStartTime { get; set; }
-    public DateTime? TaskEndTime { get; set; }
-
-    /// <summary>
-    /// 参数
-    /// </summary>
-    [SugarColumn(ColumnDataType = "varchar(8000)")]
-    public string TaskParams { get; set; }
-
-    /// <summary>
-    /// 执行次数
-    /// </summary>
-    public int Tries { get; set; }
-
-    /// <summary>
-    /// 执行次数上限
-    /// </summary>
-    public int Limit { get; set; } = 1;
-
-    [SugarColumn(IsEnableUpdateVersionValidation = true)]
-    public string Ver { get; set; }
-
-    [Navigate(NavigateType.OneToOne, nameof(ApptaskId))]
-    public Apptask Apptask { get; set; }
-}
-
-/// <summary>
-/// 任务状态
-/// </summary>
-public enum ETaskStatus
-{
-    Wait = 0,
-    Processing = 1,
-    Success = 2,
-    Failed = 3,
-    Terminated = 9,
-}
-
-/// <summary>
-/// 任务类型
-/// </summary>
-public enum ETaskType
-{
-    /// <summary>
-    /// 延期
-    /// </summary>
-    [Description("延期任务")]
-    Delay = 1,
-
-    /// <summary>
-    /// 甄别
-    /// </summary>
-    [Description("甄别任务")]
-    Screen = 2,
 }

+ 114 - 17
src/Hotline/BatchTask/ApptaskDomainService.cs

@@ -1,36 +1,69 @@
 using FluentValidation;
+using Hotline.Share.Dtos.BatchTask;
+using Hotline.Share.Enums.BatchTask;
 using Hotline.Validators.BatchTask;
+using MapsterMapper;
+using Microsoft.Extensions.Logging;
+using XF.Domain.Dependency;
+using XF.Domain.Exceptions;
+using XF.Domain.Repository;
+using static Lucene.Net.Util.Fst.Util;
 
 namespace Hotline.BatchTask;
 
-public class ApptaskDomainService : IApptaskDomainService
+public class ApptaskDomainService : IApptaskDomainService, IScopeDependency
 {
+    private readonly IRepository<Apptask> _apptaskRepository;
+    private readonly IRepository<ApptaskItem> _apptaskItemRepository;
+    private readonly IMapper _mapper;
+    private readonly ILogger<ApptaskDomainService> _logger;
+
+    public ApptaskDomainService(
+        IRepository<Apptask> apptaskRepository,
+        IRepository<ApptaskItem> apptaskItemRepository,
+        IMapper mapper,
+        ILogger<ApptaskDomainService> logger)
+    {
+        _apptaskRepository = apptaskRepository;
+        _apptaskItemRepository = apptaskItemRepository;
+        _mapper = mapper;
+        _logger = logger;
+    }
+
     /// <summary>
     /// 新增任务
     /// </summary>
-    public Task<string> AddAsync(Apptask apptask, CancellationToken cancellation)
+    public async Task<string> AddAsync(AddApptaskDto dto, CancellationToken cancellation)
     {
         var validator = new ApptaskValidator();
-        var result = validator.Validate(apptask);
+        var result = validator.Validate(dto);
         if (!result.IsValid)
             throw new ValidationException(result.Errors.FirstOrDefault()?.ErrorMessage);
 
-        if(string.IsNullOrEmpty(apptask.Name))
+        var apptask = _mapper.Map<Apptask>(dto);
+
+        if (string.IsNullOrEmpty(apptask.Name))
             apptask.CreateName();
 
+        await _apptaskRepository.AddNav(apptask)
+            .Include(d => d.ApptaskItems)
+            .ExecuteCommandAsync();
 
-        //todo 1.参数校验,
-        //2. 任务名称可选:依据任务类型生成 3.类型、次数上限由主表冗余到明细
-        throw new NotImplementedException();
+        return apptask.Id;
     }
 
     /// <summary>
     /// 查询任务进度
     /// </summary>
     /// <returns></returns>
-    public Task<ApptaskProgress> GetProgressAsync(string taskId, CancellationToken cancellation)
+    public async Task<ApptaskProgressDto> GetProgressAsync(string taskId, CancellationToken cancellation)
     {
-        throw new NotImplementedException();
+        var apptask = await _apptaskRepository.Queryable()
+            .Includes(d => d.ApptaskItems)
+            .FirstAsync(d => d.Id == taskId, cancellation);
+        if (apptask is null)
+            throw new UserFriendlyException("无效任务编号");
+        return _mapper.Map<ApptaskProgressDto>(apptask);
     }
 
     /// <summary>
@@ -39,9 +72,25 @@ public class ApptaskDomainService : IApptaskDomainService
     /// <param name="taskId"></param>
     /// <param name="cancellation"></param>
     /// <returns></returns>
-    public Task TerminalTaskAsync(string taskId, CancellationToken cancellation)
+    public async Task TerminalTaskAsync(string taskId, CancellationToken cancellation)
     {
-        throw new NotImplementedException();
+        var apptask = await _apptaskRepository.Queryable()
+            .Includes(d => d.ApptaskItems.Where(x => x.Tries < x.TryLimit
+                                                   && (x.TaskStatus == ETaskStatus.Waiting || x.TaskStatus == ETaskStatus.Failed)).ToList())
+            .FirstAsync(d => d.Id == taskId, cancellation);
+        if (apptask is null)
+            throw new UserFriendlyException("无效任务编号");
+        if (apptask.ApptaskItems.Count == 0) return;
+        var Succeed = 0;
+        foreach (var item in apptask.ApptaskItems)
+        {
+            item.TaskStatus = ETaskStatus.Terminated;
+            var result = await _apptaskItemRepository.Updateable(item).ExecuteCommandWithOptLockAsync();
+            if (result != 0) Succeed++;
+        }
+
+        if (Succeed != apptask.ApptaskItems.Count)
+            await TerminalTaskAsync(taskId, cancellation);
     }
 
     /// <summary>
@@ -49,21 +98,69 @@ public class ApptaskDomainService : IApptaskDomainService
     /// </summary>
     /// <param name="cancellation"></param>
     /// <returns></returns>
-    public Task<ApptaskItem> GetWaitingTaskAsync(CancellationToken cancellation)
+    public async Task<ApptaskItem?> GetWaitingTaskAsync(CancellationToken cancellation)
     {
-        // 1. 乐观锁 2.待执行 或 失败并且次数未到上限的
-        throw new NotImplementedException();
+        var taskItems = await _apptaskItemRepository.Queryable()
+            .Where(d => d.Tries < d.TryLimit
+                        && (d.TaskStatus == ETaskStatus.Waiting || d.TaskStatus == ETaskStatus.Failed))
+            .OrderBy(d => d.CreationTime)
+            .Take(10)
+            .ToListAsync(cancellation);
+
+        if (taskItems.Count == 0) return null;
+
+        foreach (var item in taskItems)
+        {
+            item.SetProcessing();
+            item.Tries++;
+            var result = await _apptaskItemRepository.Updateable(item).ExecuteCommandWithOptLockAsync();
+            if (result != 0)
+                return item;
+        }
+
+        return await GetWaitingTaskAsync(cancellation);
     }
 
     /// <summary>
     /// 执行任务
     /// </summary>
     /// <param name="executor"></param>
+    /// <param name="apptaskItem"></param>
     /// <param name="cancellation"></param>
     /// <returns></returns>
-    public Task ExecuteAsync(IApptaskExecutor executor, CancellationToken cancellation)
+    public async Task ExecuteAsync<TRequest>(IApptaskExecutor<TRequest> executor, ApptaskItem apptaskItem, CancellationToken cancellation)
     {
-        // try catch到ex或返回fail的执行失败,返回成功的执行成功
-        throw new NotImplementedException();
+        try
+        {
+            TRequest request = default;
+            if (apptaskItem.TaskParams is not null)
+            {
+                request = System.Text.Json.JsonSerializer.Deserialize<TRequest>(apptaskItem.TaskParams);
+                if (request is null)
+                    throw new UserFriendlyException("任务参数反序列化异常");
+            }
+            var result = await executor.ExecuteAsync(request, cancellation);
+            apptaskItem.TaskStatus = result.IsSuccess ? ETaskStatus.Succeeded : ETaskStatus.Failed;
+            apptaskItem.Message = result.Message;
+            apptaskItem.TaskEndTime = DateTime.Now;
+        }
+        catch (Exception e)
+        {
+            _logger.LogError("批量任务执行异常:{err}", e.Message);
+            apptaskItem.TaskStatus = ETaskStatus.Failed;
+            apptaskItem.Message = "批量任务执行异常";
+            apptaskItem.TaskEndTime = DateTime.Now;
+        }
+        finally
+        {
+            await _apptaskItemRepository.Updateable(apptaskItem)
+                .UpdateColumns(d => new
+                {
+                    d.TaskStatus,
+                    d.Message,
+                    d.TaskEndTime
+                })
+                .ExecuteCommandAsync(cancellation);
+        }
     }
 }

+ 65 - 0
src/Hotline/BatchTask/ApptaskItem.cs

@@ -0,0 +1,65 @@
+using Hotline.Share.Enums.BatchTask;
+using SqlSugar;
+using XF.Domain.Repository;
+
+namespace Hotline.BatchTask;
+
+[SugarIndex("index_apptaskitem_creationtime", nameof(ApptaskItem.CreationTime), OrderByType.Asc)]
+public class ApptaskItem : CreationEntity
+{
+    /// <summary>
+    /// 任务id
+    /// </summary>
+    public string ApptaskId { get; set; }
+
+    /// <summary>
+    /// 业务id(冗余)
+    /// </summary>
+    public string BusinessId { get; set; }
+
+    /// <summary>
+    /// 任务类型(冗余)
+    /// </summary>
+    public ETaskType TaskType { get; set; }
+
+    /// <summary>
+    /// 任务状态
+    /// </summary>
+    public ETaskStatus TaskStatus { get; set; }
+
+    /// <summary>
+    /// 任务执行时间
+    /// </summary>
+    public DateTime? TaskStartTime { get; set; }
+    public DateTime? TaskEndTime { get; set; }
+
+    /// <summary>
+    /// 参数
+    /// </summary>
+    [SugarColumn(ColumnDataType = "varchar(8000)")]
+    public string? TaskParams { get; set; }
+
+    /// <summary>
+    /// 执行次数
+    /// </summary>
+    public int Tries { get; set; }
+
+    /// <summary>
+    /// 执行次数上限
+    /// </summary>
+    public int TryLimit { get; set; } = 1;
+
+    public string? Message { get; set; }
+
+    [SugarColumn(IsEnableUpdateVersionValidation = true)]
+    public string Ver { get; set; }
+
+    [Navigate(NavigateType.OneToOne, nameof(ApptaskId))]
+    public Apptask Apptask { get; set; }
+
+    public void SetProcessing()
+    {
+        TaskStatus = ETaskStatus.Processing;
+        TaskStartTime = DateTime.Now;
+    }
+}

+ 7 - 17
src/Hotline/BatchTask/IApptaskDomainService.cs

@@ -1,4 +1,5 @@
-using System;
+using Hotline.Share.Dtos.BatchTask;
+using System;
 using System.Collections.Generic;
 using System.Linq;
 using System.Text;
@@ -11,13 +12,13 @@ namespace Hotline.BatchTask
         /// <summary>
         /// 新增任务
         /// </summary>
-        Task<string> AddAsync(Apptask apptask, CancellationToken cancellation);
+        Task<string> AddAsync(AddApptaskDto dto, CancellationToken cancellation);
 
         /// <summary>
         /// 查询任务进度
         /// </summary>
         /// <returns></returns>
-        Task<ApptaskProgress> GetProgressAsync(string taskId, CancellationToken cancellation);
+        Task<ApptaskProgressDto> GetProgressAsync(string taskId, CancellationToken cancellation);
 
         /// <summary>
         /// 终止任务
@@ -33,26 +34,15 @@ namespace Hotline.BatchTask
         /// </summary>
         /// <param name="cancellation"></param>
         /// <returns></returns>
-        Task<ApptaskItem> GetWaitingTaskAsync(CancellationToken cancellation);
+        Task<ApptaskItem?> GetWaitingTaskAsync(CancellationToken cancellation);
 
         /// <summary>
         /// 执行任务
         /// </summary>
         /// <param name="executor"></param>
+        /// <param name="apptaskItem"></param>
         /// <param name="cancellation"></param>
         /// <returns></returns>
-        Task ExecuteAsync(IApptaskExecutor executor, CancellationToken cancellation);
-    }
-
-    /// <summary>
-    /// 任务进度
-    /// </summary>
-    public class ApptaskProgress
-    {
-        public int Total { get; set; }
-        public int Waiting { get; set; }
-        public int Processing { get; set; }
-        public int Success { get; set; }
-        public int Fail { get; set; }
+        Task ExecuteAsync<TRequest>(IApptaskExecutor<TRequest> executor, ApptaskItem apptaskItem, CancellationToken cancellation);
     }
 }

+ 9 - 2
src/Hotline/BatchTask/IApptaskExecutor.cs

@@ -1,6 +1,6 @@
 namespace Hotline.BatchTask;
 
-public interface IApptaskExecutor
+public interface IApptaskExecutor<TRequest>
 {
     /// <summary>
     /// 执行任务
@@ -8,5 +8,12 @@ public interface IApptaskExecutor
     /// <param name="request"></param>
     /// <param name="cancellation"></param>
     /// <returns>是否成功执行</returns>
-    Task<bool> ExecuteAsync<TRequest>(TRequest request, CancellationToken cancellation);
+    Task<ApptaskExecuteResult> ExecuteAsync(TRequest? request, CancellationToken cancellation);
+}
+
+
+public class ApptaskExecuteResult
+{
+    public bool IsSuccess { get; set; }
+    public string? Message { get; set; }
 }

+ 3 - 2
src/Hotline/Validators/BatchTask/ApptaskValidator.cs

@@ -5,10 +5,11 @@ using System.Text;
 using System.Threading.Tasks;
 using FluentValidation;
 using Hotline.BatchTask;
+using Hotline.Share.Dtos.BatchTask;
 
 namespace Hotline.Validators.BatchTask;
 
-public class ApptaskValidator : AbstractValidator<Apptask>
+public class ApptaskValidator : AbstractValidator<AddApptaskDto>
 {
     public ApptaskValidator()
     {
@@ -27,7 +28,7 @@ public class ApptaskValidator : AbstractValidator<Apptask>
     }
 }
 
-public class ApptaskItemValidator : AbstractValidator<ApptaskItem>
+public class ApptaskItemValidator : AbstractValidator<AddApptaskItemDto>
 {
     public ApptaskItemValidator()
     {