前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >如何在C++17中实现stackless coroutine以及相关的任务调度器

如何在C++17中实现stackless coroutine以及相关的任务调度器

作者头像
fangfang
发布2021-10-29 15:19:30
发布2021-10-29 15:19:30
2K00
代码可运行
举报
文章被收录于专栏:方方的杂货铺方方的杂货铺
运行总次数:0
代码可运行

1. 前言

C++协程一直是大家比较关注的一个技术点, 在C++20 coroutine属性正式推出之前, 就已经有很多项目实装了, 实现机制也略也差异, 下面先来简单看下比较常见的实现方式:

1.1 基于ucontext的实现

一种是Linux下不跨平台的ucontext实现, 当然也包括从ucontext机制衍生出来的一系列实现(删除不必要的状态save, restore代码, 提高性能, 或者直接使用平台相关的汇编指令实现ucontext类似的效果(boost.context实现)) emsp; 这种方式实现的都是我们平时所说的full stack模式, stack size一般设置成业务能够平稳运转的大小, stack的处理是比较自动的, 但如果stack上的变量过多, 协程函数比较复杂, 默认协程栈大小设置的偏小的话就容易出现爆栈的问题, 栈大小设置的过大, Stack交换又会带来性能问题. 一个比较特殊的情况, 游戏GamePlay相关的业务使用这种模式, 由于本身的复杂度和需求迭代的诉求, 比较容易出现stack size(内存占用)和性能难以平衡的问题.

1.2 基于switch case的duff device实现

这是一种比较hack的实现机制, 这种方式本身也是sackless的, lua的vm就用该方式来实现lua的coroutine特性, 在有GC的情况下, stackless本身不是障碍(数据不需要在栈上定义), Lua本身的协程机制也工作得非常稳定. 对于无GC的情况, 也有部分项目使用这种模式来实现自己的协程, 但对比stackful的协程使用上会稍显麻烦, 原来可以正常使用的stack变量需要手动处理, 存储在特定的地方.

1.3 C++20的coroutine实现

C++20的coroutine特性也慢慢被大家熟知, c++20使用的也是stackless的coroutine实现, 对比stackful的模式, C++20的stackless实现泛用性会更好一点, 另外除了依赖compiler对代码的自动处理外, 不依赖不同平台的底层机制, 这样也是一种更健康的特性增加方式. 对比duff device, c++20的coroutine会自动对协程函数内的代码做处理, 将相关的stack变量变为heap变量, 这样也解决了stackless协程对stack变量的支持问题(本篇主要是基于C++17的corountine实现, C++20的后续的文章再做展开了).

1.4 本文的侧重点

C++ coroutine不同os的实现分析, 相关特性的介绍, 都有大量的相关资料, 本文主要针对如何更好的使用coroutine, 如何利用coroutine特性来实现一个业务侧简单易用的协程调度器, 不会对coroutine的相关特性做太详细的展开, 也会结合具体的案例(Rpc调用链), 来看一下怎么用协程来简化多个节点之间的异步请求处理. 会重点关注在可控可扩展的任务调度器本身.

2. 使用背景介绍

rstudio c++ framework对协程的支持主要是通过两层功能来做的, 底层的stackless coroutine机制 + 上层的Scheduler机制.

目前Scheduler提供的主要特性如下:

代码语言:javascript
代码运行次数:0
运行
复制
//获取当前运行的协程关联的SchedTask对象
#define rco_self_task()
//获取当前运行的协程关联的Scheduler对象
#define rco_current_scheduer()
//挂起当前协程到Immediate Queue
#define rco_yield()
//协程的起始指令(一般位于协程代码的最前端)
#define rco_begin()             co_begin()
//协程的结束指令(一般位于协程代码的最末尾)
#define rco_return              co_return
//等待系统返回一个ResumeObject对象后继续执行
#define rco_await(...)
//等待下一帧开始后继续执行
#define rco_yield_next_frame()
//立即结束当前协程(后续协程代码将不会被执行)
#define rco_kill_self() 
//等待指定时间(毫秒为单位)后继续执行
#define rco_yield_sleep(SLEEP_TIME_MS)  
//创建一个子协程
#define rco_yield_create_task(IS_CHILD, FUNC, ...)
//等待一个Rpc请求返回后继续执行
#define rco_yield_rpc_call(PROXY, FUNC_NAME, ARGS, TIMEOUT_MS)
//等待一个协程结束后继续执行
#define rco_yield_wait_task_finish(TARGET_TASK_ID, TIMEOUT_MS)
//获取系统传递的ResumeObject对象
#define rco_get_resume_object(ResumeObjectType)     
//最近一次系统等待是否成功执行
#define rco_last_invoke_suc()       
//向系统发出完成事件并立即结束当前协程
#define rco_emit_finish_event(FinishEvent)

