本次要实现的是一个能够实现实时监控Mongodb中数据变化的系统,要能够在数据发生变动的时候实时将变动消息发送给指定的邮箱。
主要使用的就是Redis-smq这个库,下面展示的就是主要使用的消息队列类,其中包括了很多队列种类,有先进先出、优先级先出等方式。
整个库的原理如下结构图,本次使用到的只有主线,就是发送和接收:
const { transemail } = require('../email_list/email.js'); const redis = require('promise-redis-client'); const redisHost = 'localhost'; const redisPort = 6379; // 配置 Redis 客户端 const createRedisClient = () => { return new Promise((resolve, reject) => { let client = redis.createClient({ host: redisHost, port: redisPort }); client.on('error', err => { console.log('Redis 连接出错'); reject(err); }); client.on('ready', () => { console.log('Redis ready'); resolve(client); }); }); }; async function startWaitMsg(redisClient) { while (true) { let res = null; try { res = await redisClient.brpop('bookChanges', 0); console.log('收到消息', res); } catch (err) { console.log('brpop 出错,重新 brpop'); continue; } res = res.toString(); transemail(res); } } async function listenredis() { try { // 启动生产者 // startProducer(); // 创建 Redis 客户端 const redisClient = await createRedisClient(); // 启动消息监听 startWaitMsg(redisClient); } catch (error) { console.error('Error:', error); } } //测试的时候使用的代码 listenredis().catch(console.error); // 处理退出信号以关闭客户端 process.on('SIGINT', async () => { console.log('Closing clients...'); process.exit(0); });
由于要实现实时检测,经过分析以后使用mongoose中的数据流监控最为合适,但是要实现这个方法需要用到watch方法,这个方法只有在mongodb有副本集的时候才能使用,因此还需要提前配置好mongodb才能进行这里下一步的操作,如果没有配置过mongodb的副本集的可以参考我的这篇博客。
首先在命令行中将服务启动:
const mongoose = require('mongoose'); mongoose.connect('mongodb://localhost/test', { useNewUrlParser: true, useUnifiedTopology: true }).then(() => { console.log('Successfully connected to MongoDB'); const bookSchema = new mongoose.Schema({ title: String, author: String }); const Book = mongoose.model('Book', bookSchema); const bookChangeStream = Book.watch(); bookChangeStream.on('change', (change) => { console.log('Collection changed:', change); if (change.operationType === 'insert') { console.log('New book added:', change.fullDocument); } }); }).catch((error) => { console.log('Error connecting to MongoDB:', error); });
测试结果:
在Mongo Campass中添加数据以后,在终端中出现如下消息:
证明测试成功,可以进行下一步操作啦!
const redis = require('redis'); const mongoose = require('mongoose'); // 创建 Redis 客户端 const redisClient = redis.createClient({ host: 'localhost', port: 6379 }); // 连接到 Redis redisClient.connect(); //连接mongodb数据库并检测变化发送到redis消息队列 async function connectAndMonitorMongoDB(redisClient) { try { await mongoose.connect('mongodb://localhost/test', { useNewUrlParser: true, useUnifiedTopology: true }); console.log('Successfully connected to MongoDB'); const bookSchema = new mongoose.Schema({ title: String, author: String }); const Book = mongoose.model('Book', bookSchema); const bookChangeStream = Book.watch(); try{ bookChangeStream.on('change', (change) => { console.log('Collection changed:', change); console.log("type of change:",typeof(change)); msg = JSON.stringify(change.fullDocument); msg = msg.replace(/{|}/g, ''); msg = "New message received:"+msg; console.log("massage:",msg); console.log("type of message:",typeof(msg)); if (change.operationType === 'insert') { console.log('New book added:', msg); redisClient.lPush('bookChanges', msg, function(err, reply) { if (err) { console.log('Error storing JSON to Redis:', err); } else { console.log('JSON stored successfully, list length:', reply); }}) } }); }catch (err){ console.log("error while loading data into redis:", err) } } catch (error) { console.log('Error connecting to MongoDB:', error); } } // module.exports = { connectAndMonitorMongoDB }; async function main() { try { await connectAndMonitorMongoDB(redisClient); console.log('Monitoring MongoDB changes...'); } catch (error) { console.error('Failed to start monitoring:', error); } } main();
在nodejs中将JSON对象转换成字符串的JSON.Stringify函数并不是严格的转换成字符串而是带有一个大括号,然而这个在进行redis进队列的时候会有问题,因此需要用正则表达式去掉大括号:
msg = JSON.stringify(change.fullDocument); msg = msg.replace(/{|}/g, ''); msg = "New message received:"+msg;