XingTangCallsSyncJob.cs 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. using Hotline.Application.CallCenter;
  2. using Quartz;
  3. using SqlSugar;
  4. using Hotline.CallCenter.Calls;
  5. using Hotline.Repository.SqlSugar;
  6. using Hotline.Users;
  7. using MapsterMapper;
  8. using Microsoft.Extensions.Logging;
  9. using XF.Domain.Repository;
  10. using XingTang.Sdk;
  11. using DotNetCore.CAP;
  12. using Hotline.Share.Dtos.Order;
  13. using Hotline.Share.Dtos.TrCallCenter;
  14. using Hotline.Share.Enums.CallCenter;
  15. using System.Dynamic;
  16. using Hotline.Share.Dtos.CallCenter;
  17. namespace Hotline.Application.Jobs
  18. {
  19. /// <summary>
  20. /// 查询兴唐通话记录
  21. /// </summary>
  22. public class XingTangCallsSyncJob : IJob, IDisposable
  23. {
  24. private readonly IRepository<CallNative> _callRepository;
  25. private readonly IRepository<User> _userRepository;
  26. private readonly ICallApplication _callApplication;
  27. private readonly ICapPublisher _capPublisher;
  28. private readonly IMapper _mapper;
  29. private readonly ILogger<XingTangCallsSyncJob> _logger;
  30. private readonly ISqlSugarClient _db;
  31. public XingTangCallsSyncJob(
  32. ISugarUnitOfWork<XingTangDbContext> uow,
  33. IRepository<CallNative> callRepository,
  34. IRepository<User> userRepository,
  35. ICallApplication callApplication,
  36. ICapPublisher capPublisher,
  37. IMapper mapper,
  38. ILogger<XingTangCallsSyncJob> logger)
  39. {
  40. _callRepository = callRepository;
  41. _userRepository = userRepository;
  42. _callApplication = callApplication;
  43. _capPublisher = capPublisher;
  44. _mapper = mapper;
  45. _logger = logger;
  46. _db = uow.Db;
  47. }
  48. public async Task Execute(IJobExecutionContext context)
  49. {
  50. var xingtangCalls = await _db.Queryable<XingtangCall>()
  51. .Where(d => (d.IsSync == null || !d.IsSync) && (d.Tries == null || d.Tries <= 50))
  52. .OrderBy(d => d.Id)
  53. .Take(10)
  54. .ToListAsync(context.CancellationToken);
  55. var occupyCalls = new List<XingtangCall>();
  56. foreach (var call in xingtangCalls)
  57. {
  58. call.IsSync = true;
  59. call.Tries += 1;
  60. var rows = await _db.Updateable(call)
  61. .ExecuteCommandWithOptLockAsync();
  62. if (rows > 0)
  63. occupyCalls.Add(call);
  64. }
  65. try
  66. {
  67. var calls = _mapper.Map<List<CallNative>>(occupyCalls);
  68. //填充user信息
  69. var staffNos = calls.Select(d => d.StaffNo).ToList();
  70. var users = await _userRepository.Queryable()
  71. .Where(d => staffNos.Contains(d.StaffNo))
  72. .ToListAsync(context.CancellationToken);
  73. foreach (var call in calls)
  74. {
  75. call.Id = await GetCallIdAsync(call.CallNo, context.CancellationToken);
  76. var user = users.FirstOrDefault(d => d.StaffNo == call.StaffNo);
  77. if (user is not null)
  78. {
  79. call.UserId = user.Id;
  80. call.UserName = user.Name;
  81. }
  82. }
  83. await _callRepository.AddRangeAsync(calls, context.CancellationToken);
  84. //推省上
  85. if (calls.Any())
  86. {
  87. var callIns = _mapper.Map<List<CallNativeDto>>(calls);
  88. await _capPublisher.PublishAsync(Hotline.Share.Mq.EventNames.HotlineCallAdd, callIns);
  89. }
  90. ////todo
  91. //var callIns = calls.Where(d => d.Direction == ECallDirection.In).ToList();
  92. //if (callIns.Any())
  93. //{
  94. // await _capPublisher.PublishAsync(Hotline.Share.Mq.EventNames.HotlineCallAdd, callIns);
  95. //}
  96. }
  97. catch (Exception e)
  98. {
  99. //Console.WriteLine(e);
  100. _logger.LogError($"获取通话记录异常:{e.Message} \n {e.StackTrace}");
  101. foreach (var occupyCall in occupyCalls)
  102. {
  103. occupyCall.IsSync = false;
  104. }
  105. await _db.Updateable(occupyCalls)
  106. .UpdateColumns(d => new { d.IsSync })
  107. .ExecuteCommandAsync(context.CancellationToken);
  108. }
  109. }
  110. private async Task<string> GetCallIdAsync(string callNo, CancellationToken cancellation)
  111. {
  112. var relation = await _callApplication.GetRelationAsync(callNo, cancellation);
  113. if (relation is null)
  114. {
  115. relation = new CallidRelation
  116. {
  117. Id = callNo,
  118. CallId = Ulid.NewUlid().ToString(),
  119. IsUsed = true
  120. };
  121. await _callApplication.AddRelationAsync(relation, cancellation);
  122. return relation.CallId;
  123. }
  124. if (relation.IsUsed)
  125. return Ulid.NewUlid().ToString();
  126. relation.IsUsed = true;
  127. var rows = await _callApplication.UpdateRelationOptLockAsync(relation, cancellation);
  128. if (rows > 0)
  129. return relation.CallId;
  130. //重新取relation 重新判断isUsed
  131. return await GetCallIdAsync(callNo, cancellation);
  132. }
  133. /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
  134. public void Dispose()
  135. {
  136. //_logger.LogInformation($"{nameof(XingTangCallsSyncJob)} disposed");
  137. }
  138. }
  139. }