¿Cómo escuchar cambios en una colección de MongoDB?

Resuelto Andrew asked hace 12 años • 12 respuestas

Estoy creando una especie de sistema de cola de trabajos en segundo plano con MongoDB como almacén de datos. ¿Cómo puedo "escuchar" las inserciones en una colección de MongoDB antes de generar trabajadores para procesar el trabajo?

¿Necesito sondear cada pocos segundos para ver si hay algún cambio con respecto a la última vez, o hay alguna manera de que mi secuencia de comandos pueda esperar a que se produzcan inserciones?

Este es un proyecto PHP en el que estoy trabajando, pero no dudes en responder en Ruby o en un idioma independiente.

Andrew avatar Mar 14 '12 03:03 Andrew
Aceptado

Lo que estás pensando suena mucho a desencadenantes. MongoDB no admite activadores; sin embargo, algunas personas han "desarrollado los suyos propios" utilizando algunos trucos. La clave aquí es el registro de operaciones.

Cuando ejecuta MongoDB en un conjunto de réplicas, todas las acciones de MongoDB se registran en un registro de operaciones (conocido como registro de operaciones). El registro de operaciones es básicamente una lista actualizada de las modificaciones realizadas a los datos. Los conjuntos de réplicas funcionan escuchando los cambios en este registro de operaciones y luego aplicando los cambios localmente.

¿Te suena esto familiar?

No puedo detallar todo el proceso aquí, son varias páginas de documentación, pero las herramientas que necesita están disponibles.

Primero, algunos artículos sobre el registro de operaciones - Breve descripción - Diseño de la localcolección (que contiene el registro de operaciones)

También querrás aprovechar los cursores adaptables . Estos le proporcionarán una manera de escuchar los cambios en lugar de realizar encuestas para detectarlos. Tenga en cuenta que la replicación utiliza cursores adaptables, por lo que esta es una característica compatible.

Gates VP avatar Mar 13 '2012 21:03 Gates VP

MongoDB tiene lo que se llama capped collectionsy tailable cursorseso le permite a MongoDB enviar datos a los oyentes.

A capped collectiones esencialmente una colección que tiene un tamaño fijo y solo permite inserciones. Así es como se vería crear uno:

db.createCollection("messages", { capped: true, size: 100000000 })

Cursores adaptables de MongoDB ( publicación original de Jonathan H. Wage )

Rubí

coll = db.collection('my_collection')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
  if doc = cursor.next_document
    puts doc
  else
    sleep 1
  end
end

PHP

$mongo = new Mongo();
$db = $mongo->selectDB('my_db')
$coll = $db->selectCollection('my_collection');
$cursor = $coll->find()->tailable(true);
while (true) {
    if ($cursor->hasNext()) {
        $doc = $cursor->getNext();
        print_r($doc);
    } else {
        sleep(1);
    }
}

Pitón (por Robert Stewart)

from pymongo import Connection
import time

db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
    try:
        doc = cursor.next()
        print doc
    except StopIteration:
        time.sleep(1)

Perl (por Max )

use 5.010;

use strict;
use warnings;
use MongoDB;

my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)
{
    if (defined(my $doc = $cursor->next))
    {
        say $doc;
    }
    else
    {
        sleep 1;
    }
}

Recursos adicionales:

Tutorial de Ruby/Node.js que le guiará en la creación de una aplicación que escuche las inserciones en una colección limitada de MongoDB.

Un artículo que habla sobre cursores adaptables con más detalle.

Ejemplos de PHP, Ruby, Python y Perl sobre el uso de cursores adaptables.

Andrew avatar Apr 25 '2012 23:04 Andrew

Mira esto: Cambiar flujos

10 de enero de 2018: versión 3.6

*EDITAR: escribí un artículo sobre cómo hacer esto https://medium.com/riow/mongodb-data-collection-change-85b63d96ff76

https://docs.mongodb.com/v3.6/changeStreams/


Es nuevo en mongodb 3.6 https://docs.mongodb.com/manual/release-notes/3.6/ 2018/01/10

$ mongod --version
db version v3.6.2

Para utilizar changeStreams, la base de datos debe ser un conjunto de replicación.

Más información sobre conjuntos de replicación: https://docs.mongodb.com/manual/replication/

Su base de datos será " independiente " de forma predeterminada.

Cómo convertir un conjunto independiente en un conjunto de réplicas: https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/


El siguiente ejemplo es una aplicación práctica de cómo podría utilizar esto.
* Específicamente para Nodo.

/* file.js */
'use strict'


module.exports = function (
    app,
    io,
    User // Collection Name
) {
    // SET WATCH ON COLLECTION 
    const changeStream = User.watch();  

    // Socket Connection  
    io.on('connection', function (socket) {
        console.log('Connection!');

        // USERS - Change
        changeStream.on('change', function(change) {
            console.log('COLLECTION CHANGED');

            User.find({}, (err, data) => {
                if (err) throw err;

                if (data) {
                    // RESEND ALL USERS
                    socket.emit('users', data);
                }
            });
        });
    });
};
/* END - file.js */

Enlaces útiles:
https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set
https://docs.mongodb.com/manual/tutorial/change-streams-example

https://docs.mongodb.com/v3.6/tutorial/change-streams-example
http://plusnconsulting.com/post/MongoDB-Change-Streams

Rio Weber avatar Feb 07 '2018 01:02 Rio Weber

Desde MongoDB 3.6 habrá una nueva API de notificaciones llamada Change Streams que puedes usar para esto. Vea esta publicación de blog para ver un ejemplo . Ejemplo de ello:

cursor = client.my_db.my_collection.changes([
    {'$match': {
        'operationType': {'$in': ['insert', 'replace']}
    }},
    {'$match': {
        'newDocument.n': {'$gte': 1}
    }}
])

# Loops forever.
for change in cursor:
    print(change['newDocument'])
Mitar avatar Sep 07 '2017 19:09 Mitar