Foreach paralelo con lambda asíncrona

Resuelto clausndk asked hace 11 años • 10 respuestas

Me gustaría manejar una colección en paralelo, pero tengo problemas para implementarla y, por lo tanto, espero ayuda.

El problema surge si quiero llamar a un método marcado como asíncrono en C#, dentro de la lambda del bucle paralelo. Por ejemplo:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

El problema ocurre cuando el recuento es 0, porque todos los subprocesos creados son en realidad solo subprocesos en segundo plano y la Parallel.ForEachllamada no espera a que se complete. Si elimino la palabra clave async, el método se ve así:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
  // some pre stuff
  var responseTask = await GetData(item);
  responseTask.Wait();
  var response = responseTask.Result;
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

Funciona, pero desactiva por completo la función de espera y tengo que realizar un manejo manual de excepciones. (Eliminado por brevedad).

¿Cómo puedo implementar un Parallel.ForEachbucle que utilice la palabra clave await dentro de lambda? ¿Es posible?

El prototipo del método Parallel.ForEach toma un Action<T>parámetro as, pero quiero que espere mi lambda asincrónica.

clausndk avatar Feb 28 '13 20:02 clausndk
Aceptado

Si sólo quieres un paralelismo simple, puedes hacer esto:

var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;

Si necesita algo más complejo, consulte la publicación de Stephen ToubForEachAsync .

Stephen Cleary avatar Feb 28 '2013 13:02 Stephen Cleary

Puede utilizar el ParallelForEachAsyncmétodo de extensión del paquete AsyncEnumerator NuGet :

using Dasync.Collections;

var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;

Descargo de responsabilidad: soy el autor de la biblioteca AsyncEnumerator, que es de código abierto y tiene licencia del MIT, y publico este mensaje solo para ayudar a la comunidad.

Serge Semenov avatar Aug 26 '2016 21:08 Serge Semenov

Una de las nuevas API de .NET 6 es Parallel.ForEachAsync , una forma de programar trabajo asincrónico que le permite controlar el grado de paralelismo:

var urls = new [] 
{
    "https://dotnet.microsoft.com",
    "https://www.microsoft.com",
    "https://stackoverflow.com"
};

var client = new HttpClient();

var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, options, async (url, token) =>
{
    var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);

    var response = await client.GetAsync(url);

    if (response.IsSuccessStatusCode)
    {
        using var target = File.OpenWrite(targetPath);

        await response.Content.CopyToAsync(target);
    }
});

Otro ejemplo en el blog de Scott Hanselman .

La fuente , como referencia.

Majid Shahabfar avatar Aug 24 '2021 04:08 Majid Shahabfar

Con SemaphoreSlimusted puede lograr el control del paralelismo.

var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
  await throttler.WaitAsync();
  try
  {
     var response = await GetData(item);
     bag.Add(response);
  }
  finally
  {
     throttler.Release();
  }
});
await Task.WhenAll(tasks);
var count = bag.Count;
Ganso Doido avatar Jul 02 '2019 21:07 Ganso Doido

El método de extensión más simple posible compilado a partir de otras respuestas y el artículo al que hace referencia la respuesta aceptada:

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism)
{
    var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
    var tasks = source.Select(async item =>
    {
        await throttler.WaitAsync();
        try
        {
            await asyncAction(item).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    });
    await Task.WhenAll(tasks);
}

ACTUALIZACIÓN: aquí hay una modificación simple que también admite un token de cancelación como se solicita en los comentarios (no probado)

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, CancellationToken, Task> asyncAction, int maxDegreeOfParallelism, CancellationToken cancellationToken)
{
    var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
    var tasks = source.Select(async item =>
    {
        await throttler.WaitAsync(cancellationToken);
        if (cancellationToken.IsCancellationRequested) return;

        try
        {
            await asyncAction(item, cancellationToken).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    });
    await Task.WhenAll(tasks);
}
Alex from Jitbit avatar Mar 15 '2021 09:03 Alex from Jitbit