Foreword

Microsoft.AspNetCore.ConcurrencyLimiter Added after AspNetCore3.0, used for incoming requests to be queued to avoid the lack of thread pool. In
our daily development, we may often configure the number of connections to a web server and request queue size. Today we look at how to implement a concurrency and queue length limit through the middleware form.

Queue policy

Add Nuget

Install-Package Microsoft.AspNetCore.ConcurrencyLimiter

Copy        public void ConfigureServices(IServiceCollection services)
        {
            services.AddQueuePolicy(options =>
            {
                
                options.MaxConcurrentRequests = 2;
               
                options.RequestQueueLimit = 1;
            });
            services.AddControllers();
        }
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
           
            app.UseConcurrencyLimiter();
            app.Run(async context =>
            {
                Task.Delay(100).Wait(); // 100ms sync-over-async

                await context.Response.WriteAsync("Hello World!");
            });
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseHttpsRedirection();

            app.UseRouting();

            app.UseAuthorization();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
        }     

With the simple configuration above, we can introduce him into our code to do the concurrency limit and the length of the queue; then the question is, how is he implemented?

Copy public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)
{
        services.Configure(configure);
        services.AddSingleton<IQueuePolicy, QueuePolicy>();
        return services;
}

QueuePolicy uses SemaphoreSlim semaphore design, SemaphoreSlim , Semaphore (semaphore) supports concurrent multi-threaded access to protected code, the object will specify the maximum number of tasks when initializing, when the thread requests access to resources, the semaphore decrements, and when they release The semaphore count is incremented.

Copy      /// <summary>
        ///     
        /// </summary>
        /// <param name="options"></param>
        public QueuePolicy(IOptions<QueuePolicyOptions> options)
        {
            _maxConcurrentRequests = options.Value.MaxConcurrentRequests;
            if (_maxConcurrentRequests <= 0)
            {
                throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer.");
            }

            _requestQueueLimit = options.Value.RequestQueueLimit;
            if (_requestQueueLimit < 0)
            {
                throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number.");
            }
            //yse SemaphoreSlim to limit
            _serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests);
        }

ConcurrencyLimiterMiddleware Middleware

Copy        /// <summary>
        /// Invokes the logic of the middleware.
        /// </summary>
        /// <param name="context">The <see cref="HttpContext"/>.</param>
        /// <returns>A <see cref="Task"/> that completes when the request leaves.</returns>
        public async Task Invoke(HttpContext context)
        {
            var waitInQueueTask = _queuePolicy.TryEnterAsync();

            // Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.
            bool result;

            if (waitInQueueTask.IsCompleted)
            {
                ConcurrencyLimiterEventSource.Log.QueueSkipped();
                result = waitInQueueTask.Result;
            }
            else
            {
                using (ConcurrencyLimiterEventSource.Log.QueueTimer())
                {
                    result = await waitInQueueTask;
                }
            }

            if (result)
            {
                try
                {
                    await _next(context);
                }
                finally
                {
                    _queuePolicy.OnExit();
                }
            }
            else
            {
                ConcurrencyLimiterEventSource.Log.RequestRejected();
                ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
                context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
                await _onRejected(context);
            }
        }

Every time we request it, we will call it first _queuePolicy.TryEnterAsync(). After entering the method, we will open a private lock, and then judge whether the total request volume is ≥ (the size of the request queue limit + the maximum number of concurrent requests) . If the current number is exceeded, then I throw it directly and send you a 503 status;

Copy  if (result)
  {
         try
         {
             await _next(context);
         }
         finally
        {
            _queuePolicy.OnExit();
        }
        }
        else
        {
            ConcurrencyLimiterEventSource.Log.RequestRejected();
            ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
            context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
            await _onRejected(context);
        }

The problem is coming. If I haven’t reached the size you set up yet, my request does not put pressure on your server, then you can handle it for me.
await _serverSemaphore.WaitAsync();Asynchronously wait for the semaphore, if no thread is granted the signal. The amount of access rights, enter the execution protection code; otherwise the thread will wait here until the semaphore is released

