浏览代码

SendTaskLoadService

xf 1 年之前
父节点
当前提交
606dfe62d4

+ 1 - 0
src/DataSharing.Application/DataSharing.Application.csproj

@@ -9,6 +9,7 @@
   <ItemGroup>
     <PackageReference Include="Hotline.Share" Version="1.0.37" />
     <PackageReference Include="Polly.Core" Version="8.2.0" />
+    <PackageReference Include="XF.EasyCaching" Version="1.0.1" />
   </ItemGroup>
 
   <ItemGroup>

+ 128 - 76
src/DataSharing.Application/Services/SendTaskHandler.cs

@@ -1,76 +1,128 @@
-//using System;
-//using System.Collections.Generic;
-//using System.Linq;
-//using System.Text;
-//using System.Threading.Tasks;
-//using Consul;
-//using Microsoft.Extensions.DependencyInjection;
-//using Microsoft.Extensions.Hosting;
-//using Microsoft.Extensions.Logging;
-//using Polly.Registry;
-//using XF.Domain.Dependency;
-//using XF.Domain.Exceptions;
-
-//namespace DataSharing.Application.Services
-//{
-//    public class SendTaskHandler : ISendTaskHandler, IScopeDependency
-//    {
-//        private readonly ILogger<SendTaskHandler> _logger;
-
-//        public SendTaskHandler(ILogger<SendTaskHandler> logger)
-//        {
-//            _logger = logger;
-//        }
-
-//        public async Task SendAsync(int count, CancellationToken cancellationToken)
-//        {
-//            _logger.LogInformation($"==>执行send开始, count:{count}");
-//            var rd = Random.Shared.Next(1000, 10000);
-//            if (rd <= 5000)
-//            {
-//                throw new UserFriendlyException("执行失败");
-//            }
-
-//            await Task.Delay(rd, cancellationToken);
-
-//            _logger.LogInformation($"执行send结束, count:{count}<==");
-//        }
-//    }
-
-//    public interface ISendTaskHandler
-//    {
-//        Task SendAsync(int count, CancellationToken cancellationToken);
-//    }
-
-//    public class SendTaskService : BackgroundService
-//    {
-//        private readonly IServiceScopeFactory _serviceScopeFactory;
-
-//        public SendTaskService(IServiceScopeFactory serviceScopeFactory)
-//        {
-//            _serviceScopeFactory = serviceScopeFactory;
-//        }
-
-//        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
-//        {
-//            using var scope = _serviceScopeFactory.CreateScope();
-//            var provider = scope.ServiceProvider;
-//            var pipelineProvider = provider.GetRequiredService<ResiliencePipelineProvider<string>>();
-//            var pipeline = pipelineProvider.GetPipeline(StrategyDefaults.RetryStrategy);
-//            var handler = provider.GetRequiredService<ISendTaskHandler>();
-
-//            var count = 0;
-//            while (!stoppingToken.IsCancellationRequested)
-//            {
-//                await pipeline.ExecuteAsync(async token =>
-//                {
-//                    Console.WriteLine($"execute count:{count}");
-//                    await handler.SendAsync(count, token);
-//                    count++;
-//                });
-
-//                await Task.Delay(1000, stoppingToken);
-//            }
-//        }
-//    }
-//}
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Consul;
+using EasyCaching.Redis;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using Polly;
+using Polly.Registry;
+using Polly.Retry;
+using StackExchange.Redis;
+using XF.Domain.Dependency;
+using XF.Domain.Exceptions;
+using XF.Domain.Queues;
+
+namespace DataSharing.Application.Services
+{
+    public class SendTaskHandler : ISendTaskHandler, IScopeDependency
+    {
+        //private readonly ResiliencePipelineProvider<string> _pipelineProvider;
+        private readonly IQueue _queue;
+        private readonly ILogger<SendTaskHandler> _logger;
+
+        public SendTaskHandler(
+            //ResiliencePipelineProvider<string> pipelineProvider,
+            IQueue queue,
+            ILogger<SendTaskHandler> logger)
+        {
+            //_pipelineProvider = pipelineProvider;
+            _queue = queue;
+            _logger = logger;
+        }
+
+        public async Task SendAsync(int count, CancellationToken cancellationToken)
+        {
+            _logger.LogInformation($"==>执行send开始, count:{count}");
+            //2
+            //var pipeline = _pipelineProvider.GetPipeline(StrategyDefaults.RetryStrategy);
+
+            //await pipeline.ExecuteAsync(async token =>
+            //{
+            //    var rd = Random.Shared.Next(1000, 10000);
+            //    if (rd <= 5000)
+            //    {
+            //        throw new UserFriendlyException($"count: {count}, 执行失败");
+            //    }
+            //});
+
+            //1
+            //var rd = Random.Shared.Next(1000, 10000);
+            //if (rd <= 5000)
+            //{
+            //    throw new UserFriendlyException("执行失败");
+            //}
+
+            //await Task.Delay(rd, cancellationToken);
+
+            //3
+            // Create an instance of builder that exposes various extensions for adding resilience strategies
+            //ResiliencePipeline pipeline = new ResiliencePipelineBuilder()
+            //    .AddRetry(new RetryStrategyOptions()) // Add retry using the default options
+            //    .AddTimeout(TimeSpan.FromSeconds(10)) // Add 10 seconds timeout
+            //    .Build(); // Builds the resilience pipeline
+
+            //// Execute the pipeline asynchronously
+            //await pipeline.ExecuteAsync(async token =>
+            //{
+            //    var rd = Random.Shared.Next(1000, 10000);
+            //    if (rd <= 5000)
+            //    {
+            //        throw new UserFriendlyException($"count: {count}, 执行失败");
+            //    }
+            //});
+
+            //4
+            //await _queue.EnqueueAsync("test-queue", "111", cancellationToken);
+            //await _queue.EnqueueAsync("test-queue", "222", cancellationToken);
+            //await _queue.EnqueueAsync("test-queue", "333", cancellationToken);
+
+            //var a1 = await _queue.DequeueAsync<string>("test-queue", cancellationToken);
+            //var a2 = await _queue.DequeueAsync<string>("test-queue", cancellationToken);
+            
+
+            _logger.LogInformation($"执行send结束, count:{count}<==");
+        }
+    }
+
+    public interface ISendTaskHandler
+    {
+        Task SendAsync(int count, CancellationToken cancellationToken);
+    }
+
+    public class SendTaskService : BackgroundService
+    {
+        private readonly IServiceScopeFactory _serviceScopeFactory;
+
+        public SendTaskService(IServiceScopeFactory serviceScopeFactory)
+        {
+            _serviceScopeFactory = serviceScopeFactory;
+        }
+
+        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+        {
+            using var scope = _serviceScopeFactory.CreateScope();
+            var provider = scope.ServiceProvider;
+            //var pipelineProvider = provider.GetRequiredService<ResiliencePipelineProvider<string>>();
+            //var pipeline = pipelineProvider.GetPipeline(StrategyDefaults.RetryStrategy);
+            var handler = provider.GetRequiredService<ISendTaskHandler>();
+
+            var count = 0;
+            while (!stoppingToken.IsCancellationRequested)
+            {
+                //await handler.SendAsync(count, stoppingToken);
+                ////await pipeline.ExecuteAsync(async token =>
+                ////{
+                ////    Console.WriteLine($"execute count:{count}");
+                ////    await handler.SendAsync(count, token);
+                ////});
+
+                //count++;
+                //await Task.Delay(1000, stoppingToken);
+            }
+        }
+    }
+}

