Question:
Quite a simple task – there are N ordered requests that need to be executed asynchronously with a limit on the number of simultaneously executed, and then get them from them in the same N ordered responses.
In reality, it can be working with a network or a database, when there are a lot of requests, but you don't want to arrange an apocalypse on your computer, network or server. Therefore, a limit is introduced on the number of simultaneously active asynchronous tasks.
The frontal solution looks obvious.
public async Task<Tout[]> RunSemaphoreAsync<Tin, Tout>(IEnumerable<Tin> items, Func<Tin, Task<Tout>> func, int degree)
{
using SemaphoreSlim semaphore = new(degree);
return await Task.WhenAll(items.Select(async item => {
await semaphore.WaitAsync();
try
{
return await func(item);
}
finally
{
semaphore.Release();
}
}));
}
Usage example:
Let it be a work task that accepts something, does something and gives something back.
public async Task<int> RunJobAsync(int n)
{
await Task.Yield();
return n + 1;
}
And here is such a launch for an example.
IEnumerable<int> numbers = Enumerable.Range(0, 100);
int[] result = await RunSemaphoreAsync(numbers, RunJobAsync, Environment.ProcessorCount * 2);
Console.WriteLine(string.Join(",", result));
Everything works like a clock, quickly and as expected.
With the fact that everything is fine above, but once, when I once again read about all sorts of implementations of the Producer / Consumer pattern, I came up with the idea to use workers instead of a semaphore. Actually, why not.
The result is the following method.
public async Task<Tout[]> RunWorkersAsync<Tin, Tout>(IEnumerable<Tin> items, Func<Tin, Task<Tout>> func, int degree)
{
List<Task<Tout>> tasks = new();
using (var source = items.GetEnumerator())
{
Task[] jobs = new Task[degree];
for (int i = 0; i < degree; i++)
{
jobs[i] = ((Func<Task>)(async () =>
{
while (true)
{
Task<Tout> task;
lock (source)
{
if (source.MoveNext())
{
task = func(source.Current);
tasks.Add(task);
}
else
break;
}
await task;
}
}))();
}
await Task.WhenAll(jobs);
}
return tasks.Select(t => t.Result).ToArray();
}
Works just as beautifully as the first candidate. Then which is better?
And I decided to try on the overhead.
I'm not a master at writing benchmarks, but who stopped when? 🙂
class Program
{
static void Main(string[] args)
{
var result = BenchmarkRunner.Run<MyBenchmarks>();
Console.ReadKey();
}
}
[MemoryDiagnoser]
public class MyBenchmarks
{
private readonly List<int> numbers = Enumerable.Range(0, 2000).ToList();
private readonly int degree = Environment.ProcessorCount * 2;
[Benchmark]
public Task SemaphoreTest()
{
return RunSemaphoreAsync(numbers, RunJobAsync, degree);
}
[Benchmark]
public Task WorkersTest()
{
return RunWorkersAsync(numbers, RunJobAsync, degree);
}
public async Task<int> RunJobAsync(int n)
{
await Task.Yield();
return n + 1;
}
public async Task<Tout[]> RunSemaphoreAsync<Tin, Tout>(IEnumerable<Tin> items, Func<Tin, Task<Tout>> func, int degree)
{
using SemaphoreSlim semaphore = new(degree);
return await Task.WhenAll(items.Select(async item => {
await semaphore.WaitAsync();
try
{
return await func(item);
}
finally
{
semaphore.Release();
}
}));
}
public async Task<Tout[]> RunWorkersAsync<Tin, Tout>(IEnumerable<Tin> items, Func<Tin, Task<Tout>> func, int degree)
{
List<Task<Tout>> tasks = new();
using (var source = items.GetEnumerator())
{
Task[] jobs = new Task[degree];
for (int i = 0; i < degree; i++)
{
jobs[i] = ((Func<Task>)(async () =>
{
while (true)
{
Task<Tout> task;
lock (source)
{
if (source.MoveNext())
{
task = func(source.Current);
tasks.Add(task);
}
else
break;
}
await task;
}
}))();
}
await Task.WhenAll(jobs);
}
return tasks.Select(t => t.Result).ToArray();
}
}
And then I get an interesting result.
BenchmarkDotNet=v0.13.0, OS=Windows 10.0.19043.1081 (21H1/May2021Update)
Intel Core i7-4700HQ CPU 2.40GHz (Haswell), 1 CPU, 8 logical and 4 physical cores
.NET SDK=5.0.301
[Host] : .NET 5.0.7 (5.0.721.25508), X64 RyuJIT
DefaultJob : .NET 5.0.7 (5.0.721.25508), X64 RyuJIT
Method | Mean | Error | StdDev | Gen 0 | Gen 1 | Gen 2 | Allocated |
---|---|---|---|---|---|---|---|
SemaphoreTest | 1,780.2 us | 4.23 us | 3.95 us | 140.6250 | 41.0156 | – | 519 KB |
WorkersTest | 943.2 us | 18.37 us | 26.92 us | 74.2188 | 19.5313 | – | 262 KB |
The eyes refuse to believe it. Why did the method with a semaphore just merge like this, or maybe a curve test or a curve implementation? Please judge.
PS I didn't immediately run to SO, but first of all looked for complaints about the slow SemaphoreSlim
… and didn't find it, but I found this: https://github.com/dotnet/runtime/pull/55262 . In other words, the traffic light gets a little better in .NET 6.
Answer:
The code for RunWorkersAsync and RunSemaphoreAsync distributes work differently between threads.
In RunWorkersAsync, only the first operation of the while loop of each worker is processed in the initial thread, after that the enumeration occurs in several threads, since everything after the await call is processed on threads from the pool. There is no more tie to some common thread in the code.
In RunSemaphoreAsync, in the initial thread, for each element,
items.Select(async item => {
semaphore.WaitAsync(); // await и продолжение - на потоке из пула
Accordingly, in this embodiment, the main stream becomes a bottleneck. Handlers cannot rake work faster than this thread creates it.
This can be solved by forcing yield right at the beginning of the loop:
public async Task<Tout[]> RunSemaphoreAsync<Tin, Tout>(IEnumerable<Tin> items, Func<Tin, Task<Tout>> func, int degree)
{
using SemaphoreSlim semaphore = new(degree);
return await Task.WhenAll(items.Select(async item =>
{
await Task.Yield();
await semaphore.WaitAsync();
try
{
return await func(item);
}
finally
{
semaphore.Release();
}
}));
}
The results will be slightly better, but the bottleneck itself in the form of a single stream will still not disappear.