前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >非常优雅简单的isolate,一行代码轻松实现Isolate复用与异步任务执行

非常优雅简单的isolate,一行代码轻松实现Isolate复用与异步任务执行

原创
作者头像
用户4143901
发布2024-11-28 13:01:37
发布2024-11-28 13:01:37
1680
举报

背景介绍

dart是单线程语言,支持异步,dart线程中有两个队列,分别为微任务队列和事件队列。在 Dart 的事件循环中,首先会执行同步代码,这些代码会直接在当前执行栈中执行,当同步任务执行完毕后,dart会从两个队列中依次取出任务执行,优先执行微任务队列,随后执行事件队列,所以如果将耗时较长的任务都放在主线程中执行,那么无疑会影响系统性能,dart主线程异步只支持耗时较短的任务。Dart 本身也支持多线程编程,Isolate 作为一种类似线程的概念(为了方便理解,以下部分会简称线程),提供了多任务并行的能力,但其使用相对复杂,且创建和销毁 Isolate 的过程较为繁重,对性能也会造成一定的负担。因此,本文决定将 Isolate 封装成一个插件,方便在日常应用中快速使用,并支持 Isolate 的复用,类似于 Java 线程池的概念,以避免频繁创建和销毁 Isolate 带来的性能损耗。

主要功能

  1. 支持isolate数量自定义,支持isolate复用
  2. 支持isolate任务执行完成后主动切回调用者isolate(如在dart主线程调用任务执行完后会主动切回主线程)
  3. 当前isolate池无可用isolate时,支持添加到等待队列
  4. 使用简单,一行代码直接可创建执行任务使用方法1.更新pubspec.yaml文件并添加IsolatePool依赖项
代码语言:dart
复制
dependencies:
  isolate_easy_pool: ^0.0.7

2.初始化SDK,初始化时机建议尽早执行

代码语言:dart
复制
  IsolatePool.getInstance().init();

3.调用IsolatePool中的runTask方法执行异步任务,runTask方法体是在子isolate中的Future运行,如果想在主Isolate(主线程)中接收回调(线程池初始化必须在主isolate),可以使用await等待runTask执行,runTask执行完后会主动切回主Isolate(主线程),并且返回信息,runTask参数接收一个泛型方法返回类型,下面的return "Task completed!";这个返回类型可以是任意类型,runTask接收者必须与此类型一致("Task completed!"是一个string类型,String data也只能是String接收)

代码语言:dart
复制
  void startExecuteIsolatePoolTask(int i) async {
    // 运行一个简单的异步任务
    String data = await IsolatePool.getInstance().runTask(() async {
      //子线程任务
      await Future.delayed(const Duration(seconds: 10)); // 模拟异步任务
      //将信息返回给主线程
      return "Task completed!";
    });
    //dart主线程
    print("received====The $i task has been completed=${Isolate.current.debugName}");
  }

4.isolate池销毁,当确定不再使用isolate池时,可以调用dispose函数进行销毁,销毁后不能再次使用,建议在应用程序结束时调用

代码语言:dart
复制
IsolatePool.getInstance().dispose();

原理介绍

1.IsolatePool初始化

 初始化过程会调用Isolate.spawn并获取子isolate的sendPort以及创建子isolate的isolate中的receivePort,并将其一并封装存入集合内。init参数coreThreadSum代表的是线程池线程数,建议根据实际情况选择合适的Isolate数量(如CPU核心数,异步类型(CPU密集型、IO))等,默认4个Isolate。

isOpenLog,是否开启debug日志,默认不开启,建议debug模式开始,release模式关闭

代码语言:dart
复制
  Future<void> init([int coreThreadSum = 4, bool isOpenLog = false]) async {
    // 启动线程池中的工作线程
    if (!_isInit) {
      _isOpenLog = isOpenLog;
      _coreThreadSum = coreThreadSum;
      _isolates.clear();
      for (int i = 0; i < _coreThreadSum; i++) {
        final sendPort = await _iTask.spawnIsolate();
        _isolates.add(sendPort);
      }
      _isInit = true;
    }
  }

  Future<IsolateMessage> spawnIsolate() async {
    final receivePort = ReceivePort();
    Isolate isolate = await Isolate.spawn(_worker, receivePort.sendPort);
    // 获取工作线程的 SendPort
    final sendPort = await receivePort.first;
    return IsolateMessage(
        sendPort: sendPort, receivePort: receivePort, isolate: isolate);
  }

2.IsolatePool异步任务执行

1.通过调用IsolatePoll中的runTask函数,该函数参数是一个函数类型,并且返回值是Future<T>,该返回值会直接返回给调用者Isolate,runTask函数主要功能是判断Isolate池是否有空闲Isolate,如有空闲isoalte就直接将任务交给空闲isoalte执行,否则放入等待队列中

