Limitación de tareas asincrónicas
Me gustaría ejecutar un montón de tareas asíncronas, con un límite en la cantidad de tareas que pueden estar pendientes de completarse en un momento dado.
Supongamos que tiene 1000 URL y solo desea tener 50 solicitudes abiertas a la vez; pero tan pronto como se completa una solicitud, abre una conexión a la siguiente URL de la lista. De esta manera, siempre habrá exactamente 50 conexiones abiertas a la vez, hasta que se agote la lista de URL.
También quiero utilizar una cantidad determinada de subprocesos si es posible.
Se me ocurrió un método de extensión ThrottleTasksAsync
que hace lo que quiero. ¿Existe ya una solución más sencilla? Supongo que este es un escenario común.
Uso:
class Program
{
static void Main(string[] args)
{
Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();
Console.WriteLine("Press a key to exit...");
Console.ReadKey(true);
}
}
Aquí está el código:
static class IEnumerableExtensions
{
public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
{
var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());
var semaphore = new SemaphoreSlim(maxConcurrentTasks);
// Run the throttler on a separate thread.
var t = Task.Run(() =>
{
foreach (var item in enumerable)
{
// Wait for the semaphore
semaphore.Wait();
blockingQueue.Add(item);
}
blockingQueue.CompleteAdding();
});
var taskList = new List<Task<Result_T>>();
Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
_ =>
{
Enumerable_T item;
if (blockingQueue.TryTake(out item, 100))
{
taskList.Add(
// Run the task
taskToRun(item)
.ContinueWith(tsk =>
{
// For effect
Thread.Sleep(2000);
// Release the semaphore
semaphore.Release();
return tsk.Result;
}
)
);
}
});
// Await all the tasks.
return await Task.WhenAll(taskList);
}
static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
{
while (!condition()) yield return true;
}
}
El método utiliza BlockingCollection
y SemaphoreSlim
para hacerlo funcionar. El acelerador se ejecuta en un subproceso y todas las tareas asíncronas se ejecutan en el otro subproceso. Para lograr el paralelismo, agregué un parámetro maxDegreeOfParallelism que se pasa a un Parallel.ForEach
bucle reutilizado como while
bucle.
La versión antigua era:
foreach (var master = ...)
{
var details = ...;
Parallel.ForEach(details, detail => {
// Process each detail record here
}, new ParallelOptions { MaxDegreeOfParallelism = 15 });
// Perform the final batch updates here
}
Pero el grupo de subprocesos se agota rápidamente y no se puede hacer async
/ await
.
Bonificación:
para solucionar el problema de BlockingCollection
que se genera una excepción Take()
cuando CompleteAdding()
se llama, estoy usando la TryTake
sobrecarga con un tiempo de espera. Si no usara el tiempo de espera en TryTake
, anularía el propósito de usar un BlockingCollection
ya que TryTake
no se bloqueará. ¿Existe una mejor manera? Lo ideal sería que hubiera un TakeAsync
método.
Como se sugiere, utilice TPL Dataflow .
A TransformBlock<TInput, TOutput>
puede ser lo que estás buscando.
Usted define a MaxDegreeOfParallelism
para limitar cuántas cadenas se pueden transformar (es decir, cuántas URL se pueden descargar) en paralelo. Luego publicas las URL en el bloque y, cuando terminas, le dices al bloque que terminaste de agregar elementos y buscas las respuestas.
var downloader = new TransformBlock<string, HttpResponse>(
url => Download(url),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 }
);
var buffer = new BufferBlock<HttpResponse>();
downloader.LinkTo(buffer);
foreach(var url in urls)
downloader.Post(url);
//or await downloader.SendAsync(url);
downloader.Complete();
await downloader.Completion;
IList<HttpResponse> responses;
if (buffer.TryReceiveAll(out responses))
{
//process responses
}
Nota: El TransformBlock
buffer tanto su entrada como su salida. Entonces, ¿por qué necesitamos vincularlo a un BufferBlock
?
Porque TransformBlock
no se completará hasta que se hayan consumido todos los elementos ( HttpResponse
) y await downloader.Completion
se bloqueará. En su lugar, dejamos que downloader
reenvíe toda su salida a un bloque de búfer dedicado; luego esperamos a que se downloader
complete e inspeccionamos el bloque de búfer.
Según lo solicitado, aquí está el código con el que terminé.
El trabajo se configura en una configuración maestro-detalle y cada maestro se procesa como un lote. Cada unidad de trabajo se pone en cola de esta manera:
var success = true;
// Start processing all the master records.
Master master;
while (null != (master = await StoredProcedures.ClaimRecordsAsync(...)))
{
await masterBuffer.SendAsync(master);
}
// Finished sending master records
masterBuffer.Complete();
// Now, wait for all the batches to complete.
await batchAction.Completion;
return success;
Los maestros se almacenan en búfer uno a la vez para ahorrar trabajo para otros procesos externos. Los detalles de cada maestro se envían para trabajar a través del masterTransform
TransformManyBlock
. También se crea A BatchedJoinBlock
para recopilar los detalles en un lote.
El trabajo real se realiza de forma detailTransform
TransformBlock
asincrónica, 150 a la vez. BoundedCapacity
está configurado en 300 para garantizar que no se almacenen demasiados Masters al comienzo de la cadena, y al mismo tiempo deja espacio para que se pongan en cola suficientes registros detallados para permitir que se procesen 150 registros a la vez. El bloque envía un object
a sus objetivos, porque se filtra a través de los enlaces dependiendo de si es a Detail
o Exception
.
Recopila batchAction
ActionBlock
el resultado de todos los lotes y realiza actualizaciones masivas de la base de datos, registro de errores, etc. para cada lote.
Habrá varios BatchedJoinBlock
, uno para cada maestro. Dado que cada uno ISourceBlock
se genera de forma secuencial y cada lote solo acepta la cantidad de registros de detalle asociados con un maestro, los lotes se procesarán en orden. Cada bloque solo genera un grupo y se desvincula al finalizar. Sólo el último bloque por lotes propaga su finalización al final ActionBlock
.
La red de flujo de datos:
// The dataflow network
BufferBlock<Master> masterBuffer = null;
TransformManyBlock<Master, Detail> masterTransform = null;
TransformBlock<Detail, object> detailTransform = null;
ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;
// Buffer master records to enable efficient throttling.
masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });
// Sequentially transform master records into a stream of detail records.
masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>
{
var records = await StoredProcedures.GetObjectsAsync(masterRecord);
// Filter the master records based on some criteria here
var filteredRecords = records;
// Only propagate completion to the last batch
var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0;
// Create a batch join block to encapsulate the results of the master record.
var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });
// Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block.
var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail);
var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception);
var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion });
// Unlink batchjoinblock upon completion.
// (the returned task does not need to be awaited, despite the warning.)
batchjoinblock.Completion.ContinueWith(task =>
{
detailLink1.Dispose();
detailLink2.Dispose();
batchLink.Dispose();
});
return filteredRecords;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
// Process each detail record asynchronously, 150 at a time.
detailTransform = new TransformBlock<Detail, object>(async detail => {
try
{
// Perform the action for each detail here asynchronously
await DoSomethingAsync();
return detail;
}
catch (Exception e)
{
success = false;
return e;
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });
// Perform the proper action for each batch
batchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>
{
var details = batch.Item1.Cast<Detail>();
var errors = batch.Item2.Cast<Exception>();
// Do something with the batch here
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });
masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });