¿Crear una cola de bloqueo <T> en .NET?

Resuelto Eric Schoonover asked hace 15 años • 10 respuestas

Tengo un escenario en el que tengo varios subprocesos que se agregan a una cola y varios subprocesos leen desde la misma cola. Si la cola alcanza un tamaño específico, todos los hilos que llenan la cola se bloquearán al agregarlos hasta que se elimine un elemento de la cola.

La siguiente solución es la que estoy usando ahora y mi pregunta es: ¿Cómo se puede mejorar? ¿Existe algún objeto que ya permita este comportamiento en el BCL que debería usar?

internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
    //todo: might be worth changing this into a proper QUEUE

    private AutoResetEvent _FullEvent = new AutoResetEvent(false);

    internal T this[int i]
    {
        get { return (T) List[i]; }
    }

    private int _MaxSize;
    internal int MaxSize
    {
        get { return _MaxSize; }
        set
        {
            _MaxSize = value;
            checkSize();
        }
    }

    internal BlockingCollection(int maxSize)
    {
        MaxSize = maxSize;
    }

    internal void Add(T item)
    {
        Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));

        _FullEvent.WaitOne();

        List.Add(item);

        Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));

        checkSize();
    }

    internal void Remove(T item)
    {
        lock (List)
        {
            List.Remove(item);
        }

        Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
    }

    protected override void OnRemoveComplete(int index, object value)
    {
        checkSize();
        base.OnRemoveComplete(index, value);
    }

    internal new IEnumerator GetEnumerator()
    {
        return List.GetEnumerator();
    }

    private void checkSize()
    {
        if (Count < MaxSize)
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Set();
        }
        else
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Reset();
        }
    }
}
Eric Schoonover avatar Feb 10 '09 05:02 Eric Schoonover
Aceptado

Eso parece muy inseguro (muy poca sincronización); ¿Qué tal algo como:

class SizeQueue<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly int maxSize;
    public SizeQueue(int maxSize) { this.maxSize = maxSize; }

    public void Enqueue(T item)
    {
        lock (queue)
        {
            while (queue.Count >= maxSize)
            {
                Monitor.Wait(queue);
            }
            queue.Enqueue(item);
            if (queue.Count == 1)
            {
                // wake up any blocked dequeue
                Monitor.PulseAll(queue);
            }
        }
    }
    public T Dequeue()
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                Monitor.Wait(queue);
            }
            T item = queue.Dequeue();
            if (queue.Count == maxSize - 1)
            {
                // wake up any blocked enqueue
                Monitor.PulseAll(queue);
            }
            return item;
        }
    }
}

(editar)

En realidad, querrás encontrar una forma de cerrar la cola para que los lectores comiencen a salir limpiamente, tal vez algo así como un indicador bool; si está configurado, una cola vacía simplemente regresa (en lugar de bloquearse):

bool closing;
public void Close()
{
    lock(queue)
    {
        closing = true;
        Monitor.PulseAll(queue);
    }
}
public bool TryDequeue(out T value)
{
    lock (queue)
    {
        while (queue.Count == 0)
        {
            if (closing)
            {
                value = default(T);
                return false;
            }
            Monitor.Wait(queue);
        }
        value = queue.Dequeue();
        if (queue.Count == maxSize - 1)
        {
            // wake up any blocked enqueue
            Monitor.PulseAll(queue);
        }
        return true;
    }
}
Marc Gravell avatar Feb 09 '2009 22:02 Marc Gravell

Use .net 4 BlockingCollection, para poner en cola use Add(), para quitar de la cola use Take(). Utiliza internamente ConcurrentQueue sin bloqueo. Más información aquí Técnica de cola rápida y mejor para productores/consumidores BlockingCollection vs cola concurrente

xhafan avatar Dec 09 '2011 15:12 xhafan

Puede utilizar BlockingCollection y ConcurrentQueue en System.Collections.Concurrent Namespace

 public class ProducerConsumerQueue<T> : BlockingCollection<T>
{
    /// <summary>
    /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
    /// </summary>
    public ProducerConsumerQueue()  
        : base(new ConcurrentQueue<T>())
    {
    }

  /// <summary>
  /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
  /// </summary>
  /// <param name="maxSize"></param>
    public ProducerConsumerQueue(int maxSize)
        : base(new ConcurrentQueue<T>(), maxSize)
    {
    }



}
Andreas avatar Feb 06 '2013 10:02 Andreas

"¿Cómo se puede mejorar esto?"

Bueno, debes observar cada método de tu clase y considerar qué pasaría si otro hilo llamara simultáneamente a ese método o a cualquier otro método. Por ejemplo, coloca un candado en el método Eliminar, pero no en el método Agregar. ¿Qué sucede si un hilo se agrega al mismo tiempo que otro hilo se elimina? Cosas malas.

Considere también que un método puede devolver un segundo objeto que proporciona acceso a los datos internos del primer objeto (por ejemplo, GetEnumerator). Imagine que un hilo pasa por ese enumerador y otro hilo modifica la lista al mismo tiempo. No es bueno.

Una buena regla general es hacer que esto sea más sencillo y correcto reduciendo el número de métodos en la clase al mínimo absoluto.

En particular, no herede otra clase contenedora, porque expondrá todos los métodos de esa clase, proporcionando una manera para que la persona que llama corrompa los datos internos o vea cambios parcialmente completos en los datos (igual de malo, porque los datos parece corrupto en ese momento). Oculta todos los detalles y sé completamente despiadado al permitir el acceso a ellos.

Le recomiendo encarecidamente que utilice soluciones disponibles en el mercado: obtenga un libro sobre subprocesos o utilice una biblioteca de terceros. De lo contrario, dado lo que estás intentando, estarás depurando tu código durante mucho tiempo.

Además, ¿no tendría más sentido que Eliminar devuelva un elemento (por ejemplo, el que se agregó primero, ya que es una cola), en lugar de que la persona que llama elija un elemento específico? Y cuando la cola está vacía, quizás Eliminar también debería bloquearse.

Actualización: ¡la respuesta de Marc realmente implementa todas estas sugerencias! :) Pero dejaré esto aquí ya que puede ser útil para comprender por qué su versión es tan mejorada.

Daniel Earwicker avatar Feb 09 '2009 22:02 Daniel Earwicker

Simplemente hice esto usando las Extensiones Reactivas y recordé esta pregunta:

public class BlockingQueue<T>
{
    private readonly Subject<T> _queue;
    private readonly IEnumerator<T> _enumerator;
    private readonly object _sync = new object();

    public BlockingQueue()
    {
        _queue = new Subject<T>();
        _enumerator = _queue.GetEnumerator();
    }

    public void Enqueue(T item)
    {
        lock (_sync)
        {
            _queue.OnNext(item);
        }
    }

    public T Dequeue()
    {
        _enumerator.MoveNext();
        return _enumerator.Current;
    }
}

No necesariamente del todo seguro, pero sí muy sencillo.

Mark Rendle avatar May 07 '2010 14:05 Mark Rendle