1、协程调度器介绍
/// 协程调度器名称 std::string m_name; /// 互斥锁 MutexType m_mutex; /// 线程池 std::vector m_threads; /// 任务队列 std::list m_tasks; /// 线程池的线程ID数组 std::vector m_threadIds; /// 工作线程数量,不包含use_caller的主线程 size_t m_threadCount = 0; /// 活跃线程数 std::atomic m_activeThreadCount = {0}; /// idle线程数 std::atomic m_idleThreadCount = {0}; /// 是否use caller bool m_useCaller; /// use_caller为true时,调度器所在线程的调度协程 Fiber::ptr m_rootFiber; /// use_caller为true时,调度器所在线程的id int m_rootThread = 0; /// 是否正在停止 bool m_stopping = false;
void Scheduler::start() { SYLAR_LOG_DEBUG(g_logger) << "start"; MutexType::Lock lock(m_mutex); if (m_stopping) { SYLAR_LOG_ERROR(g_logger) << "Scheduler is stopped"; return; } SYLAR_ASSERT(m_threads.empty()); m_threads.resize(m_threadCount); for (size_t i = 0; i < m_threadCount; i++) { m_threads[i].reset(new Thread(std::bind(&Scheduler::run, this), m_name + "_" + std::to_string(i))); m_threadIds.push_back(m_threads[i]->getId()); } }
void Scheduler::run() { SYLAR_LOG_DEBUG(g_logger) << m_name << " run"; setThis(); set_hook_enable(true); // 少用系统调用 // if(sylar::GetThreadId() != m_rootThread) { // 第二个条件是将use_caller为true时,非caller线程中的t_scheduler_fiber赋值,caller线程中的t_scheduler_fiber在Scheduler初始化时已经赋值了 // if (!m_useCaller || (t_scheduler_fiber == nullptr && m_useCaller)) { t_scheduler_fiber = Fiber::GetThis(); // } //初始话一个没有任务时执行的协程 Fiber::ptr idle_fiber(new Fiber(std::bind(&Scheduler::idle, this))); Fiber::ptr cb_fiber; ScheduleTask task; while(true) { task.reset(); bool need_tickle = false; { MutexType::Lock lock(m_lock); //这个for的作用是找到一个可以给当前线程执行的任务 auto it = m_tasks.begin(); for(; it != m_tasks.end(); ++it) { if (it ->threadid != -1 && it->threadid != sylar::GetThreadId()) { need_tickle = true; continue; } SYLAR_ASSERT(it->cb || it->fiber); // 状态可以再加一个assert task = *it; ++m_activeThreadCount; m_tasks.erase(it++); break; } // 通知其他线程 need_tickle |= (it != m_tasks.end()); } if(need_tickle) { tickle(); } if(task.fiber) { // 这个任务是fiber task.fiber->swapIn(); --m_activeThreadCount; if(task.fiber->getState() == Fiber::READY) { schedule(task.fiber); } else if(task.fiber->getState() != Fiber::TERM && task.fiber->getState() != Fiber::EXCEPT) { task.fiber->setState(Fiber::HOLD); } // 让fiber资源尽早释放,不用等到下个fiber来的时候再释放 task.reset(); } else if(task.cb) { // 这个任务是callback if(cb_fiber) { cb_fiber->reset(task.cb, m_useCaller); } else { cb_fiber.reset(new Fiber(task.cb)); } task.reset(); cb_fiber->swapIn(); --m_activeThreadCount; if(cb_fiber->getState() == Fiber::READY) { schedule(task.fiber); } else if(cb_fiber->getState() != Fiber::TERM && cb_fiber->getState() != Fiber::EXCEPT) { cb_fiber->setState(Fiber::HOLD); } // 让fiber资源尽早释放,不用等到下个cb来的时候再释放 cb_fiber.reset(); } else { // 进到这个分支情况一定是任务队列空了,调度idle协程即可 if (idle_fiber->getState() == Fiber::TERM) { // 如果调度器没有调度任务,那么idle协程会不停地切换调度协程和idle协程,不会结束, //如果idle协程结束了,那一定是调度器停止了,在stop函数中控制 SYLAR_LOG_DEBUG(g_logger) << "idle fiber term"; break; } ++m_idleThreadCount; idle_fiber->swapIn(); --m_idleThreadCount; if(idle_fiber->getState() != Fiber::TERM && idle_fiber->getState() != Fiber::EXCEPT) { idle_fiber->setState(Fiber::HOLD); } } } // SYLAR_LOG_DEBUG(g_logger) << t_scheduler_fiber.use_count(); // t_scheduler_fiber = nullptr; }
void Scheduler::stop() { SYLAR_LOG_DEBUG(g_logger) << "stop"; if (stopping()) { return; } // 这里控制会控制idle协程的退出 m_stopping = true; /// 只能由创建线程发起scheduler的线程发起stop // 当m_useCaller为true时,创建线程发起scheduler的线程中GetThis() == this // 当m_useCaller为false时,创建线程发起scheduler的线程中GetThis() == nullptr if (m_useCaller) { SYLAR_ASSERT(GetThis() == this); } else { SYLAR_ASSERT(GetThis() != this); } for (size_t i = 0; i < m_thread_nums; i++) { tickle(); } // 这里的主要目的,use_caller是ture的时候,在这里让caller的调度协程进行任务调度 // 如果将这里删除,那么caller线程永远不会进行任务调度 if (m_scheduleFiber) { tickle(); m_scheduleFiber->swapIn(); } // 目的是防止调度线程先执行完,销毁了Scheduler对象 // 即使使用detach也是不可以的,这里涉及到多个线程使用Scheduler对象,如果线程还没退出对象销毁了,这个时候就会造成段错误 std::vector thrs; { MutexType::Lock lock(m_lock); thrs.swap(m_threads); } for (auto &i : thrs) { i->join(); } SYLAR_LOG_DEBUG(g_logger) << t_scheduler_fiber.use_count(); }