下面我们先从stackless coroutine开始来看一下框架协程机制的整体实现思路.

3. stackless coroutine实现

当前框架的无栈协程实现是基于switch case的duff device特性来实现的, 通过对应的case label, 可以在重入一个函数的时候跳转到不同的label, 从而实现协程的挂起和恢复, 提供最原始的协程机制支撑. 当然, 这种通过hack方式达成的机制一般来说会多多少少带来一些使用上的限制, 框架目前的实现是分成两步来弱化限制, 让整个机制的使用更方便的:

  1. 借助C++新特性, 既然case label实现的无栈协程无法保存栈变量, 那么我们就使用额外的设施来取代stack变量, 对于C++20, 相当于通过手动的方式完成了c++20 stackless coroutine compiler做的那部分工作(主要是提取stack变量转为heap变量存储和使用.)
  2. 外围包装调度器, 实现子协程, 各种针对业务特化的特性, 如sleep, rpc request等, 另外也有集中的地方对当前系统的所有协程做集中的管理和调度.

我们先来看一个简单的示例代码:

代码语言:javascript
代码运行次数:0
运行
复制
co_start([](std::pair<int, int>& p, int& c, std::string& local, LocalStruct& locals) -> RStudioCoTask::RpcCoroutineTask
            {
                LOG_INFO(0, "lambda_f");
                return nullptr;
            }, std::make_pair(1, 2), 3, "", LocalStruct{});

如上所示, 通过额外扩展的函数参数表(上例中的 p, c, local, locals等变量), 虽然我们没法使用栈变量(比较好的一点大部分情况下明确栈变量编译器都会直接报错, 有直接的提示), 我们可以通过参数表来声明需要在协程中使用到的临时变量, 相当于通过手动的方式去解决C++20 compiler编译器自动处理的那一部分. 略微麻烦一点, 但当前的整个实现, 除了刚才说的栈变量需要用特殊的方式使用外, 已经可以比较好的使用函数式编程的方式去组织编写业务代码了.

3.1 stackless coroutine核心机制

整体机制利用了c/c++一个比较trick的特性, duff device特性, 通过上图中的宏也可以看到, switch 的case label可以穿插do while(), 看着比较奇怪, 平时我们也很少这样来写代码, 但编译器确实是能够正确支持这种行为的. 一方面我们的宏实现 (do{ case: }while(0))能够正确运行, 另外因为duff device特性, 像for(){ co_yield();} 这种循环中嵌套yield()的情况也能正常运行了.

对于一段代码, 整个协程化的机制大致如下:

  1. 宏展开, 形成switch(coroutine_state()){case 挂起点: }这种结构的代码, 通过关键宏的辅助, 我们可以实现每次resume CoPromise对象保存的function执行的时候都能够正确跳转到上一次的挂起点继续往下执行.
  2. 构建核心的CoPromise对象, 如上图所示的参数表的参数会被自动保存到CoPromise对象中(作为一个std::tuple<>), 代码段本身也会作为一个std::function存储在CoPromise对象上.
  3. 执行CoPromise对象的resume, 直到协程执行完毕.

3.2 stackless coroutine核心对象

coroutine的核心对象主要有以下几个:

3.2.1 CoroutineHandle

代码语言:javascript
代码运行次数:0
运行
复制
class RSTUDIO_APP_SERVICE_API CoroutineHandle
{
public:
    bool done() const;
    int get_state() const;
    void resume();
    void destroy();
    const void* get_user_data() const;
    void set_user_data(const void* ud);
};

外部直接访问的协程对象, 相当于CoPromise的代理句柄, 内部保存了CoPromise对象, 外界可以通过它访问当前协程的执行状态, 以及对协程进行resume(), destroy()等操作.

3.2.2 CoroutineTaskBase

