Bloqueo asíncrono basado en una clave

Resuelto James South asked hace 9 años • 5 respuestas

Estoy intentando resolver un problema que surgió con mi biblioteca ImageProcessor aquí donde obtengo errores intermitentes de acceso a archivos al agregar elementos al caché.

System.IO.IOException: el proceso no puede acceder al archivo 'D:\home\site\wwwroot\app_data\cache\0\6\5\f\2\7\065f27fc2c8e843443d210a1e84d1ea28bbab6c4.webp' porque está siendo utilizado por otro proceso .

Escribí una clase diseñada para realizar un bloqueo asincrónico basado en una clave generada por una URL con hash, pero parece que me he perdido algo en la implementación.

Mi clase de bloqueo

public sealed class AsyncDuplicateLock
{
    /// <summary>
    /// The collection of semaphore slims.
    /// </summary>
    private static readonly ConcurrentDictionary<object, SemaphoreSlim> SemaphoreSlims
                            = new ConcurrentDictionary<object, SemaphoreSlim>();

    /// <summary>
    /// Locks against the given key.
    /// </summary>
    /// <param name="key">
    /// The key that identifies the current object.
    /// </param>
    /// <returns>
    /// The disposable <see cref="Task"/>.
    /// </returns>
    public IDisposable Lock(object key)
    {
        DisposableScope releaser = new DisposableScope(
        key,
        s =>
        {
            SemaphoreSlim locker;
            if (SemaphoreSlims.TryRemove(s, out locker))
            {
                locker.Release();
                locker.Dispose();
            }
        });

        SemaphoreSlim semaphore = SemaphoreSlims.GetOrAdd(key, new SemaphoreSlim(1, 1));
        semaphore.Wait();
        return releaser;
    }

    /// <summary>
    /// Asynchronously locks against the given key.
    /// </summary>
    /// <param name="key">
    /// The key that identifies the current object.
    /// </param>
    /// <returns>
    /// The disposable <see cref="Task"/>.
    /// </returns>
    public Task<IDisposable> LockAsync(object key)
    {
        DisposableScope releaser = new DisposableScope(
        key,
        s =>
        {
            SemaphoreSlim locker;
            if (SemaphoreSlims.TryRemove(s, out locker))
            {
                locker.Release();
                locker.Dispose();
            }
        });

        Task<IDisposable> releaserTask = Task.FromResult(releaser as IDisposable);
        SemaphoreSlim semaphore = SemaphoreSlims.GetOrAdd(key, new SemaphoreSlim(1, 1));

        Task waitTask = semaphore.WaitAsync();

        return waitTask.IsCompleted
                   ? releaserTask
                   : waitTask.ContinueWith(
                       (_, r) => (IDisposable)r,
                       releaser,
                       CancellationToken.None,
                       TaskContinuationOptions.ExecuteSynchronously,
                       TaskScheduler.Default);
    }

    /// <summary>
    /// The disposable scope.
    /// </summary>
    private sealed class DisposableScope : IDisposable
    {
        /// <summary>
        /// The key
        /// </summary>
        private readonly object key;

        /// <summary>
        /// The close scope action.
        /// </summary>
        private readonly Action<object> closeScopeAction;

        /// <summary>
        /// Initializes a new instance of the <see cref="DisposableScope"/> class.
        /// </summary>
        /// <param name="key">
        /// The key.
        /// </param>
        /// <param name="closeScopeAction">
        /// The close scope action.
        /// </param>
        public DisposableScope(object key, Action<object> closeScopeAction)
        {
            this.key = key;
            this.closeScopeAction = closeScopeAction;
        }

        /// <summary>
        /// Disposes the scope.
        /// </summary>
        public void Dispose()
        {
            this.closeScopeAction(this.key);
        }
    }
}

Uso: dentro de un HttpModule

private readonly AsyncDuplicateLock locker = new AsyncDuplicateLock();

using (await this.locker.LockAsync(cachedPath))
{
    // Process and save a cached image.
}

¿Alguien puede detectar dónde me he equivocado? Me preocupa estar entendiendo mal algo fundamental.

La fuente completa de la biblioteca está almacenada en Github aquí.

James South avatar Jun 30 '15 19:06 James South
Aceptado

Como señaló el otro respondedor , el código original elimina el SemaphoreSlimde ConcurrentDictionaryantes de publicar el semáforo. Entonces, hay demasiada rotación de semáforos: se están eliminando del diccionario cuando aún podrían estar en uso (no adquiridos, pero ya recuperados del diccionario).

El problema con este tipo de "bloqueo de mapeo" es que es difícil saber cuándo el semáforo ya no es necesario. Una opción es no desechar nunca los semáforos; esa es la solución fácil, pero puede que no sea aceptable en su situación. Otra opción, si los semáforos están realmente relacionados con instancias de objetos y no con valores (como cadenas), es adjuntarlos mediante efemérides; sin embargo, creo que esta opción tampoco sería aceptable en su escenario.

