Vpp的功能逻辑被划分为多个featuce arc(比如ipv4、IPv6、l2、MPLS等),每个feature arc是由一定顺序的node组成,每个node完成一个功能。本文主要是来介绍一下node的类型及调度模式。
个人理解:
从代码上来看只有Input类型的node节点注册的时候state设置成中断方式,才会出现中断和轮训的切换,默认全是轮询方式。 PRE_INPUT类型node只能按照轮询当时来调度。设置成state为中断也是一样的。
中断方式和轮询方式之间切换(类似内核NAPI机制)。
模式切换依据累计报文数量,在vlib_main_or_worker_loop启动中设置。
Pre_input类型只能在main线程上被调度,一般为Process类型提供输入。
inPut类型的node可以运行在main线程和worker线程,一般为internal类型的node提供报文输入。
流程是在vlib_main_or_worker_loop函数的while(1)循环中,如下面代码:
if (is_main)/*在main线程上处理pre—input类型node*/
vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT])
cpu_time_now = dispatch_node (vm, n,
VLIB_NODE_TYPE_PRE_INPUT,
VLIB_NODE_STATE_POLLING,
/* frame */ 0,
cpu_time_now);
/* 处理input类型nodes. */
vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT])
cpu_time_now = dispatch_node (vm, n,
VLIB_NODE_TYPE_INPUT,
VLIB_NODE_STATE_POLLING,
/* frame */ 0,
cpu_time_now);
中断类型node是将node的索引保存在pending_interrupt_node_runtime_indices数组中,依次遍历:
/*pending_interrupt_node_runtime_indices数组大小默认32个*/
uword l = _vec_len (nm->pending_interrupt_node_runtime_indices);
uword i;
if (l > 0)
{ ...........
for (i = 0; i < l; i++)
{ /*获取vlib_node_runtime_t的地址*/
n = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
last_node_runtime_indices[i]);
cpu_time_now = dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT,
VLIB_NODE_STATE_INTERRUPT,
/* frame */ 0,
cpu_time_now);
}
}
}
INPUT 类型的 node 是如何把报文发送给 INTERNAL 类型的 node 的。整个过程可以分成以下几步:
1、确定把报文发送给第几个孩子节点
确定把报文送给哪个孩子节点处理,是由该 node 的业务逻辑决定的,通常有两种方式:
一是通过解析报文的内容来决定,比如 ethernet-input 节点通过解析报文是 ipv4 还是 ipv6 报文把报文发送给ip4-input 或 ip6-input 去处理;
二是通过查找转发表来决定,比如 ip4-lookup 通过查找路由表来决定把报文发送给 ip4-arp 还是 ip4-rewrite 节点来处理。
2.找到该孩子节点对应的用于存放报文的结构 vlib_frame_t
根据 node 图的初始化流程可知,确定把报文发送给第几个孩子节点之后,就可以获取该孩子节点对应的 vlib_next_frame_t 结构了。vlib_next_frame_t 结构中的 frame_index 字段指向vlib_frame_t 结构,vlib_frame_t 结构是存放报文索引的地方(也就vlib_buffer_t结构索引)。
3.把报文放入 vlib_frame_t 结构
vlib_frame_t 结构最后一个字段为可变数组,初始化的时候会分配一块内存,用于存放报文的索引。
4.创建 vlib_pending_frame_t 结构,并把它加入数组 vm->node_main. pending_frames 等待调度vlib_pending_frame_t 结构记录报文所在的结构 vlib_next_frame_t 的 index,以及处理这些报文的node 的 vlib_node_runtime_t 结构的索引,这样通过 vlib_pending_frame_t 结构里面的信息就可以把报文分发给指定的 node 处理了。
下面以ipv4 feature arc来介绍internal类型调度:
1、ipv4_input_node节点注册(hdp\src\vnet\ip\ip4_input.c)
/* ip_input_node节点注册 */
VLIB_REGISTER_NODE (ip4_input_node) =
{
.function = ip4_input,
.name = "ip4-input",
.vector_size = sizeof (u32),
.n_errors = IP4_N_ERROR,
.error_strings = ip4_error_strings,
.n_next_nodes = IP4_INPUT_N_NEXT,
.next_nodes = {
[IP4_INPUT_NEXT_DROP] = "error-drop",
[IP4_INPUT_NEXT_PUNT] = "error-punt",
[IP4_INPUT_NEXT_LOOKUP] = "ip4-lookup",
[IP4_INPUT_NEXT_LOOKUP_MULTICAST] = "mcast4-lookup",
[IP4_INPUT_NEXT_ICMP_ERROR] = "ip4-icmp-error",
},
.format_buffer = format_ip4_header,
.format_trace = format_ip4_input_trace,
};
2、节点处理函数处理: 函数vlib_get_next_frame功能是:找到该孩子节点对应的用于存放报文的结构 vlib_frame_t, 返回 :to_next :返回当前要存放报文索引位置,n_left_to_next:存放报文剩余空间大小。
vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
函数vlib_put_next_frame ,将报文挂接到先将报文挂到加入数组 vm->node_main.pending_frames 等待调度处理
vlib_put_next_frame (vm, node, next_index, n_left_to_next);
3、vm->node_main.pending_frames调度
/* internal 类型的node调度*/
for (i = 0; i < _vec_len (nm->pending_frames); i++)
cpu_time_now = dispatch_pending_node (vm, i, cpu_time_now);
/* Reset pending vector for next iteration. */
_vec_len (nm->pending_frames) = 0;
1、调度相关执行流程介绍
1)第一次调度所有的 process 执行的代码:dispatch_process 函数
process类型节点只能在main线程运行,首先执行在vlib_main_or_worker_loop函数中调用
主要功能:设置挂起点,调用处理函数。
dispatch_process->vlib_process_startup(设置挂起点)->()p->node_runtime->function
if (is_main)
{
uword i;
nm->current_process_index = ~0;
/*遍历所有process类型node*/
for (i = 0; i < vec_len (nm->processes); i++)
cpu_time_now = dispatch_process (vm, nm->processes[i], /* frame */ 0,
cpu_time_now);
}
2)恢复挂起的 process 的代码:dispatch_suspended_process 函数
需要恢复执行的 process 有两种原因:一是等待的时钟已经到时(即定时器到期),二是等待的事件已经发生。下面我们介绍过 process 等待时钟的时候会把自己加入定时器,让定时器到时后调度它重新执行,对应的代码在 main线程主循环里:
if (is_main)
{
/* Check if process nodes have expired from timing wheel. */
ASSERT (nm->data_from_advancing_timing_wheel != 0);
/*获取所有超时的 process,即数组 data_from_advancing_timing_wheel 中存放
的是所有超时的 process 的 index。Process 等待的事件发生时则是通过
vlib_process_signal_event_helper 函数向该数组里 data_from_advancing_timing_wheel
加入自己的index 来实现恢复执行的。也就是说,不管是因为等待的时钟到来还
是因为等待的事件发生了,都需要把自己的 index 加入数组 data_from_advancing_timing_wheel
才能实现恢复执行。*/
nm->data_from_advancing_timing_wheel =
TW (tw_timer_expire_timers_vec)
((TWT (tw_timer_wheel) *) nm->timing_wheel, vlib_time_now (vm),
nm->data_from_advancing_timing_wheel);
ASSERT (nm->data_from_advancing_timing_wheel != 0);
if (PREDICT_FALSE
(_vec_len (nm->data_from_advancing_timing_wheel) > 0))
{
uword i;
processes_timing_wheel_data:
for (i = 0; i < _vec_len (nm->data_from_advancing_timing_wheel);
i++)
{
u32 d = nm->data_from_advancing_timing_wheel[i];
u32 di = vlib_timing_wheel_data_get_index (d);
/*其实加入数组 data_from_advancing_timing_wheel 中的不是 process 的 index,
而是 process 的 index 左移一位再或上 0 或 1,1 表示该 process 等待的是时间事件
(timed even),1 当前没用,所以最后一位总是 0(等待时间发生)。
*/
if (vlib_timing_wheel_data_is_timed_event (d))
{
vlib_signal_timed_event_data_t *te =
pool_elt_at_index (nm->signal_timed_event_data_pool,
di);
vlib_node_t *n =
vlib_get_node (vm, te->process_node_index);
vlib_process_t *p =
vec_elt (nm->processes, n->runtime_index);
void *data;
/*时间事件调用vlib_process_signal_event_helper,转成等待时间已经发生,下一次来处理*/
data =
vlib_process_signal_event_helper (nm, n, p,
te->event_type_index,
te->n_data_elts,
te->n_data_elt_bytes);
if (te->n_data_bytes < sizeof (te->inline_event_data))
clib_memcpy (data, te->inline_event_data,
te->n_data_bytes);
else
{
clib_memcpy (data, te->event_data_as_vector,
te->n_data_bytes);
vec_free (te->event_data_as_vector);
}
pool_put (nm->signal_timed_event_data_pool, te);
}
else
{
cpu_time_now = clib_cpu_time_now ();
/*等待时间恢复,调用dispatch_suspended_process*/
cpu_time_now =
dispatch_suspended_process (vm, di, cpu_time_now);
}
}
_vec_len (nm->data_from_advancing_timing_wheel) = 0;
}
}
.................
/*mian线程判断时钟轮触发是否有数据,如果有直接调转处理*/
if (is_main && _vec_len (nm->data_from_advancing_timing_wheel) > 0)
goto processes_timing_wheel_data;
dispatch_suspended_process->vlib_process_resume 函数来恢复到挂起点。
static_always_inline uword
vlib_process_resume (vlib_process_t * p)
{
uword r;
p->flags &= ~(VLIB_PROCESS_IS_SUSPENDED_WAITING_FOR_CLOCK
| VLIB_PROCESS_IS_SUSPENDED_WAITING_FOR_EVENT
| VLIB_PROCESS_RESUME_PENDING);
r = clib_setjmp (&p->return_longjmp, VLIB_PROCESS_RETURN_LONGJMP_RETURN);
if (r == VLIB_PROCESS_RETURN_LONGJMP_RETURN)
/*恢复到挂起点*/
clib_longjmp (&p->resume_longjmp, VLIB_PROCESS_RESUME_LONGJMP_RESUME);
return r;
}
而挂起的原因有等待事件和等待时钟,分别对应的函数是vlib_process_wait_for_event 和 vlib_process_wait_for_event_or_clock。
always_inline uword *
vlib_process_wait_for_event (vlib_main_t * vm)
{
vlib_node_main_t *nm = &vm->node_main;
vlib_process_t *p;
uword r;
/*
判断事件位图 non_empty_event_type_bitmap 是否为空,如果不为空则表示有事件需要处理,直
接返回该事件位图 non_empty_event_type_bitmap。否则表示没有事件可以处理,此时需要把自
己挂起等待事件发生,先调用 clib_setjmp 设置事件发生后恢复执行时的“返回点”,再调用
clib_longjmp 跳转到调度该 process 之前设置的“返回点”,即跳出该 process 的执行回到 main
的主循环中。设置标志位 VLIB_PROCESS_IS_SUSPENDED_WAITING_FOR_EVENT 表示是因
为等待事件而挂起的。
*/
p = vec_elt (nm->processes, nm->current_process_index);
if (clib_bitmap_is_zero (p->non_empty_event_type_bitmap))
{
p->flags |= VLIB_PROCESS_IS_SUSPENDED_WAITING_FOR_EVENT;
r =
clib_setjmp (&p->resume_longjmp, VLIB_PROCESS_RESUME_LONGJMP_SUSPEND);
if (r == VLIB_PROCESS_RESUME_LONGJMP_SUSPEND)
clib_longjmp (&p->return_longjmp,
VLIB_PROCESS_RETURN_LONGJMP_SUSPEND);
}
return p->non_empty_event_type_bitmap;
}
always_inline f64
vlib_process_wait_for_event_or_clock (vlib_main_t * vm, f64 dt)
{
vlib_node_main_t *nm = &vm->node_main;
vlib_process_t *p;
f64 wakeup_time;
uword r;
p = vec_elt (nm->processes, nm->current_process_index);
if (vlib_process_suspend_time_is_zero (dt)
|| !clib_bitmap_is_zero (p->non_empty_event_type_bitmap))
return dt;
wakeup_time = vlib_time_now (vm) + dt;
/* Suspend waiting for both clock and event to occur. */
p->flags |= (VLIB_PROCESS_IS_SUSPENDED_WAITING_FOR_EVENT
| VLIB_PROCESS_IS_SUSPENDED_WAITING_FOR_CLOCK);
r = clib_setjmp (&p->resume_longjmp, VLIB_PROCESS_RESUME_LONGJMP_SUSPEND);
if (r == VLIB_PROCESS_RESUME_LONGJMP_SUSPEND)
{
/*设置恢复时钟间隔*/
p->resume_clock_interval = dt * 1e5;
/*跳转到main主线程中,返回挂起标识位*/
clib_longjmp (&p->return_longjmp, VLIB_PROCESS_RETURN_LONGJMP_SUSPEND);
}
/* Return amount of time still left to sleep.
If <= 0 then we've been waken up by the clock (and not an event). */
return wakeup_time - vlib_time_now (vm);
}
介绍事件机制之前,先来了解一下几个相关的数据结构:
每个 process 都可以定义自己的事件类型,如命令行 process 的事件类型为:每种事件类型对应一个任意数值,可以是从 0 开始也可以从任意数开始。向 process 发送事件的时候要指定事件的类型,以及需要该 process 处理的该事件对应的数据。事件数据的存储结构为数组 pending_event_data_by_type_index,下标是事件类型的值。由于事件类型可以指定定义且其值可以为任意值,因此需要一张映射表来把自定义的所有事件的值映射成从 0 开始的连续的值,这里用的映射表为哈希表 event_type_index_by_type_opaque,key是自定义的事件的值,value 为从 event_type_pool 池中分配的 elt 的索引,这个索引作为数组pending_event_data_by_type_index 的下标(后面称为事件类型索引),elt 存放的自定义的事件的值。同时,为了快速查找当前 process 有是否有时间或者有哪些事件需要处理,使用一个位图 non_empty_event_type_bitmap 来记录当前需要处理事件,事件类型索引对应的 bit 置 1 时表
示该事件等待被处理,否则不是。
向某个 process 发送事件的过称为:
1、 先在哈希表 event_type_index_by_type_opaque 中查找该事件对应的事件类型索引,如果找
不到则从 event_type_pool 池分配一个索引作为该事件的事件类型索引
2、 以事件类型索引为下标,把事件对应的数据加入数组 pending_event_data_by_type_index 中
3、 在位图 non_empty_event_type_bitmap 中设置事件类型索引对应的 bit 位。
/*
参数 node_index 为 process 对应的 node 的 index,
type_opaque 为自定义的事件的类型,
data 为该事件对应的数据,
直接调用 vlib_process_signal_event_data 函数发送事件,返回值 d 为存放事
件数据的位置,直接把 data 写进去即可。
*/
always_inline void
vlib_process_signal_event (vlib_main_t * vm,
uword node_index, uword type_opaque, uword data)
{
uword *d = vlib_process_signal_event_data (vm, node_index, type_opaque,
1 /* elts */ , sizeof (uword));
d[0] = data;
}
2、以BFD process为例学习process调度模型用法
1、bfd process类型node节点注册
/*BFD process类型node节点注册*/
VLIB_REGISTER_NODE (bfd_process_node, static) = {
.function = bfd_process,
.type = VLIB_NODE_TYPE_PROCESS,
.name = "bfd-process",
.n_next_nodes = 0,
.next_nodes = {},
};
2、bfd_process函数处理流程
/*
* bfd process node function
*/
static uword
bfd_process (vlib_main_t * vm, vlib_node_runtime_t * rt, vlib_frame_t * f)
{
bfd_main_t *bm = &bfd_main;
u32 *expired = 0;
uword event_type, *event_data = 0;
/* So we can send events to the bfd process
保存BFD process node的索引
*/
bm->bfd_process_node_index = bfd_process_node.index;
while (1)
{
u64 now = clib_cpu_time_now ();
u64 next_expire = timing_wheel_next_expiring_elt_time (&bm->wheel);
BFD_DBG ("timing_wheel_next_expiring_elt_time(%p) returns %lu",
&bm->wheel, next_expire);
if ((i64) next_expire < 0)
{ /*还没有过期,等待事件*/
BFD_DBG ("wait for event without timeout");
/*等待事件到来,如果没有事件,设置等待事件方式挂起。
这样就会跳转到dispatch_process->vlib_process_startup 返回
VLIB_PROCESS_RETURN_LONGJMP_SUSPEND挂起动作。
*/
(void) vlib_process_wait_for_event (vm);
event_type = vlib_process_get_events (vm, &event_data);
}
else
{
f64 timeout = ((i64) next_expire - (i64) now) / bm->cpu_cps;
BFD_DBG ("wait for event with timeout %.02f", timeout);
if (timeout < 0)
{
BFD_DBG ("negative timeout, already expired, skipping wait");
event_type = ~0;
}
else
{
/* 如果有事件,函数vlib_process_wait_for_event_or_clock,直接返回,处理事件。
如果没有事件,设置flags成挂起等待事件和等待时钟。跳转到dispatch_process->vlib_process_startup
函数中 返回VLIB_PROCESS_RETURN_LONGJMP_SUSPEND再次挂起动作。
*/
(void) vlib_process_wait_for_event_or_clock (vm, timeout);
event_type = vlib_process_get_events (vm, &event_data);
}
}
now = clib_cpu_time_now ();
switch (event_type)
{
case ~0: /* no events => timeout */
/* nothing to do here */
break;
case BFD_EVENT_RESCHEDULE:
/* nothing to do here - reschedule is done automatically after
* each event or timeout */
break;
case BFD_EVENT_NEW_SESSION:
if (!pool_is_free_index (bm->sessions, *event_data))
{
bfd_session_t *bs =
pool_elt_at_index (bm->sessions, *event_data);
bfd_send_periodic (vm, rt, bm, bs, now);
bfd_on_config_change (vm, rt, bm, bs, now); /*add new*/
bfd_set_timer (bm, bs, now, 1);
}
else
{
BFD_DBG ("Ignoring event for non-existent session index %u",
(u32) * event_data);
}
break;
case BFD_EVENT_CONFIG_CHANGED:
if (!pool_is_free_index (bm->sessions, *event_data))
{
bfd_session_t *bs =
pool_elt_at_index (bm->sessions, *event_data);
bfd_on_config_change (vm, rt, bm, bs, now);
}
else
{
BFD_DBG ("Ignoring event for non-existent session index %u",
(u32) * event_data);
}
break;
default:
clib_warning ("BUG: event type 0x%wx", event_type);
break;
}
BFD_DBG ("advancing wheel, now is %lu", now);
BFD_DBG ("timing_wheel_advance (%p, %lu, %p, 0);", &bm->wheel, now,
expired);
expired = timing_wheel_advance (&bm->wheel, now, expired, 0);
BFD_DBG ("Expired %d elements", vec_len (expired));
u32 *p = NULL;
vec_foreach (p, expired)
{
const u32 bs_idx = *p;
if (!pool_is_free_index (bm->sessions, bs_idx))
{
bfd_session_t *bs = pool_elt_at_index (bm->sessions, bs_idx);
bfd_on_timeout (vm, rt, bm, bs, now);
bfd_set_timer (bm, bs, now, 1);
}
}
if (expired)
{
_vec_len (expired) = 0;
}
if (event_data)
{
_vec_len (event_data) = 0;
}
}
return 0;
}
3、发送事件
/*创建一个新的bfd,通过vlib_process_signal_event 来发布一个事件,
会调到bfd_process函数去处理
*/
void bfd_session_start (bfd_main_t * bm, bfd_session_t * bs)
{
BFD_DBG ("\nStarting session: %U", format_bfd_session, bs);
bfd_set_effective_required_min_rx (bm, bs,
bs->config_required_min_rx_clocks);
bfd_recalc_tx_interval (bm, bs);
bfd_calc_report_tx_interval(bm, bs);
vlib_process_signal_event (bm->vlib_main, bm->bfd_process_node_index,
BFD_EVENT_NEW_SESSION, bs->bs_idx);
bfd_notify_listeners (bm, BFD_LISTEN_EVENT_CREATE, bs);
}
总结:
本文只是简单介绍了vpp的基本调度类型。并通过一些事例详细介绍了internal和preocess类型的基本用法和处理逻辑。这些都是最基本的。
本文分享自 DPDK VPP源码分析 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!