【后端开发实习】用MongoDB和Redis实现消息队列搭建分布式邮件消息系统
创始人
2025-01-11 09:06:22
0

用Redis实现消息队列并搭建分布式邮件消息系统

  • 系统介绍
  • Redis实现消息队列
    • 思路分析
    • 代码实现
  • MongoDB监听数据变化
    • 思路分析
    • 代码实现
      • Mongoose测试连接
      • 监听mongodb数据变化
  • 注意点

系统介绍

本次要实现的是一个能够实现实时监控Mongodb中数据变化的系统,要能够在数据发生变动的时候实时将变动消息发送给指定的邮箱。

  • Node.js:用于开发的语言,既能用于前端开发,又能用来做后端开发。
  • Redis:用于搭建消息队列,实现消息的分布式。
  • MongoDB:持久化数据,同时实现触发条件的监听,当MongoDB中有新增数据的时候发送新增数据的邮件消息。

Redis实现消息队列

思路分析

主要使用的就是Redis-smq这个库,下面展示的就是主要使用的消息队列类,其中包括了很多队列种类,有先进先出、优先级先出等方式。
在这里插入图片描述
整个库的原理如下结构图,本次使用到的只有主线,就是发送和接收:
在这里插入图片描述

代码实现

const { transemail } = require('../email_list/email.js'); const redis = require('promise-redis-client'); const redisHost = 'localhost'; const redisPort = 6379;  // 配置 Redis 客户端 const createRedisClient = () => {     return new Promise((resolve, reject) => {         let client = redis.createClient({ host: redisHost, port: redisPort });         client.on('error', err => {             console.log('Redis 连接出错');             reject(err);         });         client.on('ready', () => {             console.log('Redis ready');             resolve(client);         });     }); };  async function startWaitMsg(redisClient) {     while (true) {         let res = null;         try {             res = await redisClient.brpop('bookChanges', 0);             console.log('收到消息', res);         } catch (err) {             console.log('brpop 出错,重新 brpop');             continue;         }         res = res.toString();         transemail(res);     } }  async function listenredis() {     try {         // 启动生产者         // startProducer();          // 创建 Redis 客户端         const redisClient = await createRedisClient();          // 启动消息监听         startWaitMsg(redisClient);     } catch (error) {         console.error('Error:', error);     } } //测试的时候使用的代码 listenredis().catch(console.error);  // 处理退出信号以关闭客户端 process.on('SIGINT', async () => {     console.log('Closing clients...');     process.exit(0); });  

MongoDB监听数据变化

思路分析

由于要实现实时检测,经过分析以后使用mongoose中的数据流监控最为合适,但是要实现这个方法需要用到watch方法,这个方法只有在mongodb有副本集的时候才能使用,因此还需要提前配置好mongodb才能进行这里下一步的操作,如果没有配置过mongodb的副本集的可以参考我的这篇博客。

  1. 用mongoose中的watch连接mongodb副本集数据库获取数据变化。
  2. 将数据变化发送到redis消息队列中。

首先在命令行中将服务启动:
在这里插入图片描述

代码实现

Mongoose测试连接

const mongoose = require('mongoose');  mongoose.connect('mongodb://localhost/test', {   useNewUrlParser: true,   useUnifiedTopology: true }).then(() => {   console.log('Successfully connected to MongoDB');    const bookSchema = new mongoose.Schema({     title: String,     author: String   });    const Book = mongoose.model('Book', bookSchema);    const bookChangeStream = Book.watch();    bookChangeStream.on('change', (change) => {     console.log('Collection changed:', change);     if (change.operationType === 'insert') {       console.log('New book added:', change.fullDocument);     }   }); }).catch((error) => {   console.log('Error connecting to MongoDB:', error); });  

在这里插入图片描述
测试结果:
在Mongo Campass中添加数据以后,在终端中出现如下消息:
在这里插入图片描述
证明测试成功,可以进行下一步操作啦!

监听mongodb数据变化