作为协程的返回值的基类类型, 可以对协程的起始行为和结束行为进行控制, 如启动时是否挂起, 结束时是否自动destroy等, 使用Scheduler后这部分都统一使用CoTaskForScheduler作为返回值, 起始行为和结束行为是固定的, 起始的时候协程自动挂起(方便附加外部的额外数据), 结束的时候不自动删除CoPromise对象(由Scheduler负责删除).

3.2.3 CoPromise

协程对象的基类, CoroutineHandle上提供的能力都是通过它来完成的.

3.2.4 template class CoPromiseImpl : public CoPromise

协程对象的具体实现, 通过模板, 我们可以很好的完成 参数列表 -> std::tuple<>, 外部函数-> std::function<>的转换存储, 我们重点关注resume()的实现:

代码语言:javascript
代码运行次数:0
运行
复制
virtual void resume() override
{
    if (done())
    {
        return;
    }
    if (m_function)
    {
        push_promise(this);
        apply(m_function, m_params);
        pop_promise();
    }
    return;
}

apply()主要完成展开m_params并调用m_function的操作, 具体代码:

代码语言:javascript
代码运行次数:0
运行
复制
template<typename F, typename T, std::size_t... I>
auto apply_impl(F f, T& t, std::index_sequence<I...>)
{
    return f(std::get<I>(t)...);
}
template<typename F, typename T>
auto apply(F f, T& t)
{
    return apply_impl(f, t, std::make_index_sequence<std::tuple_size<T>::value>());
}

需要注意的是push_promise()和pop_promise(), 这两个函数存在的目的是正确的设置当前协程栈栈顶的协程对象, 这样我们前面看到的宏中使用的 rstudio::co_task::current_promise() 才能正确的获取到当前的协程对象, 正确的执行相关的逻辑代码段.

3.3 Corutine小结

当前这版stackless coroutine的实现参考了c++20的实现, 苗老师也合理考虑了迁移到C++20的成本, 从易用性, 可控性, 提供的功能的角度感觉都是比较好的.

4. Scheduler实现

4.1 Scheduler实现的动机

一些复杂的机制, 如子协程的创建和等待, 依赖外围系统的Sleep机制, 异步Rpc机制等, 肯定不适合直接在最底层的stackless coroutine上直接实现, 目前框架采用的方式是在外围再包装一层Scheduler, 来完成一些复杂功能和业务强相关功能的封装和实现, 这样底层的stackless coroutine侧重基础机制的实现, 外围的Scheduler侧重复杂逻辑的实现和控制, 从机制和实现上来说都会相对清晰.

4.2 Scheduler核心机制

如上图所示, Scheduler主要提供对SchedTask的管理, 以及三个基础机制方便协程相关业务机制的实现:

  1. Yield机制: 用于协程挂起时向Scheduler返回控制指令
  2. Resume机制: 用于对挂起等待的协程传递值, 方便业务侧代码针对不同情况进行处理
  3. FinishEvent机制: 利用事件机制, 向外围业务系统发起通知, 告知协程已经执行结束并返回执行结果.

4.3 Scheduler核心对象

4.3.1 SchedTask

代码语言:javascript
代码运行次数:0
运行
复制
class RSTUDIO_APP_SERVICE_API SchedTask
{
public:
    uint64_t GetId() const;
    int Run();
    bool IsDone() const;
    void BindSleepHandle(jobs::task_info_ptr handle);

    template<typename AwaitHandleType>
    auto BindAwaitHandle(AwaitHandleType&& awaitHandle) -> std::enable_if_t<std::is_base_of<IAwaitHandle, AwaitHandleType>::value>;

    void ClearAwaitHandle();

    bool HasAwaitHandle() const;

    const reflection::UserObject& GetAwaitHandle() const;

    const reflection::UserObject& GetExtraFinishObject() const;

    template<typename AwaitEventType>
    auto BindResumeObject(AwaitEventType&& awaitEvent) -> std::enable_if_t<std::is_base_of<ResumeObject, AwaitEventType>::value>;

    template<typename AwaitEventType>
    auto GetResumeObjectAsType()->std::enable_if_t<std::is_base_of<ResumeObject, AwaitEventType>::value, AwaitEventType*>;

    bool HasResumeObject() const noexcept;

    void ClearResumeObject();