Entonces, lo hacemos de la manera más difícil. :)

Hay algunos enfoques diferentes que funcionarían. Creo que tiene sentido abordarlo desde una perspectiva de conteo de referencias (contando cada semáforo en el diccionario). Además, queremos que la operación de decremento, conteo y eliminación sea atómica, por lo que solo uso una única lock(haciendo que el diccionario concurrente sea superfluo):

public sealed class AsyncDuplicateLock
{
  private sealed class RefCounted<T>
  {
    public RefCounted(T value)
    {
      RefCount = 1;
      Value = value;
    }

    public int RefCount { get; set; }
    public T Value { get; private set; }
  }

  private static readonly Dictionary<object, RefCounted<SemaphoreSlim>> SemaphoreSlims
                        = new Dictionary<object, RefCounted<SemaphoreSlim>>();

  private SemaphoreSlim GetOrCreate(object key)
  {
    RefCounted<SemaphoreSlim> item;
    lock (SemaphoreSlims)
    {
      if (SemaphoreSlims.TryGetValue(key, out item))
      {
        ++item.RefCount;
      }
      else
      {
        item = new RefCounted<SemaphoreSlim>(new SemaphoreSlim(1, 1));
        SemaphoreSlims[key] = item;
      }
    }
    return item.Value;
  }

  public IDisposable Lock(object key)
  {
    GetOrCreate(key).Wait();
    return new Releaser { Key = key };
  }

  public async Task<IDisposable> LockAsync(object key)
  {
    await GetOrCreate(key).WaitAsync().ConfigureAwait(false);
    return new Releaser { Key = key };
  }

  private sealed class Releaser : IDisposable
  {
    public object Key { get; set; }

    public void Dispose()
    {
      RefCounted<SemaphoreSlim> item;
      lock (SemaphoreSlims)
      {
        item = SemaphoreSlims[Key];
        --item.RefCount;
        if (item.RefCount == 0)
          SemaphoreSlims.Remove(Key);
      }
      item.Value.Release();
    }
  }
}
Stephen Cleary avatar Jul 02 '2015 21:07 Stephen Cleary

Los problemas en su implementación surgen de su deseo de eliminar del diccionario los casilleros no utilizados. Sería mucho más sencillo si pudieras dejar que cada uno SemaphoreSlimpermanezca en el diccionario para siempre (hasta que finalice el proceso). Suponiendo que esta no sea una opción viable, tienes dos obstáculos que superar:

  1. Cómo realizar un seguimiento de cuántos trabajadores utilizan cada semáforo, para saber cuándo es seguro eliminarlo.
  2. Cómo hacer lo anterior usando la ConcurrentDictionary<K,V>colección eficaz pero complicada.

La respuesta de Stephen Cleary muestra cómo resolver el primer problema usando una normal Dictionary<K,V>. Se almacena un contador de referencia junto con cada uno SemaphoreSlimy todo se sincroniza con la lockdeclaración en un único objeto de casillero. En esta respuesta mostraré cómo resolver el segundo problema.

El problema de la ConcurrentDictionary<K,V>colección es que protege de la corrupción sólo su estado interno, no los valores que contiene. Entonces, si usa una clase mutable como TValue, está abriendo la puerta a condiciones de carrera sutiles , especialmente si tiene la intención de almacenar en caché estos valores en un grupo y reutilizarlos. El truco que elimina las condiciones de carrera es convertir la TValueestructura en una estructura inmutable. De esta manera, esencialmente se convierte en parte del estado interno del diccionario y está protegido por él. En la AsyncDuplicateLockimplementación siguiente, TValuees un readonly struct, declarado también como un recordrendimiento¹ y conveniencia:

public class AsyncDuplicateLock
{
    private readonly ConcurrentDictionary<object, Entry> _semaphores = new();

    private readonly record struct Entry(SemaphoreSlim Semaphore, int RefCount);

    public readonly struct Releaser : IDisposable
    {
        private readonly AsyncDuplicateLock _parent;
        private readonly object _key;
        public Releaser(AsyncDuplicateLock parent, object key)
        {
            _parent = parent; _key = key;
        }
        public void Dispose() => _parent.Release(_key);
    }

    public async ValueTask<Releaser> LockAsync(object key)
    {
        Entry entry = _semaphores.AddOrUpdate(key,
            static _ => new Entry(new SemaphoreSlim(1, 1), 1),
            static (_, entry) => entry with { RefCount = entry.RefCount + 1 });

        await entry.Semaphore.WaitAsync().ConfigureAwait(false);
        return new Releaser(this, key);
    }

