Limitación de tareas asincrónicas

Resuelto Josh Wyant asked hace 10 años • 3 respuestas

Me gustaría ejecutar un montón de tareas asíncronas, con un límite en la cantidad de tareas que pueden estar pendientes de completarse en un momento dado.

Supongamos que tiene 1000 URL y solo desea tener 50 solicitudes abiertas a la vez; pero tan pronto como se completa una solicitud, abre una conexión a la siguiente URL de la lista. De esta manera, siempre habrá exactamente 50 conexiones abiertas a la vez, hasta que se agote la lista de URL.

También quiero utilizar una cantidad determinada de subprocesos si es posible.

Se me ocurrió un método de extensión ThrottleTasksAsyncque hace lo que quiero. ¿Existe ya una solución más sencilla? Supongo que este es un escenario común.

Uso:

class Program
{
    static void Main(string[] args)
    {
        Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait();

        Console.WriteLine("Press a key to exit...");
        Console.ReadKey(true);
    }
}

Aquí está el código:

static class IEnumerableExtensions
{
    public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun)
    {
        var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>());

        var semaphore = new SemaphoreSlim(maxConcurrentTasks);

        // Run the throttler on a separate thread.
        var t = Task.Run(() =>
        {
            foreach (var item in enumerable)
            {
                // Wait for the semaphore
                semaphore.Wait();
                blockingQueue.Add(item);
            }

            blockingQueue.CompleteAdding();
        });

        var taskList = new List<Task<Result_T>>();

        Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism },
        _ =>
        {
            Enumerable_T item;

            if (blockingQueue.TryTake(out item, 100))
            {
                taskList.Add(
                    // Run the task
                    taskToRun(item)
                    .ContinueWith(tsk =>
                        {
                            // For effect
                            Thread.Sleep(2000);

                            // Release the semaphore
                            semaphore.Release();

                            return tsk.Result;
                        }
                    )
                );
            }
        });

        // Await all the tasks.
        return await Task.WhenAll(taskList);
    }

    static IEnumerable<bool> IterateUntilTrue(Func<bool> condition)
    {
        while (!condition()) yield return true;
    }
}

El método utiliza BlockingCollectiony SemaphoreSlimpara hacerlo funcionar. El acelerador se ejecuta en un subproceso y todas las tareas asíncronas se ejecutan en el otro subproceso. Para lograr el paralelismo, agregué un parámetro maxDegreeOfParallelism que se pasa a un Parallel.ForEachbucle reutilizado como whilebucle.

La versión antigua era:

foreach (var master = ...)
{
    var details = ...;
    Parallel.ForEach(details, detail => {
        // Process each detail record here
    }, new ParallelOptions { MaxDegreeOfParallelism = 15 });
    // Perform the final batch updates here
}

Pero el grupo de subprocesos se agota rápidamente y no se puede hacer async/ await.

Bonificación: para solucionar el problema de BlockingCollectionque se genera una excepción Take()cuando CompleteAdding()se llama, estoy usando la TryTakesobrecarga con un tiempo de espera. Si no usara el tiempo de espera en TryTake, anularía el propósito de usar un BlockingCollectionya que TryTakeno se bloqueará. ¿Existe una mejor manera? Lo ideal sería que hubiera un TakeAsyncmétodo.

Josh Wyant avatar Mar 19 '14 05:03 Josh Wyant
Aceptado

Como se sugiere, utilice TPL Dataflow .

A TransformBlock<TInput, TOutput>puede ser lo que estás buscando.

Usted define a MaxDegreeOfParallelismpara limitar cuántas cadenas se pueden transformar (es decir, cuántas URL se pueden descargar) en paralelo. Luego publicas las URL en el bloque y, cuando terminas, le dices al bloque que terminaste de agregar elementos y buscas las respuestas.

var downloader = new TransformBlock<string, HttpResponse>(
        url => Download(url),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 }
    );

var buffer = new BufferBlock<HttpResponse>();
downloader.LinkTo(buffer);

foreach(var url in urls)
    downloader.Post(url);
    //or await downloader.SendAsync(url);