Copy lock (_totalRequestsLock)
    {
        if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests)
        {
             return false;
        }
            TotalRequests++;
        }
        
        await _serverSemaphore.WaitAsync();
        return true;
    }

After the success is returned, the middleware is processed again, and _queuePolicy.OnExit();the call is _serverSemaphore.Release();released by the call , and the total number of requests is decremented.

Stack strategy

Let’s take a look at another method, stack strategy, how does he do it? Take a look at it. Attach the code how to use it.

Copy     public void ConfigureServices(IServiceCollection services)
        {
            services.AddStackPolicy(options =>
            {
               
                options.MaxConcurrentRequests = 2;
               
                options.RequestQueueLimit = 1;
            });
            services.AddControllers();
        }

Through the above configuration, we can implement the corresponding strategy for our application. Let’s see how he implemented it.

Copy  public static IServiceCollection AddStackPolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)
        {
            services.Configure(configure);
            services.AddSingleton<IQueuePolicy, StackPolicy>();
            return services;
        }

You can see that this is StackPolicya strategy that is done through classes. Let’s take a look at the main methods.

Copy        /// <summary>
        ///     
        /// </summary>
        /// <param name="options"></param>
        public StackPolicy(IOptions<QueuePolicyOptions> options)
        {
            //stack
            _buffer = new List<ResettableBooleanCompletionSource>();
            //queue
            _maxQueueCapacity = options.Value.RequestQueueLimit;
           
            _maxConcurrentRequests = options.Value.MaxConcurrentRequests;
           
            _freeServerSpots = options.Value.MaxConcurrentRequests;
        }

When we request a call through the middleware, _queuePolicy.TryEnterAsync()we will first determine if we still have the number of access requests. If _freeServerSpots>0, then we will return true to us directly, let the middleware go directly to the next step, if the current queue = we set The size of the queue, then we need to cancel the previous request; each cancellation is to cancel the previous request after the previous reservation;

Copy    public ValueTask<bool> TryEnterAsync()
        {
            lock (_bufferLock)
            {
                if (_freeServerSpots > 0)
                {
                    _freeServerSpots--;
                    return _trueTask;
                }
               
                if (_queueLength == _maxQueueCapacity)
                {
                    _hasReachedCapacity = true;
                    _buffer[_head].Complete(false);
                    _queueLength--;
                }
                var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this);
                _cachedResettableTCS = null;
                if (_hasReachedCapacity || _queueLength < _buffer.Count)
                {
                    _buffer[_head] = tcs;
                }
                else
                {
                    _buffer.Add(tcs);
                }
                _queueLength++;
                // increment _head for next time
                _head++;
                if (_head == _maxQueueCapacity)
                {
                    _head = 0;
                }
                return tcs.GetValueTask();
            }
        }

When we request the call, _queuePolicy.OnExit();the stack is decremented and the request length is decremented.

Copy    public void OnExit()
        {
            lock (_bufferLock)
            {
                if (_queueLength == 0)
                {
                    _freeServerSpots++;

                    if (_freeServerSpots > _maxConcurrentRequests)
                    {
                        _freeServerSpots--;
                        throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync");
                    }

                    return;
                }

                // step backwards and launch a new task
                if (_head == 0)
                {
                    _head = _maxQueueCapacity - 1;
                }
                else
                {
                    _head--;
                }
                //退出,出栈
                _buffer[_head].Complete(true);
                _queueLength--;
            }
        }

Summary

Based on the characteristics of the stack structure, in practical applications, usually only the following two operations are performed on the stack:

  • Adding an element to the stack is called “stacking” (stacking or pushing);
  • Extracting the specified element from the stack, this process is called “popping” (or popup);

There are two ways to implement the queue storage structure:

  • Sequential queue: a queue structure implemented on the basis of a sequence table;
  • Chain queue: a queue structure implemented on the basis of a linked list;