    template<typename ExtraType>
    auto BindExtraFinishObject(ExtraType&& extraObj)->std::enable_if_t<std::is_base_of<CoEventExtra, ExtraType>::value>;

    bool IsLastInvokeSuc() const noexcept;

    bool IsLastInvokeTimeOut() const noexcept;

    bool IsLastInvokeFailed() const noexcept;

    void AddChildTask(uint64_t tid);

    void AddWaitNofityTask(uint64_t tid);

    const auto& GetChildTaskArray() const;

    const auto& GetWaitNotifyArray() const;

    void Terminate();

    Scheduler* GetManager() const;
protected:
    uint64_t                    mTaskId;
    co_task::CoroutineHandle    mCoHandle;
    Scheduler* mManager;
    std::vector<uint64_t>       mChildArray;
    std::vector<uint64_t>       mWaitNotifyArray;

    //value used to return from coroutine
    reflection::UserObject      mAwaitHandle;

    //value used to send to coroutine(now as a AwaitEvent)
    reflection::UserObject      mResumeObject;

    //value use to add for finish event(so external user can fill something in it)
    reflection::UserObject      mExtraFinishObject;

    jobs::task_info_ptr         mSleepHandle;

    bool                        mIsTerminate = false;
};

SchedTask完成对底层Coroutine对象的封装, 首先它是底层coroutine的容器, 我们在Task创建的时候对co_task::CoPromise对象和SchedTask做了绑定, 其次上面提到的三种机制关联的数据也是存储在其中的, SchedTask代码如下:

代码语言:javascript
代码运行次数:0
运行
复制
uint64_t Scheduler::CreateTaskFromHandle(co_task::CoroutineHandle handle)
{
    auto tid = ++mIdCount;
    SchedTask* schedTask = new SchedTask(tid, handle, this);
    handle.set_user_data(schedTask);

    mTaskMap.emplace(tid, schedTask);
    AddToImmRun(schedTask);

    return tid;
}

这样在协程执行的过程中, 我们都能很方便的拿到关联的SchedTask, 然后再拿到SchedTask上存储的数据, 来完成我们想要做的事情.

像其他几个成员变量, 是前面介绍的三种机制的支撑数据, 通过代码中的注释也能看到:

  1. reflection::UserObject mAwaitHandle: 用于协程向Scheduler返回控制指定(IAwaitHandle对象)
  2. reflection::UserObject mResumeObject: 主要用于异步等待的执行, 当一个异步等待成功执行的时候, 向协程传递值
  3. reflection::UserObject mExtraFinishObject: 三个机制里最特殊的一个机制, 用于外部系统在发起协程的时候在其中存入数据, 这部分数据会被当作FinishEvent的一部分一起传递给业务系统.

4.3.2 Scheduler

Scheduler的代码比较多, 主要就是SchedTask的管理器, 另外也完成对前面提到的三种机制的支持, 文章重点分析一下三种机制的实现代码.

4.3.2.1 Yield机制

代码语言:javascript
代码运行次数:0
运行
复制
void Scheduler::Update()
{
    //Handle need kill task first
    while(!mNeedKillArray.empty())
    {
        auto tid = *(mNeedKillArray.rbegin());
        mNeedKillArray.pop_back();
        auto* tmpTask = GetTaskById(tid);
        if (tmpTask != nullptr)
        {
            DestroyTask(tmpTask);
        }
    }

    mReadyTasks.insert(mReadyTasks.end(), mFrameStartTasks.begin(), mFrameStartTasks.end());
    mFrameStartTasks.clear();

    while (RSTUDIO_LIKELY(!mReadyTasks.empty()))
    {
        auto* schedTask = *(mReadyTasks.rbegin());
        mReadyTasks.pop_back();

        schedTask->Run();
        if (RSTUDIO_UNLIKELY(schedTask->IsDone()))
        {
            //Try to remove here
            DestroyTask(schedTask);
        }
        else
        {
            if (schedTask->HasAwaitHandle())
            {
                const auto& obj = schedTask->GetAwaitHandle();
                auto* awaitHandle = (IAwaitHandle*)obj.pointer();
                auto awaitMode = awaitHandle->Invoke(schedTask, this);

                switch (awaitMode)
                {
                    case rstudio::logic::AwaitMode::AwaitNever:
                        //The task need delivered to immediate queue
                        AddToImmRun(schedTask);
                        break;
                    case rstudio::logic::AwaitMode::AwaitNextframe:
                        //The task need delivered to nextframe queue
                        AddToNextFrameRun(schedTask);
                        break;
                    case rstudio::logic::AwaitMode::AwaitForNotifyNoTimeout:
                    case rstudio::logic::AwaitMode::AwaitForNotifyWithTimeout:
                        HandleTaskAwaitForNotify(schedTask, awaitMode, awaitHandle->GetTimeoutMs());
                        break;
                    case rstudio::logic::AwaitMode::AwaitDoNothing:
                        //Just do nothing
                        break;
                    default:
                        RSTUDIO_ERROR(CanNotRunToHereError());
                        break;
                }
            }
            else
            {
                //Just add to immediate run array
                AddToImmRun(schedTask);
            }
        }
    }
}

