using EasyCaching.Core; using System.Threading; using XF.Domain.Dependency; using XF.Domain.Queues; namespace XF.EasyCaching; public class DistributedQueue : IQueue, IScopeDependency { private readonly IRedisCachingProvider _redisCaching; public DistributedQueue(IRedisCachingProvider redisCaching) { _redisCaching = redisCaching; } public void Enqueue(string queueName, TItem item) => _redisCaching.LPush(queueName, new List { item }); public void Enqueue(string queueName, IList items) => _redisCaching.LPush(queueName, items); public Task EnqueueAsync(string queueName, TItem item, CancellationToken cancellationToken = default) => _redisCaching.LPushAsync(queueName, new List { item }); public Task EnqueueAsync(string queueName, IList items, CancellationToken cancellationToken = default) => _redisCaching.LPushAsync(queueName, items); public TItem? Dequeue(string queueName) { var count = Count(queueName); return count <= 0 ? default : _redisCaching.RPop(queueName); } public async Task DequeueAsync(string queueName, CancellationToken cancellationToken = default) { var count = await CountAsync(queueName, cancellationToken); if (count <= 0) return default; return await _redisCaching.RPopAsync(queueName); } public long Count(string queueName) => _redisCaching.LLen(queueName); public Task CountAsync(string queueName, CancellationToken cancellationToken = default) => _redisCaching.LLenAsync(queueName); public void Remove(string queueName, TItem item, bool fromHead = false, long count = 0) => _redisCaching.LRem(queueName, fromHead ? count : -count, item); public Task RemoveAsync(string queueName, TItem item, bool fromHead = false, long count = 0, CancellationToken cancellationToken = default) => _redisCaching.LRemAsync(queueName, fromHead ? count : -count, item); public void EnqueueConcurrent(string queueName, TItem item) { if (Any(queueName, item)) return; _redisCaching.LPush(queueName, new List { item }); } public async Task EnqueueConcurrentAsync(string queueName, TItem item, CancellationToken cancellationToken = default) { var exists = await AnyAsync(queueName, item, cancellationToken); if (exists) return; await _redisCaching.LPushAsync(queueName, new List { item }); } #region private private bool Any(string queueName, TItem item) { return default; } private async Task AnyAsync(string queueName, TItem item, CancellationToken cancellationToken) { return default; } #endregion }