1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- using EasyCaching.Core;
- using System.Threading;
- using XF.Domain.Dependency;
- using XF.Domain.Queues;
- namespace XF.EasyCaching;
- public class DistributedQueue : IQueue, IScopeDependency
- {
- private readonly IRedisCachingProvider _redisCaching;
- public DistributedQueue(IRedisCachingProvider redisCaching)
- {
- _redisCaching = redisCaching;
- }
- public void Enqueue<TItem>(string queueName, TItem item) => _redisCaching.LPush(queueName, new List<TItem> { item });
- public void Enqueue<TItem>(string queueName, IList<TItem> items) => _redisCaching.LPush(queueName, items);
- public Task EnqueueAsync<TItem>(string queueName, TItem item, CancellationToken cancellationToken = default) =>
- _redisCaching.LPushAsync(queueName, new List<TItem> { item });
- public Task EnqueueAsync<TItem>(string queueName, IList<TItem> items, CancellationToken cancellationToken = default) =>
- _redisCaching.LPushAsync(queueName, items);
- public TItem? Dequeue<TItem>(string queueName)
- {
- var count = Count(queueName);
- return count <= 0 ? default : _redisCaching.RPop<TItem>(queueName);
- }
- public async Task<TItem?> DequeueAsync<TItem>(string queueName, CancellationToken cancellationToken = default)
- {
- var count = await CountAsync(queueName, cancellationToken);
- if (count <= 0) return default;
- return await _redisCaching.RPopAsync<TItem>(queueName);
- }
- public long Count(string queueName) => _redisCaching.LLen(queueName);
- public Task<long> CountAsync(string queueName, CancellationToken cancellationToken = default) => _redisCaching.LLenAsync(queueName);
- public void Remove<TItem>(string queueName, TItem item, bool fromHead = false, long count = 0) =>
- _redisCaching.LRem(queueName, fromHead ? count : -count, item);
- public Task RemoveAsync<TItem>(string queueName, TItem item, bool fromHead = false, long count = 0,
- CancellationToken cancellationToken = default) =>
- _redisCaching.LRemAsync(queueName, fromHead ? count : -count, item);
- public void EnqueueConcurrent<TItem>(string queueName, TItem item)
- {
- if (Any(queueName, item)) return;
- _redisCaching.LPush(queueName, new List<TItem> { item });
- }
- public async Task EnqueueConcurrentAsync<TItem>(string queueName, TItem item, CancellationToken cancellationToken = default)
- {
- var exists = await AnyAsync(queueName, item, cancellationToken);
- if (exists) return;
- await _redisCaching.LPushAsync(queueName, new List<TItem> { item });
- }
- #region private
- private bool Any<TItem>(string queueName, TItem item)
- {
- return default;
- }
- private async Task<bool> AnyAsync<TItem>(string queueName, TItem item, CancellationToken cancellationToken)
- {
- return default;
- }
- #endregion
- }
|