先看下用例源码:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <uv.h>
#define FIB_UNTIL 25
uv_loop_t *loop;
uv_work_t fib_reqs[FIB_UNTIL];
long fib_(long t) {
if (t == 0 || t == 1)
return 1;
else
return fib_(t-1) + fib_(t-2);
}
void fib(uv_work_t *req) {
int n = *(int *) req->data;
if (random() % 2)
sleep(1);
else
sleep(3);
long fib = fib_(n);
fprintf(stderr, "%dth fibonacci is %lu\n", n, fib);
}
void after_fib(uv_work_t *req, int status) {
if (status == UV_ECANCELED)
fprintf(stderr, "Calculation of %d cancelled.\n", *(int *) req->data);
}
void signal_handler(uv_signal_t *req, int signum)
{
printf("Signal received!\n");
int i;
for (i = 0; i < FIB_UNTIL; i++) {
// 取消在这里调用
uv_cancel((uv_req_t*) &fib_reqs[i]);
}
uv_signal_stop(req);
}
int main() {
loop = uv_default_loop();
int data[FIB_UNTIL];
int i;
// 依次提交任务到线程池的工作队列中
for (i = 0; i < FIB_UNTIL; i++) {
data[i] = i;
fib_reqs[i].data = (void *) &data[i];
uv_queue_work(loop, &fib_reqs[i], fib, after_fib);
}
uv_signal_t sig;
uv_signal_init(loop, &sig);
// 接收到终止信号后把没有结束的任务取消掉
uv_signal_start(&sig, signal_handler, SIGINT);
return uv_run(loop, UV_RUN_DEFAULT);
}
看下呗插入到线程池的任务如何取消:
int uv_cancel(uv_req_t* req) {
struct uv__work* wreq;
uv_loop_t* loop;
switch (req->type) {
case UV_FS:
loop = ((uv_fs_t*) req)->loop;
wreq = &((uv_fs_t*) req)->work_req;
break;
case UV_GETADDRINFO:
loop = ((uv_getaddrinfo_t*) req)->loop;
wreq = &((uv_getaddrinfo_t*) req)->work_req;
break;
case UV_GETNAMEINFO:
loop = ((uv_getnameinfo_t*) req)->loop;
wreq = &((uv_getnameinfo_t*) req)->work_req;
break;
case UV_RANDOM:
loop = ((uv_random_t*) req)->loop;
wreq = &((uv_random_t*) req)->work_req;
break;
case UV_WORK:
loop = ((uv_work_t*) req)->loop;
wreq = &((uv_work_t*) req)->work_req;
break;
default:
return UV_EINVAL;
}
// 根据类型转换指针类型
return uv__work_cancel(loop, req, wreq);
}
static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
int cancelled;
uv_mutex_lock(&mutex);
uv_mutex_lock(&w->loop->wq_mutex);
// 先看是否支持取消 被插入队列中且work函数尚未执行
cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL;
if (cancelled)
QUEUE_REMOVE(&w->wq);
uv_mutex_unlock(&w->loop->wq_mutex);
uv_mutex_unlock(&mutex);
if (!cancelled)
return UV_EBUSY;
// 替换work函数 插入loop的就绪wq中 通知loop执行它
w->work = uv__cancelled;
uv_mutex_lock(&loop->wq_mutex);
QUEUE_INSERT_TAIL(&loop->wq, &w->wq);
uv_async_send(&loop->wq_async);
uv_mutex_unlock(&loop->wq_mutex);
return 0;
}
// 终止线程
static void uv__cancelled(struct uv__work* w) {
abort();
}
void uv__work_done(uv_async_t* handle) {
struct uv__work* w;
uv_loop_t* loop;
QUEUE* q;
QUEUE wq;
int err;
loop = container_of(handle, uv_loop_t, wq_async);
uv_mutex_lock(&loop->wq_mutex);
QUEUE_MOVE(&loop->wq, &wq);
uv_mutex_unlock(&loop->wq_mutex);
while (!QUEUE_EMPTY(&wq)) {
q = QUEUE_HEAD(&wq);
QUEUE_REMOVE(q);
w = container_of(q, struct uv__work, wq);
// 这里就是上文决定是否是取消的任务
err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
w->done(w, err);
}
}
总结:cancel把会work从待处理的队列中移动到loop的wq就绪队列中,然后让loop的异步任务handler来处理它
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。