Implementación de un contador atómico de 64 bits con átomos de 32 bits

Resuelto ridiculous_fish asked hace 5 años • 1 respuestas

Me gustaría improvisar un contador atómico uint64 a partir de uint32 atómicos. El contador tiene un único escritor y múltiples lectores. El escritor es un manejador de señales por lo que no debe bloquear.

Mi idea es utilizar un recuento de generaciones con el bit bajo como bloqueo de lectura. El lector vuelve a intentarlo hasta que el recuento de generación sea estable durante la lectura y el bit bajo se desactive.

¿Es correcto el siguiente código en el diseño y uso del ordenamiento de la memoria? ¿Existe una mejor manera?

using namespace std;
class counter {
    atomic<uint32_t> lo_{};
    atomic<uint32_t> hi_{};
    atomic<uint32_t> gen_{};

    uint64_t read() const {
        auto acquire = memory_order_acquire;
        uint32_t lo, hi, gen1, gen2;
        do {
            gen1 = gen_.load(acquire);
            lo = lo_.load(acquire);
            hi = hi_.load(acquire);
            gen2 = gen_.load(acquire);
        } while (gen1 != gen2 || (gen1 & 1));
        return (uint64_t(hi) << 32) | lo;
    }

    void increment() {
        auto release = memory_order_release;
        gen_.fetch_add(1, release);
        uint32_t newlo = 1 + lo_.fetch_add(1, release);
        if (newlo == 0) {
            hi_.fetch_add(1, release);
        }
        gen_.fetch_add(1, release);
    }
};

editar : ¡vaya, arreglado!auto acquire = memory_order_release;

ridiculous_fish avatar Feb 10 '19 04:02 ridiculous_fish
Aceptado

Este es un patrón conocido, llamado SeqLock. https://en.wikipedia.org/wiki/Seqlock . (Con la simplificación de que solo hay un escritor, por lo que no se necesita soporte adicional para excluir escritores simultáneos). No está libre de bloqueos; un escritor durmiendo en el momento equivocado dejará a los lectores dando vueltas hasta que el escritor termine. Pero en el caso común en el que eso no sucede, tiene un rendimiento excelente sin competencia entre lectores que son verdaderamente de sólo lectura.

No necesita ni desea que el incremento de la carga útil utilice operaciones RMW atómicas . (A menos que esté en un sistema que pueda realizar una carga o adición atómica de 64 bits de forma económica, hágalo en lugar de un SeqLock).
Puede simplemente cargar ambas mitades con cargas atómicas de 32 bits, incrementarlas y almacenar atómicamente el resultado. (Con un orden económico relaxedo releasede memoria para la carga útil y usando una releasetienda para la segunda actualización del contador de secuencia, lo que usted llama el contador de "generación").

De manera similar, el contador de secuencia tampoco necesita ser un RMW atómico. (A menos que lo estés usando como spinlock con varios escritores)

El escritor único solo necesita cargas puras y tiendas puras con solo releasepedidos, que son (mucho) más baratas que el RMW atómico, o tiendas con seq_cstpedidos :

  • cargar el contador y el valor en cualquier orden
  • almacenar un nuevo contador (antiguo+1)
  • almacene el nuevo valor (o simplemente actualice la mitad inferior si desea realizar una bifurcación sin acarreo)
  • almacenar el contador final.

El orden de las tiendas en esos 3 puntos es lo único que importa. Una valla de escritura después de la primera tienda podría ser buena, porque realmente no queremos que el costo de hacer ambas tiendas sea de ambas mitades del valor release, en CPU donde eso es más caro que relaxed.


Desafortunadamente, para satisfacer las reglas de C++, tiene valueque ser atomic<T>, lo que hace que sea inconveniente lograr que el compilador genere el código más eficiente posible para cargar ambas mitades. por ejemplo, ARM ldrdo ldp/ stpload-pair no están garantizados como atómicos hasta ARMv8.4a , pero eso no importa. (Y los compiladores a menudo no optimizan dos cargas atómicas separadas de 32 bits en una carga más amplia).

Los valores que otros hilos leen mientras el contador de secuencia es impar son irrelevantes, pero nos gustaría evitar un comportamiento indefinido. Tal vez podríamos usar una unión de a volatile uint64_ty anatomic<uint64_t>


Escribí esta SeqLock<class T>plantilla de C++ para otra pregunta para la que no terminé de escribir una respuesta (averiguar qué versiones de ARM tienen carga y almacenamiento atómicos de 64 bits).

