C++协程一直是大家比较关注的一个技术点, 在C++20 coroutine属性正式推出之前, 就已经有很多项目实装了, 实现机制也略也差异, 下面先来简单看下比较常见的实现方式:
一种是Linux下不跨平台的ucontext实现, 当然也包括从ucontext机制衍生出来的一系列实现(删除不必要的状态save, restore代码, 提高性能, 或者直接使用平台相关的汇编指令实现ucontext类似的效果(boost.context实现)) emsp; 这种方式实现的都是我们平时所说的full stack模式, stack size一般设置成业务能够平稳运转的大小, stack的处理是比较自动的, 但如果stack上的变量过多, 协程函数比较复杂, 默认协程栈大小设置的偏小的话就容易出现爆栈的问题, 栈大小设置的过大, Stack交换又会带来性能问题. 一个比较特殊的情况, 游戏GamePlay相关的业务使用这种模式, 由于本身的复杂度和需求迭代的诉求, 比较容易出现stack size(内存占用)和性能难以平衡的问题.
这是一种比较hack的实现机制, 这种方式本身也是sackless的, lua的vm就用该方式来实现lua的coroutine特性, 在有GC的情况下, stackless本身不是障碍(数据不需要在栈上定义), Lua本身的协程机制也工作得非常稳定. 对于无GC的情况, 也有部分项目使用这种模式来实现自己的协程, 但对比stackful的协程使用上会稍显麻烦, 原来可以正常使用的stack变量需要手动处理, 存储在特定的地方.
C++20的coroutine特性也慢慢被大家熟知, c++20使用的也是stackless的coroutine实现, 对比stackful的模式, C++20的stackless实现泛用性会更好一点, 另外除了依赖compiler对代码的自动处理外, 不依赖不同平台的底层机制, 这样也是一种更健康的特性增加方式. 对比duff device, c++20的coroutine会自动对协程函数内的代码做处理, 将相关的stack变量变为heap变量, 这样也解决了stackless协程对stack变量的支持问题(本篇主要是基于C++17的corountine实现, C++20的后续的文章再做展开了).
C++ coroutine不同os的实现分析, 相关特性的介绍, 都有大量的相关资料, 本文主要针对如何更好的使用coroutine, 如何利用coroutine特性来实现一个业务侧简单易用的协程调度器, 不会对coroutine的相关特性做太详细的展开, 也会结合具体的案例(Rpc调用链), 来看一下怎么用协程来简化多个节点之间的异步请求处理. 会重点关注在可控可扩展的任务调度器本身.
rstudio c++ framework对协程的支持主要是通过两层功能来做的, 底层的stackless coroutine机制 + 上层的Scheduler机制.
目前Scheduler提供的主要特性如下:
//获取当前运行的协程关联的SchedTask对象
#define rco_self_task()
//获取当前运行的协程关联的Scheduler对象
#define rco_current_scheduer()
//挂起当前协程到Immediate Queue
#define rco_yield()
//协程的起始指令(一般位于协程代码的最前端)
#define rco_begin() co_begin()
//协程的结束指令(一般位于协程代码的最末尾)
#define rco_return co_return
//等待系统返回一个ResumeObject对象后继续执行
#define rco_await(...)
//等待下一帧开始后继续执行
#define rco_yield_next_frame()
//立即结束当前协程(后续协程代码将不会被执行)
#define rco_kill_self()
//等待指定时间(毫秒为单位)后继续执行
#define rco_yield_sleep(SLEEP_TIME_MS)
//创建一个子协程
#define rco_yield_create_task(IS_CHILD, FUNC, ...)
//等待一个Rpc请求返回后继续执行
#define rco_yield_rpc_call(PROXY, FUNC_NAME, ARGS, TIMEOUT_MS)
//等待一个协程结束后继续执行
#define rco_yield_wait_task_finish(TARGET_TASK_ID, TIMEOUT_MS)
//获取系统传递的ResumeObject对象
#define rco_get_resume_object(ResumeObjectType)
//最近一次系统等待是否成功执行
#define rco_last_invoke_suc()
//向系统发出完成事件并立即结束当前协程
#define rco_emit_finish_event(FinishEvent)
下面我们先从stackless coroutine开始来看一下框架协程机制的整体实现思路.
当前框架的无栈协程实现是基于switch case的duff device特性来实现的, 通过对应的case label, 可以在重入一个函数的时候跳转到不同的label, 从而实现协程的挂起和恢复, 提供最原始的协程机制支撑. 当然, 这种通过hack方式达成的机制一般来说会多多少少带来一些使用上的限制, 框架目前的实现是分成两步来弱化限制, 让整个机制的使用更方便的:
我们先来看一个简单的示例代码:
co_start([](std::pair<int, int>& p, int& c, std::string& local, LocalStruct& locals) -> RStudioCoTask::RpcCoroutineTask
{
LOG_INFO(0, "lambda_f");
return nullptr;
}, std::make_pair(1, 2), 3, "", LocalStruct{});
如上所示, 通过额外扩展的函数参数表(上例中的 p, c, local, locals等变量), 虽然我们没法使用栈变量(比较好的一点大部分情况下明确栈变量编译器都会直接报错, 有直接的提示), 我们可以通过参数表来声明需要在协程中使用到的临时变量, 相当于通过手动的方式去解决C++20 compiler编译器自动处理的那一部分. 略微麻烦一点, 但当前的整个实现, 除了刚才说的栈变量需要用特殊的方式使用外, 已经可以比较好的使用函数式编程的方式去组织编写业务代码了.
整体机制利用了c/c++一个比较trick的特性, duff device特性, 通过上图中的宏也可以看到, switch 的case label可以穿插do while(), 看着比较奇怪, 平时我们也很少这样来写代码, 但编译器确实是能够正确支持这种行为的. 一方面我们的宏实现 (do{ case: }while(0))能够正确运行, 另外因为duff device特性, 像for(){ co_yield();} 这种循环中嵌套yield()的情况也能正常运行了.
对于一段代码, 整个协程化的机制大致如下:
coroutine的核心对象主要有以下几个:
class RSTUDIO_APP_SERVICE_API CoroutineHandle
{
public:
bool done() const;
int get_state() const;
void resume();
void destroy();
const void* get_user_data() const;
void set_user_data(const void* ud);
};
外部直接访问的协程对象, 相当于CoPromise的代理句柄, 内部保存了CoPromise对象, 外界可以通过它访问当前协程的执行状态, 以及对协程进行resume(), destroy()等操作.
作为协程的返回值的基类类型, 可以对协程的起始行为和结束行为进行控制, 如启动时是否挂起, 结束时是否自动destroy等, 使用Scheduler后这部分都统一使用CoTaskForScheduler作为返回值, 起始行为和结束行为是固定的, 起始的时候协程自动挂起(方便附加外部的额外数据), 结束的时候不自动删除CoPromise对象(由Scheduler负责删除).
协程对象的基类, CoroutineHandle上提供的能力都是通过它来完成的.
协程对象的具体实现, 通过模板, 我们可以很好的完成 参数列表 -> std::tuple<>, 外部函数-> std::function<>的转换存储, 我们重点关注resume()的实现:
virtual void resume() override
{
if (done())
{
return;
}
if (m_function)
{
push_promise(this);
apply(m_function, m_params);
pop_promise();
}
return;
}
apply()主要完成展开m_params并调用m_function的操作, 具体代码:
template<typename F, typename T, std::size_t... I>
auto apply_impl(F f, T& t, std::index_sequence<I...>)
{
return f(std::get<I>(t)...);
}
template<typename F, typename T>
auto apply(F f, T& t)
{
return apply_impl(f, t, std::make_index_sequence<std::tuple_size<T>::value>());
}
需要注意的是push_promise()和pop_promise(), 这两个函数存在的目的是正确的设置当前协程栈栈顶的协程对象, 这样我们前面看到的宏中使用的 rstudio::co_task::current_promise() 才能正确的获取到当前的协程对象, 正确的执行相关的逻辑代码段.
当前这版stackless coroutine的实现参考了c++20的实现, 苗老师也合理考虑了迁移到C++20的成本, 从易用性, 可控性, 提供的功能的角度感觉都是比较好的.
一些复杂的机制, 如子协程的创建和等待, 依赖外围系统的Sleep机制, 异步Rpc机制等, 肯定不适合直接在最底层的stackless coroutine上直接实现, 目前框架采用的方式是在外围再包装一层Scheduler, 来完成一些复杂功能和业务强相关功能的封装和实现, 这样底层的stackless coroutine侧重基础机制的实现, 外围的Scheduler侧重复杂逻辑的实现和控制, 从机制和实现上来说都会相对清晰.
如上图所示, Scheduler主要提供对SchedTask的管理, 以及三个基础机制方便协程相关业务机制的实现:
class RSTUDIO_APP_SERVICE_API SchedTask
{
public:
uint64_t GetId() const;
int Run();
bool IsDone() const;
void BindSleepHandle(jobs::task_info_ptr handle);
template<typename AwaitHandleType>
auto BindAwaitHandle(AwaitHandleType&& awaitHandle) -> std::enable_if_t<std::is_base_of<IAwaitHandle, AwaitHandleType>::value>;
void ClearAwaitHandle();
bool HasAwaitHandle() const;
const reflection::UserObject& GetAwaitHandle() const;
const reflection::UserObject& GetExtraFinishObject() const;
template<typename AwaitEventType>
auto BindResumeObject(AwaitEventType&& awaitEvent) -> std::enable_if_t<std::is_base_of<ResumeObject, AwaitEventType>::value>;
template<typename AwaitEventType>
auto GetResumeObjectAsType()->std::enable_if_t<std::is_base_of<ResumeObject, AwaitEventType>::value, AwaitEventType*>;
bool HasResumeObject() const noexcept;
void ClearResumeObject();
template<typename ExtraType>
auto BindExtraFinishObject(ExtraType&& extraObj)->std::enable_if_t<std::is_base_of<CoEventExtra, ExtraType>::value>;
bool IsLastInvokeSuc() const noexcept;
bool IsLastInvokeTimeOut() const noexcept;
bool IsLastInvokeFailed() const noexcept;
void AddChildTask(uint64_t tid);
void AddWaitNofityTask(uint64_t tid);
const auto& GetChildTaskArray() const;
const auto& GetWaitNotifyArray() const;
void Terminate();
Scheduler* GetManager() const;
protected:
uint64_t mTaskId;
co_task::CoroutineHandle mCoHandle;
Scheduler* mManager;
std::vector<uint64_t> mChildArray;
std::vector<uint64_t> mWaitNotifyArray;
//value used to return from coroutine
reflection::UserObject mAwaitHandle;
//value used to send to coroutine(now as a AwaitEvent)
reflection::UserObject mResumeObject;
//value use to add for finish event(so external user can fill something in it)
reflection::UserObject mExtraFinishObject;
jobs::task_info_ptr mSleepHandle;
bool mIsTerminate = false;
};
SchedTask完成对底层Coroutine对象的封装, 首先它是底层coroutine的容器, 我们在Task创建的时候对co_task::CoPromise对象和SchedTask做了绑定, 其次上面提到的三种机制关联的数据也是存储在其中的, SchedTask代码如下:
uint64_t Scheduler::CreateTaskFromHandle(co_task::CoroutineHandle handle)
{
auto tid = ++mIdCount;
SchedTask* schedTask = new SchedTask(tid, handle, this);
handle.set_user_data(schedTask);
mTaskMap.emplace(tid, schedTask);
AddToImmRun(schedTask);
return tid;
}
这样在协程执行的过程中, 我们都能很方便的拿到关联的SchedTask, 然后再拿到SchedTask上存储的数据, 来完成我们想要做的事情.
像其他几个成员变量, 是前面介绍的三种机制的支撑数据, 通过代码中的注释也能看到:
Scheduler的代码比较多, 主要就是SchedTask的管理器, 另外也完成对前面提到的三种机制的支持, 文章重点分析一下三种机制的实现代码.
void Scheduler::Update()
{
//Handle need kill task first
while(!mNeedKillArray.empty())
{
auto tid = *(mNeedKillArray.rbegin());
mNeedKillArray.pop_back();
auto* tmpTask = GetTaskById(tid);
if (tmpTask != nullptr)
{
DestroyTask(tmpTask);
}
}
mReadyTasks.insert(mReadyTasks.end(), mFrameStartTasks.begin(), mFrameStartTasks.end());
mFrameStartTasks.clear();
while (RSTUDIO_LIKELY(!mReadyTasks.empty()))
{
auto* schedTask = *(mReadyTasks.rbegin());
mReadyTasks.pop_back();
schedTask->Run();
if (RSTUDIO_UNLIKELY(schedTask->IsDone()))
{
//Try to remove here
DestroyTask(schedTask);
}
else
{
if (schedTask->HasAwaitHandle())
{
const auto& obj = schedTask->GetAwaitHandle();
auto* awaitHandle = (IAwaitHandle*)obj.pointer();
auto awaitMode = awaitHandle->Invoke(schedTask, this);
switch (awaitMode)
{
case rstudio::logic::AwaitMode::AwaitNever:
//The task need delivered to immediate queue
AddToImmRun(schedTask);
break;
case rstudio::logic::AwaitMode::AwaitNextframe:
//The task need delivered to nextframe queue
AddToNextFrameRun(schedTask);
break;
case rstudio::logic::AwaitMode::AwaitForNotifyNoTimeout:
case rstudio::logic::AwaitMode::AwaitForNotifyWithTimeout:
HandleTaskAwaitForNotify(schedTask, awaitMode, awaitHandle->GetTimeoutMs());
break;
case rstudio::logic::AwaitMode::AwaitDoNothing:
//Just do nothing
break;
default:
RSTUDIO_ERROR(CanNotRunToHereError());
break;
}
}
else
{
//Just add to immediate run array
AddToImmRun(schedTask);
}
}
}
}
上面是Scheduler的Update()实现代码, 在每个task->Run()后, 外围代码会判断task上是否有设置的AwaitHandle对象, 有的话则调用关联的AwaitHandle的Invoke()方法, 同时Inovke方法的返回值也提供了对协程后续行为的控制, 主要是以下几种模式:
Resume机制主要是通过唤醒在Await队列中的协程的时候向关联的Task对象传递ResumeObject实现的:
//Not a real event notify here, just do need things
template <typename E>
auto ResumeTaskByAwaitObject(E&& awaitObj) -> std::enable_if_t<std::is_base_of<ResumeObject, E>::value>
{
auto tid = awaitObj.taskId;
if (IsTaskInAwaitSet(tid))
{
//Only in await set task can be resume
auto* task = GetTaskById(tid);
if (RSTUDIO_LIKELY(task != nullptr))
{
task->BindResumeObject(std::forward<E>(awaitObj));
AddToImmRun(task);
}
OnTaskAwaitNotifyFinish(tid);
}
}
然后再通过rco_get_resume_object()宏在协程代码中获取对应的ResumeObject. 宏的声明代码如下:
#define rco_get_resume_object(ResumeObjectType) rco_self_task()->GetResumeObjectAsType<ResumeObjectType>()
本身就是一个简单的传值取值的过程. 注意传递ResumeObject后, 我们也会马上将协程加入到mReadTasks队列中以方便在接下来的Update中唤醒它.
有一些特殊的场合, 可能需要协程执行完成后向业务系统发起通知并传递返回值, 比如Rpc Service的协程支持实现, 这里直接以RpcService的协程支持为例来说明好了.
首先是业务侧, 在创建完协程后, 需要给协程绑定后续协程执行完成后做进一步操作需要的数据:
ponder::Value val = method->DoInvoke(std::move(args));
auto& cotask = val.to<reflection::UserObject>().ref<logic::CoTaskForScheduler>();
logic::SchedTask* task = Scheduler::GetTaskFromCoTask(cotask);
assert(task);
CoRpcFinishEventExtra extraData;
extraData.connId = connPtr->GetId();
extraData.funcId = reqHead.funcNameId;
extraData.serviceId = reqHead.serviceNameId;
extraData.rpcReqId = reqHead.reqId;
task->BindExtraFinishObject(std::move(extraData));
这里将Connection id等信息通过ExtraFinishObject直接绑定到了Task上. 然后业务侧会在操作执行完成的时候使用rco_emit_finish_event()宏来结束协程并向业务系统发起通知:
logic::CoTaskForScheduler HeartBeatService::DoHeartBeat(logic::Scheduler& scheduler, int testVal)
{
auto retval = scheduler.CreateTask(
[testVal]() -> CoTaskForScheduler {
rco_begin();
rco_yield_sleep(1000);
printf("service yield call finish!\n");
rco_emit_finish_event(CoRpcFinishEvent(ponder::Value(testVal + 1)));
rco_return;
}
);
return retval.second;
}
rco_emit_finish_event()宏实际调用的是Scheduler上的EmitTaskFinishEvent()方法:
template<typename E>
auto EmitTaskFinishEvent(E&& event, SchedTask* task) -> std::enable_if_t<std::is_base_of<CoFinishEvent, E>::value>
{
assert(task != nullptr);
event.extraFinishObj = task->GetExtraFinishObject();
mEventManager.emit(std::forward<E>(event));
KillTask(task->GetId());
}
注意这个地方task上存储的ExtraFinishObject会作为event的一部分直接传递给业务系统, 并在发起事件后调用删除协程任务的方法.
最后通过事件机制, 业务系统获取到足够的信息后完成具体的业务操作, 对于 RpcService来说就是向请求方返回执行结果了:
void ServiceServer::receive(const CoRpcFinishEvent& ev)
{
assert(ev.extraFinishObj != ponder::UserObject::nothing && ev.extraFinishObj.getClass().get_id() == MetatypeHash::hash<CoRpcFinishEventExtra>());
auto& extraData = ev.extraFinishObj.ref<CoRpcFinishEventExtra>();
auto iter = mAccteptedConnectionMap.find(extraData.connId);
if (iter != mAccteptedConnectionMap.end())
{
if (ev.rpcResultType == network::RpcResponseResultType::RequestSuc)
{
ponder::Value tmpVal = ev.retValue;
ResponseRpcRequestNormal(iter->second, extraData.serviceId, extraData.rpcReqId, std::move(tmpVal));
}
else
{
ResponseRpcRequestWithError(iter->second, extraData.serviceId, extraData.rpcReqId, ev.rpcResultType);
}
}
}
本身机制是服务于业务需要的, 这里通过具体的业务系统实现来理解机制, 会相对简单.
mScheduler.CreateTask(
[clientProxy](std::pair<int, int>& p, int& c, std::string& local, LocalStruct& locals)-> logic::CoTaskForScheduler
{
rco_begin();
{
c = p.first + p.second + c;
locals.local_i = 1024;
locals.local_p = "balabala";
auto* task = rco_self_task();
////LOG_INFO(0, "step1 %d", p.first);
printf("step1 %d\n", p.first);
}
rco_yield_next_frame();
{
////LOG_INFO(0, "step2 %d", p.second);
printf("step2 %d\n", p.second);
c = 0;
while (c < 5)
{
printf("in while loop c=%d\n", c);
rco_yield_sleep(1000);
c++;
}
for (c = 0; c < 5; c++)
{
printf("in for loop c=%d\n", c);
rco_yield_next_frame();
}
}
rco_yield();
////rco_kill_self();
{
////LOG_INFO(0, "step3 %d", c);
printf("step3 %d\n", c);
rco_yield_create_task(false, []()-> logic::CoTaskForScheduler {
rco_begin();
printf("from child coroutine!\n");
rco_yield_sleep(2000);
printf("after child coroutine sleep\n");
rco_return;
});
////auto newTaskId = createEvent->newTaskId;
{
auto* createEvent = rco_get_resume_object(logic::CreateTaskResumeObject);
printf("new task create in coroutine: %llu\n", createEvent->newTaskId);
}
printf("Begin wait for task!\n");
rco_yield_wait_task_finish(
rco_get_resume_object(logic::CreateTaskResumeObject)->newTaskId,
10000
);
printf("After wait for task!\n");
}
rco_await(clientProxy->DoHeartBeat(3));
////__sched_yield_rpc_request(clientProxy, "DoHeartBeat", rstudio::reflection::Args{ 3 }, 1000);
{
////assert(rco_last_invoke_suc());
auto* rpcEvent = rco_get_resume_object(logic::RpcResumeObject);
if (rpcEvent->rpcResultType == rstudio::network::RpcResponseResultType::RequestSuc)
{
assert(rpcEvent->totalRet == 1);
assert(rpcEvent->retValue.to<int>() == 4);
printf("rpc coroutine run suc!\n");
}
else
{
printf("rpc coroutine run failed! result = %d \n", (int)rpcEvent->rpcResultType);
}
}
rco_yield_sleep(5000);
{
////LOG_INFO(0, "step4 %s", locals.local_p);
printf("step4 %s, after 5s sleep\n", locals.local_p);
}
rco_return;
}, std::make_pair(1, 2), 3, "", LocalStruct{});
执行结果:
step1 1
step2 2
in while loop c=0
in while loop c=1
in while loop c=2
in while loop c=3
in while loop c=4
in for loop c=0
in for loop c=1
in for loop c=2
in for loop c=3
in for loop c=4
step3 5
new task create in coroutine: 2
Begin wait for task!
from child coroutine!
after child coroutine sleep
After wait for task!
service yield call finish!
rpc coroutine run suc!
step4 balabala, after 5s sleep
因为不能像c++20的实现那样, 通过compiler多次调整代码的方式来实现协程机制, 所以像本地变量的使用等, 都是需要我们自己注意并手动声明解决的.
我们目前的做法是定义一个LocalStructs, 把需要用到的变量声明在其中, 然后再作为函数参数传递到函数中使用.
大部分情况本地变量的使用编译器会直接报错, 如 for(int i = 0; ...){rco_yield();}这种, 但如果在函数实现头部声明 int i = 0;再使用, 上面的代码会出现死循环, 原因是每次function执行的时候都会将i的值重置为0, 这个是一定要注意的.
行为异常, 优先检查是否使用了局部变量.
整体实现思路上是具备通用性的, 在默认使用GCC8.3的TOS3下面, 一些复杂业务场景, 比如游戏这种业务可能随需求发生变更, 导致复杂度更高, stack size不那么可控的情况下, 对比选择stackful的coroutine方案, 使用stackless的实现, 是一个更可控的方式.
在具备条件的情况下, 推荐直接使用C++20的coroutine特性, 笔者项目实装对比下来, 整体的业务侧编码舒适度, 以及可控性, 还是高非常多的. 后续的文章会承接本篇, 补充C++20相关实现的思路.