¿Cómo escuchar cambios en una colección de MongoDB?
Estoy creando una especie de sistema de cola de trabajos en segundo plano con MongoDB como almacén de datos. ¿Cómo puedo "escuchar" las inserciones en una colección de MongoDB antes de generar trabajadores para procesar el trabajo?
¿Necesito sondear cada pocos segundos para ver si hay algún cambio con respecto a la última vez, o hay alguna manera de que mi secuencia de comandos pueda esperar a que se produzcan inserciones?
Este es un proyecto PHP en el que estoy trabajando, pero no dudes en responder en Ruby o en un idioma independiente.
Lo que estás pensando suena mucho a desencadenantes. MongoDB no admite activadores; sin embargo, algunas personas han "desarrollado los suyos propios" utilizando algunos trucos. La clave aquí es el registro de operaciones.
Cuando ejecuta MongoDB en un conjunto de réplicas, todas las acciones de MongoDB se registran en un registro de operaciones (conocido como registro de operaciones). El registro de operaciones es básicamente una lista actualizada de las modificaciones realizadas a los datos. Los conjuntos de réplicas funcionan escuchando los cambios en este registro de operaciones y luego aplicando los cambios localmente.
¿Te suena esto familiar?
No puedo detallar todo el proceso aquí, son varias páginas de documentación, pero las herramientas que necesita están disponibles.
Primero, algunos artículos sobre el registro de operaciones - Breve descripción
- Diseño de la local
colección (que contiene el registro de operaciones)
También querrás aprovechar los cursores adaptables . Estos le proporcionarán una manera de escuchar los cambios en lugar de realizar encuestas para detectarlos. Tenga en cuenta que la replicación utiliza cursores adaptables, por lo que esta es una característica compatible.
MongoDB tiene lo que se llama capped collections
y tailable cursors
eso le permite a MongoDB enviar datos a los oyentes.
A capped collection
es esencialmente una colección que tiene un tamaño fijo y solo permite inserciones. Así es como se vería crear uno:
db.createCollection("messages", { capped: true, size: 100000000 })
Cursores adaptables de MongoDB ( publicación original de Jonathan H. Wage )
Rubí
coll = db.collection('my_collection')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
if doc = cursor.next_document
puts doc
else
sleep 1
end
end
PHP
$mongo = new Mongo();
$db = $mongo->selectDB('my_db')
$coll = $db->selectCollection('my_collection');
$cursor = $coll->find()->tailable(true);
while (true) {
if ($cursor->hasNext()) {
$doc = $cursor->getNext();
print_r($doc);
} else {
sleep(1);
}
}
Pitón (por Robert Stewart)
from pymongo import Connection
import time
db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
try:
doc = cursor.next()
print doc
except StopIteration:
time.sleep(1)
Perl (por Max )
use 5.010;
use strict;
use warnings;
use MongoDB;
my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)
{
if (defined(my $doc = $cursor->next))
{
say $doc;
}
else
{
sleep 1;
}
}
Recursos adicionales:
Tutorial de Ruby/Node.js que le guiará en la creación de una aplicación que escuche las inserciones en una colección limitada de MongoDB.
Un artículo que habla sobre cursores adaptables con más detalle.
Ejemplos de PHP, Ruby, Python y Perl sobre el uso de cursores adaptables.
Mira esto: Cambiar flujos
10 de enero de 2018: versión 3.6
*EDITAR: escribí un artículo sobre cómo hacer esto https://medium.com/riow/mongodb-data-collection-change-85b63d96ff76
https://docs.mongodb.com/v3.6/changeStreams/
Es nuevo en mongodb 3.6 https://docs.mongodb.com/manual/release-notes/3.6/ 2018/01/10
$ mongod --version
db version v3.6.2
Para utilizar changeStreams, la base de datos debe ser un conjunto de replicación.
Más información sobre conjuntos de replicación: https://docs.mongodb.com/manual/replication/
Su base de datos será " independiente " de forma predeterminada.
Cómo convertir un conjunto independiente en un conjunto de réplicas: https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/
El siguiente ejemplo es una aplicación práctica de cómo podría utilizar esto.
* Específicamente para Nodo.
/* file.js */
'use strict'
module.exports = function (
app,
io,
User // Collection Name
) {
// SET WATCH ON COLLECTION
const changeStream = User.watch();
// Socket Connection
io.on('connection', function (socket) {
console.log('Connection!');
// USERS - Change
changeStream.on('change', function(change) {
console.log('COLLECTION CHANGED');
User.find({}, (err, data) => {
if (err) throw err;
if (data) {
// RESEND ALL USERS
socket.emit('users', data);
}
});
});
});
};
/* END - file.js */
Enlaces útiles:
https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set
https://docs.mongodb.com/manual/tutorial/change-streams-example
https://docs.mongodb.com/v3.6/tutorial/change-streams-example
http://plusnconsulting.com/post/MongoDB-Change-Streams
Desde MongoDB 3.6 habrá una nueva API de notificaciones llamada Change Streams que puedes usar para esto. Vea esta publicación de blog para ver un ejemplo . Ejemplo de ello:
cursor = client.my_db.my_collection.changes([
{'$match': {
'operationType': {'$in': ['insert', 'replace']}
}},
{'$match': {
'newDocument.n': {'$gte': 1}
}}
])
# Loops forever.
for change in cursor:
print(change['newDocument'])