123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- using FluentValidation;
- using Hotline.Share.Dtos.BatchTask;
- using Hotline.Share.Enums.BatchTask;
- using Hotline.Validators.BatchTask;
- using MapsterMapper;
- using Microsoft.Extensions.Logging;
- using XF.Domain.Dependency;
- using XF.Domain.Exceptions;
- using XF.Domain.Repository;
- using static Lucene.Net.Util.Fst.Util;
- namespace Hotline.BatchTask;
- public class ApptaskDomainService : IApptaskDomainService, IScopeDependency
- {
- private readonly IRepository<Apptask> _apptaskRepository;
- private readonly IRepository<ApptaskItem> _apptaskItemRepository;
- private readonly IMapper _mapper;
- private readonly ILogger<ApptaskDomainService> _logger;
- public ApptaskDomainService(
- IRepository<Apptask> apptaskRepository,
- IRepository<ApptaskItem> apptaskItemRepository,
- IMapper mapper,
- ILogger<ApptaskDomainService> logger)
- {
- _apptaskRepository = apptaskRepository;
- _apptaskItemRepository = apptaskItemRepository;
- _mapper = mapper;
- _logger = logger;
- }
- /// <summary>
- /// 新增任务
- /// </summary>
- public async Task<string> AddAsync(AddApptaskDto dto, CancellationToken cancellation)
- {
- var validator = new ApptaskValidator();
- var result = validator.Validate(dto);
- if (!result.IsValid)
- throw new ValidationException(result.Errors.FirstOrDefault()?.ErrorMessage);
- var apptask = _mapper.Map<Apptask>(dto);
- if (string.IsNullOrEmpty(apptask.Name))
- apptask.CreateName();
- await _apptaskRepository.AddNav(apptask)
- .Include(d => d.ApptaskItems)
- .ExecuteCommandAsync();
- return apptask.Id;
- }
- /// <summary>
- /// 查询任务进度
- /// </summary>
- /// <returns></returns>
- public async Task<ApptaskProgressDto> GetProgressAsync(string taskId, CancellationToken cancellation)
- {
- var apptask = await _apptaskRepository.Queryable()
- .Includes(d => d.ApptaskItems)
- .FirstAsync(d => d.Id == taskId, cancellation);
- if (apptask is null)
- throw new UserFriendlyException("无效任务编号");
- return _mapper.Map<ApptaskProgressDto>(apptask);
- }
- /// <summary>
- /// 终止任务
- /// </summary>
- /// <param name="taskId"></param>
- /// <param name="cancellation"></param>
- /// <returns></returns>
- public async Task TerminalTaskAsync(string taskId, CancellationToken cancellation)
- {
- var apptask = await _apptaskRepository.Queryable()
- .Includes(d => d.ApptaskItems.Where(x => x.Tries < x.TryLimit
- && (x.TaskStatus == ETaskStatus.Waiting || x.TaskStatus == ETaskStatus.Failed)).ToList())
- .FirstAsync(d => d.Id == taskId, cancellation);
- if (apptask is null)
- throw new UserFriendlyException("无效任务编号");
- if (apptask.ApptaskItems.Count == 0) return;
- var Succeed = 0;
- foreach (var item in apptask.ApptaskItems)
- {
- item.TaskStatus = ETaskStatus.Terminated;
- var result = await _apptaskItemRepository.Updateable(item).ExecuteCommandWithOptLockAsync();
- if (result != 0) Succeed++;
- }
- if (Succeed != apptask.ApptaskItems.Count)
- await TerminalTaskAsync(taskId, cancellation);
- }
- /// <summary>
- /// 获取一个待执行的任务
- /// </summary>
- /// <param name="cancellation"></param>
- /// <returns></returns>
- public async Task<ApptaskItem?> GetWaitingTaskAsync(CancellationToken cancellation)
- {
- var taskItems = await _apptaskItemRepository.Queryable()
- .Where(d => d.Tries < d.TryLimit
- && (d.TaskStatus == ETaskStatus.Waiting || d.TaskStatus == ETaskStatus.Failed))
- .OrderBy(d => d.CreationTime)
- .Take(10)
- .ToListAsync(cancellation);
- if (taskItems.Count == 0) return null;
- foreach (var item in taskItems)
- {
- item.SetProcessing();
- item.Tries++;
- var result = await _apptaskItemRepository.Updateable(item).ExecuteCommandWithOptLockAsync();
- if (result != 0)
- return item;
- }
- return await GetWaitingTaskAsync(cancellation);
- }
- /// <summary>
- /// 执行任务
- /// </summary>
- /// <param name="executor"></param>
- /// <param name="apptaskItem"></param>
- /// <param name="cancellation"></param>
- /// <returns></returns>
- public async Task ExecuteAsync<TRequest>(IApptaskExecutor<TRequest> executor, ApptaskItem apptaskItem, CancellationToken cancellation)
- {
- try
- {
- TRequest request = default;
- if (apptaskItem.TaskParams is not null)
- {
- request = System.Text.Json.JsonSerializer.Deserialize<TRequest>(apptaskItem.TaskParams);
- if (request is null)
- throw new UserFriendlyException("任务参数反序列化异常");
- }
- var result = await executor.ExecuteAsync(request, cancellation);
- apptaskItem.TaskStatus = result.IsSuccess ? ETaskStatus.Succeeded : ETaskStatus.Failed;
- apptaskItem.Message = result.Message;
- apptaskItem.TaskEndTime = DateTime.Now;
- }
- catch (Exception e)
- {
- _logger.LogError("批量任务执行异常:{err}", e.Message);
- apptaskItem.TaskStatus = ETaskStatus.Failed;
- apptaskItem.Message = "批量任务执行异常";
- apptaskItem.TaskEndTime = DateTime.Now;
- }
- finally
- {
- await _apptaskItemRepository.Updateable(apptaskItem)
- .UpdateColumns(d => new
- {
- d.TaskStatus,
- d.Message,
- d.TaskEndTime
- })
- .ExecuteCommandAsync(cancellation);
- }
- }
- }
|