局域网内,客户端会进行udp广播,服务端会监听udp广播并回复,以此可以让客户端发现服务端。
发现之后,客户端可以发起建立tcp连接的请求,服务端响应请求,建立tcp连接。
然后客户端可以与服务端进行tcp消息的收发(数据以json包格式传输)。
注意:本文仅是服务端代码
封装了一个networkmanager类来完成网络通讯功能。
networkmanager.h
#pragma once #include #include #include #include #include #include #include #include #include #include class NetworkManager : public QWidget { Q_OBJECT public: NetworkManager(QWidget* parent = nullptr); void tcpSendCommand(QString ip, QString Command);//tcp发送指令 void removeTcpConnect(QString ip); signals: void signalProjectData(QString ip, QJsonObject jsonObject); void signalPerformanceData(QString ip, QJsonObject jsonObject); void signalStatusData(QString ip, QJsonObject jsonObject); void signalDisconnect(QString ip); private: class Impl; std::unique_ptr _impl = nullptr; };
networkmanager.cpp
#include "networkmanager.h" class NetworkManager::Impl : public QObject { Q_OBJECT public: explicit Impl(NetworkManager* obj); ~Impl() final; NetworkManager* _self = nullptr; QUdpSocket* _udpSocket = nullptr; QTcpServer* _tcpServer = nullptr; QMap _tcpSocketList;//tcp连接列表 public: void udpStartListening();//udp开始监听 void tcpStartListening();// tcp开始监听 public slots: void slotUdpReadData(); void slotTcpProcessConnection(); void slotReadSocket(); void slotDisconnectSocket(); }; NetworkManager::Impl::Impl(NetworkManager* obj) :_self(obj) {} NetworkManager::Impl::~Impl() = default; NetworkManager::NetworkManager(QWidget* parent): QWidget(parent), _impl(std::make_unique(this)) { _impl->_udpSocket = new QUdpSocket(this); _impl->_tcpServer = new QTcpServer(this); _impl->tcpStartListening(); _impl->udpStartListening(); } void NetworkManager::Impl::tcpStartListening() { // 有新的连接 绑定连接处理函数 connect(_tcpServer, &QTcpServer::newConnection, this, &Impl::slotTcpProcessConnection); if (_tcpServer->listen(QHostAddress::Any, 5556)) { // 端口号 qDebug() << "tcp Listening on port 5555"; } else { qDebug() << "TCP Error" << "Failed to Listen to port 5555"; } } void NetworkManager::Impl::udpStartListening()// udp监听 { connect(_udpSocket, &QUdpSocket::readyRead, this, &Impl::slotUdpReadData); if (_udpSocket->bind(QHostAddress::Any, 5555)) { // 端口号 qDebug() << "udp Listening on port 5555"; } else { qDebug()<< "UDP Error"<< "Failed to bind to port 5555"; } } void NetworkManager::Impl::slotUdpReadData() { while (_udpSocket->hasPendingDatagrams()) { QByteArray datagram; datagram.resize(_udpSocket->pendingDatagramSize()); QHostAddress sender; quint16 senderPort; // 读取数据包 _udpSocket->readDatagram(datagram.data(), datagram.size(), &sender, &senderPort); // 打印接收到的数据 qDebug() << "Received datagram from" << sender.toString() << ":" << senderPort; // 发送响应消息 QByteArray response = "udp responsed" ; _udpSocket->writeDatagram(response, QHostAddress(sender), senderPort); } } void NetworkManager::Impl::slotTcpProcessConnection()// tcp 连接处理 { // 通过TcpServer拿到一个socket对象clientsocket,用它来和客户端通信; QTcpSocket* newTcpSocket = _tcpServer->nextPendingConnection(); QString ip = newTcpSocket->peerAddress().toString(); QString ipv4 = QHostAddress(ip.mid(7)).toString();// 获取ip地址 qDebug() << ipv4; // 将连接存到map中 _tcpSocketList.insert(ipv4, newTcpSocket); connect(newTcpSocket, &QTcpSocket::readyRead, this, &Impl::slotReadSocket); connect(newTcpSocket, &QTcpSocket::disconnected, this, &Impl::slotDisconnectSocket); } void NetworkManager::Impl::slotReadSocket() { qDebug() << "tcp read data"; QTcpSocket* tempSocket = qobject_cast(sender()); if (tempSocket) { //先获取发送消息方的ip地址 QString ip = tempSocket->peerAddress().toString(); QString ipv4 = QHostAddress(ip.mid(7)).toString(); if (_tcpSocketList.contains(ipv4)) { //解析 QByteArray jsonData = _tcpSocketList[ipv4]->readAll();// 读取数据 QJsonDocument document = QJsonDocument::fromJson(jsonData); if (document.isNull()) { // 处理JSON解析错误 qDebug()<<"document is Null"; } else { if (document.isObject()) { QJsonObject jsonObject = document.object(); // 解析JSON文件 根据type 做不同处理 QString type = jsonObject["Type"].toString(); if (type == "Project") { emit _self->signalProjectData(ipv4, jsonObject); } else if (type == "Performance") { emit _self->signalPerformanceData(ipv4, jsonObject); } else if (type == "Status") { emit _self->signalStatusData(ipv4, jsonObject); } } } } } } // 处理断连 void NetworkManager::Impl::slotDisconnectSocket() { QTcpSocket* tempSocket = qobject_cast(sender()); QString ip = tempSocket->peerAddress().toString(); QString ipv4 = QHostAddress(ip.mid(7)).toString(); qDebug() << "tcp disconnect:" << ipv4; _tcpSocketList[ipv4]->close(); _tcpSocketList.remove(ipv4);//移除列表 // 发出信号 提示断连 emit _self->signalDisconnect(ipv4); } // 通信 发送指令 void NetworkManager::tcpSendCommand(QString ip, QString Command) { qDebug() << Command<<" "<_tcpSocketList.contains(ip)) { // 发送指令 qDebug() << "send command:"<< Command; _impl->_tcpSocketList[ip]->write(Command.toUtf8()); } } void NetworkManager::removeTcpConnect(QString ip) { // 移除该tcp连接 _impl->_tcpSocketList.remove(ip); } #include "networkmanager.moc"
这个服务端的网络通信代码,虽然实现了功能,但是还有以下不足:
1、收发消息是串行的。如果多台客户端连接,会将每个客户端的ip以及socket存入map,收发消息都是主线程在做,如果多个同时来消息,其实是串行处理的。
2、没有做心跳保持。如果客户端断线,服务器并不知道。
将通信交给子线程去做,实现并行消息收发,并且在子线程内加上心跳保持。
实现方案:
封装TcpServer
类,重写TcpServer
的incomingConnection
方法,每当有新的连接进来,会自动调用这个函数,我们重写这个函数,在这个函数中进行子线程的创建,使用子线程进行通信。
将子线程与通信的标识符,存在一个map中,方便对多个客户端通信。
重写Thread
,封装为TcpThread
,重写run
函数,在里面初始化该线程的socket,并连接相应的收发消息信号和槽函数。
重写TcpSocket
类,主要是在内部实现,收发消息信号的转发和处理。
networkmanager.h
#include "mytcpsocket.h" #include "tcpthread.h" #include #include #include #include #include #include #include #include #include #include class NetworkManager : public QWidget { Q_OBJECT public: NetworkManager(QWidget* parent = nullptr); void tcpSendCommand(QString ip, QString Command);//tcp发送指令 void removeTcpConnect(QString ip); signals: void signalProjectData(QString ip, const QJsonObject& jsonObject); void signalPerformanceData(QString ip, const QJsonObject& jsonObject); void signalStatusData(QString ip, const QJsonObject& jsonObject); void signalDisconnect(QString ip); private: class Impl; std::unique_ptr _impl = nullptr; };
networkmanager.cpp
#include "networkmanager.h" class NetworkManager::Impl : public QObject { Q_OBJECT public: explicit Impl(NetworkManager* obj); ~Impl() final; void udpStartListening();//udp开始监听 void tcpStartListening();// tcp开始监听 public: NetworkManager* _self = nullptr; QUdpSocket* _udpSocket = nullptr; MyTcpServer* _tcpServer = nullptr; QMap _tcpList;//tcp连接列表 ip和描述符 public slots: void slotUdpReadData(); void slotTcpProcessConnection(QString ip, qintptr sockDesc); void slotReadSocket(QString ip, qintptr sockDesc, const QByteArray& msg); void slotDisconnectSocket(QString ip, qintptr sockDesc); }; NetworkManager::Impl::Impl(NetworkManager* obj) :_self(obj) {} NetworkManager::Impl::~Impl() = default; NetworkManager::NetworkManager(QWidget* parent): QWidget(parent), _impl(std::make_unique(this)) { _impl->_udpSocket = new QUdpSocket(this); _impl->_tcpServer = new MyTcpServer(this);// 将网络管理类设置为父对象 _impl->tcpStartListening(); _impl->udpStartListening(); } // tcp 开始监听 void NetworkManager::Impl::tcpStartListening() { // 有新的连接 绑定连接处理函数 connect(_tcpServer, &MyTcpServer::signalNewTcpConnect, this, &Impl::slotTcpProcessConnection); // 收到新的消息 connect(_tcpServer, &MyTcpServer::signalTcpGetMsg, this, &Impl::slotReadSocket); // 客户端断连消息 connect(_tcpServer, &MyTcpServer::signalTcpDisconnect, this, &Impl::slotDisconnectSocket); if (_tcpServer->listen(QHostAddress::Any, 5556)) { // 端口号 qDebug() << "tcp Listening on port 5556"; } else { qDebug() << "TCP Error" << "Failed to Listen to port 5556"; } } // udp开始监听 void NetworkManager::Impl::udpStartListening() { // 收到udp消息 connect(_udpSocket, &QUdpSocket::readyRead, this, &Impl::slotUdpReadData); if (_udpSocket->bind(QHostAddress::Any, 5555)) { // 端口号 qDebug() << "udp Listening on port 5555"; } else { qDebug()<< "UDP Error"<< "Failed to bind to port 5555"; } } // udp消息读取槽函数 void NetworkManager::Impl::slotUdpReadData() { while (_udpSocket->hasPendingDatagrams()) { QByteArray datagram; datagram.resize(_udpSocket->pendingDatagramSize()); QHostAddress sender; quint16 senderPort; // 读取数据包 _udpSocket->readDatagram(datagram.data(), datagram.size(), &sender, &senderPort); // 回复响应消息 QByteArray response = "udp responsed" ; _udpSocket->writeDatagram(response, QHostAddress(sender), senderPort); } } // tcp 连接处理 void NetworkManager::Impl::slotTcpProcessConnection(QString ip, qintptr sockDesc) { qDebug() << "new connect:"< QJsonDocument document = QJsonDocument::fromJson(msg); if (document.isNull()) { // 处理JSON解析错误 qDebug() << "JSON is NULL"; } else { if (document.isObject()) { QJsonObject jsonObject = document.object(); // 根据key获取value QString type = jsonObject["Type"].toString(); if (type == "Project") { emit _self->signalProjectData(ip, jsonObject); } else if (type == "Performance") { emit _self->signalPerformanceData(ip, jsonObject); } else if (type == "Status") { emit _self->signalStatusData(ip, jsonObject); } } } } // 处理断开连接 void NetworkManager::Impl::slotDisconnectSocket(QString ip, qintptr sockDesc) { qDebug() << "tcp disconnect:" << ip; // 发出信号 提示断连 emit _self->signalDisconnect(ip); } // 通信 发送指令 void NetworkManager::tcpSendCommand(QString ip, QString Command) { if (_impl->_tcpList.contains(ip)) { // 发送指令 qDebug() << "send command:" << Command << " to " << ip; _impl->_tcpServer->sendData(_impl->_tcpList[ip], Command.toUtf8()); } else { qDebug() << "The ip address is not in the _tcpList. Unable to send message"; } } void NetworkManager::removeTcpConnect(QString ip) { // 移除该tcp连接 _impl->_tcpList.remove(ip); } #include "networkmanager.moc"
mytcpserver.h
#pragma once #include "mytcpsocket.h" #include "tcpthread.h" #include "networkmanager.h" #include #include #include class MyTcpServer : public QTcpServer { Q_OBJECT public: explicit MyTcpServer(QObject* parent = nullptr); void sendData(qintptr sockDesc, const QByteArray& msg); void slotTcpDisconnect(QString ip, qintptr sockDesc); signals: void signalNewTcpConnect(QString ip, qintptr sockDesc);// 有新的tcp连接 void signalTcpGetMsg(QString ip, qintptr Desc, const QByteArray& msg);// 收到新的消息 void signalTcpDisconnect(QString ip, qintptr sockDesc);// tcp断连 传给网络管理类 private: void incomingConnection(qintptr sockDesc) override; QMap _socketMap;//用来保存每一个客户端的通信线程 };
mytcpserver.cpp
重写了incomingConnection
方法
#include "mytcpserver.h" MyTcpServer::MyTcpServer(QObject* parent) : QTcpServer(parent) { } //当有新的连接进来的时候会自动调用这个函数,不需要你去绑定信号槽 void MyTcpServer::incomingConnection(qintptr sockDesc) { // 用于注册一个类型,使其可以被Qt的元系统识别(不写这个,线程通信会报错) qRegisterMetaType("qintptr"); //产生线程用于通信 TcpThread* tcpthread = new TcpThread(sockDesc); // 转移 tcpthread->moveToThread(tcpthread); //将标识符和对应的线程 存在map中 _socketMap.insert(sockDesc, tcpthread); //将新的tcp连接的ip发送给网络管理类 connect(tcpthread, &TcpThread::signalSendIp, this, &MyTcpServer::signalNewTcpConnect); // 底层socket收到消息时触发readyread,通过signalGetMsg将消息传给线程 // 线程通过signalThreadGetMsg将消息转发给tcpserver // tcpserver通过signalTcpGetMsg再将消息转发给网络管理类 connect(tcpthread, &TcpThread::signalThreadGetMsg, this, &MyTcpServer::signalTcpGetMsg); //线程中发出断开tcp连接,传给网络管理类,tcp断开连接 connect(tcpthread, &TcpThread::signalTcpDisconnect, this, &MyTcpServer::slotTcpDisconnect); // 开启线程 tcpthread->start(); } // 发送消息 void MyTcpServer::sendData(qintptr sockDesc, const QByteArray& msg) { if(_socketMap.contains(sockDesc)) { // 触发对应线程发送消息 emit _socketMap[sockDesc]->signalSendData(sockDesc, msg); } } void MyTcpServer::slotTcpDisconnect(QString ip, qintptr sockDesc) { _socketMap.remove(sockDesc); // 移除 emit signalTcpDisconnect(ip, sockDesc); // 发信号告诉网络管理类 tcp断连 }
tcpthread.h
#pragma once #include "mytcpsocket.h" #include #include #include #include #include class TcpThread : public QThread { Q_OBJECT public: //构造函数初始化套接字标识符 explicit TcpThread(qintptr sockDesc, QObject* parent = nullptr); ~TcpThread(); void run() override; signals: void signalTcpDisconnect(QString ip, qintptr sockDesc);// 线程给tcpserver发信号 tcp断连 void signalSendData(qintptr Desc, const QByteArray& msg); // 线程发送消息 void signalThreadGetMsg(QString ip, qintptr Desc, const QByteArray& msg); // 线程收到消息 void signalSendIp(QString ip, qintptr Desc); // 线程给tcpserver发信号 有新的连接(主要是发ip) public slots: void slotSendData(qintptr Desc, const QByteArray& msg); void slotReceiveData(qintptr Desc, const QByteArray& msg); void slotDisconnect(); void slotSendHeartbeat(); private: qintptr _socketDesc; QString _ip; QByteArray _imgData;// 用于大文件组包 MyTcpSocket* _socket = nullptr; QTimer* _heartbeatTimer;// 心跳包发送间隔 int _checkTime = 5;// 控制心跳断联时间 };
tcpthread.cpp
ps:这个里面的实现逻辑需要注意以下,在run中,主要是四个操作:收消息、发消息、断连处理、心跳保持
由于我这里收到的消息都是json包(和客户端约定好的格式),所以我拿收到的消息解析一下json,判断一下字段,即可知道是不是心跳包,如果是心跳包,我就直接对我的心跳计数更新,如果不是,那就说明是数据包,我就转发给主线程。
这个tcpthread类其实相当于mytcpsocket和mytcpserver的中间层,底层的mytcpsocket是真正完成收发消息的,tcpthread主要是负责管理这个socket的线程,它主要做的就是将收到的消息转发出去,以及接收外界的发消息指令,将指令转给socekt。
说明两个实现思路:
我定义了一个计时器,每隔两秒我会发送一条心跳消息emit _socket->signalSocketSendMsg(this->_socketDesc, "heartbeat");
客户端接收到心跳消息后,也会回我一个心跳回复,是个json,里面type字段是heartbeat
。我收到这个消息,这就完成了心跳保持。
但是代码里有个_checkTime
是什么意思呢?
如果我没收到心跳回复,那么并不能立刻说明客户端掉线了,有的时候可能网络波动,或者别的什么原因。所以我们想给心跳保持加一个容错时间。
我设置了一个变量:_checkTime
初始化=5(可根据情况自己定义)
当我发送一次心跳包,我就_checkTime--
我收到一次心跳包,我就_checkTime++
这样只要双方正常收发心跳,那么_checkTime
就一直是5,如果没有收到客户端的回复,那么_checkTime
就会一直减少。
而我在每次发送心跳包之前都会检查一下_checkTime
,如果小于0了,那么说明客户端掉线了。也就是说,我给客户端留的冗余时间是10秒
容错时间=_checkTime*心跳包发送频率
我们的业务中,收发的都是json包,并且其中有的时候可能json包中有图片(编码后的图片)
这个图片比较大,可能64kb-170kb
而tcpsocket
的readyRead
是有接收限制的(好像是最大可以接收64kb的数据,记不清了,可以去搜索一下)
这样的话图片一次就发不完,客户端需要多次发送。
一般来讲,这种大文件传输的情况正确的处理方法是:
客户端拆包,服务器组包。并且客户端发送之前需要先发送如文件名,文件大小等信息,客户端接收到后,再向客户端请求文件数据。
客户端接收到请求后,再发送文件数据,并且双方约定好每次传输的字节大小,接收完了之后,服务端以此为依据完成组包。
具体我就不赘述了,可以搜索别的去看。
我这里用的不是正常的处理方法,我这里取了个巧。只适用于我的业务场景。
由于我这里收发都是json包,所以我利用了json包的解析来完成了一个简易的组包。
具体实现思路:客户端不需要管,文件大不大,直接发就行,文件小的话一次就发了,文件大的话socket内部会自动帮你分为几次发送的。
而服务端接收到数据以后,拿去解析json,如果小文件,直接解析成功了。如果这次是大文件,一次没发完,那么就会解析失败。ok,关键就是这里,解析失败我就把这次的数据存起来,然后下次数据来了,由于不完整自然还会解析失败,我就累加上去(这也就相当于组包了),每次解析失败我都累加,并且累加完了我再次解析,如果解析成功了,那就说明发完了。那就正常转发消息即可,注意要清空用来组包的变量。
由此就可以完成一个简易的组包,这只适用于特殊情况下。
#include "tcpthread.h" TcpThread::TcpThread(qintptr sockDesc, QObject* parent) : QThread(parent) { this->_socketDesc = sockDesc; } void TcpThread::run() { // 打印线程id 测试用 /*qDebug() << "run:"; qDebug() << "Current thread ID:" << QThread::currentThreadId();*/ _socket = new MyTcpSocket(this->_socketDesc); //绑定套接字标识符绑定给自定义套接字对象 _socket->setSocketDescriptor(this->_socketDesc); // 拿到ip地址 QString ipHostAddress = _socket->peerAddress().toString(); _ip = QHostAddress(ipHostAddress.mid(7)).toString(); // 告诉tcpserver 有新的连接建立 并将ip传递出去 emit signalSendIp(_ip, this->_socketDesc); // 收到消息 // socket发出有消息的信号,然后触发线程中发出有消息的信号 connect(_socket, &MyTcpSocket::signalSocketGetMsg, this, &TcpThread::slotReceiveData); // 发送消息 // 当线程收到sendData信号时候,通知socket发送消息 connect(this, &TcpThread::signalSendData, _socket, &MyTcpSocket::slotSendData); // 断开连接 // 当套接字断开时,发送底层的disconnected信号 connect(_socket, &MyTcpSocket::disconnected, this, &TcpThread::slotDisconnect); // 心跳连接 _heartbeatTimer = new QTimer(this); connect(_heartbeatTimer, &QTimer::timeout, this, &TcpThread::slotSendHeartbeat); _heartbeatTimer->start(2*1000); // 2秒发送一次心跳 this->exec();//在Qt中启动消息机制 } TcpThread::~TcpThread() { delete _socket; } // 发送消息 void TcpThread::slotSendData(qintptr Desc, const QByteArray& msg) { // 触发socket发送消息的信号 将消息传递进去 if (_socket) { emit _socket->signalSocketSendMsg(Desc, msg); } } // 收到消息 void TcpThread::slotReceiveData(qintptr Desc, const QByteArray& msg) { // 解析一下 QJsonDocument document = QJsonDocument::fromJson(msg); if (document.isNull()) { // 如果是大数据(如图片) 可能一次发不完 需要进行合包 // 处理JSON解析错误 qDebug() << "JSON is NULL(Thread)"; _imgData += msg;// 累加合包 QJsonDocument imgDocument = QJsonDocument::fromJson(_imgData); if (!imgDocument.isNull()) { if (imgDocument.isObject()) { emit signalThreadGetMsg(_ip, Desc, _imgData);// 转发出去 qDebug() << "QByteArray:img data size" << _imgData.size(); _imgData.clear();// 清除 } } } else { //不是大文件 一次可以接收完 if (document.isObject()) { QJsonObject jsonObject = document.object(); QString type = jsonObject["Type"].toString(); if (type == "Heartbeat") // 如果是心跳包 直接拦截 { _checkTime++; // 是心跳回复 计数加1 } else { // 如果不是心跳包 说明是数据包 转发出去 emit signalThreadGetMsg(_ip, Desc, msg); } } } } // 断开连接 void TcpThread::slotDisconnect() { // 告诉tcpserver tcp断联 emit signalTcpDisconnect(_ip, this->_socketDesc); //让该线程中的套接字断开连接 _socket->disconnectFromHost();//断开连接 //线程退出 qDebug() << "thread quit"; this->quit(); } // 心跳保持 void TcpThread::slotSendHeartbeat() { qDebug() << "heartbeat-----------"<<_checkTime; if(_checkTime <= 0) // 检查心跳计时 { // 客户端掉线 qDebug() << "heartbeat:client disconnect"; _socket->disconnectFromHost();//断开连接 执行这个会触发socket的disconnect //线程退出 qDebug() << "thread quit"; this->quit(); } else { // 发送心跳包 emit _socket->signalSocketSendMsg(this->_socketDesc, "heartbeat"); _checkTime--; } }
mytcpsocket.h
#pragma once #include #include #include class MyTcpSocket : public QTcpSocket { Q_OBJECT public: explicit MyTcpSocket(qintptr socketDesc, QObject* parent = nullptr); public slots: void slotSendData(qintptr socketDesc, const QByteArray& data); signals: void signalSocketSendMsg(qintptr socketDesc, const QByteArray& msg);// 发送消息 void signalSocketGetMsg(qintptr socketDesc, const QByteArray& msg); // 接收消息 private: qintptr _sockDesc; };
mytcpsocket.cpp
#include "mytcpsocket.h" MyTcpSocket::MyTcpSocket(qintptr socketDesc, QObject* parent) : QTcpSocket(parent) { this->_sockDesc = socketDesc; // 线程内触发socket的发送消息信号 则执行对应发送消息的槽函数 connect(this, &MyTcpSocket::signalSocketSendMsg, this, &MyTcpSocket::slotSendData); // 收到消息 传递出去 connect(this, &MyTcpSocket::readyRead, this, [=] { QByteArray msg = readAll(); emit signalSocketGetMsg(_sockDesc, msg); }); } // 发送消息 void MyTcpSocket::slotSendData(qintptr socketDesc, const QByteArray& msg) { if (socketDesc == _sockDesc && !msg.isEmpty()) { this->write(msg); } }