downloader.Complete();
await downloader.Completion;

IList<HttpResponse> responses;
if (buffer.TryReceiveAll(out responses))
{
    //process responses
}

Nota: El TransformBlockbuffer tanto su entrada como su salida. Entonces, ¿por qué necesitamos vincularlo a un BufferBlock?

Porque TransformBlockno se completará hasta que se hayan consumido todos los elementos ( HttpResponse) y await downloader.Completionse bloqueará. En su lugar, dejamos que downloaderreenvíe toda su salida a un bloque de búfer dedicado; luego esperamos a que se downloadercomplete e inspeccionamos el bloque de búfer.

dcastro avatar Mar 18 '2014 22:03 dcastro

Según lo solicitado, aquí está el código con el que terminé.

El trabajo se configura en una configuración maestro-detalle y cada maestro se procesa como un lote. Cada unidad de trabajo se pone en cola de esta manera:

var success = true;

// Start processing all the master records.
Master master;
while (null != (master = await StoredProcedures.ClaimRecordsAsync(...)))
{
    await masterBuffer.SendAsync(master);
}

// Finished sending master records
masterBuffer.Complete();

// Now, wait for all the batches to complete.
await batchAction.Completion;

return success;

Los maestros se almacenan en búfer uno a la vez para ahorrar trabajo para otros procesos externos. Los detalles de cada maestro se envían para trabajar a través del masterTransform TransformManyBlock. También se crea A BatchedJoinBlockpara recopilar los detalles en un lote.

El trabajo real se realiza de forma detailTransform TransformBlockasincrónica, 150 a la vez. BoundedCapacityestá configurado en 300 para garantizar que no se almacenen demasiados Masters al comienzo de la cadena, y al mismo tiempo deja espacio para que se pongan en cola suficientes registros detallados para permitir que se procesen 150 registros a la vez. El bloque envía un objecta sus objetivos, porque se filtra a través de los enlaces dependiendo de si es a Detailo Exception.

Recopila batchAction ActionBlockel resultado de todos los lotes y realiza actualizaciones masivas de la base de datos, registro de errores, etc. para cada lote.

Habrá varios BatchedJoinBlock, uno para cada maestro. Dado que cada uno ISourceBlockse genera de forma secuencial y cada lote solo acepta la cantidad de registros de detalle asociados con un maestro, los lotes se procesarán en orden. Cada bloque solo genera un grupo y se desvincula al finalizar. Sólo el último bloque por lotes propaga su finalización al final ActionBlock.

La red de flujo de datos:

// The dataflow network
BufferBlock<Master> masterBuffer = null;
TransformManyBlock<Master, Detail> masterTransform = null;
TransformBlock<Detail, object> detailTransform = null;
ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null;

// Buffer master records to enable efficient throttling.
masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 });

// Sequentially transform master records into a stream of detail records.
masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord =>
{
    var records = await StoredProcedures.GetObjectsAsync(masterRecord);

    // Filter the master records based on some criteria here
    var filteredRecords = records;

    // Only propagate completion to the last batch
    var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0;

    // Create a batch join block to encapsulate the results of the master record.
    var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 });

    // Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block.
    var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail);
    var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception);
    var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion });

    // Unlink batchjoinblock upon completion.
    // (the returned task does not need to be awaited, despite the warning.)
    batchjoinblock.Completion.ContinueWith(task =>
    {
        detailLink1.Dispose();
        detailLink2.Dispose();
        batchLink.Dispose();
    });

    return filteredRecords;
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

// Process each detail record asynchronously, 150 at a time.
detailTransform = new TransformBlock<Detail, object>(async detail => {
    try
    {
        // Perform the action for each detail here asynchronously
        await DoSomethingAsync();

        return detail;
    }
    catch (Exception e)
    {
        success = false;
        return e;
    }

}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 });

// Perform the proper action for each batch
batchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch =>
{
    var details = batch.Item1.Cast<Detail>();
    var errors = batch.Item2.Cast<Exception>();

    // Do something with the batch here
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true });
masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });
Josh Wyant avatar Mar 28 '2014 18:03 Josh Wyant