上面是Scheduler的Update()实现代码, 在每个task->Run()后, 外围代码会判断task上是否有设置的AwaitHandle对象, 有的话则调用关联的AwaitHandle的Invoke()方法, 同时Inovke方法的返回值也提供了对协程后续行为的控制, 主要是以下几种模式:

  1. rstudio::logic::AwaitMode::AwaitNever: 立即将协程加入回mReadyTask队列, 对应协程会被马上唤醒执行
  2. rstudio::logic::AwaitMode::AwaitNextframe: 将协程加入到下一帧执行的队列, 协程将会在下一帧被唤醒执行
  3. rstudio::logic::AwaitMode::AwaitForNotifyNoTimeout: 等待外界通知后再唤醒执行(无超时模式), 注意该模式下如果一直没收到通知, 相关协程会一直在队列中存在.
  4. rstudio::logic::AwaitMode::AwaitForNotifyWithTimeout:同3, 差别是存在一个超时时间, 超时时间到了也会唤醒协程, 业务方可以通过ResumeObject判断协程是被超时唤醒的.
  5. rstudio::logic::AwaitMode::AwaitDoNothing:特殊的AwaitHandle实现会使用该模式, 比如删除Task的实现, 都要删除Task了, 我们肯定不需要再将Task加入任何可唤醒队列了.

4.3.2.2 Resume机制

Resume机制主要是通过唤醒在Await队列中的协程的时候向关联的Task对象传递ResumeObject实现的:

代码语言:javascript
代码运行次数:0
运行
复制
//Not a real event notify here, just do need things
template <typename E>
auto ResumeTaskByAwaitObject(E&& awaitObj) -> std::enable_if_t<std::is_base_of<ResumeObject, E>::value>
{
    auto tid = awaitObj.taskId;
    if (IsTaskInAwaitSet(tid))
    {
        //Only in await set task can be resume
        auto* task = GetTaskById(tid);
        if (RSTUDIO_LIKELY(task != nullptr))
        {
            task->BindResumeObject(std::forward<E>(awaitObj));
            AddToImmRun(task);
        }

        OnTaskAwaitNotifyFinish(tid);
    }
}

然后再通过rco_get_resume_object()宏在协程代码中获取对应的ResumeObject. 宏的声明代码如下:

代码语言:javascript
代码运行次数:0
运行
复制
#define rco_get_resume_object(ResumeObjectType)                     rco_self_task()->GetResumeObjectAsType<ResumeObjectType>()

本身就是一个简单的传值取值的过程. 注意传递ResumeObject后, 我们也会马上将协程加入到mReadTasks队列中以方便在接下来的Update中唤醒它.

4.3.2.3 FinishEvent机制

有一些特殊的场合, 可能需要协程执行完成后向业务系统发起通知并传递返回值, 比如Rpc Service的协程支持实现, 这里直接以RpcService的协程支持为例来说明好了.

首先是业务侧, 在创建完协程后, 需要给协程绑定后续协程执行完成后做进一步操作需要的数据:

代码语言:javascript
代码运行次数:0
运行
复制
ponder::Value val = method->DoInvoke(std::move(args));
auto& cotask = val.to<reflection::UserObject>().ref<logic::CoTaskForScheduler>();
logic::SchedTask* task = Scheduler::GetTaskFromCoTask(cotask);
assert(task);

CoRpcFinishEventExtra extraData;
extraData.connId = connPtr->GetId();
extraData.funcId = reqHead.funcNameId;
extraData.serviceId = reqHead.serviceNameId;
extraData.rpcReqId = reqHead.reqId;

