如何在.Net Core API项目中跨多个线程限制对HttpClient的所有传出异步调用

How to Throttle all outgoing asynchronous calls to HttpClient across multiple threads in .net Core API project(如何在.Net Core API项目中跨多个线程限制对HttpClient的所有传出异步调用)

本文介绍了如何在.Net Core API项目中跨多个线程限制对HttpClient的所有传出异步调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在设计一个.Net核心Web API,它使用一个我不能控制的外部API。我找到了一些关于堆栈溢出的很好的答案,它们允许我在同一线程中使用信号量来限制对这个外部API的请求。我想知道如何最好地将这种限制扩展到应用程序范围,而不是只针对特定的任务列表进行限制。我一直在学习HttpMessageHandler,这似乎是拦截所有传出消息并应用节流的一种可能方式。但我担心的是我可能不理解的线程安全和锁定问题。我包括了我当前的节流代码,希望这可能有助于理解我正在尝试做什么,但跨多个线程,并且不断添加任务,而不是预定义的任务列表。

private static async Task<List<iMISPagedResultResponse>> GetAsyncThrottled(List<int> pages, int throttle, IiMISClient client, string url, int limit)
{
        var rtn = new List<PagedResultResponse>();
        var allTasks = new List<Task>();
        var throttler = new SemaphoreSlim(initialCount: throttle);
        foreach (var page in pages)
        {
            await throttler.WaitAsync();
            allTasks.Add(
                Task.Run(async () =>
                {
                    try
                    {
                        var result = await GetPagedResult(client, url, page);
                        return result;
                    }
                    finally
                    {
                        throttler.Release();
                    }
                }));
        }
        await Task.WhenAll(allTasks);
        foreach (var task in allTasks)
        {
            var result = ((Task<PagedResultResponse>)task).Result;
            rtn.Add(result);
        }
        return rtn;
}

推荐答案

概念性问题

  • SemaphoreSlim是线程安全的,因此将其用作跨多个线程的并行度节流不存在线程安全或锁定问题。
  • HttpMessageHandler确实是outbound middleware mechanism to intercept calls placed through HttpClient。因此,它们是使用SemaphoreSlim对http调用应用并行性限制的理想方式。

简单实现

因此ThrottlingDelegatingHandler可能如下所示:

public class ThrottlingDelegatingHandler : DelegatingHandler
{
    private SemaphoreSlim _throttler;

    public ThrottlingDelegatingHandler(SemaphoreSlim throttler)
    {
        _throttler = throttler ?? throw new ArgumentNullException(nameof(throttler));
    }

    protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
    {
        if (request == null) throw new ArgumentNullException(nameof(request));

        await _throttler.WaitAsync(cancellationToken);
        try
        {
            return await base.SendAsync(request, cancellationToken);
        }
        finally
        {
            _throttler.Release();
        }
    }
}

将实例作为单一实例创建和维护:

int maxParallelism = 10;
var throttle = new ThrottlingDelegatingHandler(new SemaphoreSlim(maxParallelism)); 
DelegatingHandler应用于要通过其并行限制调用的HttpClient的所有实例:
HttpClient throttledClient = new HttpClient(throttle);

HttpClient不必是单例:只需要throttle实例。

为简洁起见,我省略了Dot Net Core的DI代码,但您将使用.Net Core的容器注册单例ThrottlingDelegatingHandler实例,在使用点通过DI获取该单例,并在您如上所示构造的HttpClient中使用它。

但是:

更好的实施:使用HttpClientFactory(.NET Core 2.1+)

以上仍然回避了一个问题:您将如何管理HttpClient生命周期:

  • 单例(应用作用域)HttpClient%sdo not pick up DNS updates。除非您终止并重新启动您的应用程序(可能不受欢迎),否则您的应用程序将无法识别DNS更新。
  • 频繁创建和释放模式using (HttpClient client = ) { },can cause socket exhaustion。

HttpClientFactory的设计目标之一是管理HttpClient实例及其委托处理程序的生命周期,以避免这些问题。

在.NET Core 2.1中,您可以使用HttpClientFactoryConfigureServices(IServiceCollection services)中的Startup类中将其连接起来,如下所示:

int maxParallelism = 10;
services.AddSingleton<ThrottlingDelegatingHandler>(new ThrottlingDelegatingHandler(new SemaphoreSlim(maxParallelism)));

services.AddHttpClient("MyThrottledClient")
    .AddHttpMessageHandler<ThrottlingDelegatingHandler>();

(";MyThrottledClient";这里是一个named-client approach,仅为简短起见;typed clients避免使用字符串命名。)

在使用点,通过DI(reference)获取一个IHttpClientFactory,然后调用

var client = _clientFactory.CreateClient("MyThrottledClient");

获取预配置了单例ThrottlingDelegatingHandlerHttpClient实例。

以这种方式通过HttpClient实例获取的所有调用都将(在整个应用程序中通用)限制到最初配置的int maxParallelism

和HttpClientFactory神奇地处理所有HttpClient生命周期问题。

更好的实现:将Polly与IHttpClientFactory结合使用以获得所有这些开箱即用的功能

Polly是deeply integrated with IHttpClientFactory,Polly还提供Bulkhead policy其中works as a parallelism throttle by an identical SemaphoreSlim mechanism。

因此,作为手动滚动ThrottlingDelegatingHandler的替代方案,您也可以只将Polly隔板策略与IHttpClientFactory一起使用。在您的Startup类中,只需:

int maxParallelism = 10;
var throttler = Policy.BulkheadAsync<HttpResponseMessage>(maxParallelism, Int32.MaxValue);

services.AddHttpClient("MyThrottledClient")
    .AddPolicyHandler(throttler);

如前所述从HttpClientFactory获取预配置的HttpClient实例。如前所述,通过这样的HttpClient实例的所有调用将被并行限制到已配置的maxParallelism

此外,Polly隔板策略还提供了配置您希望同时允许多少个操作为主信号量中的一个执行槽"排队"的能力。因此,例如:

var throttler = Policy.BulkheadAsync<HttpResponseMessage>(10, 100);
当按上述方式配置到HttpClient中时,将允许10个并行http调用,并允许多达100个http调用为一个执行槽"排队"。这可以防止出现故障的下游系统导致上游排队呼叫的资源过度膨胀,从而为高吞吐量系统提供额外的弹性。

若要将Polly选项与HttpClientFactory一起使用,请引入Microsoft.Extensions.Http.PollyPollyNuget包。

引用:Polly deep doco on Polly and IHttpClientFactory;Bulkhead policy。


附录重新任务

问题使用Task.Run(...)并提到:

使用外部API的.Net核心Web API

和:

不断添加任务,而不是预定义的任务列表。

如果您的.Net核心Web API对于.Net核心Web API处理的每个请求只使用一次外部API,并且您采用了本答案其余部分讨论的方法,那么将下游外部http调用卸载到具有Task.Run(...)的新Task将是不必要的,并且只会在额外的Task实例和线程切换中产生开销。.NET核心将已经在线程池上的多个线程上运行传入请求。

这篇关于如何在.Net Core API项目中跨多个线程限制对HttpClient的所有传出异步调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本文标题为:如何在.Net Core API项目中跨多个线程限制对HttpClient的所有传出异步调用

基础教程推荐