事件循环和TCP服务器:
muduo::net::EventLoop _baseloop;
muduo::net::TcpServer _server;
这些是muduo
库提供的核心组件,负责处理网络事件和管理TCP连接。消息分发和编码:
muduo::net::ProtobufDispatcher _dispatcher;
muduo::net::ProtobufCodec _codec;
这些是muduo
库中用于处理Protocol Buffers消息的工具,_dispatcher
负责根据消息类型调用对应的回调函数,_codec
用于处理消息的序列化与反序列化。连接管理器:
ConnectionManager _connectionManager;
管理所有的客户端连接,确保在处理消息时能获取到正确的连接信息。消费者管理器:
ConsumerManager _consumerManager;
管理所有的消费者,处理消息的订阅与消费逻辑。虚拟机:
VirtualHost _virtualHost;
在消息队列系统中,虚拟机管理交换机、队列等资源。线程池:
muduo::ThreadPool _threadPool;
用于处理耗时的任务,避免阻塞主线程的事件循环。 Broker(int port, std::string dbname, std::string msgdir) : _server(&_baseloop, muduo::net::InetAddress("127.0.0.1", port), "MyBroker", muduo::net::TcpServer::kReusePort), _dispatcher(std::bind(&Broker::onUnknowMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)), _codec(std::make_shared(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))), _connections(std::make_shared()), _consumers(std::make_shared()), _pool(std::make_shared(1)), _vhost(std::make_shared(virtualHost, dbname, msgdir)) { // 1.将所有的消息回调函数注册到_dispatcher中 //_dispatcher.registerMessageCallback(std::bind(&Broker::onOpenChannel,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onOpenChannel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onCloseChannel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onDeclareExchange, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onRemoveExchange, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onDeclareQueue, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onRemoveQueueReq, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onBind, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onUnBind, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onBasicPublish, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onBasicAck, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onBasicSubscribe, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onBasicCancel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); // 2._server设置连接回调函数和消息回调函数 _server.setConnectionCallback(std::bind(&Broker::onConnection, this, std::placeholders::_1)); _server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); }
_server
: 创建一个TCP服务器,监听指定的地址,并绑定了连接回调函数和消息回调函数。_dispatcher
和_codec
: 负责消息的分发和编码处理。_dispatcher
,确保接收到特定类型的消息时能够调用对应的处理逻辑。void start() { _server.start(); _baseloop->loop(); }
void onConnection(const muduo::net::TcpConnectionPtr& conn) { if (conn->connected()) { // 处理新连接 std::shared_ptr connection = std::make_shared(conn); _connectionManager.addConnection(connection); } else { // 处理连接关闭 _connectionManager.removeConnection(conn); } }
Connection
对象,并将其添加到_connectionManager
。_connectionManager
中移除该连接。 void onUnknowMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp receiveTime) { LOG_INFO << "unknow message: " << message->GetTypeName(); conn->shutdown(); }
接收到来自客户端的不同类型的请求时,通过_dispatcher调用对应的消息处理回调函数
拿到连接,然后打开信道即可.
拿到连接+打开信道+利用信道调用服务端对应的函数即可.
using Task = std::function
;
ThreadPool
类的成员变量主要包括以下几部分:
任务队列 (std::vector
):
std::function
类型的可调用对象。线程相关 (std::vector
):
同步机制 (std::mutex _mutex
和 std::condition_variable _cond
):
_mutex
用于保护任务队列,防止多个线程同时访问任务队列时出现竞争条件。_cond
用于在线程等待和唤醒时进行同步控制,当任务队列中有新任务时,唤醒等待的线程。线程池状态 (std::atomic
):
true
时,线程池将停止接受新任务并退出。构造函数,用于初始化线程池并启动指定数量的线程。
析构函数,用于停止线程池并等待所有线程退出。
push
函数用于向线程池提交新任务。该函数是模板函数,能够接受任意类型的函数和参数,并将其封装为异步任务。
std::bind
将函数和参数绑定在一起,然后使用std::packaged_task
将其封装成异步任务。_tasks
中。_cond.notify_one()
唤醒一个等待中的线程去执行任务。std::future
:返回一个std::future
对象,用户可以通过它获取任务的执行结果。_cond.wait
等待,直到任务队列中有任务或线程池停止。_tasks
和临时任务列表tmp
,避免频繁加锁解锁。_is_stop
设置为true
,标志着线程池将停止工作。_cond.notify_all()
唤醒所有等待中的线程,以便它们可以检查停止标志并退出。join()
等待每个线程退出。#pragma once #include #include #include #include #include #include #include #include #include #include class ThreadPool { public: using ptr = std::shared_ptr; ThreadPool(int thread_num = 1) { // 初始化线程池,创建thread_num个线程,每个线程执行entry函数 _is_stop = false; for (int i = 0; i < thread_num; i++) { _threads.emplace_back(&ThreadPool::entry, this);//用这些参数来构造对象 } } ~ThreadPool() { stop(); } using Task = std::function; // 用户传入要执行的函数和参数,push内部封装成packaged_task异步任务,利用lambda生成可调用对象并放入队列中 template auto push(F &&func, Args &&...args) -> std::future { using return_type = decltype(func(args...)); auto obj = std::bind(std::forward(func), std::forward(args)...); auto ptask = std::make_shared>(obj); std::future future = ptask->get_future(); { std::unique_lock lock(_mutex); // 将可调用对象放入队列 _tasks.push_back([ptask]() { (*ptask)(); }); // 唤醒一个线程 _cond.notify_one(); } return future; } void stop() { if (_is_stop == true) return; _is_stop = true; _cond.notify_all(); for (auto &t : _threads) { t.join(); } } private: void entry() // 线程入口函数 { while (_is_stop == false) { // 一个线程一次将队列中的所有任务都取出,避免频繁加锁解锁 std::vector tmp; { std::unique_lock lock(_mutex); _cond.wait(lock, [this]() { return !_tasks.empty() || _is_stop; }); tmp.swap(_tasks); } for(auto &task:tmp) { task(); } } } private: // 任务队列 std::vector _tasks; // 同步互斥,锁相关 std::mutex _mutex; std::condition_variable _cond; // 一批线程 std::vector _threads; // 结束标志 std::atomic _is_stop; };
#pragma once #include "connection.hpp" #include "../include/muduo/net/Buffer.h" #include "../include/muduo/net/EventLoop.h" #include "../include/muduo/net/TcpServer.h" #include "../include/muduo/base/noncopyable.h" #include "../include/muduo/net/Callbacks.h" #include "proto/dispatcher.h" #include "proto/codec.h" #include "../include/muduo/base/Logging.h" #include "../include/muduo/base/Mutex.h" #include #include namespace mq { class Broker { private: const std::string virtualHost = "defaultHost"; // 默认虚拟机名称 // 1.muduo库服务器相关 // 服务器和事件循环 muduo::net::EventLoop _baseloop; muduo::net::TcpServer _server; // Protobuf消息分发器 ProtobufDispatcher _dispatcher; // Protobuf协议处理器,解决粘包等问题,提取出一个完整的请求报文,并交给_dispatcher处理 mq::ProtobufCodecPtr _codec; // 2.消息队列相关 ConnectionManager::ptr _connections; // 连接管理器 VirtualHost::ptr _vhost; // 虚拟机 ConsumerManager::ptr _consumers; // 消费者管理器 ThreadPool::ptr _pool; // 线程池 public: Broker(int port, std::string dbname, std::string msgdir) : _server(&_baseloop, muduo::net::InetAddress("127.0.0.1", port), "MyBroker", muduo::net::TcpServer::kReusePort), _dispatcher(std::bind(&Broker::onUnknowMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)), _codec(std::make_shared(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))), _connections(std::make_shared()), _consumers(std::make_shared()), _pool(std::make_shared(1)), _vhost(std::make_shared(virtualHost, dbname, msgdir)) { // 1.将所有的消息回调函数注册到_dispatcher中 //_dispatcher.registerMessageCallback(std::bind(&Broker::onOpenChannel,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onOpenChannel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onCloseChannel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onDeclareExchange, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onRemoveExchange, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onDeclareQueue, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onRemoveQueueReq, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onBind, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onUnBind, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onBasicPublish, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onBasicAck, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onBasicSubscribe, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _dispatcher.registerMessageCallback(std::bind(&Broker::onBasicCancel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); // 2._server设置连接回调函数和消息回调函数 _server.setConnectionCallback(std::bind(&Broker::onConnection, this, std::placeholders::_1)); _server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); } void start() { _server.start(); _baseloop.loop(); } private: void onConnection(const muduo::net::TcpConnectionPtr &conn) { if (conn->connected()) { DLOG("new connection from:%s ", conn->peerAddress().toIpPort().c_str()); _connections->addConnection(std::make_shared(), _vhost, _consumers, conn, _codec, _pool); } else { DLOG("connection from:%s closed", conn->peerAddress().toIpPort().c_str()); _connections->removeConnection(conn); } } void onUnknowMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp receiveTime) { LOG_INFO << "unknow message: " << message->GetTypeName(); conn->shutdown(); } //-----------------------一系列消息回调函数--------------------------------- // 1.打开/关闭信道' void onOpenChannel(const muduo::net::TcpConnectionPtr &conn, const OpenChannelPtr &req, muduo::Timestamp receiveTime) { auto connection = _connections->getConnection(conn); if (connection.get() == nullptr) { DLOG("onOpenChannel: connection is null"); conn->shutdown(); return; } connection->openChannel(req); } void onCloseChannel(const muduo::net::TcpConnectionPtr &conn, const CloseChannelReqPtr &req, muduo::Timestamp receiveTime) { auto connection = _connections->getConnection(conn); if (connection.get() == nullptr) { DLOG("onCloseChannel: connection is null"); conn->shutdown(); return; } connection->closeChannel(req); } // 2.声明/移除交换机 void onDeclareExchange(const muduo::net::TcpConnectionPtr &conn, const DeclareExchangeReqPtr &req, muduo::Timestamp receiveTime) { auto connection = _connections->getConnection(conn); if (connection.get() == nullptr) { DLOG("onDeclareExchange: connection is null"); conn->shutdown(); return; } auto channel = connection->getChannel(req->chid()); if (channel.get() == nullptr) { DLOG("onDeclareExchange: channel is null"); conn->shutdown(); return; } channel->declareExchange(req); } void onRemoveExchange(const muduo::net::TcpConnectionPtr &conn, const RemoveExchangeReqPtr &req, muduo::Timestamp receiveTime) { auto connection = _connections->getConnection(conn); if (connection.get() == nullptr) { DLOG("onRemoveExchange: connection is null"); conn->shutdown(); return; } auto channel = connection->getChannel(req->chid()); if (channel.get() == nullptr) { DLOG("onRemoveExchange: channel is null"); conn->shutdown(); return; } channel->removeExchange(req); } // 3.声明/移除队列 void onDeclareQueue(const muduo::net::TcpConnectionPtr &conn, const DeclareQueueReqPtr &req, muduo::Timestamp receiveTime) { auto connection = _connections->getConnection(conn); if (connection.get() == nullptr) { DLOG("onDeclareQueue: connection is null"); conn->shutdown(); return; } auto channel = connection->getChannel(req->chid()); if (channel.get() == nullptr) { DLOG("onDeclareQueue: channel is null"); conn->shutdown(); return; } channel->declareQueue(req); } void onRemoveQueueReq(const muduo::net::TcpConnectionPtr &conn, const RemoveQueueReqPtr &req, muduo::Timestamp receiveTime) { auto connection = _connections->getConnection(conn); if (connection.get() == nullptr) { DLOG("onRemoveQueueReq: connection is null"); conn->shutdown(); return; } auto channel = connection->getChannel(req->chid()); if (channel.get() == nullptr) { DLOG("onRemoveQueueReq: channel is null"); conn->shutdown(); return; } channel->removeQueue(req); } // 4.绑定/解除绑定 void onBind(const muduo::net::TcpConnectionPtr &conn, const BindReqPtr &req, muduo::Timestamp receiveTime) { auto connection = _connections->getConnection(conn); if (connection.get() == nullptr) { DLOG("onBind: connection is null"); conn->shutdown(); return; } auto channel = connection->getChannel(req->chid()); if (channel.get() == nullptr) { DLOG("onBind: channel is null"); conn->shutdown(); return; } channel->bind(req); } void onUnBind(const muduo::net::TcpConnectionPtr &conn, const UnbindReqPtr &req, muduo::Timestamp receiveTime) { auto connection = _connections->getConnection(conn); if (connection.get() == nullptr) { DLOG("onUnBind: connection is null"); conn->shutdown(); return; } auto channel = connection->getChannel(req->chid()); if (channel.get() == nullptr) { DLOG("onUnBind: channel is null"); conn->shutdown(); return; } channel->unbind(req); } // 5.发布/确认消息 void onBasicPublish(const muduo::net::TcpConnectionPtr &conn, const BasicPublishReqPtr &req, muduo::Timestamp receiveTime) { auto connection = _connections->getConnection(conn); if (connection.get() == nullptr) { DLOG("onBasicPublish: connection is null"); conn->shutdown(); return; } auto channel = connection->getChannel(req->chid()); if (channel.get() == nullptr) { DLOG("onBasicPublish: channel is null"); conn->shutdown(); return; } channel->basicPublish(req); } void onBasicAck(const muduo::net::TcpConnectionPtr &conn, const BasicAckReqPtr &req, muduo::Timestamp receiveTime) { auto connection = _connections->getConnection(conn); if (connection.get() == nullptr) { DLOG("onBasicAck: connection is null"); conn->shutdown(); return; } auto channel = connection->getChannel(req->chid()); if (channel.get() == nullptr) { DLOG("onBasicAck: channel is null"); conn->shutdown(); return; } channel->basicAck(req); } // 6.订阅和取消订阅 void onBasicSubscribe(const muduo::net::TcpConnectionPtr &conn, const BasicSubscribeReqPtr &req, muduo::Timestamp receiveTime) { auto connection = _connections->getConnection(conn); if (connection.get() == nullptr) { DLOG("onBasicSubscribe: connection is null"); conn->shutdown(); return; } auto channel = connection->getChannel(req->chid()); if (channel.get() == nullptr) { DLOG("onBasicSubscribe: channel is null"); conn->shutdown(); return; } channel->basicSubscribe(req); } void onBasicCancel(const muduo::net::TcpConnectionPtr &conn, const BasicCancelReqPtr &req, muduo::Timestamp receiveTime) { auto connection = _connections->getConnection(conn); if (connection.get() == nullptr) { DLOG("onBasicCancel: connection is null"); conn->shutdown(); return; } auto channel = connection->getChannel(req->chid()); if (channel.get() == nullptr) { DLOG("onBasicCancel: channel is null"); conn->shutdown(); return; } channel->basicCancel(req); } }; };