前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >结合 golang 中分析协程框架的实现

结合 golang 中分析协程框架的实现

原创
作者头像
ge3m0r
发布2024-06-15 22:41:06
460
发布2024-06-15 22:41:06

协程

协程可以说是 golang 中的有名的框架,本文主要分析 Github 项目 Ntyco 协程框架的实现,由于本人目前 golang 写的不多,因此不会对 golang 的源码进行分析,只是根据 golang 的协程调度来分析 c 语言版本调度。

协程的基本元素

golang 中大名鼎鼎的协程有这样三个元素 G M P,G 表示 goroutinue 协程,m 是内核元素, p 表示处理器,用来管理和执行协程。

如果使用 c 语言的话,那么内核线程使用 epoll 进行管理最好,而在 golang ,当然我们主要实现的就是 goroutine 和 process ,说白了,我们需要设计一个协程的数据结构并对其进行操作,然后实现一个协程调度其对齐进行调度,并且和内核之间进行通信。

源码分析

项目中的代码结构如下:

协程代码结构
协程代码结构

所有的实现代码都在 core 目录下。

所有实现代码结构
所有实现代码结构

其中最关键的代码在 nty_coroutine 、 nty_schedule 和 nty_socket 中,其中 nty_tree ,是对红黑树的各种操作实现,nty_queue 是对队列的操作实现。

我们先从协程开始,关于协程的数据结构定义如下:

代码语言:c
复制
typedef struct _nty_coroutine {  //协程的定义

	//private
	
#ifdef _USE_UCONTEXT 
	ucontext_t ctx;
#else
	nty_cpu_ctx ctx;
#endif
	proc_coroutine func; //调度的函数
	void *arg;  //参数
	void *data;  //数据
	size_t stack_size;
	size_t last_stack_size;
	
	nty_coroutine_status status;  //协程状态
	nty_schedule *sched;  //调度器

	uint64_t birth;  //诞生时间
	uint64_t id;   //协程id
#if CANCEL_FD_WAIT_UINT64
	int fd;
	unsigned short events;  //POLL_EVENT
#else
	int64_t fd_wait;
#endif
	char funcname[64];   //函数名
	struct _nty_coroutine *co_join;  //加入的协程类型

	void **co_exit_ptr;
	void *stack;
	void *ebp;
	uint32_t ops;
	uint64_t sleep_usecs;

	RB_ENTRY(_nty_coroutine) sleep_node;  //红黑树维护的沉睡节点
	RB_ENTRY(_nty_coroutine) wait_node;  //红黑树维护的等待节点

	LIST_ENTRY(_nty_coroutine) busy_next;  //繁忙的接下一个位置

	TAILQ_ENTRY(_nty_coroutine) ready_next; //等待
	TAILQ_ENTRY(_nty_coroutine) defer_next;
	TAILQ_ENTRY(_nty_coroutine) cond_next;

	TAILQ_ENTRY(_nty_coroutine) io_next;
	TAILQ_ENTRY(_nty_coroutine) compute_next;

	struct {
		void *buf;
		size_t nbytes;
		int fd;
		int ret;
		int err;
	} io;  // io 结构

	struct _nty_coroutine_compute_sched *compute_sched;  //计算的调度
	int ready_fds;
	struct pollfd *pfds;
	nfds_t nfds;
} nty_coroutine;

说几个关键的内容一个是协程的状态,golang 中协程状态有三种,一种是 sleep 沉睡,另一种是 ready 在准备,然后正在运行,这里用了红黑树维护各个节点,没然后用队列维护正在准备的节点。

定义的状态如下:

代码语言:c
复制
typedef enum {//协程状态的类型
	NTY_COROUTINE_STATUS_WAIT_READ,
	NTY_COROUTINE_STATUS_WAIT_WRITE,
	NTY_COROUTINE_STATUS_NEW,
	NTY_COROUTINE_STATUS_READY,
	NTY_COROUTINE_STATUS_EXITED,
	NTY_COROUTINE_STATUS_BUSY,
	NTY_COROUTINE_STATUS_SLEEPING,
	NTY_COROUTINE_STATUS_EXPIRED,
	NTY_COROUTINE_STATUS_FDEOF,
	NTY_COROUTINE_STATUS_DETACH,
	NTY_COROUTINE_STATUS_CANCELLED,
	NTY_COROUTINE_STATUS_PENDING_RUNCOMPUTE,
	NTY_COROUTINE_STATUS_RUNCOMPUTE,
	NTY_COROUTINE_STATUS_WAIT_IO_READ,
	NTY_COROUTINE_STATUS_WAIT_IO_WRITE,
	NTY_COROUTINE_STATUS_WAIT_MULTI
} nty_coroutine_status;

上述状态就是各种细分的协程状态。

协程定义之后,就要对数据结构进行操作,然后让协程跟我们的调度进行进行交互。

