¿Crear una cola de bloqueo <T> en .NET?
Tengo un escenario en el que tengo varios subprocesos que se agregan a una cola y varios subprocesos leen desde la misma cola. Si la cola alcanza un tamaño específico, todos los hilos que llenan la cola se bloquearán al agregarlos hasta que se elimine un elemento de la cola.
La siguiente solución es la que estoy usando ahora y mi pregunta es: ¿Cómo se puede mejorar? ¿Existe algún objeto que ya permita este comportamiento en el BCL que debería usar?
internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
//todo: might be worth changing this into a proper QUEUE
private AutoResetEvent _FullEvent = new AutoResetEvent(false);
internal T this[int i]
{
get { return (T) List[i]; }
}
private int _MaxSize;
internal int MaxSize
{
get { return _MaxSize; }
set
{
_MaxSize = value;
checkSize();
}
}
internal BlockingCollection(int maxSize)
{
MaxSize = maxSize;
}
internal void Add(T item)
{
Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));
_FullEvent.WaitOne();
List.Add(item);
Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));
checkSize();
}
internal void Remove(T item)
{
lock (List)
{
List.Remove(item);
}
Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
}
protected override void OnRemoveComplete(int index, object value)
{
checkSize();
base.OnRemoveComplete(index, value);
}
internal new IEnumerator GetEnumerator()
{
return List.GetEnumerator();
}
private void checkSize()
{
if (Count < MaxSize)
{
Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
_FullEvent.Set();
}
else
{
Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
_FullEvent.Reset();
}
}
}
Eso parece muy inseguro (muy poca sincronización); ¿Qué tal algo como:
class SizeQueue<T>
{
private readonly Queue<T> queue = new Queue<T>();
private readonly int maxSize;
public SizeQueue(int maxSize) { this.maxSize = maxSize; }
public void Enqueue(T item)
{
lock (queue)
{
while (queue.Count >= maxSize)
{
Monitor.Wait(queue);
}
queue.Enqueue(item);
if (queue.Count == 1)
{
// wake up any blocked dequeue
Monitor.PulseAll(queue);
}
}
}
public T Dequeue()
{
lock (queue)
{
while (queue.Count == 0)
{
Monitor.Wait(queue);
}
T item = queue.Dequeue();
if (queue.Count == maxSize - 1)
{
// wake up any blocked enqueue
Monitor.PulseAll(queue);
}
return item;
}
}
}
(editar)
En realidad, querrás encontrar una forma de cerrar la cola para que los lectores comiencen a salir limpiamente, tal vez algo así como un indicador bool; si está configurado, una cola vacía simplemente regresa (en lugar de bloquearse):
bool closing;
public void Close()
{
lock(queue)
{
closing = true;
Monitor.PulseAll(queue);
}
}
public bool TryDequeue(out T value)
{
lock (queue)
{
while (queue.Count == 0)
{
if (closing)
{
value = default(T);
return false;
}
Monitor.Wait(queue);
}
value = queue.Dequeue();
if (queue.Count == maxSize - 1)
{
// wake up any blocked enqueue
Monitor.PulseAll(queue);
}
return true;
}
}
Use .net 4 BlockingCollection, para poner en cola use Add(), para quitar de la cola use Take(). Utiliza internamente ConcurrentQueue sin bloqueo. Más información aquí Técnica de cola rápida y mejor para productores/consumidores BlockingCollection vs cola concurrente
Puede utilizar BlockingCollection y ConcurrentQueue en System.Collections.Concurrent Namespace
public class ProducerConsumerQueue<T> : BlockingCollection<T>
{
/// <summary>
/// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
/// </summary>
public ProducerConsumerQueue()
: base(new ConcurrentQueue<T>())
{
}
/// <summary>
/// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
/// </summary>
/// <param name="maxSize"></param>
public ProducerConsumerQueue(int maxSize)
: base(new ConcurrentQueue<T>(), maxSize)
{
}
}
"¿Cómo se puede mejorar esto?"
Bueno, debes observar cada método de tu clase y considerar qué pasaría si otro hilo llamara simultáneamente a ese método o a cualquier otro método. Por ejemplo, coloca un candado en el método Eliminar, pero no en el método Agregar. ¿Qué sucede si un hilo se agrega al mismo tiempo que otro hilo se elimina? Cosas malas.
Considere también que un método puede devolver un segundo objeto que proporciona acceso a los datos internos del primer objeto (por ejemplo, GetEnumerator). Imagine que un hilo pasa por ese enumerador y otro hilo modifica la lista al mismo tiempo. No es bueno.
Una buena regla general es hacer que esto sea más sencillo y correcto reduciendo el número de métodos en la clase al mínimo absoluto.
En particular, no herede otra clase contenedora, porque expondrá todos los métodos de esa clase, proporcionando una manera para que la persona que llama corrompa los datos internos o vea cambios parcialmente completos en los datos (igual de malo, porque los datos parece corrupto en ese momento). Oculta todos los detalles y sé completamente despiadado al permitir el acceso a ellos.
Le recomiendo encarecidamente que utilice soluciones disponibles en el mercado: obtenga un libro sobre subprocesos o utilice una biblioteca de terceros. De lo contrario, dado lo que estás intentando, estarás depurando tu código durante mucho tiempo.
Además, ¿no tendría más sentido que Eliminar devuelva un elemento (por ejemplo, el que se agregó primero, ya que es una cola), en lugar de que la persona que llama elija un elemento específico? Y cuando la cola está vacía, quizás Eliminar también debería bloquearse.
Actualización: ¡la respuesta de Marc realmente implementa todas estas sugerencias! :) Pero dejaré esto aquí ya que puede ser útil para comprender por qué su versión es tan mejorada.
Simplemente hice esto usando las Extensiones Reactivas y recordé esta pregunta:
public class BlockingQueue<T>
{
private readonly Subject<T> _queue;
private readonly IEnumerator<T> _enumerator;
private readonly object _sync = new object();
public BlockingQueue()
{
_queue = new Subject<T>();
_enumerator = _queue.GetEnumerator();
}
public void Enqueue(T item)
{
lock (_sync)
{
_queue.OnNext(item);
}
}
public T Dequeue()
{
_enumerator.MoveNext();
return _enumerator.Current;
}
}
No necesariamente del todo seguro, pero sí muy sencillo.