dart是单线程语言,支持异步,dart线程中有两个队列,分别为微任务队列和事件队列。在 Dart 的事件循环中,首先会执行同步代码,这些代码会直接在当前执行栈中执行,当同步任务执行完毕后,dart会从两个队列中依次取出任务执行,优先执行微任务队列,随后执行事件队列,所以如果将耗时较长的任务都放在主线程中执行,那么无疑会影响系统性能,dart主线程异步只支持耗时较短的任务。Dart 本身也支持多线程编程,Isolate 作为一种类似线程的概念(为了方便理解,以下部分会简称线程),提供了多任务并行的能力,但其使用相对复杂,且创建和销毁 Isolate 的过程较为繁重,对性能也会造成一定的负担。因此,本文决定将 Isolate 封装成一个插件,方便在日常应用中快速使用,并支持 Isolate 的复用,类似于 Java 线程池的概念,以避免频繁创建和销毁 Isolate 带来的性能损耗。
dependencies:
isolate_easy_pool: ^0.0.7
2.初始化SDK,初始化时机建议尽早执行
IsolatePool.getInstance().init();
3.调用IsolatePool中的runTask方法执行异步任务,runTask方法体是在子isolate中的Future运行,如果想在主Isolate(主线程)中接收回调(线程池初始化必须在主isolate),可以使用await等待runTask执行,runTask执行完后会主动切回主Isolate(主线程)
,并且返回信息,runTask参数接收一个泛型方法返回类型,下面的return "Task completed!";这个返回类型可以是任意类型,runTask接收者必须与此类型一致("Task completed!"是一个string类型,String data也只能是String接收)
void startExecuteIsolatePoolTask(int i) async {
// 运行一个简单的异步任务
String data = await IsolatePool.getInstance().runTask(() async {
//子线程任务
await Future.delayed(const Duration(seconds: 10)); // 模拟异步任务
//将信息返回给主线程
return "Task completed!";
});
//dart主线程
print("received====The $i task has been completed=${Isolate.current.debugName}");
}
4.isolate池销毁,当确定不再使用isolate池时,可以调用dispose函数进行销毁
,销毁后不能再次使用,建议在应用程序结束时调用
IsolatePool.getInstance().dispose();
初始化过程会调用Isolate.spawn并获取子isolate的sendPort以及创建子isolate的isolate中的receivePort,并将其一并封装存入集合内。init参数coreThreadSum代表的是线程池线程数,建议根据实际情况选择合适的Isolate数量(如CPU核心数,异步类型(CPU密集型、IO))等,默认4个Isolate。
isOpenLog,是否开启debug日志,默认不开启,建议debug模式开始,release模式关闭
Future<void> init([int coreThreadSum = 4, bool isOpenLog = false]) async {
// 启动线程池中的工作线程
if (!_isInit) {
_isOpenLog = isOpenLog;
_coreThreadSum = coreThreadSum;
_isolates.clear();
for (int i = 0; i < _coreThreadSum; i++) {
final sendPort = await _iTask.spawnIsolate();
_isolates.add(sendPort);
}
_isInit = true;
}
}
Future<IsolateMessage> spawnIsolate() async {
final receivePort = ReceivePort();
Isolate isolate = await Isolate.spawn(_worker, receivePort.sendPort);
// 获取工作线程的 SendPort
final sendPort = await receivePort.first;
return IsolateMessage(
sendPort: sendPort, receivePort: receivePort, isolate: isolate);
}
1.通过调用IsolatePoll中的runTask函数,该函数参数是一个函数类型,并且返回值是Future<T>,该返回值会直接返回给调用者Isolate
,runTask函数主要功能是判断Isolate池是否有空闲Isolate,如有空闲isoalte就直接将任务交给空闲isoalte执行,否则放入等待队列中
Future<T> runTask<T>(Future<T> Function() task) async {
if (!_isInit) {
await init();
}
if (_isDestroy) {
_printf("The current isolate pool has been destroyed");
final completer = Completer<T>();
return completer.future;
} else {
// 获取空闲线程的 SendPort
if (_isolates.isNotEmpty) {
_printf("run tasking");
// 创建任务完成的 Completer
final completer = Completer<T>();
return _runTask(task, completer);
} else {
//当前不存在空闲线程,加入等待队列
final completer = Completer<T>();
_waitQueue.addLast(IsolateQueue(func: task, completer: completer));
_printf("join wait queue,wait queue count ${_waitQueue.length}");
return completer.future;
}
}
}
2.从上面可以看出真正执行异步任务的是_runTask<T>函数,该函数通过调用sendTask函数将任务发送给子isolate
,刚刚在init函数内已经将子isolate中的sendPort拿到了,所以sendTask函数的功能是通过拿到的sendPort调用send函数将任务发送给子isolate
Future<T> _runTask<T>(
Future<T> Function() task, Completer<T> completer) async {
// 获取空闲线程的 SendPort
final isolate = _isolates.isNotEmpty
? _isolates.removeAt(0)
: await _iTask.spawnIsolate();
_iTask.sendTask(task, isolate, (message) {
completer.complete(message);
_isolates.add(isolate); // 任务完成后,释放SendPort给线程池
_checkExecuteNextTask();
});
return completer.future;
}
void sendTask(Future Function() task, IsolateMessage isolate,
void Function(dynamic) callback) {
final port = ReceivePort();
isolate.receivePort = port;
// 向Isolate发送任务
isolate.sendPort.send(<dynamic>[task, port.sendPort]);
isolate.msgBackPort = port;
// 监听任务完成的通知
port.listen((message) {
callback(message);
});
}
3.子siolate通过接收发送的信息获取需要执行的任务信息执行任务
,这里有个这个判断:result is String && result == IsolatePool.ISOLATE_DISPOSE,主要是应用于isoalte销毁
Future<void> _worker(SendPort sendPort) async {
final receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);
//isolate循环接收消息
await for (var message in receivePort) {
if (message == null) {
break;
}
final task = message[0] as Future Function();
final replyPort = message[1] as SendPort;
// 执行任务
final result = await task();
//完成后回调给当前调用isloate
replyPort.send(result);
if (result is String && result == IsolatePool.ISOLATE_DISPOSE) {
break;
}
}
receivePort.close();
}
4.当某一isolate任务执行完成后会对等待队列进行检测,如果等待队列还有任务需要被执行,立即从等待队列获取任务并交由该isolate执行
_checkExecuteNextTask() async {
//该函数存在并发情况,加锁进行同步,防止任务执行异常
await _lock.synchronized(() async {
_printf("check execute next task");
if (_waitQueue.isNotEmpty && _isolates.isNotEmpty) {
final nextTask = _waitQueue.removeFirst(); // 取出下一个任务
if (_waitQueue.isNotEmpty) {
_checkExecuteNextTask();
}
final task = nextTask.func; // 获取任务
final completer = nextTask.completer;
_printf(
"execute wait queue,isoloate free count ${_isolates.length},wait queue count ${_waitQueue.length}");
_runTask(task, completer);
}
});
}
5.IsolatePoll销毁:通过对子isolate发送指定类型信息(销毁指令),子isolate接收销毁指令
(上述介绍的result == IsolatePool.ISOLATE_DISPOSE判断),会结束消息接收,终止任务
,但是正在执行的任务不一定会立即被销毁,当前任务执行完成后才会被销毁
Future<void> dispose() async {
// 停止所有的线程
_printf("dispose all isolate");
for (var isolate in _isolates) {
final completer = Completer<String?>();
_iTask.sendTask(
() async {
return ISOLATE_DISPOSE;
},
isolate,
(message) {
completer.complete(message);
_destroy();
});
}
}
总结;上述是插件的主要功能和原理介绍,插件使用方法非常简单,只需要一行代码便可实现isolate复用,异步,自动切回主isolate功能,极大的提高了开发效率。
最后,附上github源码地址:https://github.com/LiuYX-Skio/isolate_easy_pool
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。