代码语言:c
复制
//初始化协程
static void nty_coroutine_init(nty_coroutine *co) { //协程初始化

#ifdef _USE_UCONTEXT
	getcontext(&co->ctx); //获取协程上下文
	co->ctx.uc_stack.ss_sp = co->sched->stack;
	co->ctx.uc_stack.ss_size = co->sched->stack_size;
	co->ctx.uc_link = &co->sched->ctx;
	// printf("TAG21\n");
	makecontext(&co->ctx, (void (*)(void)) _exec, 1, (void*)co);
	// printf("TAG22\n");
#else
	void **stack = (void **)(co->stack + co->stack_size);

	stack[-3] = NULL;
	stack[-2] = (void *)co;

	co->ctx.esp = (void*)stack - (4 * sizeof(void*));
	co->ctx.ebp = (void*)stack - (3 * sizeof(void*));
	co->ctx.eip = (void*)_exec;
#endif
	co->status = BIT(NTY_COROUTINE_STATUS_READY);  //将协程状态设置为准备状态
	
}

//协程的调度函数
void nty_coroutine_yield(nty_coroutine *co) {  //协程调度函数
	co->ops = 0;  //协程操作标志,表示正在调度
#ifdef _USE_UCONTEXT
	if ((co->status & BIT(NTY_COROUTINE_STATUS_EXITED)) == 0) {  //协程退出
		_save_stack(co);  //保存栈
	}
	swapcontext(&co->ctx, &co->sched->ctx); //交换上下文
#else
	_switch(&co->sched->ctx, &co->ctx);
#endif
}

这里只展示了对于协程的调度以及对于协程的初始化,另外还有协程创建,状态转换,加入不同状态的队列中,这一部分的内容相对较为容易。

协程调度

协程定义出来之后,我们需要这样的前置只是,协程到底要怎么调度,这就是我们需要对栈进行操作,在 x86 处理器上,我们汇编代码都是在栈上进行处理的,如果了解过 liunx 操作系统,我们就知道操作系统在从用户态切换到内核态就需要进行系统调用,这个时候会保留进程的上下文,然后从内核态执行完毕后就恢复过来。

因此我们在协程调度的数据结构中定义如下。

代码语言:c
复制
typedef struct _nty_schedule {  //协程调度结构
	uint64_t birth;  //协程诞生时间
#ifdef _USE_UCONTEXT //ucontext_t 实现上下文
	ucontext_t ctx;
#else  //cpu 保存上下文
	nty_cpu_ctx ctx;
#endif
	void *stack;  
	size_t stack_size;   //栈大小
	int spawned_coroutines;
	uint64_t default_timeout;  //默认过期时间
	struct _nty_coroutine *curr_thread;  //当前协程
	int page_size;

	int poller_fd;   //poll fd
	int eventfd;    //时间的fd
	struct epoll_event eventlist[NTY_CO_MAX_EVENTS];  //epoll 时间
	int nevents;

	int num_new_events;  //新时间个数
	pthread_mutex_t defer_mutex; //锁

	nty_coroutine_queue ready;  //准备好的队列
	nty_coroutine_queue defer;  //结束队列

	nty_coroutine_link busy;    //繁忙的协程
	
	nty_coroutine_rbtree_sleep sleeping; //沉睡的协程
	nty_coroutine_rbtree_wait waiting; //等待的协程

	//private 

} nty_schedule;  //调度器的定义

我们来看一下这个协程调度器的大概作用,首先就是跟协程进行交互,对协程的上下文进行保存加载。

另外就是跟内核线程进行交互,他需要将协程的内容加入内核 epoll 中进行调度。

先从调度器的创建开始