Esto intenta verificar si el objetivo ya admite operaciones atómicas sin bloqueo para atomic<T>evitar que lo use cuando no tenga sentido. (Desactívelo para realizar pruebas con el fin de definir IGNORE_SIZECHECK.) TODO: vuelva a hacerlo de forma transparente, tal vez con una especialización de plantilla, en lugar de usar un archivo static_assert.

Proporcioné una inc()función Tque admite un ++operador. TODO sería un apply()que acepta una lambda para hacer algo Ty almacenar el resultado entre actualizaciones del contador de secuencia.

// **UNTESTED**

#include <atomic>

#ifdef UNIPROCESSOR
// all readers and writers run on the same core (or same software thread)
// ordering instructions at compile time is all that's necessary
#define ATOMIC_FENCE std::atomic_signal_fence
#else
// A reader can be running on another core while writing.
// Memory barriers or ARMv8 acquire / release loads / store are needed
#define ATOMIC_FENCE std::atomic_thread_fence
#endif
// using fences instead of .store(std::memory_order_release) will stop the compiler
// from taking advantage of a release-store instruction instead of separate fence, like on AArch64
// But fences allow it to be optimized away to just compile-time ordering for the single thread or unirprocessor case.


// SINGLE WRITER only.
// uses volatile + barriers for the data itself, like pre-C++11
template <class T>
class SeqLocked
{
#ifndef IGNORE_SIZECHECK
    // sizeof(T) > sizeof(unsigned)
    static_assert(!std::atomic<T>::is_always_lock_free, "A Seq Lock with a type small enough to be atomic on its own is totally pointless, and we don't have a specialization that replaces it with a straight wrapper for atomic<T>");
#endif

       // C++17 doesn't have a good way to express a load that doesn't care about tearing
       //  without explicitly writing it as multiple small parts and thus gimping the compiler if it can use larger loads
    volatile T data;          // volatile should be fine on any implementation where pre-C++11 lockless code was possible with volatile,
                              //  even though Data Race UB does apply to volatile variables in ISO C++11 and later.
                            // even non-volatile normally works in practice, being ordered by compiler barriers.

    std::atomic<unsigned> seqcount{0};  // Even means valid, odd means modification in progress.
                                        //  unsigned definitely wraps around at a power of 2 on overflow

public:
    T get() const {
        unsigned c0, c1;
        T tmp;
        // READER RETRY LOOP
        do {
            c0 = seqcount.load(std::memory_order_acquire);     // or for your signal-handler use-case, relaxed load followed by ATOMIC_FENCE(std::memory_order_acquire);

            tmp = (T)data;       // load

            ATOMIC_FENCE(std::memory_order_acquire);  // LoadLoad barrier
            c1 = seqcount.load(std::memory_order_relaxed);
        } while(c0&1 || c0 != c1);     // retry if the counter changed or is odd

        return tmp;
    }

    // TODO: a version of this that takes a lambda for the operation on tmp
    T inc()     // WRITER
    {
        unsigned orig_count = seqcount.load(std::memory_order_relaxed);
        // we're the only writer, avoid an atomic RMW.

        seqcount.store(orig_count+1, std::memory_order_relaxed);
        ATOMIC_FENCE(std::memory_order_release);     // 2-way barrier *after* the store, not like a release store.  Or like making data=tmp a release operation.
        // make sure the counter becomes odd *before* any data change

        T tmp = data;  // load into a non-volatile temporary
        ++tmp;         // make any change to it
        data = tmp;    // store

        seqcount.store(orig_count+2, std::memory_order_release);  // or use ATOMIC_FENCE(std::memory_order_release); *before* this, so the UNIPROCESSOR case can just do compile-time ordering

        return tmp;
    }

    void set(T newval) {
        unsigned orig_count = seqcount.load(std::memory_order_relaxed);

        seqcount.store(orig_count+1, std::memory_order_relaxed);
        ATOMIC_FENCE(std::memory_order_release);
        // make sure the data stores appear after the first counter update.

        data = newval;    // store

        ATOMIC_FENCE(std::memory_order_release);
        seqcount.store(orig_count+2, std::memory_order_relaxed);  // Or use mo_release here, better on AArch64
    }

};


/***** test callers *******/
#include <stdint.h>

struct sixteenbyte {
    //unsigned arr[4];
    unsigned long  a,b,c,d;
    sixteenbyte() = default;
    sixteenbyte(const volatile sixteenbyte &old)
         : a(old.a), b(old.b), c(old.c), d(old.d) {}
    //arr(old.arr) {}
};

void test_inc(SeqLocked<uint64_t> &obj) {  obj.inc(); }
sixteenbyte test_get(SeqLocked<sixteenbyte> &obj) { return obj.get(); }
//void test_set(SeqLocked<sixteenbyte> &obj, sixteenbyte val) { obj.set(val); }

