Foreach paralelo con lambda asíncrona
Me gustaría manejar una colección en paralelo, pero tengo problemas para implementarla y, por lo tanto, espero ayuda.
El problema surge si quiero llamar a un método marcado como asíncrono en C#, dentro de la lambda del bucle paralelo. Por ejemplo:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}
var count = bag.Count;
El problema ocurre cuando el recuento es 0, porque todos los subprocesos creados son en realidad solo subprocesos en segundo plano y la Parallel.ForEach
llamada no espera a que se complete. Si elimino la palabra clave async, el método se ve así:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
// some pre stuff
var responseTask = await GetData(item);
responseTask.Wait();
var response = responseTask.Result;
bag.Add(response);
// some post stuff
}
var count = bag.Count;
Funciona, pero desactiva por completo la función de espera y tengo que realizar un manejo manual de excepciones. (Eliminado por brevedad).
¿Cómo puedo implementar un Parallel.ForEach
bucle que utilice la palabra clave await dentro de lambda? ¿Es posible?
El prototipo del método Parallel.ForEach toma un Action<T>
parámetro as, pero quiero que espere mi lambda asincrónica.
Si sólo quieres un paralelismo simple, puedes hacer esto:
var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;
Si necesita algo más complejo, consulte la publicación de Stephen ToubForEachAsync
.
Puede utilizar el ParallelForEachAsync
método de extensión del paquete AsyncEnumerator NuGet :
using Dasync.Collections;
var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;
Descargo de responsabilidad: soy el autor de la biblioteca AsyncEnumerator, que es de código abierto y tiene licencia del MIT, y publico este mensaje solo para ayudar a la comunidad.
Una de las nuevas API de .NET 6 es Parallel.ForEachAsync , una forma de programar trabajo asincrónico que le permite controlar el grado de paralelismo:
var urls = new []
{
"https://dotnet.microsoft.com",
"https://www.microsoft.com",
"https://stackoverflow.com"
};
var client = new HttpClient();
var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, options, async (url, token) =>
{
var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);
var response = await client.GetAsync(url);
if (response.IsSuccessStatusCode)
{
using var target = File.OpenWrite(targetPath);
await response.Content.CopyToAsync(target);
}
});
Otro ejemplo en el blog de Scott Hanselman .
La fuente , como referencia.
Con SemaphoreSlim
usted puede lograr el control del paralelismo.
var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
await throttler.WaitAsync();
try
{
var response = await GetData(item);
bag.Add(response);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
var count = bag.Count;
El método de extensión más simple posible compilado a partir de otras respuestas y el artículo al que hace referencia la respuesta aceptada:
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism)
{
var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
var tasks = source.Select(async item =>
{
await throttler.WaitAsync();
try
{
await asyncAction(item).ConfigureAwait(false);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
}
ACTUALIZACIÓN: aquí hay una modificación simple que también admite un token de cancelación como se solicita en los comentarios (no probado)
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, CancellationToken, Task> asyncAction, int maxDegreeOfParallelism, CancellationToken cancellationToken)
{
var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
var tasks = source.Select(async item =>
{
await throttler.WaitAsync(cancellationToken);
if (cancellationToken.IsCancellationRequested) return;
try
{
await asyncAction(item, cancellationToken).ConfigureAwait(false);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
}