TaskSendTaskLuZhou110Job.cs 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. using DataSharing.SendTask;
  2. using DataSharing.Share.Dtos.LuZhou;
  3. using DataSharing.Share.Enums;
  4. using MapsterMapper;
  5. using Microsoft.Extensions.Logging;
  6. using Newtonsoft.Json;
  7. using Quartz;
  8. using XF.Domain.Repository;
  9. using static Lucene.Net.Util.Fst.Util;
  10. namespace DataSharing.LuZhou.LuZhou110
  11. {
  12. public class TaskSendTaskLuZhou110Job : IJob, IDisposable
  13. {
  14. private readonly IMapper _mapper;
  15. private readonly ILogger<TaskOtherPlatformsJob> _logger;
  16. private readonly IRepository<SendTaskLuzhou110> _sendTaskLuZhou110Repository;
  17. private readonly IRepository<WaitSendTaskLuzhou110> _waitSendTaskLuZhou110Repository;
  18. private readonly LZ110Invoker _lz110Invoker;
  19. private readonly ISharingConfigurationManager _sharingConfigurationManager;
  20. private readonly IRepository<DsSendTaskDetailInfo> _sendTaskDetailInfoRepository;
  21. private readonly ILZ110RegisterManager _registerManager;
  22. /// <summary>
  23. ///
  24. /// </summary>
  25. /// <param name="mapper"></param>
  26. /// <param name="logger"></param>
  27. /// <param name="sendTaskLuZhou110Repository"></param>
  28. /// <param name="waitSendTaskLuZhou110Repository"></param>
  29. /// <param name="lz110Invoker"></param>
  30. public TaskSendTaskLuZhou110Job(IMapper mapper,
  31. ILogger<TaskOtherPlatformsJob> logger,
  32. IRepository<SendTaskLuzhou110> sendTaskLuZhou110Repository,
  33. IRepository<WaitSendTaskLuzhou110> waitSendTaskLuZhou110Repository,
  34. LZ110Invoker lz110Invoker,
  35. ISharingConfigurationManager sharingConfigurationManager,
  36. IRepository<DsSendTaskDetailInfo> sendTaskDetailInfoRepository,
  37. ILZ110RegisterManager registerManager)
  38. {
  39. _mapper = mapper;
  40. _logger = logger;
  41. _sendTaskLuZhou110Repository = sendTaskLuZhou110Repository;
  42. _waitSendTaskLuZhou110Repository = waitSendTaskLuZhou110Repository;
  43. _lz110Invoker = lz110Invoker;
  44. _sharingConfigurationManager = sharingConfigurationManager;
  45. _sendTaskDetailInfoRepository = sendTaskDetailInfoRepository;
  46. _registerManager = registerManager;
  47. }
  48. public async Task Execute(IJobExecutionContext context)
  49. {
  50. var tasks = await _waitSendTaskLuZhou110Repository.Queryable()
  51. .Where(d => d.State == EWaitSendTaskState.WaitPush && d.SendTimes <= 3)
  52. .OrderBy(d => d.CreationTime)
  53. .Take(10)
  54. .ToListAsync(context.CancellationToken);
  55. _logger.LogInformation("泸州110平台读取数据条数--------------------------------------" + tasks.Count);
  56. if (tasks.Count != 0)
  57. {
  58. List<WaitSendTaskLuzhou110> list = new List<WaitSendTaskLuzhou110>();
  59. //处理推送数据
  60. foreach (var sendTask in tasks)
  61. {
  62. //状态更改为推送中
  63. sendTask.State = EWaitSendTaskState.Pushing;
  64. //使用乐观锁,控制数据
  65. if (await _waitSendTaskLuZhou110Repository.Updateable(sendTask).ExecuteCommandWithOptLockAsync() > 0)
  66. {
  67. list.Add(sendTask);
  68. }
  69. }
  70. _logger.LogInformation("泸州110平台锁定条数--------------------------------------" + list.Count);
  71. //执行推送
  72. if (list != null && list.Count > 0)
  73. {
  74. await SendData(list, context.CancellationToken);
  75. }
  76. }
  77. }
  78. /// <summary>
  79. /// 推送数据
  80. /// </summary>
  81. /// <param name="list"></param>
  82. /// <param name="cancellationToken"></param>
  83. /// <returns></returns>
  84. private async Task SendData(List<WaitSendTaskLuzhou110> list, CancellationToken cancellationToken)
  85. {
  86. var url = _sharingConfigurationManager.GetLuZhouConfig().LuZhou110.SendDataAddress;
  87. foreach (var item in list)
  88. {
  89. string error = "";
  90. LZ110ApiResponse<LZ110BaseResponse> response = new();
  91. try
  92. {
  93. response = await _lz110Invoker.RequestStringContentAsync<LZ110BaseResponse>(item.Path, item.Request, cancellationToken);
  94. _logger.LogInformation($"发起请求LZ110返回111111111:{JsonConvert.SerializeObject(response)}");
  95. if (!response.IsSuccess && response?.Data?.ResponseStatusObject.StatusCode == "1")// ==1:未注册,==4:已注册
  96. {
  97. await _registerManager.RefreshRegisterAsync(cancellationToken);
  98. _logger.LogInformation($"发起请求LZ110返回222222:{JsonConvert.SerializeObject(response)}");
  99. response = await _lz110Invoker.RequestStringContentAsync<LZ110BaseResponse>(item.Path, item.Request, cancellationToken);
  100. _logger.LogInformation($"发起请求LZ110返回333333:{JsonConvert.SerializeObject(response)}");
  101. }
  102. }
  103. catch (Exception ex)
  104. {
  105. _logger.LogInformation($"发起请求LZ110返回333333:{ex.Message}");
  106. error = ex.Message;
  107. }
  108. //写入明细表
  109. DsSendTaskDetailInfo dsSendTaskInfo = new()
  110. {
  111. TaskId = item.Id,
  112. Result = System.Text.RegularExpressions.Regex.Unescape(System.Text.Json.JsonSerializer.Serialize(response)),
  113. ResultErrorData = error,
  114. ProcessingServices = ""
  115. };
  116. if (response is not null)
  117. {
  118. #region 处理待同步表数据
  119. //移除待同步表数据
  120. if (response.IsSuccess)
  121. await _waitSendTaskLuZhou110Repository.RemoveAsync(item, cancellationToken: cancellationToken);
  122. else
  123. {
  124. //修改待同步表数据状态
  125. item.LastTime = DateTime.Now;
  126. item.SendTimes = item.SendTimes + 1;
  127. if (item.FirstTime is null)
  128. item.FirstTime = DateTime.Now;
  129. if (item.SendTimes >= 3)
  130. item.State = EWaitSendTaskState.PushFail;
  131. else
  132. item.State = EWaitSendTaskState.WaitPush;
  133. await _waitSendTaskLuZhou110Repository.UpdateAsync(item, cancellationToken);
  134. }
  135. #endregion
  136. #region 处理推送任务表
  137. var sendTaskData = await _sendTaskLuZhou110Repository.GetAsync(p => p.Id == item.Id, cancellationToken);
  138. if (sendTaskData != null)
  139. {
  140. //处理推送时间
  141. sendTaskData.LastTime = DateTime.Now;
  142. sendTaskData.SendTimes = sendTaskData.SendTimes + 1;
  143. if (sendTaskData.FirstTime is null)
  144. sendTaskData.FirstTime = DateTime.Now;
  145. if (response.IsSuccess)
  146. {
  147. sendTaskData.IsSuccess = ESendTaskState.PushSuccess;
  148. dsSendTaskInfo.IsSuccess = true;
  149. }
  150. else
  151. sendTaskData.IsSuccess = ESendTaskState.PushFail;
  152. await _sendTaskLuZhou110Repository.UpdateAsync(sendTaskData, cancellationToken);
  153. }
  154. #endregion
  155. }
  156. await _sendTaskDetailInfoRepository.AddAsync(dsSendTaskInfo, cancellationToken);
  157. }
  158. }
  159. public void Dispose()
  160. {
  161. }
  162. }
  163. }