const redis = require('redis'); const mongoose = require('mongoose'); // 创建 Redis 客户端 const redisClient = redis.createClient({ 	host: 'localhost', 	port: 6379   });      // 连接到 Redis redisClient.connect();    //连接mongodb数据库并检测变化发送到redis消息队列 async function connectAndMonitorMongoDB(redisClient) {   try {     await mongoose.connect('mongodb://localhost/test', {       useNewUrlParser: true,       useUnifiedTopology: true     });     console.log('Successfully connected to MongoDB');      const bookSchema = new mongoose.Schema({       title: String,       author: String     });      const Book = mongoose.model('Book', bookSchema);      const bookChangeStream = Book.watch(); 	try{ 		bookChangeStream.on('change', (change) => { 			console.log('Collection changed:', change); 			console.log("type of change:",typeof(change)); 			msg = JSON.stringify(change.fullDocument); 			msg = msg.replace(/{|}/g, ''); 			msg = "New message received:"+msg; 			console.log("massage:",msg); 			console.log("type of message:",typeof(msg)); 			if (change.operationType === 'insert') { 			  console.log('New book added:', msg); 			  redisClient.lPush('bookChanges', msg, function(err, reply) { 				if (err) { 				  console.log('Error storing JSON to Redis:', err); 				} else { 				  console.log('JSON stored successfully, list length:', reply); 				}}) 			} 		  }); 	}catch (err){ 		console.log("error while loading data into redis:", err) 	}   } catch (error) {     console.log('Error connecting to MongoDB:', error);   } }  // module.exports = { connectAndMonitorMongoDB }; async function main() {   try {     await connectAndMonitorMongoDB(redisClient);     console.log('Monitoring MongoDB changes...');   } catch (error) {     console.error('Failed to start monitoring:', error);   } }  main(); 

注意点

在nodejs中将JSON对象转换成字符串的JSON.Stringify函数并不是严格的转换成字符串而是带有一个大括号,然而这个在进行redis进队列的时候会有问题,因此需要用正则表达式去掉大括号:

msg = JSON.stringify(change.fullDocument); msg = msg.replace(/{|}/g, ''); msg = "New message received:"+msg; 

相关内容

热门资讯

第七阶段指导!情怀娱乐辅助器,... 第七阶段指导!情怀娱乐辅助器,越乡游破解(有挂开挂辅助挂);无需打开直接搜索加(薇:13670430...
一分钟秒懂!微信广东雀神挂件辅... 您好:微信广东雀神挂件辅助这款游戏可以开挂的,确实是有挂的,很多玩家在这款游戏中打牌都会发现很多用户...
透视代打!德州透视竞技联盟,w... 透视代打!德州透视竞技联盟,we-poker软件,透明挂教程!(有挂开挂辅助平台)1、下载安装好德州...
第七要领!微乐麻将脚本透视,雀... 第七要领!微乐麻将脚本透视,雀姬辅助脚本(有挂开挂辅助下载);无需打开直接搜索加薇136704302...
玩家亲测!奇迹辅助思维脚本,贪... 玩家亲测!奇迹辅助思维脚本,贪玩娱乐科技(有挂开挂辅助下载);无需打开直接搜索加(薇:1367043...
十刹那掌握!哥哥打大a辅助工具... 您好:杭州都莱大菠萝插件这款游戏可以开挂的,确实是有挂的,很多玩家在这款游戏中打牌都会发现很多用户的...
第4要领!火神大厅辅助器,皇豪... 第4要领!火神大厅辅助器,皇豪互娱控制系统app(有挂开挂辅助脚本)1、下载安装好皇豪互娱控制系统a...
透视免费!皇豪互众智能辅助器破... 透视免费!皇豪互众智能辅助器破解,微信小程序财神十三章特殊牌,必胜教程(有挂开挂辅助挂);无需打开直...
9分钟体悟!科乐辅助,湖湘互娱... 9分钟体悟!科乐辅助,湖湘互娱牛牛(有挂开挂辅助软件);无需打开直接搜索打开薇:136704302 ...
玩家实测!传送五激k辅助靠谱吗... 玩家实测!传送五激k辅助靠谱吗,快玩炸翻天辅助工具(有挂开挂辅助脚本)1、下载安装好传送五激k辅助靠...