uint64_t test_get(SeqLocked<uint64_t> &obj) {
    return obj.get();
}

// void atomic_inc_u64_seq_cst(std::atomic<uint64_t> &a) { ++a; }
uint64_t u64_inc_relaxed(std::atomic<uint64_t> &a) {
    // same but without dmb barriers
    return 1 + a.fetch_add(1, std::memory_order_relaxed);
}

uint64_t u64_load_relaxed(std::atomic<uint64_t> &a) {
    // gcc uses LDREXD, not just LDRD?
    return a.load(std::memory_order_relaxed);
}

void u64_store_relaxed(std::atomic<uint64_t> &a, uint64_t val) {
    // gcc uses a LL/SC retry loop even for a pure store?
    a.store(val, std::memory_order_relaxed);
}

Compila el asm que queremos en el explorador del compilador Godbolt para ARM y otras ISA. Al menos para int64_t; Los tipos de estructuras más grandes pueden copiarse de manera menos eficiente debido a volatilereglas engorrosas.

Utiliza datos no atómicos volatile T datapara los datos compartidos. Técnicamente, este es un comportamiento indefinido de carrera de datos, pero todos los compiladores que usamos en la práctica estaban bien con el acceso multiproceso a volatileobjetos anterior a C++ 11. Y antes de C++11, la gente incluso dependía de la atomicidad para algunos tamaños. No lo hacemos , verificamos el contador y solo usamos el valor que leímos si no hubo escrituras simultáneas. (Ese es el objetivo de un SeqLock).

Un problema volatile T dataes que en ISO C++, T foo = datano se compilará para objetos de estructura a menos que proporcione un constructor de copia de un volatileobjeto, como

sixteenbyte(const volatile sixteenbyte &old)
         : a(old.a), b(old.b), c(old.c), d(old.d) {}

Esto es realmente molesto para nosotros, porque no nos importan los detalles de cómo se lee la memoria, solo que varias lecturas no se optimizan en una.

volatilees realmente la herramienta incorrecta aquí , y T datasería mejor tener suficiente protección para garantizar que la lectura realmente ocurra entre las lecturas del contador atómico. por ejemplo, podríamos hacer eso en GNU C con una asm("":::"memory");barrera del compilador contra el reordenamiento antes/después de los accesos. Eso permitiría al compilador copiar objetos más grandes con vectores SIMD, o lo que sea, lo cual no hará con volatileaccesos separados.

Creo que std::atomic_thread_fence(mo_acquire)también sería una barrera suficiente, pero no estoy 100% seguro.


En ISO C, puede copiar un volatileagregado (estructura) y el compilador emitirá cualquier conjunto que normalmente haría para copiar esa cantidad de bytes. Pero en C++, aparentemente no podemos tener cosas buenas.


Relacionado: sistemas de un solo núcleo con un escritor en un controlador de interrupciones

En un sistema integrado con un núcleo y algunas variables que solo se actualizan mediante controladores de interrupciones, es posible que tenga un escritor que pueda interrumpir al lector, pero no al revés. Eso permite algunas variaciones más económicas que utilizan el valor mismo para detectar lecturas rotas.

Consulte Lectura de una variable de 64 bits actualizada por un ISR , especialmente para un contador monótono. La sugerencia de Brendan de leer primero la mitad más significativa, luego la mitad inferior y luego la mitad más significativa nuevamente. Si coincide, tu lectura no fue rota de una manera importante. (Una escritura que no cambió la mitad superior no es un problema incluso si interrumpió al lector para cambiar la mitad inferior justo antes o después de que el lector la leyera).

O, en general, vuelva a leer el valor completo hasta que vea el mismo valor dos veces seguidas.

Ninguna de estas técnicas es segura para SMP: el reintento de lectura solo protege contra lecturas rotas, no contra escrituras rotas si el escritor almacenó las mitades por separado. Es por eso que SeqLock usa un tercer entero atómico como contador de secuencia. Funcionarían en cualquier caso en el que el escritor sea atómico. el lector, pero el lector no es atómico. El controlador de interrupciones versus el código principal es uno de esos casos, o el controlador de señales es equivalente.

Podría utilizar la mitad inferior de un contador monótono como número de secuencia, si no le importa incrementar en 2 en lugar de 1. (Tal vez requiera que los lectores realicen un desplazamiento de 64 bits a la derecha en 1 para recuperar el número real. Entonces eso no es bueno.)

Peter Cordes avatar Feb 09 '2019 22:02 Peter Cordes