using FluentValidation; using Hotline.Share.Dtos.BatchTask; using Hotline.Share.Enums.BatchTask; using Hotline.Validators.BatchTask; using MapsterMapper; using Microsoft.Extensions.Logging; using XF.Domain.Dependency; using XF.Domain.Exceptions; using XF.Domain.Repository; using static Lucene.Net.Util.Fst.Util; namespace Hotline.BatchTask; public class ApptaskDomainService : IApptaskDomainService, IScopeDependency { private readonly IRepository _apptaskRepository; private readonly IRepository _apptaskItemRepository; private readonly IMapper _mapper; private readonly ILogger _logger; public ApptaskDomainService( IRepository apptaskRepository, IRepository apptaskItemRepository, IMapper mapper, ILogger logger) { _apptaskRepository = apptaskRepository; _apptaskItemRepository = apptaskItemRepository; _mapper = mapper; _logger = logger; } /// /// 新增任务 /// public async Task AddAsync(AddApptaskDto dto, CancellationToken cancellation) { var validator = new ApptaskValidator(); var result = validator.Validate(dto); if (!result.IsValid) throw new ValidationException(result.Errors.FirstOrDefault()?.ErrorMessage); var apptask = _mapper.Map(dto); if (string.IsNullOrEmpty(apptask.Name)) apptask.CreateName(); await _apptaskRepository.AddNav(apptask) .Include(d => d.ApptaskItems) .ExecuteCommandAsync(); return apptask.Id; } /// /// 查询任务进度 /// /// public async Task GetProgressAsync(string taskId, CancellationToken cancellation) { var apptask = await _apptaskRepository.Queryable() .Includes(d => d.ApptaskItems) .FirstAsync(d => d.Id == taskId, cancellation); if (apptask is null) throw new UserFriendlyException("无效任务编号"); return _mapper.Map(apptask); } /// /// 终止任务 /// /// /// /// public async Task TerminalTaskAsync(string taskId, CancellationToken cancellation) { var apptask = await _apptaskRepository.Queryable() .Includes(d => d.ApptaskItems.Where(x => x.Tries < x.TryLimit && (x.TaskStatus == ETaskStatus.Waiting || x.TaskStatus == ETaskStatus.Failed)).ToList()) .FirstAsync(d => d.Id == taskId, cancellation); if (apptask is null) throw new UserFriendlyException("无效任务编号"); if (apptask.ApptaskItems.Count == 0) return; var Succeed = 0; foreach (var item in apptask.ApptaskItems) { item.TaskStatus = ETaskStatus.Terminated; var result = await _apptaskItemRepository.Updateable(item).ExecuteCommandWithOptLockAsync(); if (result != 0) Succeed++; } if (Succeed != apptask.ApptaskItems.Count) await TerminalTaskAsync(taskId, cancellation); } /// /// 获取一个待执行的任务 /// /// /// public async Task GetWaitingTaskAsync(CancellationToken cancellation) { var taskItems = await _apptaskItemRepository.Queryable() .Where(d => d.Tries < d.TryLimit && (d.TaskStatus == ETaskStatus.Waiting || d.TaskStatus == ETaskStatus.Failed)) .OrderBy(d => d.CreationTime) .Take(10) .ToListAsync(cancellation); if (taskItems.Count == 0) return null; foreach (var item in taskItems) { item.SetProcessing(); item.Tries++; var result = await _apptaskItemRepository.Updateable(item).ExecuteCommandWithOptLockAsync(); if (result != 0) return item; } return await GetWaitingTaskAsync(cancellation); } /// /// 执行任务 /// /// /// /// /// public async Task ExecuteAsync(IApptaskExecutor executor, ApptaskItem apptaskItem, CancellationToken cancellation) { try { TRequest request = default; if (apptaskItem.TaskParams is not null) { request = System.Text.Json.JsonSerializer.Deserialize(apptaskItem.TaskParams); if (request is null) throw new UserFriendlyException("任务参数反序列化异常"); } var result = await executor.ExecuteAsync(request, cancellation); apptaskItem.TaskStatus = result.IsSuccess ? ETaskStatus.Succeeded : ETaskStatus.Failed; apptaskItem.Message = result.Message; apptaskItem.TaskEndTime = DateTime.Now; } catch (Exception e) { _logger.LogError("批量任务执行异常:{err}", e.Message); apptaskItem.TaskStatus = ETaskStatus.Failed; apptaskItem.Message = "批量任务执行异常"; apptaskItem.TaskEndTime = DateTime.Now; } finally { await _apptaskItemRepository.Updateable(apptaskItem) .UpdateColumns(d => new { d.TaskStatus, d.Message, d.TaskEndTime }) .ExecuteCommandAsync(cancellation); } } }