    private void Release(object key)
    {
        Entry entry;
        while (true)
        {
            bool exists = _semaphores.TryGetValue(key, out entry);
            if (!exists)
                throw new InvalidOperationException("Key not found.");
            if (entry.RefCount > 1)
            {
                Entry newEntry = entry with { RefCount = entry.RefCount - 1 };
                if (_semaphores.TryUpdate(key, newEntry, entry))
                    break;
            }
            else
            {
                if (_semaphores.TryRemove(KeyValuePair.Create(key, entry)))
                    break;
            }
        }
        entry.Semaphore.Release();
    }
}

Observe que aumentar y disminuir RefCountimplica girar en un whilebucle. Esto se debe a que el hilo actual podría perder la carrera optimista con otros hilos para actualizar el diccionario, en cuyo caso lo intentará nuevamente hasta que tenga éxito. El giro es obvio en el Releasemétodo, pero también ocurre internamente en el LockAsyncmétodo. El AddOrUpdatemétodo emplea internamente una lógica similar en torno a la invocación del updateValueFactorydelegado.

Rendimiento: la implementación anterior es aproximadamente un 80% más rápida que una Dictionary<K,V>implementación basada en más simple, en condiciones de mucha contención. Esto se debe a que ConcurrentDictionary<K,V>utiliza múltiples objetos de casillero internamente, por lo que un subproceso que desea bloquear la clave "A"no tiene que esperar hasta que otro subproceso complete la adquisición o liberación de la clave "B". Sin embargo, es considerablemente más asignable. Si tiene alguna razón para mantener relajado al recolector de basura, una Dictionary<K,V>implementación basada en - le será más útil. Si desea la máxima velocidad y la máxima eficiencia de memoria, puede consultar la sexta revisión de esta respuesta, para una implementación basada en múltiples Dictionary<K,V>s.

Excepciones: cuando la SemaphoreSlimclase se usa incorrectamente, arroja un archivo SemaphoreFullException. Esto sucede cuando el semáforo se libera más veces de las que se ha adquirido. La AsyncDuplicateLockimplementación de esta respuesta se comporta de manera diferente en caso de mal uso: arroja un archivo InvalidOperationException("Key not found."). Esto sucede porque cuando se libera una clave tantas veces como se adquirió, el semáforo asociado se elimina del diccionario. Si esta implementación alguna vez arroja un error SemaphoreFullException, sería una indicación de un error.

Nota: Personalmente , no soy partidario del (mal) uso de la usingdeclaración para otros fines que no sean liberar recursos no administrados.

¹ Compara el s en muchas operaciones ( , y entre otras), usando el . Las estructuras por defecto no se comparan de manera eficiente , a menos que implementen la interfaz. Las estructuras de registro implementan esta interfaz, de manera similar a las tuplas de valores, por lo que se pueden comparar para determinar su igualdad de manera eficiente. En realidad, usar una tupla de valor como ( ) podría ser un poco más eficiente, porque los miembros de las tuplas de valor son campos, mientras que los miembros de las estructuras de registro son propiedades. Sin embargo, las estructuras de registro son más convenientes.ConcurrentDictionary<K,V>TValueAddOrUpdateTryUpdateTryRemoveEqualityComparer<TValue>.DefaultIEquatable<T>TValue(SemaphoreSlim, int)

Theodor Zoulias avatar Dec 11 '2020 17:12 Theodor Zoulias

Escribí una biblioteca llamada AsyncKeyedLockpara solucionar este problema común. Actualmente, la biblioteca admite su uso con el tipo object(para que pueda mezclar diferentes tipos) o el uso de genéricos para obtener una solución más eficiente. Permite tiempos de espera, tokens de cancelación y también agrupaciones para reducir las asignaciones. De manera subyacente utiliza un ConcurrentDictionaryy también permite configurar la capacidad inicial y la simultaneidad para este diccionario.

He comparado esto con las otras soluciones proporcionadas aquí y es más eficiente, en términos de velocidad, uso de memoria (asignaciones) y escalabilidad (internamente utiliza el más escalable ConcurrentDictionary). Se utiliza en varios sistemas en producción y en varias bibliotecas populares.

El código fuente está disponible en GitHub y empaquetado en NuGet .

El enfoque aquí es básicamente usar ConcurrentDictionarypara almacenar un IDisposableobjeto que tiene un contador y un archivo SemaphoreSlim. Una vez que este contador llega a 0, se elimina del diccionario y se elimina o se devuelve al grupo (si se utiliza la agrupación). Monitorse utiliza para bloquear este objeto cuando el contador se incrementa o disminuye.

Ejemplo de uso:

var locker = new AsyncKeyedLocker<string>(o =>
{
   o.PoolSize = 20;
   o.PoolInitialFill = 1;
});

string key = "my key";

// asynchronous code
using (await locker.LockAsync(key, cancellationToken))
{
   ...
}

// synchronous code
using (locker.Lock(key))
{
   ...
}

Descargar desde NuGet .

Mark Cilia Vincenti avatar Nov 20 '2022 09:11 Mark Cilia Vincenti