聊天服务器——muduo库源码阅读笔记
创始人
2025-01-10 10:36:21
0

muduo源码阅读

Timestamp类,时间戳类

时间戳主要是获得时间

成员变量int64_t microSecondsSinceEpoch_ 是时间

now()函数是获得当前时间

toString()函数是把时间转换成人看的string

Timestamp Timestamp::now() {   struct timeval tv;   gettimeofday(&tv, NULL);   int64_t seconds = tv.tv_sec;   return Timestamp(seconds * kMicroSecondsPerSecond + tv.tv_usec); //把时间秒换算成毫秒 } ​ string Timestamp::toString() const {   char buf[32] = {0};   int64_t seconds = microSecondsSinceEpoch_ / kMicroSecondsPerSecond;   int64_t microseconds = microSecondsSinceEpoch_ % kMicroSecondsPerSecond;   snprintf(buf, sizeof(buf), "%" PRId64 ".%06" PRId64 "", seconds, microseconds);   return buf; }

Log日志类

有一个日志等级

enum LogLevel {   TRACE,   DEBUG,   INFO,   WARN,   ERROR,   FATAL,   NUM_LOG_LEVELS, };

Channel类

Channel理解为通道,封装了sockfd和其感兴趣的event,如EPOLLIN、EPOLLOUT事件,还绑定了poller返回的具体事件。当拿到文件描述符时,就去拿对应的channel,当文件描述符发生读、写事件,系统就会去channel里拿对应的读、写回调函数。

channel与eventloop打交道,epoll与eventloop打交道

  EventLoop* loop_;   const int  fd_;      //因为是文件描述符绑定,所以必须有fd文件描述符   int        events_;  //注册的事件   int        revents_; //epoll或者poll监听到的事件,   int        index_;   // used by Poller.   bool       logHup_;

四个回调函数,以及四个方法设置回调函数

ReadEventCallback readCallback_; EventCallback writeCallback_; EventCallback closeCallback_; EventCallback errorCallback_; ​ typedef std::function EventCallback; typedef std::function ReadEventCallback; ​ ​ void setReadCallback(ReadEventCallback cb) { readCallback_ = std::move(cb); }                    //std::move是移动,移动完之后,cb内容就消失了 void setWriteCallback(EventCallback cb) { writeCallback_ = std::move(cb); } void setCloseCallback(EventCallback cb) { closeCallback_ = std::move(cb); } void setErrorCallback(EventCallback cb) { errorCallback_ = std::move(cb); }

设置事件

void enableReading() { events_ |= kReadEvent; update(); }    //update()类似epollctrl void disableReading() { events_ &= ~kReadEvent; update(); } void enableWriting() { events_ |= kWriteEvent; update(); } void disableWriting() { events_ &= ~kWriteEvent; update(); } void disableAll() { events_ = kNoneEvent; update(); } bool isWriting() const { return events_ & kWriteEvent; } bool isReading() const { return events_ & kReadEvent; }

update()

loop_->updateChannel(this);   //channel的update是调用eventloop的updateChannel //eventloop的updateChannel是调用poller的updateChannel poller_->updateChannel(channel); //epoller的updateChannel就是调用epollctrl来修改文件描述符的关键 //update(EPOLL_CTL_DEL, channel); ==》 //::epoll_ctl(epollfd_, operation, fd, &event) ​ ​ void EPollPoller::updateChannel(Channel* channel) {   Poller::assertInLoopThread();   const int index = channel->index();   LOG_TRACE << "fd = " << channel->fd()     << " events = " << channel->events() << " index = " << index;   if (index == kNew || index == kDeleted)   {     // a new one, add with EPOLL_CTL_ADD     int fd = channel->fd();     if (index == kNew)     {       assert(channels_.find(fd) == channels_.end());       channels_[fd] = channel;     }     else // index == kDeleted     {       assert(channels_.find(fd) != channels_.end());       assert(channels_[fd] == channel);     } ​     channel->set_index(kAdded);     update(EPOLL_CTL_ADD, channel);   }   else   {     // update existing one with EPOLL_CTL_MOD/DEL     int fd = channel->fd();     (void)fd;     assert(channels_.find(fd) != channels_.end());     assert(channels_[fd] == channel);     assert(index == kAdded);     if (channel->isNoneEvent())     {       update(EPOLL_CTL_DEL, channel);       channel->set_index(kDeleted);     }     else     {       update(EPOLL_CTL_MOD, channel);     }   } } ​ ​ void EPollPoller::update(int operation, Channel* channel) {   struct epoll_event event;   memZero(&event, sizeof event);   event.events = channel->events();   event.data.ptr = channel;   int fd = channel->fd();   LOG_TRACE << "epoll_ctl op = " << operationToString(operation)     << " fd = " << fd << " event = { " << channel->eventsToString() << " }";   if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)   {     if (operation == EPOLL_CTL_DEL)     {       LOG_SYSERR << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd;     }     else     {       LOG_SYSFATAL << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd;     }   } } ​

handleEvent()

Channel::handleEvent(Timestamp receiveTime) ​ //eventloop里调用handleEvent //poller监听channel哪些事件发生了变化,然后上报给eventloop      //handleEvent就是根据目前fd上发生的事件,然后去调用对应的回调函数

Poller类

poller是基类

typedef std::vector ChannelList;  //放channel的 ​ //三个纯虚函数,在底层实现 virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0; virtual void updateChannel(Channel* channel) = 0; virtual void removeChannel(Channel* channel) = 0; ​ //是否包含channel virtual bool hasChannel(Channel* channel) const; ​ // typedef std::map ChannelMap; ​ //poller所指向的eventloop EventLoop* ownerLoop_;

epollpoller

Timestamp poll(int timeoutMs, ChannelList* activeChannels) override //在这个函数会调用void fillActiveChannels(int numEvents, ChannelList* activeChannels) const; //fillActiveChannels是填写活跃的连接,将epoll监测到的活跃事件返回给activeChannel里 ​ //成员变量 int epollfd_; //静态成员变量,表示eventList的大小,后面可以扩容 static const int kInitEventListSize = 16; typedef std::vector EventList; ​ ​ //epollfd在构造的时候使用epoll_create1创建 epollfd_(::epoll_create1(EPOLL_CLOEXEC)), ​ ​ //poll函数就是等待事件发生 Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels) {   LOG_TRACE << "fd total count " << channels_.size();   //调用epoll_wait来等待事件发生,   int numEvents = ::epoll_wait(epollfd_,                                &*events_.begin(),                                static_cast(events_.size()),                                timeoutMs);   int savedErrno = errno;   Timestamp now(Timestamp::now());        //如果有事件发生   if (numEvents > 0)   {     LOG_TRACE << numEvents << " events happened";     fillActiveChannels(numEvents, activeChannels);     if (implicit_cast(numEvents) == events_.size())    //扩容操作     {       events_.resize(events_.size()*2);     }   }   else if (numEvents == 0)   {     LOG_TRACE << "nothing happened";   }   else   {     // error happens, log uncommon ones     if (savedErrno != EINTR)     {       errno = savedErrno;       LOG_SYSERR << "EPollPoller::poll()";     }   }   return now; } ​ ​ void EPollPoller::fillActiveChannels(int numEvents,                                      ChannelList* activeChannels) const {   assert(implicit_cast(numEvents) <= events_.size());   for (int i = 0; i < numEvents; ++i)   {     //把事件取出来,事件和channel是绑定的,因此转化为channel,然后放入到activeChannels里     Channel* channel = static_cast(events_[i].data.ptr); #ifndef NDEBUG     int fd = channel->fd();     ChannelMap::const_iterator it = channels_.find(fd);     assert(it != channels_.end());     assert(it->second == channel); #endif     channel->set_revents(events_[i].events);     activeChannels->push_back(channel);   } } ​ const int kNew = -1; const int kAdded = 1; const int kDeleted = 2; //上面三个在channel是index ​ ​ void EPollPoller::updateChannel(Channel* channel) {   Poller::assertInLoopThread();   const int index = channel->index();   LOG_TRACE << "fd = " << channel->fd()     << " events = " << channel->events() << " index = " << index;   if (index == kNew || index == kDeleted)   {     // a new one, add with EPOLL_CTL_ADD     int fd = channel->fd();     if (index == kNew)     {       assert(channels_.find(fd) == channels_.end());       channels_[fd] = channel;     }     else // index == kDeleted     {       assert(channels_.find(fd) != channels_.end());       assert(channels_[fd] == channel);     } ​     channel->set_index(kAdded);     update(EPOLL_CTL_ADD, channel);   }   else   {     // update existing one with EPOLL_CTL_MOD/DEL     int fd = channel->fd();     (void)fd;     assert(channels_.find(fd) != channels_.end());     assert(channels_[fd] == channel);     assert(index == kAdded);     if (channel->isNoneEvent())     {       update(EPOLL_CTL_DEL, channel);       channel->set_index(kDeleted);     }     else     {       update(EPOLL_CTL_MOD, channel);     }   } }

EventLoop类

eventfd是专门用于事件通知的fd

其他线程可以调用eventloop,所以isInLoopThread就是判断是否在本线程里

// 把回调函数放入到队列里 void queueInLoop(Functor cb); ​ //执行回调函数 void runInLoop(Functor cb); ​ //执行上层的回调函数 void EventLoop::doPendingFunctors() ​ //主要的函数loop,在循环里,用poller监测活跃的channel,监测到后,去执行对应的事件,最后调用上层的回调函数 void EventLoop::loop() {   assert(!looping_);   assertInLoopThread();   looping_ = true;   quit_ = false;  // FIXME: what if someone calls quit() before loop() ?   LOG_TRACE << "EventLoop " << this << " start looping"; ​   while (!quit_)   {     activeChannels_.clear();     pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);     ++iteration_;     if (Logger::logLevel() <= Logger::TRACE)     {       printActiveChannels();     }     // TODO sort channel by priority     eventHandling_ = true;     for (Channel* channel : activeChannels_)     {       currentActiveChannel_ = channel;       currentActiveChannel_->handleEvent(pollReturnTime_);     }     currentActiveChannel_ = NULL;     eventHandling_ = false;     doPendingFunctors();   } ​   //这里说明一下,当执行doPendingFunctors();回调时,又加入到5个回调,如果不处理,那么下一次循环中,就会阻塞在poller_->poll   //因此queueInLoop里就会wakeup()一下,往wakeupfd写一个事件,然后poller监测到后就不阻塞了                       LOG_TRACE << "EventLoop " << this << " stop looping";   looping_ = false; }

eventloop都一个threadid,这个threadid是通过currentthread::tid()获得的

wakeupfd是通过createEventfd()创建的

createEventfd()是用linux的eventfd创建,当监听到写事件时,就会唤醒wakeup线程

单reactor和多reactor,

如果是单reactor,上层接受连接、读写,都是在一个eventloop里执行的,因此直接执行回调函数

如果是多reactor,比如上层接受连接,然后下发给子reactor,读写关闭等回调是放到子reactor里,子reactor是通过queueInloop来实现的

void EventLoop::runInLoop(Functor cb) {   if (isInLoopThread())   {     cb();   }   else   {     queueInLoop(std::move(cb));   } }

//将上层的回调函数插入到vector的 pendingFunctors里 void EventLoop::queueInLoop(Functor cb) {   {   MutexLockGuard lock(mutex_);   pendingFunctors_.push_back(std::move(cb));   } ​   //当回调正在执行时,说明上面刚加入的回调是在下一轮被执行,而不是马上执行。因此需要唤醒一下,让下一次loop()里的poller->poll()不再阻塞   if (!isInLoopThread() || callingPendingFunctors_)   {     wakeup();   } }

handleRead是wakeup的回调函数

//构造的时候设置了回调 wakeupChannel_->setReadCallback(     std::bind(&EventLoop::handleRead, this)); // we are always reading the wakeupfd wakeupChannel_->enableReading(); ​ ​ void EventLoop::handleRead() {   uint64_t one = 1;   ssize_t n = sockets::read(wakeupFd_, &one, sizeof one);   if (n != sizeof one)   {     LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";   } } ​ ​ //唤醒就是向wakeupFd写事件,wakeupchannel就会发生读事件 void EventLoop::wakeup() {   uint64_t one = 1;   ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);   if (n != sizeof one)   {     LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";   } } ​ ​ void EventLoop::doPendingFunctors() {   std::vector functors;        //callingPendingFunctors_表示正在执行回调   callingPendingFunctors_ = true;   {     MutexLockGuard lock(mutex_);     functors.swap(pendingFunctors_);  //交换的方式减少锁的临界区域范围,提升效率     //如果是先加锁,把所有的回调函数都执行完,然后再解锁。可能会导致死锁,因为在queueInLoop里,会出现在执行回调函数过程中     //还加入回调,加入回调也是加锁,然后就进入死锁。一个线程获得了一个锁,想再次获得该锁,就会死锁。     //因此陈硕通过加锁交换,解锁,再唤醒的方式   } ​   for (const Functor& functor : functors)   {     functor();   }   callingPendingFunctors_ = false; } ​

thread类

成员变量

using ThreadFunc = std::function; ​ bool started_; bool joined_; std::shared_ptr thread_; pid_t tid_;       // 在线程创建时再绑定 ThreadFunc func_; // 线程回调函数 std::string name_; static std::atomic_int numCreated_;

构造函数的时候,先把function放进去,start()的时候,才启动线程开启function

比较重要的是用信号量来保证线程创建的有序性,start()是启动线程

void Thread::start()                                                        // 一个Thread对象 记录的就是一个新线程的详细信息 {     started_ = true;     sem_t sem;     sem_init(&sem, false, 0);                                               // false指的是 不设置进程间共享     // 开启线程     thread_ = std::shared_ptr(new std::thread([&]() {         tid_ = CurrentThread::tid();                                        // 获取线程的tid值         sem_post(&sem);         func_();                                                            // 开启一个新线程 专门执行该线程函数     })); ​     // 这里必须等待获取上面新创建的线程的tid值     sem_wait(&sem); }

EventLoopThread类

成员变量

 using ThreadInitCallback = std::function;  ​  EventLoop *loop_;  bool exiting_;  Thread thread_;  std::mutex mutex_;             // 互斥锁  std::condition_variable cond_; // 条件变量  ThreadInitCallback callback_;

EventLoopThread对应的function,这个函数会以回调函数的形式放入到thread_里

在构造函数里,有这样的一条语句。function是放进去了,不过没有执行,这就是回调函数的好处

thread_(std::bind(&EventLoopThread::threadFunc, this), name)

// 下面这个方法是在单独的新线程里运行的 void EventLoopThread::threadFunc() {     EventLoop loop; // 创建一个独立的EventLoop对象 和上面的线程是一一对应的 级one loop per thread ​     if (callback_)     {         callback_(&loop);     } ​     {         std::unique_lock lock(mutex_);         loop_ = &loop;         cond_.notify_one();     }     loop.loop();    // 执行EventLoop的loop() 开启了底层的Poller的poll()          //只有等eventloop   quit()之后才会执行下面的语句,不然会一直阻塞在上面的语句     std::unique_lock lock(mutex_);     loop_ = nullptr; }

然后startloop()才是执行,会去调用thread的start(), thread的start()才是创建真正的线程

EventLoop *EventLoopThread::startLoop() {     thread_.start(); // 启用底层线程Thread类对象thread_中通过start()创建的线程 ​     //上面的start创建线程,是Thread类是调用std::thread来创建的,线程真正的创建在背后的内核,所以我们要一直等线程创建好。     EventLoop *loop = nullptr;     {         std::unique_lock lock(mutex_);         while(loop_ == nullptr)         {             //一直等待真正的线程创建好             cond_.wait(lock);         }         //线程创建好之后就可以返回对应的eventloop,说明eventloop的线程创建好了         loop = loop_;     }     return loop; }

总结:会把回调函数threadFunc先放入到thread_,但是这个时候还没有去创建线程,然后调用startLoop()函数时,再去调用thread的的start(),这个时候回去创建真正的线程。startLoop函数会返回创建好的eventloop指针,但是必须等线程创建好,这个时候就使用锁和条件变量,在回调函数threadFunc里等eventloop创建好后,会使用cond.notify_one();通知startloop线程创建好了*

EventLoopThreadPool

成员变量

EventLoop *baseLoop_; // 用户使用muduo创建的loop 如果线程数为1 那直接使用用户创建的loop 否则创建多EventLoop std::string name_; bool started_; int numThreads_; int next_; // 轮询的下标 std::vector> threads_;    //线程vector std::vector loops_;                           //EventLoop的vector

void EventLoopThreadPool::start(const ThreadInitCallback &cb) {     started_ = true; ​     for(int i = 0; i < numThreads_; ++i)     {         char buf[name_.size() + 32];         snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);         EventLoopThread *t = new EventLoopThread(cb, buf);          //绑定回调函数cb         threads_.push_back(std::unique_ptr(t));         loops_.push_back(t->startLoop());                           // 底层创建线程 绑定一个新的EventLoop 并返回该loop的地址     } ​     if(numThreads_ == 0 && cb)                                      // 整个服务端只有一个线程运行baseLoop     {         cb(baseLoop_);     } }

这个baseloop就是一开始程序运行的那个eventloop

// 如果工作在多线程中,baseLoop_(mainLoop)会默认以轮询的方式分配Channel给subLoop EventLoop *EventLoopThreadPool::getNextLoop() {     EventLoop *loop = baseLoop_;    // 如果只设置一个线程 也就是只有一个mainReactor 无subReactor 那么轮询只有一个线程 getNextLoop()每次都返回当前的baseLoop_ ​     if(!loops_.empty())             // 通过轮询获取下一个处理事件的loop     {         loop = loops_[next_];         ++next_;         if(next_ >= loops_.size())         {             next_ = 0;         }     }     return loop; }

getAllLoops就是返回所有的eventloop

InetAddress类

主要是封装sockaddr_in,封装地址、端口等等

还有输出地址、端口

Socket类

主要是封装文件描述符socket

提供socket相关的操作函数

bind listen accept

int fd() const { return sockfd_; } void bindAddress(const InetAddress &localaddr); void listen(); int accept(InetAddress *peeraddr); ​ void shutdownWrite(); ​ void setTcpNoDelay(bool on); void setReuseAddr(bool on); void setReusePort(bool on); void setKeepAlive(bool on);

Acceptor类

Acceptor类用来接受连接并处理连接

成员变量EventLoop是baseLoop,在TcpServer里会传入baseLoop给Acceptor

using NewConnectionCallback = std::function; ​ EventLoop *loop_; // Acceptor用的就是用户定义的那个baseLoop 也称作mainLoop Socket acceptSocket_; Channel acceptChannel_; NewConnectionCallback NewConnectionCallback_; bool listenning_; int idleFd_;       //用于当文件描述符过多时的操作,当文件描述符过多时,就丢到黑洞里

构造函数

Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport)     : loop_(loop)              //loop是上层传入的     , acceptSocket_(createNonblocking())       //调用函数来创建socket     , acceptChannel_(loop, acceptSocket_.fd())     , listenning_(false)     , idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC)) {     acceptSocket_.setReuseAddr(true);     acceptSocket_.setReusePort(true);     acceptSocket_.bindAddress(listenAddr);     // TcpServer::start() => Acceptor.listen() 如果有新用户连接 要执行一个回调(accept => connfd => 打包成Channel => 唤醒subloop)     // baseloop监听到有事件发生 => acceptChannel_(listenfd) => 执行该回调函数     acceptChannel_.setReadCallback(         std::bind(&Acceptor::handleRead, this));     //当有连接发生时,eventloop会监测到,然后socketfd的channel会去执行回调函数,这个回调函数就是handleRead }

//创建socket,用于构造函数 static int createNonblocking() {     int sockfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP);     if (sockfd < 0)     {         LOG_FATAL("%s:%s:%d listen socket create err:%d\n", __FILE__, __FUNCTION__, __LINE__, errno);     }     return sockfd; }

// listenfd有事件发生了,就是有新用户连接了 //该函数功能是响应和接受连接请求 void Acceptor::handleRead() {     InetAddress peerAddr;     //调用accept     int connfd = acceptSocket_.accept(&peerAddr);     if (connfd >= 0)     {         //NewConnectionCallback_上层回调,TcpServer传入的         if (NewConnectionCallback_)         {             NewConnectionCallback_(connfd, peerAddr); // 轮询找到subLoop 唤醒并分发当前的新客户端的Channel         }         else         {             ::close(connfd);         }     }     else     {         LOG_SYSERR << "in Acceptor::handleRead"; // Read the section named "The special problem of // accept()ing when you can't" in libev's doc. // By Marc Lehmann, author of libev. if (errno == EMFILE) {   ::close(idleFd_);   idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);   ::close(idleFd_);   idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC); }     } }

析构函数

Acceptor::~Acceptor() {  acceptChannel_.disableAll();    // 把从Poller中感兴趣的事件删除掉  acceptChannel_.remove();        // 调用EventLoop->removeChannel => Poller->removeChannel 把Poller的ChannelMap对应的部分删除    ::close(idleFd_); }

监听

void Acceptor::listen() {     listenning_ = true;     acceptSocket_.listen();         // listen     acceptChannel_.enableReading(); // acceptChannel_注册至Poller !重要 }

Buffer类

buffer内部是一个std::vector

前8个字节预留着用于记录数据长度,readIndex 指向可读取数据的初始位置,writeIndex指向空闲区的起始位置

image-20240422143959233

成员变量

std::vector buffer_; size_t readerIndex_; size_t writerIndex_; ​ static const size_t kCheapPrepend = 8; static const size_t kInitialSize = 1024;

每次读数据,readerIndex_向右移动。每次写数据,writerIndex向右移动。

//返回可读区域 size_t readableBytes() const { return writerIndex_ - readerIndex_; } //返回可写区域 size_t writableBytes() const { return buffer_.size() - writerIndex_; } //返回预留区域 size_t prependableBytes() const { return readerIndex_; }

取数据的操作

//取数据的操作 void retrieve(size_t len) {     if (len < readableBytes())     {         readerIndex_ += len; // 说明应用只读取了可读缓冲区数据的一部分,就是len长度 还剩下readerIndex+=len到writerIndex_的数据未读     }     else // len == readableBytes()     {         retrieveAll();     } } ​ //当把所有的数据取出来的时候,就把reader指针和writer指针归位,归到开头 void retrieveAll() {     readerIndex_ = kCheapPrepend;     writerIndex_ = kCheapPrepend; }

下面的函数是把数据取出来,并转化为string类型

// 把onMessage函数上报的Buffer数据 转成string类型的数据返回 std::string retrieveAllAsString() { return retrieveAsString(readableBytes()); } std::string retrieveAsString(size_t len) {     std::string result(peek(), len);     retrieve(len); // 上面一句把缓冲区中可读的数据已经读取出来 这里肯定要对缓冲区进行复位操作     return result; }

要插入数据,先看一下可写区大小够不够,如果不够,那么就要扩容

// buffer_.size - writerIndex_ void ensureWritableBytes(size_t len) {     if (writableBytes() < len)     {         makeSpace(len); // 扩容     } }

扩容操作,先看一下可写区和预留区的总和是否大于len,如果大于,就说明buffer容量还够,我们只需要把reader指针归位,writer指针往前移动一点距离。如果小于,就说明buffer容量不够,我们需要进行resize扩容

void makeSpace(size_t len) {     /**      * | kCheapPrepend |xxx| reader | writer |                     // xxx标示reader中已读的部分      * | kCheapPrepend | reader |          len          |      **/     if (writableBytes() + prependableBytes() < len + kCheapPrepend) // 也就是说 len > xxx + writer的部分     {         buffer_.resize(writerIndex_ + len);     }     else // 这里说明 len <= xxx + writer 把reader搬到从xxx开始 使得xxx后面是一段连续空间     {         size_t readable = readableBytes(); // readable = reader的长度         std::copy(begin() + readerIndex_,                   begin() + writerIndex_,  // 把这一部分数据拷贝到begin+kCheapPrepend起始处                   begin() + kCheapPrepend);         readerIndex_ = kCheapPrepend;         writerIndex_ = readerIndex_ + readable;     } }

添加数据

// 把[data, data+len]内存上的数据添加到writable缓冲区当中 void append(const char *data, size_t len) {     ensureWritableBytes(len);     std::copy(data, data+len, beginWrite());     writerIndex_ += len; }

从fd读数据,先开两个缓冲区。内核缓冲区里有数据,应该要尽快一次性读进来。而不是先扩容,再读数据,不然后面内核缓冲区还会继续来数据。

/**  * 从fd上读取数据 Poller工作在LT模式  * Buffer缓冲区是有大小的! 但是从fd上读取数据的时候 却不知道tcp数据的最终大小  *  * @description: 从socket读到缓冲区的方法是使用readv先读至buffer_,  * Buffer_空间如果不够会读入到栈上65536个字节大小的空间,然后以append的  * 方式追加入buffer_。既考虑了避免系统调用带来开销,又不影响数据的接收。  **/ ssize_t Buffer::readFd(int fd, int *saveErrno) {     // 栈额外空间,用于从套接字往出读时,当buffer_暂时不够用时暂存数据,待buffer_重新分配足够空间后,在把数据交换给buffer_。     char extrabuf[65536] = {0}; // 栈上内存空间 65536/1024 = 64KB ​     /*     struct iovec {         ptr_t iov_base; // iov_base指向的缓冲区存放的是readv所接收的数据或是writev将要发送的数据         size_t iov_len; // iov_len在各种情况下分别确定了接收的最大长度以及实际写入的长度     };     */ ​     // 使用iovec分配两个连续的缓冲区     struct iovec vec[2];     const size_t writable = writableBytes(); // 这是Buffer底层缓冲区剩余的可写空间大小 不一定能完全存储从fd读出的数据 ​     // 第一块缓冲区,指向可写空间     vec[0].iov_base = begin() + writerIndex_;     vec[0].iov_len = writable;     // 第二块缓冲区,指向栈空间     vec[1].iov_base = extrabuf;     vec[1].iov_len = sizeof(extrabuf); ​     // when there is enough space in this buffer, don't read into extrabuf.     // when extrabuf is used, we read 128k-1 bytes at most.     // 这里之所以说最多128k-1字节,是因为若writable为64k-1,那么需要两个缓冲区 第一个64k-1 第二个64k 所以做多128k-1     // 如果第一个缓冲区>=64k 那就只采用一个缓冲区 而不使用栈空间extrabuf[65536]的内容     const int iovcnt = (writable < sizeof(extrabuf)) ? 2 : 1;     const ssize_t n = ::readv(fd, vec, iovcnt); ​     if (n < 0)     {         *saveErrno = errno;     }     else if (n <= writable) // Buffer的可写缓冲区已经够存储读出来的数据了     {         writerIndex_ += n;     }     else // extrabuf里面也写入了n-writable长度的数据     {         writerIndex_ = buffer_.size();         append(extrabuf, n - writable); // 对buffer_扩容 并将extrabuf存储的另一部分数据追加至buffer_     }     return n; }

把数据写到fd上

// inputBuffer_.readFd表示将对端数据读到inputBuffer_中,移动writerIndex_指针 // outputBuffer_.writeFd标示将数据写入到outputBuffer_中,从readerIndex_开始,可以写readableBytes()个字节 ssize_t Buffer::writeFd(int fd, int *saveErrno) {     ssize_t n = ::write(fd, peek(), readableBytes());     if (n < 0)     {         *saveErrno = errno;     }     return n; }

TcpConnection类

  • TcpServer => Acceptor => 有一个新用户连接,通过accept函数拿到connfd

  • => TcpConnection设置回调 => 设置到Channel => Poller => Channel回调

往下层设置的回调

eventloop的doPendingFunctors();解决的上层回调就是这些

void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; } void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; } void setWriteCompleteCallback(const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; } void setCloseCallback(const CloseCallback &cb) { closeCallback_ = cb; } void setHighWaterMarkCallback(const HighWaterMarkCallback &cb, size_t highWaterMark) { highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }

下面这四个是注册到channel中

void handleRead(Timestamp receiveTime); void handleWrite(); void handleClose(); void handleError();

成员变量,socket是与channel绑定在一起

// Socket Channel 这里和Acceptor类似    Acceptor => mainloop    TcpConnection => subloop std::unique_ptr socket_; std::unique_ptr channel_;
const InetAddress localAddr_; const InetAddress peerAddr_; ​  EventLoop *loop_; // 这里是baseloop还是subloop由TcpServer中创建的线程数决定 若为多Reactor 该loop_指向subloop 若为单Reactor 该loop_指向baseloop  const std::string name_;  std::atomic_int state_;  bool reading_;

回调函数

// 这些回调TcpServer也有 用户通过写入TcpServer注册 TcpServer再将注册的回调传递给TcpConnection TcpConnection再将回调注册到Channel中 ConnectionCallback connectionCallback_;       // 有新连接时的回调 MessageCallback messageCallback_;             // 有读写消息时的回调 WriteCompleteCallback writeCompleteCallback_; // 消息发送完成以后的回调 HighWaterMarkCallback highWaterMarkCallback_; //控制收发速度 CloseCallback closeCallback_; size_t highWaterMark_;
// 数据缓冲区 Buffer inputBuffer_;    // 接收数据的缓冲区 Buffer outputBuffer_;   // 发送数据的缓冲区 用户send向outputBuffer_发

handleReader

// 将内核缓冲区中的数据读入到inputbuffer中,并调用用户注册的messageCallback // 读是相对服务器而言的 当对端客户端有数据到达 服务器端检测到EPOLLIN 就会触发该fd上的回调 handleRead取读走对端发来的数据 void TcpConnection::handleRead(Timestamp receiveTime) {     int savedErrno = 0;     //从fd上读数据到inputBuffer     ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);     if (n > 0) // 有数据到达     {         // 已建立连接的用户有可读事件发生了 调用用户传入的回调操作onMessage shared_from_this就是获取了TcpConnection的智能指针         messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);     }     else if (n == 0) // 客户端断开     {         handleClose();     }     else // 出错了     {         errno = savedErrno;         LOG_ERROR("TcpConnection::handleRead");         handleError();     } } ​

//当内核缓冲区由满变为不满的时候,就会产生写事件 //只要有写事件,就会一直写,这个是重点,所以写完之后,一定要关闭写事件 void TcpConnection::handleWrite() {     if (channel_->isWriting())     {         int savedErrno = 0;         ssize_t n = outputBuffer_.writeFd(channel_->fd(), &savedErrno);         if (n > 0)         {             outputBuffer_.retrieve(n);             //readableBytes等于0,说明buffer里的所有数据都写进去了             //如果不等于0,说明内核缓冲区里已经满了,写不进去了,             if (outputBuffer_.readableBytes() == 0)             {                 //写好了,就要关闭。因为只要有写事件,就会一直写下去。                 channel_->disableWriting();                 if (writeCompleteCallback_)                 {                     // TcpConnection对象在其所在的subloop中 向pendingFunctors_中加入回调                     loop_->queueInLoop(                         std::bind(writeCompleteCallback_, shared_from_this()));                 }                 if (state_ == kDisconnecting)                 {                     shutdownInLoop(); // 在当前所属的loop中把TcpConnection删除掉                 }             }         }         else         {             LOG_ERROR("TcpConnection::handleWrite");         }     }     else     {         LOG_ERROR("TcpConnection fd=%d is down, no more writing", channel_->fd());     } }

下面这个函数并没有用到

void TcpConnection::send(const std::string &buf) {     if (state_ == kConnected)     {         if (loop_->isInLoopThread()) // 这种是对于单个reactor的情况 用户调用conn->send时 loop_即为当前线程         {             sendInLoop(buf.c_str(), buf.size());         }         else         {             loop_->runInLoop(                 std::bind(&TcpConnection::sendInLoop, this, buf.c_str(), buf.size()));         }     } }

/**  * 发送数据 应用写的快 而内核发送数据慢 需要把待发送数据写入缓冲区,而且设置了水位回调  **/ void TcpConnection::sendInLoop(const void *data, size_t len) {     ssize_t nwrote = 0;     size_t remaining = len;     bool faultError = false; ​     if (state_ == kDisconnected) // 之前调用过该connection的shutdown 不能再进行发送了     {         LOG_ERROR("disconnected, give up writing");     } ​     // 表示channel_第一次开始写数据或者缓冲区没有待发送数据     if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)     {         nwrote = ::write(channel_->fd(), data, len);         if (nwrote >= 0)         {             remaining = len - nwrote;             if (remaining == 0 && writeCompleteCallback_)             {                 // 既然在这里数据全部发送完成,就不用再给channel设置epollout事件了                 loop_->queueInLoop(                     std::bind(writeCompleteCallback_, shared_from_this()));             }         }         else // nwrote < 0         {             nwrote = 0;             if (errno != EWOULDBLOCK) // EWOULDBLOCK表示非阻塞情况下没有数据后的正常返回 等同于EAGAIN             {                 LOG_ERROR("TcpConnection::sendInLoop");                 if (errno == EPIPE || errno == ECONNRESET) // SIGPIPE RESET                 {                     faultError = true;                 }             }         }     }     /**      * 说明当前这一次write并没有把数据全部发送出去 剩余的数据需要保存到缓冲区当中      * 然后给channel注册EPOLLOUT事件,Poller发现tcp的发送缓冲区有空间后会通知      * 相应的sock->channel,调用channel对应注册的writeCallback_回调方法,      * channel的writeCallback_实际上就是TcpConnection设置的handleWrite回调,      * 把发送缓冲区outputBuffer_的内容全部发送完成      **/     if (!faultError && remaining > 0)     {         // 目前发送缓冲区剩余的待发送的数据的长度         size_t oldLen = outputBuffer_.readableBytes();         if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_)         {             loop_->queueInLoop(                 std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));         }         outputBuffer_.append((char *)data + nwrote, remaining);         if (!channel_->isWriting())         {             channel_->enableWriting(); // 这里一定要注册channel的写事件 否则poller不会给channel通知epollout         }     } }

// 连接建立 void connectEstablished(); // 连接销毁 void connectDestroyed();

TcpServer

服务器

回调函数

//在callback.h文件里定义的有定义: using ConnectionCallback = std::function; using CloseCallback = std::function; using WriteCompleteCallback = std::function;

这些回调函数是用户定义的,传递给TcpConnection,TcpConnection传递给Channel。因此需要用户写回调函数,然后层层传递下去

ConnectionCallback connectionCallback_;       //有新连接时的回调 MessageCallback messageCallback_;             // 有读写事件发生时的回调 WriteCompleteCallback writeCompleteCallback_; // 消息发送完成后的回调 ​ void setThreadInitCallback(const ThreadInitCallback &cb) { threadInitCallback_ = cb; } void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; } void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; } void setWriteCompleteCallback(const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; }

setThreadNum,就是调用threadPool里的setThreadNum

// 设置底层subloop的个数 void TcpServer::setThreadNum(int numThreads) {     threadPool_->setThreadNum(numThreads); }

// 开启服务器监听 void TcpServer::start() {     if (started_++ == 0)    // 防止一个TcpServer对象被start多次     {         threadPool_->start(threadInitCallback_);    // 启动底层的loop线程池         //这个loop是mainloop,用来监听连接         loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get()));     } }

成员变量:

using ConnectionMap = std::unordered_map; ​ EventLoop *loop_; // baseloop 用户自定义的loop ​ const std::string ipPort_; const std::string name_; ​ std::unique_ptr acceptor_; // 运行在mainloop 任务就是监听新连接事件 ​ std::shared_ptr threadPool_; // one loop per thread ​ std::atomic_int started_; ​ int nextConnId_; ConnectionMap connections_; // 保存所有的连接

构造函数

TcpServer::TcpServer(EventLoop *loop,                      const InetAddress &listenAddr,                      const std::string &nameArg,                      Option option)     : loop_(CheckLoopNotNull(loop))     , ipPort_(listenAddr.toIpPort())     , name_(nameArg)     //比较重要的两个acceptor和threadPool,     , acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))     , threadPool_(new EventLoopThreadPool(loop, name_))     , connectionCallback_()     , messageCallback_()     , nextConnId_(1)     , started_(0) {     // 当有新用户连接时,Acceptor类中绑定的acceptChannel_会有读事件发生,执行handleRead()调用TcpServer::newConnection回调     acceptor_->setNewConnectionCallback(         std::bind(&TcpServer::newConnection, this, std::placeholders::_1, std::placeholders::_2)); }

TcpServer::newConnection

先从线程池中调用getNextLoop(),拿到一个eventLoop来管理新连接fd对应的channel。

创建新的地址和新的socket,然后创建新的连接TcpConnection。

把TcpConnection放入到对应的map里,并设置对应的函数回调

// 有一个新用户连接,acceptor会执行这个回调操作,负责将mainLoop接收到的请求连接(acceptChannel_会有读事件发生)通过回调轮询分发给subLoop去处理 void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr) {     // 轮询算法 选择一个subLoop 来管理connfd对应的channel     EventLoop *ioLoop = threadPool_->getNextLoop();     char buf[64] = {0};     snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);     ++nextConnId_;  // 这里没有设置为原子类是因为其只在mainloop中执行 不涉及线程安全问题     std::string connName = name_ + buf; ​     LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s\n",              name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());          // 通过sockfd获取其绑定的本机的ip地址和端口信息     sockaddr_in local;     ::memset(&local, 0, sizeof(local));     socklen_t addrlen = sizeof(local);     if(::getsockname(sockfd, (sockaddr *)&local, &addrlen) < 0)     {         LOG_ERROR("sockets::getLocalAddr");     } ​     InetAddress localAddr(local);     TcpConnectionPtr conn(new TcpConnection(ioLoop,                                             connName,                                             sockfd,                                             localAddr,                                             peerAddr));     connections_[connName] = conn;     // 下面的回调都是用户设置给TcpServer => TcpConnection的,至于Channel绑定的则是TcpConnection设置的四个,handleRead,handleWrite... 这下面的回调用于handlexxx函数中     conn->setConnectionCallback(connectionCallback_);     conn->setMessageCallback(messageCallback_);     conn->setWriteCompleteCallback(writeCompleteCallback_); ​     // 设置了如何关闭连接的回调     conn->setCloseCallback(         std::bind(&TcpServer::removeConnection, this, std::placeholders::_1)); ​     ioLoop->runInLoop(         std::bind(&TcpConnection::connectEstablished, conn)); } ​

相关内容

热门资讯

亚马逊云科技 Amazon B... 前言大模型应用发展迅速,部署一套AI应用的需求也越来越多,从头部署花费时...
7款读文献的AI神器,可总结分... 1、包阅一款国内文档分析工具,可以上传文档实时分析。支持各种文件类型,如...
Datawhale AI夏令营... 讯飞机器翻译挑战赛题赛题数据分析NLP前置知识GRUSeq2SeqEncoder编码器Decoder...
玩家交流《Wepoke总结》软... 玩家交流《Wepoke总结》软件透明挂!(软件)透明挂机器人(2025已更新)(哔哩哔哩);致您一封...
20240621 每日AI必读... 🤖GPT-4 通过图灵测试!!研究人员称人们在图灵测试中...
黑科技代打!微扑克模拟器辅助脚... 【福星临门,好运相随】;黑科技代打!微扑克模拟器辅助脚本(辅助挂)原来真的是有挂(真的有挂)详细教程...
何恺明新作再战AI生成:入职M... 梦晨 发自 凹非寺量子位 | 公众号 QbitAI何恺明入职MIT副教授后,首次带队的...
YOLOv8白皮书-第Y8周:... 本文为365天深度学习训练营中的学习记录博客 原作者:K同学啊|接辅导、项目定制请根据...
记者爆料《Wepoke小程序》... 您好,这款游戏可以开挂的,确实是有挂的,需要了解加微【841106723】很多玩家在这款游戏中打牌都...
一分钟揭秘《德州版Wepoke... 一分钟揭秘《德州版Wepoke》软件透明挂!(透明挂)软件渠道(2020已更新)(哔哩哔哩);软件透...