task->BindExtraFinishObject(std::move(extraData));

这里将Connection id等信息通过ExtraFinishObject直接绑定到了Task上. 然后业务侧会在操作执行完成的时候使用rco_emit_finish_event()宏来结束协程并向业务系统发起通知:

代码语言:javascript
代码运行次数:0
运行
复制
logic::CoTaskForScheduler HeartBeatService::DoHeartBeat(logic::Scheduler& scheduler, int testVal)
{

    auto retval = scheduler.CreateTask(
        [testVal]() -> CoTaskForScheduler {
            rco_begin();

            rco_yield_sleep(1000);

            printf("service yield call finish!\n");

            rco_emit_finish_event(CoRpcFinishEvent(ponder::Value(testVal + 1)));

            rco_return;
        }
    );
    return retval.second;
}

rco_emit_finish_event()宏实际调用的是Scheduler上的EmitTaskFinishEvent()方法:

代码语言:javascript
代码运行次数:0
运行
复制
template<typename E>
auto EmitTaskFinishEvent(E&& event, SchedTask* task) -> std::enable_if_t<std::is_base_of<CoFinishEvent, E>::value>
{
    assert(task != nullptr);
    event.extraFinishObj = task->GetExtraFinishObject();
    mEventManager.emit(std::forward<E>(event));

    KillTask(task->GetId());
}

注意这个地方task上存储的ExtraFinishObject会作为event的一部分直接传递给业务系统, 并在发起事件后调用删除协程任务的方法.

最后通过事件机制, 业务系统获取到足够的信息后完成具体的业务操作, 对于 RpcService来说就是向请求方返回执行结果了:

代码语言:javascript
代码运行次数:0
运行
复制
void ServiceServer::receive(const CoRpcFinishEvent& ev)
{
    assert(ev.extraFinishObj != ponder::UserObject::nothing && ev.extraFinishObj.getClass().get_id() == MetatypeHash::hash<CoRpcFinishEventExtra>());
    auto& extraData = ev.extraFinishObj.ref<CoRpcFinishEventExtra>();
    auto iter = mAccteptedConnectionMap.find(extraData.connId);
    if (iter != mAccteptedConnectionMap.end())
    {
        if (ev.rpcResultType == network::RpcResponseResultType::RequestSuc)
        {
            ponder::Value tmpVal = ev.retValue;
            ResponseRpcRequestNormal(iter->second, extraData.serviceId, extraData.rpcReqId, std::move(tmpVal));
        }
        else
        {
            ResponseRpcRequestWithError(iter->second, extraData.serviceId, extraData.rpcReqId, ev.rpcResultType);
        }
    }
}

本身机制是服务于业务需要的, 这里通过具体的业务系统实现来理解机制, 会相对简单.

4.4 示例代码

代码语言:javascript
代码运行次数:0
运行
复制
mScheduler.CreateTask(
            [clientProxy](std::pair<int, int>& p, int& c, std::string& local, LocalStruct& locals)-> logic::CoTaskForScheduler
            {
                rco_begin();
                {
                    c = p.first + p.second + c;
                    locals.local_i = 1024;
                    locals.local_p = "balabala";

                    auto* task = rco_self_task();

                    ////LOG_INFO(0, "step1 %d", p.first);
                    printf("step1 %d\n", p.first);
                }
                rco_yield_next_frame();
                {
                    ////LOG_INFO(0, "step2 %d", p.second);
                    printf("step2 %d\n", p.second);
                    c = 0;
                    while (c < 5)
                    {
                        printf("in while loop c=%d\n", c); 
                        rco_yield_sleep(1000);
                        c++;
                    }

                    for (c = 0; c < 5; c++)
                    {
                        printf("in for loop c=%d\n", c);
                        rco_yield_next_frame();
                    }
                }
                rco_yield();

                ////rco_kill_self();
                {
                    ////LOG_INFO(0, "step3 %d", c);
                    printf("step3 %d\n", c);

                    rco_yield_create_task(false, []()-> logic::CoTaskForScheduler {
                        rco_begin();

                        printf("from child coroutine!\n");
                        rco_yield_sleep(2000);
                        printf("after child coroutine sleep\n");


                        rco_return;
                        });

                    ////auto newTaskId = createEvent->newTaskId;
                    {
                        auto* createEvent = rco_get_resume_object(logic::CreateTaskResumeObject);
                        printf("new task create in coroutine: %llu\n", createEvent->newTaskId);
                    }
                    printf("Begin wait for task!\n");
                    rco_yield_wait_task_finish(
                            rco_get_resume_object(logic::CreateTaskResumeObject)->newTaskId,
                            10000
                        );
                    printf("After wait for task!\n");
                }

                rco_await(clientProxy->DoHeartBeat(3));
                ////__sched_yield_rpc_request(clientProxy, "DoHeartBeat", rstudio::reflection::Args{ 3 }, 1000);
                {
                    ////assert(rco_last_invoke_suc());
                    auto* rpcEvent = rco_get_resume_object(logic::RpcResumeObject);

                    if (rpcEvent->rpcResultType == rstudio::network::RpcResponseResultType::RequestSuc)
                    {
                        assert(rpcEvent->totalRet == 1);
                        assert(rpcEvent->retValue.to<int>() == 4);
                        printf("rpc coroutine run suc!\n");
                    }
                    else
                    {
                        printf("rpc coroutine run failed! result = %d \n", (int)rpcEvent->rpcResultType);
                    }
                }
                rco_yield_sleep(5000);
                {
                    ////LOG_INFO(0, "step4 %s", locals.local_p);
                    printf("step4 %s, after 5s sleep\n", locals.local_p);
                }
                rco_return;
            }, std::make_pair(1, 2), 3, "", LocalStruct{});