代码语言:dart
复制
  Future<T> runTask<T>(Future<T> Function() task) async {
    if (!_isInit) {
      await init();
    }
    if (_isDestroy) {
      _printf("The current isolate pool has been destroyed");
      final completer = Completer<T>();
      return completer.future;
    } else {
      // 获取空闲线程的 SendPort
      if (_isolates.isNotEmpty) {
        _printf("run tasking");
        // 创建任务完成的 Completer
        final completer = Completer<T>();
        return _runTask(task, completer);
      } else {
        //当前不存在空闲线程,加入等待队列
        final completer = Completer<T>();
        _waitQueue.addLast(IsolateQueue(func: task, completer: completer));
        _printf("join wait queue,wait queue count ${_waitQueue.length}");
        return completer.future;
      }
    }
  }

2.从上面可以看出真正执行异步任务的是_runTask<T>函数,该函数通过调用sendTask函数将任务发送给子isolate,刚刚在init函数内已经将子isolate中的sendPort拿到了,所以sendTask函数的功能是通过拿到的sendPort调用send函数将任务发送给子isolate

代码语言:dart
复制
  Future<T> _runTask<T>(
      Future<T> Function() task, Completer<T> completer) async {
    // 获取空闲线程的 SendPort
    final isolate = _isolates.isNotEmpty
        ? _isolates.removeAt(0)
        : await _iTask.spawnIsolate();
    _iTask.sendTask(task, isolate, (message) {
      completer.complete(message);
      _isolates.add(isolate); // 任务完成后,释放SendPort给线程池
      _checkExecuteNextTask();
    });
    return completer.future;
  }
    
  void sendTask(Future Function() task, IsolateMessage isolate,
      void Function(dynamic) callback) {
    final port = ReceivePort();
    isolate.receivePort = port;
    // 向Isolate发送任务
    isolate.sendPort.send(<dynamic>[task, port.sendPort]);
    isolate.msgBackPort = port;
    // 监听任务完成的通知
    port.listen((message) {
      callback(message);
    });
  }

3.子siolate通过接收发送的信息获取需要执行的任务信息执行任务,这里有个这个判断:result is String && result == IsolatePool.ISOLATE_DISPOSE,主要是应用于isoalte销毁

代码语言:dart
复制
  Future<void> _worker(SendPort sendPort) async {
    final receivePort = ReceivePort();
    sendPort.send(receivePort.sendPort);
    //isolate循环接收消息
    await for (var message in receivePort) {
      if (message == null) {
        break;
      }
      final task = message[0] as Future Function();
      final replyPort = message[1] as SendPort;
      // 执行任务
      final result = await task();
      //完成后回调给当前调用isloate
      replyPort.send(result);
      if (result is String && result == IsolatePool.ISOLATE_DISPOSE) {
        break;
      }
    }
    receivePort.close();
  }

4.当某一isolate任务执行完成后会对等待队列进行检测,如果等待队列还有任务需要被执行,立即从等待队列获取任务并交由该isolate执行

代码语言:dart
复制
  _checkExecuteNextTask() async {
    //该函数存在并发情况,加锁进行同步,防止任务执行异常
    await _lock.synchronized(() async {
      _printf("check execute next task");
      if (_waitQueue.isNotEmpty && _isolates.isNotEmpty) {
        final nextTask = _waitQueue.removeFirst(); // 取出下一个任务
        if (_waitQueue.isNotEmpty) {
          _checkExecuteNextTask();
        }
        final task = nextTask.func; // 获取任务
        final completer = nextTask.completer;
        _printf(
            "execute wait queue,isoloate free count ${_isolates.length},wait queue count ${_waitQueue.length}");
        _runTask(task, completer);
      }
    });
  }

5.IsolatePoll销毁:通过对子isolate发送指定类型信息(销毁指令),子isolate接收销毁指令(上述介绍的result == IsolatePool.ISOLATE_DISPOSE判断),会结束消息接收,终止任务,但是正在执行的任务不一定会立即被销毁,当前任务执行完成后才会被销毁

代码语言:dart
复制
  Future<void> dispose() async {
    // 停止所有的线程
    _printf("dispose all isolate");
    for (var isolate in _isolates) {
      final completer = Completer<String?>();
      _iTask.sendTask(
          () async {
            return ISOLATE_DISPOSE;
          },
          isolate,
          (message) {
            completer.complete(message);
            _destroy();
          });
    }
  }

总结;上述是插件的主要功能和原理介绍,插件使用方法非常简单,只需要一行代码便可实现isolate复用,异步,自动切回主isolate功能,极大的提高了开发效率。

最后,附上github源码地址:https://github.com/LiuYX-Skio/isolate_easy_pool

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景介绍
  • 主要功能
  • 原理介绍
    • 1.IsolatePool初始化
    • 2.IsolatePool异步任务执行
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档