DistributedQueue.cs 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. using EasyCaching.Core;
  2. using System.Threading;
  3. using XF.Domain.Dependency;
  4. using XF.Domain.Queues;
  5. namespace XF.EasyCaching;
  6. public class DistributedQueue : IQueue, IScopeDependency
  7. {
  8. private readonly IRedisCachingProvider _redisCaching;
  9. public DistributedQueue(IRedisCachingProvider redisCaching)
  10. {
  11. _redisCaching = redisCaching;
  12. }
  13. public void Enqueue<TItem>(string queueName, TItem item) => _redisCaching.LPush(queueName, new List<TItem> { item });
  14. public void Enqueue<TItem>(string queueName, IList<TItem> items) => _redisCaching.LPush(queueName, items);
  15. public Task EnqueueAsync<TItem>(string queueName, TItem item, CancellationToken cancellationToken = default) =>
  16. _redisCaching.LPushAsync(queueName, new List<TItem> { item });
  17. public Task EnqueueAsync<TItem>(string queueName, IList<TItem> items, CancellationToken cancellationToken = default) =>
  18. _redisCaching.LPushAsync(queueName, items);
  19. public TItem? Dequeue<TItem>(string queueName)
  20. {
  21. var count = Count(queueName);
  22. return count <= 0 ? default : _redisCaching.RPop<TItem>(queueName);
  23. }
  24. public async Task<TItem?> DequeueAsync<TItem>(string queueName, CancellationToken cancellationToken = default)
  25. {
  26. var count = await CountAsync(queueName, cancellationToken);
  27. if (count <= 0) return default;
  28. return await _redisCaching.RPopAsync<TItem>(queueName);
  29. }
  30. public long Count(string queueName) => _redisCaching.LLen(queueName);
  31. public Task<long> CountAsync(string queueName, CancellationToken cancellationToken = default) => _redisCaching.LLenAsync(queueName);
  32. public void Remove<TItem>(string queueName, TItem item, bool fromHead = false, long count = 0) =>
  33. _redisCaching.LRem(queueName, fromHead ? count : -count, item);
  34. public Task RemoveAsync<TItem>(string queueName, TItem item, bool fromHead = false, long count = 0,
  35. CancellationToken cancellationToken = default) =>
  36. _redisCaching.LRemAsync(queueName, fromHead ? count : -count, item);
  37. public void EnqueueConcurrent<TItem>(string queueName, TItem item)
  38. {
  39. if (Any(queueName, item)) return;
  40. _redisCaching.LPush(queueName, new List<TItem> { item });
  41. }
  42. public async Task EnqueueConcurrentAsync<TItem>(string queueName, TItem item, CancellationToken cancellationToken = default)
  43. {
  44. var exists = await AnyAsync(queueName, item, cancellationToken);
  45. if (exists) return;
  46. await _redisCaching.LPushAsync(queueName, new List<TItem> { item });
  47. }
  48. #region private
  49. private bool Any<TItem>(string queueName, TItem item)
  50. {
  51. return default;
  52. }
  53. private async Task<bool> AnyAsync<TItem>(string queueName, TItem item, CancellationToken cancellationToken)
  54. {
  55. return default;
  56. }
  57. #endregion
  58. }