执行结果:

代码语言:javascript
代码运行次数:0
运行
复制
step1 1
step2 2
in while loop c=0
in while loop c=1
in while loop c=2
in while loop c=3
in while loop c=4
in for loop c=0
in for loop c=1
in for loop c=2
in for loop c=3
in for loop c=4
step3 5
new task create in coroutine: 2
Begin wait for task!
from child coroutine!
after child coroutine sleep
After wait for task!
service yield call finish!
rpc coroutine run suc!
step4 balabala, after 5s sleep

5. 相关的限制

因为不能像c++20的实现那样, 通过compiler多次调整代码的方式来实现协程机制, 所以像本地变量的使用等, 都是需要我们自己注意并手动声明解决的.

我们目前的做法是定义一个LocalStructs, 把需要用到的变量声明在其中, 然后再作为函数参数传递到函数中使用.

大部分情况本地变量的使用编译器会直接报错, 如 for(int i = 0; ...){rco_yield();}这种, 但如果在函数实现头部声明 int i = 0;再使用, 上面的代码会出现死循环, 原因是每次function执行的时候都会将i的值重置为0, 这个是一定要注意的.

行为异常, 优先检查是否使用了局部变量.

整体实现思路上是具备通用性的, 在默认使用GCC8.3的TOS3下面, 一些复杂业务场景, 比如游戏这种业务可能随需求发生变更, 导致复杂度更高, stack size不那么可控的情况下, 对比选择stackful的coroutine方案, 使用stackless的实现, 是一个更可控的方式.

在具备条件的情况下, 推荐直接使用C++20的coroutine特性, 笔者项目实装对比下来, 整体的业务侧编码舒适度, 以及可控性, 还是高非常多的. 后续的文章会承接本篇, 补充C++20相关实现的思路.

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 前言
    • 1.1 基于ucontext的实现
    • 1.2 基于switch case的duff device实现
    • 1.3 C++20的coroutine实现
    • 1.4 本文的侧重点
  • 2. 使用背景介绍
  • 3. stackless coroutine实现
    • 3.1 stackless coroutine核心机制
    • 3.2 stackless coroutine核心对象
    • 3.2.1 CoroutineHandle
    • 3.2.2 CoroutineTaskBase
    • 3.2.3 CoPromise
    • 3.2.4 template class CoPromiseImpl : public CoPromise
    • 3.3 Corutine小结
  • 4. Scheduler实现
    • 4.1 Scheduler实现的动机
    • 4.2 Scheduler核心机制
    • 4.3 Scheduler核心对象
    • 4.3.1 SchedTask
    • 4.3.2 Scheduler
    • 4.3.2.1 Yield机制
    • 4.3.2.2 Resume机制
    • 4.3.2.3 FinishEvent机制
    • 4.4 示例代码
  • 5. 相关的限制
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档