+ 45 - 0
src/DataSharing.Application/Services/SendTaskLoadService.cs

@@ -0,0 +1,45 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using XF.Domain.Repository;
+
+namespace DataSharing.Application.Services
+{
+    public class SendTaskLoadService : BackgroundService
+    {
+        private readonly IServiceScopeFactory _serviceScopeFactory;
+        private const string SendTaskQueue = "send-task-queue";
+        private const int Delay = 60000;
+
+        public SendTaskLoadService(IServiceScopeFactory serviceScopeFactory)
+        {
+            _serviceScopeFactory = serviceScopeFactory;
+        }
+
+        /// <summary>
+        /// This method is called when the <see cref="T:Microsoft.Extensions.Hosting.IHostedService" /> starts. The implementation should return a task that represents
+        /// the lifetime of the long running operation(s) being performed.
+        /// </summary>
+        /// <param name="stoppingToken">Triggered when <see cref="M:Microsoft.Extensions.Hosting.IHostedService.StopAsync(System.Threading.CancellationToken)" /> is called.</param>
+        /// <returns>A <see cref="T:System.Threading.Tasks.Task" /> that represents the long running operations.</returns>
+        /// <remarks>See <see href="https://docs.microsoft.com/dotnet/core/extensions/workers">Worker Services in .NET</see> for implementation guidelines.</remarks>
+        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+        {
+            using var scope = _serviceScopeFactory.CreateScope();
+            var provider = scope.ServiceProvider;
+            //var TaskRepository = provider.GetService<IRepository<>>()
+
+            while (!stoppingToken.IsCancellationRequested)
+            {
+
+
+
+                await Task.Delay(Delay, stoppingToken);
+            }
+        }
+    }
+}

+ 2 - 5
src/DataSharing.Host/StartupExtensions.cs

@@ -39,8 +39,8 @@ internal static class StartupExtensions
             .AddApplication()
             ;
 
-        //Authentication
-        services.RegisterAuthentication(configuration);
+        ////Authentication
+        //services.RegisterAuthentication(configuration);
 
         services.AddControllers(options =>
             {
@@ -55,9 +55,6 @@ internal static class StartupExtensions
         //swagger
         services.RegisterSwagger();
 
-        //signalR
-        services.RegisterSignalR(configuration);
-
         /* CORS */
         services.RegisterCors(configuration, CorsOrigins);