本篇完成的模块是TCP服务器的设计和token验证
因为聊天服务要维持一个长链接,方便服务器和客户端双向通信,那么就需要一个TCPMgr来管理TCP连接
而实际开发中网络模块一般以单例模式使用,那我们就基于单例基类和可被分享类创建一个自定义的TcpMgr类,在QT工程中新建TcpMgr类,会生成头文件和源文件,头文件修改如下
#ifndef TCPMGR_H #define TCPMGR_H #include #include "singleton.h" #include "global.h" class TcpMgr:public QObject, public Singleton, public std::enable_shared_from_this { Q_OBJECT public: TcpMgr(); private: QTcpSocket _socket; QString _host; uint16_t _port; QByteArray _buffer; bool _b_recv_pending; quint16 _message_id; quint16 _message_len; public slots: void slot_tcp_connect(ServerInfo); void slot_send_data(ReqId reqId, QString data); signals: void sig_con_success(bool bsuccess); void sig_send_data(ReqId reqId, QString data); }; #endif // TCPMGR_H
接下来我们在构造函数中连接网络请求的各种信号
TcpMgr::TcpMgr():_host(""),_port(0),_b_recv_pending(false),_message_id(0),_message_len(0) { QObject::connect(&_socket, &QTcpSocket::connected, [&]() { qDebug() << "Connected to server!"; // 连接建立后发送消息 emit sig_con_success(true); }); QObject::connect(&_socket, &QTcpSocket::readyRead, [&]() { // 当有数据可读时,读取所有数据 // 读取所有数据并追加到缓冲区 _buffer.append(_socket.readAll()); QDataStream stream(&_buffer, QIODevice::ReadOnly); stream.setVersion(QDataStream::Qt_5_0); forever { //先解析头部 if(!_b_recv_pending){ // 检查缓冲区中的数据是否足够解析出一个消息头(消息ID + 消息长度) if (_buffer.size() < static_cast(sizeof(quint16) * 2)) { return; // 数据不够,等待更多数据 } // 预读取消息ID和消息长度,但不从缓冲区中移除 stream >> _message_id >> _message_len; //将buffer 中的前四个字节移除 _buffer = _buffer.mid(sizeof(quint16) * 2); // 输出读取的数据 qDebug() << "Message ID:" << _message_id << ", Length:" << _message_len; } //buffer剩余长读是否满足消息体长度,不满足则退出继续等待接受 if(_buffer.size() < _message_len){ _b_recv_pending = true; return; } _b_recv_pending = false; // 读取消息体 QByteArray messageBody = _buffer.mid(0, _message_len); qDebug() << "receive body msg is " << messageBody ; _buffer = _buffer.mid(_message_len); } }); // 处理错误(适用于Qt 5.15之前的版本) QObject::connect(&_socket, static_cast(&QTcpSocket::error), [&](QTcpSocket::SocketError socketError) { qDebug() << "Error:" << _socket.errorString() ; switch (socketError) { case QTcpSocket::ConnectionRefusedError: qDebug() << "Connection Refused!"; emit sig_con_success(false); break; case QTcpSocket::RemoteHostClosedError: qDebug() << "Remote Host Closed Connection!"; break; case QTcpSocket::HostNotFoundError: qDebug() << "Host Not Found!"; emit sig_con_success(false); break; case QTcpSocket::SocketTimeoutError: qDebug() << "Connection Timeout!"; emit sig_con_success(false); break; case QTcpSocket::NetworkError: qDebug() << "Network Error!"; break; default: qDebug() << "Other Error!"; break; } }); // 处理连接断开 QObject::connect(&_socket, &QTcpSocket::disconnected, [&]() { qDebug() << "Disconnected from server."; }); QObject::connect(this, &TcpMgr::sig_send_data, this, &TcpMgr::slot_send_data); }
连接对端服务器
void TcpMgr::slot_tcp_connect(ServerInfo si) { qDebug()<< "receive tcp connect signal"; // 尝试连接到服务器 qDebug() << "Connecting to server..."; _host = si.Host; _port = static_cast(si.Port.toUInt()); _socket.connectToHost(si.Host, _port); }
因为客户端发送数据可能在任何线程,为了保证线程安全,我们在要发送数据时发送TcpMgr的sig_send_data信号,然后实现接受这个信号的槽函数
void TcpMgr::slot_send_data(ReqId reqId, QString data) { uint16_t id = reqId; // 将字符串转换为UTF-8编码的字节数组 QByteArray dataBytes = data.toUtf8(); // 计算长度(使用网络字节序转换) quint16 len = static_cast(data.size()); // 创建一个QByteArray用于存储要发送的所有数据 QByteArray block; QDataStream out(&block, QIODevice::WriteOnly); // 设置数据流使用网络字节序 out.setByteOrder(QDataStream::BigEndian); // 写入ID和长度 out << id << len; // 添加字符串数据 block.append(data); // 发送数据 _socket.write(block); }
然后修改LoginDialog中的initHandlers中的收到服务器登陆回复后的逻辑,这里发送信号准备发起长链接到聊天服务器
void LoginDialog::initHttpHandlers() { //注册获取登录回包逻辑 _handlers.insert(ReqId::ID_LOGIN_USER, [this](QJsonObject jsonObj){ int error = jsonObj["error"].toInt(); if(error != ErrorCodes::SUCCESS){ showTip(tr("参数错误"),false); enableBtn(true); return; } auto user = jsonObj["user"].toString(); //发送信号通知tcpMgr发送长链接 ServerInfo si; si.Uid = jsonObj["uid"].toInt(); si.Host = jsonObj["host"].toString(); si.Port = jsonObj["port"].toString(); si.Token = jsonObj["token"].toString(); _uid = si.Uid; _token = si.Token; qDebug()<< "user is " << user << " uid is " << si.Uid <<" host is " << si.Host << " Port is " << si.Port << " Token is " << si.Token; emit sig_connect_tcp(si); }); }
在LoginDialog构造函数中连接信号,包括建立tcp连接,以及收到TcpMgr连接成功或者失败的信号处理
//连接tcp连接请求的信号和槽函数 connect(this, &LoginDialog::sig_connect_tcp, TcpMgr::GetInstance().get(), &TcpMgr::slot_tcp_connect); //连接tcp管理者发出的连接成功信号 connect(TcpMgr::GetInstance().get(), &TcpMgr::sig_con_success, this, &LoginDialog::slot_tcp_con_finish);
LoginDialog收到连接结果的槽函数
void LoginDialog::slot_tcp_con_finish(bool bsuccess) { if(bsuccess){ showTip(tr("聊天服务连接成功,正在登录..."),true); QJsonObject jsonObj; jsonObj["uid"] = _uid; jsonObj["token"] = _token; QJsonDocument doc(jsonObj); QString jsonString = doc.toJson(QJsonDocument::Indented); //发送tcp请求给chat server TcpMgr::GetInstance()->sig_send_data(ReqId::ID_CHAT_LOGIN, jsonString); }else{ showTip(tr("网络异常"),false); enableBtn(true); } }
在这个槽函数中我们发送了sig_send_data信号并且通知TcpMgr将数据发送给服务器。
一个TCP服务器必然会有连接的接收,维持,收发数据等逻辑。那我们就要基于asio完成这个服务的搭建。主服务是这个样子的
#include "LogicSystem.h" #include #include #include #include "AsioIOServicePool.h" #include "CServer.h" #include "ConfigMgr.h" using namespace std; bool bstop = false; std::condition_variable cond_quit; std::mutex mutex_quit; int main() { try { auto &cfg = ConfigMgr::Inst(); auto pool = AsioIOServicePool::GetInstance(); boost::asio::io_context io_context; boost::asio::signal_set signals(io_context, SIGINT, SIGTERM); signals.async_wait([&io_context, pool](auto, auto) { io_context.stop(); pool->Stop(); }); auto port_str = cfg["SelfServer"]["Port"]; CServer s(io_context, atoi(port_str.c_str())); io_context.run(); } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << endl; } }
CServer类的声明
#include #include "CSession.h" #include #include
构造函数中监听对方连接
CServer::CServer(boost::asio::io_context& io_context, short port):_io_context(io_context), _port(port), _acceptor(io_context, tcp::endpoint(tcp::v4(),port)) { cout << "Server start success, listen on port : " << _port << endl; StartAccept(); }
接受连接的函数
void CServer::StartAccept() { auto &io_context = AsioIOServicePool::GetInstance()->GetIOService(); shared_ptr new_session = make_shared(io_context, this); _acceptor.async_accept(new_session->GetSocket(), std::bind(&CServer::HandleAccept, this, new_session, placeholders::_1)); }
从AsioIOServicePool中返回一个可用的iocontext构造Session,然后将接受的新链接的socket写入这个Session保管
AsioIOServicePool已经在前面讲解很多次了,它的声明如下
#include #include #include "Singleton.h" class AsioIOServicePool:public Singleton { friend Singleton; public: using IOService = boost::asio::io_context; using Work = boost::asio::io_context::work; using WorkPtr = std::unique_ptr; ~AsioIOServicePool(); AsioIOServicePool(const AsioIOServicePool&) = delete; AsioIOServicePool& operator=(const AsioIOServicePool&) = delete; // 使用 round-robin 的方式返回一个 io_service boost::asio::io_context& GetIOService(); void Stop(); private: AsioIOServicePool(std::size_t size = std::thread::hardware_concurrency()); std::vector _ioServices; std::vector _works; std::vector _threads; std::size_t _nextIOService; };
AsioIOServicePool具体实现
#include "AsioIOServicePool.h" #include using namespace std; AsioIOServicePool::AsioIOServicePool(std::size_t size):_ioServices(size), _works(size), _nextIOService(0){ for (std::size_t i = 0; i < size; ++i) { _works[i] = std::unique_ptr(new Work(_ioServices[i])); } //遍历多个ioservice,创建多个线程,每个线程内部启动ioservice for (std::size_t i = 0; i < _ioServices.size(); ++i) { _threads.emplace_back([this, i]() { _ioServices[i].run(); }); } } AsioIOServicePool::~AsioIOServicePool() { std::cout << "AsioIOServicePool destruct" << endl; } boost::asio::io_context& AsioIOServicePool::GetIOService() { auto& service = _ioServices[_nextIOService++]; if (_nextIOService == _ioServices.size()) { _nextIOService = 0; } return service; } void AsioIOServicePool::Stop(){ //因为仅仅执行work.reset并不能让iocontext从run的状态中退出 //当iocontext已经绑定了读或写的监听事件后,还需要手动stop该服务 for (auto& work : _works) { //把服务先停止 work->get_io_context().stop(); work.reset(); } for (auto& t : _threads) { t.join(); } }
CServer的处理连接逻辑
void CServer::HandleAccept(shared_ptr new_session, const boost::system::error_code& error){ if (!error) { new_session->Start(); lock_guard lock(_mutex); _sessions.insert(make_pair(new_session->GetUuid(), new_session)); } else { cout << "session accept failed, error is " << error.what() << endl; } StartAccept(); }
上面的逻辑接受新链接后执行Start函数,新链接接受数据,然后Server继续监听新的连接
void CSession::Start(){ AsyncReadHead(HEAD_TOTAL_LEN); }
先读取头部数据
void CSession::AsyncReadHead(int total_len) { auto self = shared_from_this(); asyncReadFull(HEAD_TOTAL_LEN, [self, this](const boost::system::error_code& ec, std::size_t bytes_transfered) { try { if (ec) { std::cout << "handle read failed, error is " << ec.what() << endl; Close(); _server->ClearSession(_uuid); return; } if (bytes_transfered < HEAD_TOTAL_LEN) { std::cout << "read length not match, read [" << bytes_transfered << "] , total [" << HEAD_TOTAL_LEN << "]" << endl; Close(); _server->ClearSession(_uuid); return; } _recv_head_node->Clear(); memcpy(_recv_head_node->_data, _data, bytes_transfered); //获取头部MSGID数据 short msg_id = 0; memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN); //网络字节序转化为本地字节序 msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id); std::cout << "msg_id is " << msg_id << endl; //id非法 if (msg_id > MAX_LENGTH) { std::cout << "invalid msg_id is " << msg_id << endl; _server->ClearSession(_uuid); return; } short msg_len = 0; memcpy(&msg_len, _recv_head_node->_data + HEAD_ID_LEN, HEAD_DATA_LEN); //网络字节序转化为本地字节序 msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len); std::cout << "msg_len is " << msg_len << endl; //id非法 if (msg_len > MAX_LENGTH) { std::cout << "invalid data length is " << msg_len << endl; _server->ClearSession(_uuid); return; } _recv_msg_node = make_shared(msg_len, msg_id); AsyncReadBody(msg_len); } catch (std::exception& e) { std::cout << "Exception code is " << e.what() << endl; } }); }
上面的逻辑里调用asyncReadFull读取整个长度,然后解析收到的数据,前两个字节为id,之后两个字节为长度,最后n个长度字节为消息内容
//读取完整长度 void CSession::asyncReadFull(std::size_t maxLength, std::function handler ) { ::memset(_data, 0, MAX_LENGTH); asyncReadLen(0, maxLength, handler); }
读取指定长度
//读取指定字节数 void CSession::asyncReadLen(std::size_t read_len, std::size_t total_len, std::function handler) { auto self = shared_from_this(); _socket.async_read_some(boost::asio::buffer(_data + read_len, total_len-read_len), [read_len, total_len, handler, self](const boost::system::error_code& ec, std::size_t bytesTransfered) { if (ec) { // 出现错误,调用回调函数 handler(ec, read_len + bytesTransfered); return; } if (read_len + bytesTransfered >= total_len) { //长度够了就调用回调函数 handler(ec, read_len + bytesTransfered); return; } // 没有错误,且长度不足则继续读取 self->asyncReadLen(read_len + bytesTransfered, total_len, handler); }); }
读取头部成功后,其回调函数内部调用了读包体的逻辑
void CSession::AsyncReadBody(int total_len) { auto self = shared_from_this(); asyncReadFull(total_len, [self, this, total_len](const boost::system::error_code& ec, std::size_t bytes_transfered) { try { if (ec) { std::cout << "handle read failed, error is " << ec.what() << endl; Close(); _server->ClearSession(_uuid); return; } if (bytes_transfered < total_len) { std::cout << "read length not match, read [" << bytes_transfered << "] , total [" << total_len<<"]" << endl; Close(); _server->ClearSession(_uuid); return; } memcpy(_recv_msg_node->_data , _data , bytes_transfered); _recv_msg_node->_cur_len += bytes_transfered; _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0'; cout << "receive data is " << _recv_msg_node->_data << endl; //此处将消息投递到逻辑队列中 LogicSystem::GetInstance()->PostMsgToQue(make_shared(shared_from_this(), _recv_msg_node)); //继续监听头部接受事件 AsyncReadHead(HEAD_TOTAL_LEN); } catch (std::exception& e) { std::cout << "Exception code is " << e.what() << endl; } }); }
读取包体完成后,在回调中继续读包头。以此循环往复直到读完所有数据。如果对方不发送数据,则回调函数就不会触发。不影响程序执行其他工作,因为我们采用的是asio异步的读写操作
当然我们解析完包体后会调用LogicSystem单例将解析好的消息封装为逻辑节点传递给逻辑层进行处理
我们在逻辑层处理
void LogicSystem::RegisterCallBacks() { _fun_callbacks[MSG_CHAT_LOGIN] = std::bind(&LogicSystem::LoginHandler, this, placeholders::_1, placeholders::_2, placeholders::_3); } void LogicSystem::LoginHandler(shared_ptr session, const short &msg_id, const string &msg_data) { Json::Reader reader; Json::Value root; reader.parse(msg_data, root); std::cout << "user login uid is " << root["uid"].asInt() << " user token is " << root["token"].asString() << endl; std::string return_str = root.toStyledString(); session->Send(return_str, msg_id); }
并在构造函数中注册这些处理流程
LogicSystem::LogicSystem():_b_stop(false){ RegisterCallBacks(); _worker_thread = std::thread (&LogicSystem::DealMsg, this); }
到此,完成了ChatServer收到QT客户端发送过来的长链接请求,并解析读取的数据,将收到的数据通过tcp发送给对端
在proto文件里新增登陆验证服务
message LoginReq{ int32 uid = 1; string token= 2; } message LoginRsp { int32 error = 1; int32 uid = 2; string token = 3; } service StatusService { rpc GetChatServer (GetChatServerReq) returns (GetChatServerRsp) {} rpc Login(LoginReq) returns(LoginRsp); }
接下来是调用grpc命令生成新的pb文件覆盖原有的,并且也拷贝给StatusServer一份
我们完善登陆逻辑,先去StatusServer验证token是否合理,如果合理再从内存中寻找用户信息,如果没找到则从数据库加载一份
void LogicSystem::LoginHandler(shared_ptr session, const short &msg_id, const string &msg_data) { Json::Reader reader; Json::Value root; reader.parse(msg_data, root); auto uid = root["uid"].asInt(); std::cout << "user login uid is " << uid << " user token is " << root["token"].asString() << endl; //从状态服务器获取token匹配是否准确 auto rsp = StatusGrpcClient::GetInstance()->Login(uid, root["token"].asString()); Json::Value rtvalue; Defer defer([this, &rtvalue, session]() { std::string return_str = rtvalue.toStyledString(); session->Send(return_str, MSG_CHAT_LOGIN_RSP); }); rtvalue["error"] = rsp.error(); if (rsp.error() != ErrorCodes::Success) { return; } //内存中查询用户信息 auto find_iter = _users.find(uid); std::shared_ptr user_info = nullptr; if (find_iter == _users.end()) { //查询数据库 user_info = MysqlMgr::GetInstance()->GetUser(uid); if (user_info == nullptr) { rtvalue["error"] = ErrorCodes::UidInvalid; return; } _users[uid] = user_info; } else { user_info = find_iter->second; } rtvalue["uid"] = uid; rtvalue["token"] = rsp.token(); rtvalue["name"] = user_info->name; }
在StatusServer验证token之前,我们需要在StatusServer中的GetServer的服务里将token写入内存
Status StatusServiceImpl::GetChatServer(ServerContext* context, const GetChatServerReq* request, GetChatServerRsp* reply) { std::string prefix("llfc status server has received : "); const auto& server = getChatServer(); reply->set_host(server.host); reply->set_port(server.port); reply->set_error(ErrorCodes::Success); reply->set_token(generate_unique_string()); insertToken(request->uid(), reply->token()); return Status::OK; }
接下来我们实现登陆验证服务
Status StatusServiceImpl::Login(ServerContext* context, const LoginReq* request, LoginRsp* reply) { auto uid = request->uid(); auto token = request->token(); std::lock_guard guard(_token_mtx); auto iter = _tokens.find(uid); if (iter == _tokens.end()) { reply->set_error(ErrorCodes::UidInvalid); return Status::OK; } if (iter->second != token) { reply->set_error(ErrorCodes::TokenInvalid); return Status::OK; } reply->set_error(ErrorCodes::Success); reply->set_uid(uid); reply->set_token(token); return Status::OK; }
这样当GateServer访问StatusServer的Login服务做验证后,就可以将数据返回给QT前端了
QT 的客户端TcpMgr收到请求后要进行对应的逻辑处理。所以我们在TcpMgr的构造函数中调用initHandlers注册消息
void TcpMgr::initHandlers() { //auto self = shared_from_this(); _handlers.insert(ID_CHAT_LOGIN_RSP, [this](ReqId id, int len, QByteArray data){ qDebug()<< "handle id is "<< id << " data is " << data; // 将QByteArray转换为QJsonDocument QJsonDocument jsonDoc = QJsonDocument::fromJson(data); // 检查转换是否成功 if(jsonDoc.isNull()){ qDebug() << "Failed to create QJsonDocument."; return; } QJsonObject jsonObj = jsonDoc.object(); if(!jsonObj.contains("error")){ int err = ErrorCodes::ERR_JSON; qDebug() << "Login Failed, err is Json Parse Err" << err ; emit sig_login_failed(err); return; } int err = jsonObj["error"].toInt(); if(err != ErrorCodes::SUCCESS){ qDebug() << "Login Failed, err is " << err ; emit sig_login_failed(err); return; } UserMgr::GetInstance()->SetUid(jsonObj["uid"].toInt()); UserMgr::GetInstance()->SetName(jsonObj["name"].toString()); UserMgr::GetInstance()->SetToken(jsonObj["token"].toString()); emit sig_swich_chatdlg(); }); }
并且增加处理请求
void TcpMgr::handleMsg(ReqId id, int len, QByteArray data) { auto find_iter = _handlers.find(id); if(find_iter == _handlers.end()){ qDebug()<< "not found id ["<< id << "] to handle"; return ; } find_iter.value()(id,len,data); }
为管理用户数据,需要创建一个UserMgr类,统一管理用户数据,我们这么声明
#ifndef USERMGR_H #define USERMGR_H #include #include #include class UserMgr:public QObject,public Singleton, public std::enable_shared_from_this { Q_OBJECT public: friend class Singleton; ~ UserMgr(); void SetName(QString name); void SetUid(int uid); void SetToken(QString token); private: UserMgr(); QString _name; QString _token; int _uid; }; #endif // USERMGR_H
简单实现几个功能
#include "usermgr.h" UserMgr::~UserMgr() { } void UserMgr::SetName(QString name) { _name = name; } void UserMgr::SetUid(int uid) { _uid = uid; } void UserMgr::SetToken(QString token) { _token = token; } UserMgr::UserMgr() { }
详细和复杂的管理后续不断往这里补充就行了
登陆界面响应TcpMgr返回的登陆请求,在其构造函数中添加
//连接tcp管理者发出的登陆失败信号 connect(TcpMgr::GetInstance().get(), &TcpMgr::sig_login_failed, this, &LoginDialog::slot_login_failed);
并实现槽函数
void LoginDialog::slot_login_failed(int err) { QString result = QString("登录失败, err is %1") .arg(err); showTip(result,false); enableBtn(true); }
到此完成了登陆的请求和响应,接下来要实现响应登陆成功后跳转到聊天界面。下一篇先实现聊天布局。