代码语言:c
复制
int nty_schedule_create(int stack_size) { //协程调度器的创建

	int sched_stack_size = stack_size ? stack_size : NTY_CO_MAX_STACKSIZE;  //创建一个传入的栈大小

	nty_schedule *sched = (nty_schedule*)calloc(1, sizeof(nty_schedule)); //创建一个调度器结构
	if (sched == NULL) {
		printf("Failed to initialize scheduler\n");
		return -1;
	}

	assert(pthread_setspecific(global_sched_key, sched) == 0); //设置线程数据键位调度器结构

	sched->poller_fd = nty_epoller_create();  //创建epoller
	if (sched->poller_fd == -1) {
		printf("Failed to initialize epoller\n");
		nty_schedule_free(sched);
		return -2;
	}

	nty_epoller_ev_register_trigger();   //对调度的协程时间进行操作,加入 epoll

	sched->stack_size = sched_stack_size; //调度器栈大小
	sched->page_size = getpagesize();  //页大小

#ifdef _USE_UCONTEXT
	int ret = posix_memalign(&sched->stack, sched->page_size, sched->stack_size); //分配页对齐内存
	assert(ret == 0);
#else
	sched->stack = NULL;
	bzero(&sched->ctx, sizeof(nty_cpu_ctx));
#endif

	sched->spawned_coroutines = 0; //创建的协程
	sched->default_timeout = 3000000u; //默认过期时间

	RB_INIT(&sched->sleeping); //红黑树初始化
	RB_INIT(&sched->waiting);

	sched->birth = nty_coroutine_usec_now(); //获取当前时间

	TAILQ_INIT(&sched->ready);  //队列初始化
	TAILQ_INIT(&sched->defer);
	LIST_INIT(&sched->busy);  //链表初始化

这里调度器就是对各种状态协程调度,同时针对内核线程的 epoll 也要进行交互。

注意,因为我们协程不可能只创建一个, golang 中,有多个 Process 进行处理,因此调度上也会需要对各调度的数据结构。

接着实现就是一些协程数据结构的对协程在各个节点之间转换操作,因为篇幅原因就不再赘述,我们最后来看,调度器跑起来的函数

代码语言:c
复制
void nty_schedule_run(void) {

	nty_schedule *sched = nty_coroutine_get_sched();  //获取调度
	if (sched == NULL) return ;

	while (!nty_schedule_isdone(sched)) {//当还有协程运行
		
		// 1. expired --> sleep rbtree
		nty_coroutine *expired = NULL;
		while ((expired = nty_schedule_expired(sched)) != NULL) {  //超过设定的过期时间
			nty_coroutine_resume(expired); //恢复协程然后执行
		}
		// 2. ready queue //再在准备的队列
		nty_coroutine *last_co_ready = TAILQ_LAST(&sched->ready, _nty_coroutine_queue);
		while (!TAILQ_EMPTY(&sched->ready)) {
			nty_coroutine *co = TAILQ_FIRST(&sched->ready);
			TAILQ_REMOVE(&co->sched->ready, co, ready_next);

			if (co->status & BIT(NTY_COROUTINE_STATUS_FDEOF)) {
				nty_coroutine_free(co);
				break;
			}

			nty_coroutine_resume(co);
			if (co == last_co_ready) break;
		}

		// 3. wait rbtree //对等待的队列
		nty_schedule_epoll(sched);
		while (sched->num_new_events) {
			int idx = --sched->num_new_events;
			struct epoll_event *ev = sched->eventlist+idx;
			
			int fd = ev->data.fd;
			int is_eof = ev->events & EPOLLHUP;
			if (is_eof) errno = ECONNRESET;

			nty_coroutine *co = nty_schedule_search_wait(fd);
			if (co != NULL) {
				if (is_eof) {
					co->status |= BIT(NTY_COROUTINE_STATUS_FDEOF);
				}
				nty_coroutine_resume(co);
			}

			is_eof = 0;
		}
	}

	nty_schedule_free(sched);
	
	return ;
}

上述就是一个针对睡眠,准备和等待队列的一个调度,睡眠的队列有个时间,当这个协程创建时间超过了这个时间,那么就要进入调度执行,对于准备的队列直接从队列中取然后执行,最后就是执行等待的队列。

内核epoll 的调度

在调度 run 函数中,我们开到最后是对 epoll 的调度,集中在 nty_schedule_epoll,nty_schedule_search_wait ,nty_coroutine_resume。

我们先来看看 nty_schedule_epoll 函数上:

代码语言:c
复制
static int nty_schedule_epoll(nty_schedule *sched) {

	sched->num_new_events = 0;

	struct timespec t = {0, 0};
	uint64_t usecs = nty_schedule_min_timeout(sched);
	if (usecs && TAILQ_EMPTY(&sched->ready)) {  //对 ready 的队列进行计算
		t.tv_sec = usecs / 1000000u;
		if (t.tv_sec != 0) {
			t.tv_nsec = (usecs % 1000u) * 1000u;
		} else {
			t.tv_nsec = usecs * 1000u;
		}
	} else {
		return 0;
	}

	int nready = 0;
	while (1) {
		nready = nty_epoller_wait(t);  
		if (nready == -1) {
			if (errno == EINTR) continue;
			else assert(0);
		}
		break;
	}

	sched->nevents = 0;
	sched->num_new_events = nready;

	return 0;
}

函数较为见到那,通过 nty_epoller_wait 得到 nready 的时间,然后将时间的数量设置为 nready ,然后调度器后边就是会处理时间。

另外就是 nty_schedule_search_wait 就是从 wait 的红黑树中找到就绪时间,最后就是协程的恢复执行的一个过程。

结尾

上述就是一个简单的协程框架的分析,源码整个部分较为复杂,很多细节没有讲到,建议大家结合 golang 的原理跟着查看,关键内容在这个调度器的实现上,源码中还有 poll socket 之类网络 I/O的封装,不过不再本文讲述的范围内了。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 协程
  • 协程的基本元素
  • 源码分析
    • 协程调度
      • 内核epoll 的调度
        • 结尾
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档