异步 Swift 代码需要能够和现有同步代码一起使用,这些同步代码使用 completion 回调或者 delegate 方法等技术来响应事件。在 continuations 上,异步任务可以挂起自身,同步代码能够捕获并调用 continuations 来恢复任务,响应事件。
Swift-evolution 关键点时间线:
Swift APIs 经常通过 callback 的方式提供异步代码执行操作。这可能是因为代码本身是在引入 async/await 之前编写的,也可能因为它与一些主要由事件驱动组成的系统相关联,在这种情况下,可能需要在内部使用 callback 的同时向程序提供异步接口。调用异步任务需要能够挂起其本身,同时为事件驱动同步系统提供一种机制来恢复它以响应事件。
Swift 库将会提供 API 用来为当前异步任务获取 continuation。获取任务的 continuation 会挂起该任务,并产生一个值,同步代码可以使用 handle 来恢复任务。最终给定的 API 基于 completion callback,例如:
func beginOperation(completion: (OperationResult) -> Void)
我们可以把上述beginOperation(completion:)
转为一个async
接口,即通过挂起该任务并在调用 callback 时,使用该任务的 continuation 恢复它,并把传进 callback 的参数转为异步函数的正常返回值:
func operation() async -> OperationResult {
// 挂起当前任务,并把它的 continuation 传给 closure,该 closure 会直接执行
return await withUnsafeContinuation { continuation in
// 调用同步基于回调的 API(the synchronous callback-based API)
beginOperation(completion: { result in
// 当执行回调时,恢复 continuation
continuation.resume(returning: result)
})
}
}
Swift 库提供了两个函数:withUnsafeContinuation
和withUnsafeThrowingContinuation
,它们均允许从异步代码内部调用基于 callback 的API。每个函数都接受一个 operation 闭包参数,基于 callback 的 API 将会调用该闭包。这个operation 闭包参数接受一个 continuation 实例,该 continuation 实例必须在 callback 中执行恢复操作,提供返回值或者抛出错误,它们会在异步任务恢复时,成为withUnsafeContinuation
或withUnsafeThrowingContinuation
的调用结果。
struct UnsafeContinuation<T, E: Error> {
func resume(returning: T)
func resume(throwing: E)
func resume(with result: Result<T, E>)
}
extension UnsafeContinuation where T == Void {
func resume() { resume(returning: ()) }
}
extension UnsafeContinuation where E == Error {
// Allow covariant use of a `Result` with a stricter error type than
// the continuation:
func resume<ResultError: Error>(with result: Result<T, ResultError>)
}
func withUnsafeContinuation<T>(
_ operation: (UnsafeContinuation<T, Never>) -> ()
) async -> T
func withUnsafeThrowingContinuation<T>(
_ operation: (UnsafeContinuation<T, Error>) throws -> ()
) async throws -> T
在当前任务上下文中,withUnsafe*Continuation
(表示withUnsafeContinuation
和withUnsafeThrowingContinuation
两个函数,下文类似)将会立即执行 operation 参数对应的闭包,并传入用于恢复任务的 continuation 参数值。operation
必须安排 continuation 在之后的某个点恢复。在operation
函数返回后,当前任务也已经挂起。当前任务必须通过调用 continuation 的resume
方法跳出挂起状态。注意resume
在将任务从暂停状态转换出来后,会立即把上下文的控制权返回给调用者,如果任务所在的执行器不重新调度它,任务本身实际上不会恢复执行。resume(throwing:)
可用来通过传递给定错误来恢复任务。为了方便起见,可以使用给定的Result
,resume(with:)
通过正常返回或者根据Result
状态引发错误来恢复任务。如果operation
在返回前引发了未捕获的错误,这就好像 operation 调用了resume(throwing:)
并出现错误一样。
如果withUnsafe*Continuation
返回类型是Void
,当调用resume(returning:)
函数时,必须指定()
的值。这样做会出现奇怪的代码(比如resume(returning: ())),所以Unsafe*Continuation<Void>
有另一个成员函数resume()
,让resume
调用可读性更强。
调用withUnsafeContinuation
之后,resume
函数在程序每个执行路径必须且仅调用一次。Unsafe*Continuation
是一个不安全的接口,因此如果在同一个 continuation 上多次调用resume
方法,会出现未定义的行为。任务在恢复执行之前都是挂起状态,如果 continuation 取消且从未调用resume
,此时任务在程序结束之前都一直保持挂起状态,会造成它所有的资源发生内存泄漏。包装器(Wrapper)可以提供对这些误用 continuation 的检查,库也会提供一个这样的包装器,如下所述。
例如,使用Unsafe*Continuation
API,可以包装这样的函数(例子为了表现 continuation API 的灵活性,故意编写的比较复杂):
func buyVegetables(
shoppingList: [String],
// a) if all veggies were in store, this is invoked *exactly-once*
onGotAllVegetables: ([Vegetable]) -> (),
// b) if not all veggies were in store, invoked one by one *one or more times*
onGotVegetable: (Vegetable) -> (),
// b) if at least one onGotVegetable was called *exactly-once*
// this is invoked once no more veggies will be emitted
onNoMoreVegetables: () -> (),
// c) if no veggies _at all_ were available, this is invoked *exactly once*
onNoVegetablesInStore: (Error) -> ()
)
// returns 1 or more vegetables or throws an error
func buyVegetables(shoppingList: [String]) async throws -> [Vegetable] {
try await withUnsafeThrowingContinuation { continuation in
var veggies: [Vegetable] = []
buyVegetables(
shoppingList: shoppingList,
onGotAllVegetables: { veggies in continuation.resume(returning: veggies) },
onGotVegetable: { v in veggies.append(v) },
onNoMoreVegetables: { continuation.resume(returning: veggies) },
onNoVegetablesInStore: { error in continuation.resume(throwing: error) },
)
}
}
let veggies = try await buyVegetables(shoppingList: ["onion", "bell pepper"])
由于把正确的 continuation 恢复操作调用编写到buyVegetables
函数复杂的 callback 中,我们可以为该函数提供更好的重载,并允许异步代码以更自然自上而下的方式与该函数交互。
Unsafe*Continuation
为连接同步和异步代码提供了一种轻量机制,但它容易误用,误用会以危险的方法破坏处理状态。为了在同步和异步代码开发接口时提供额外的安全性和指导,库会提供一个包装器,用来检查continuation
的不合法使用:
struct CheckedContinuation<T, E: Error> {
func resume(returning: T)
func resume(throwing: E)
func resume(with result: Result<T, E>)
}
extension CheckedContinuation where T == Void {
func resume()
}
extension CheckedContinuation where E == Error {
// Allow covariant use of a `Result` with a stricter error type than
// the continuation:
func resume<ResultError: Error>(with result: Result<T, ResultError>)
}
func withCheckedContinuation<T>(
_ operation: (CheckedContinuation<T, Never>) -> ()
) async -> T
func withCheckedThrowingContinuation<T>(
_ operation: (CheckedContinuation<T, Error>) throws -> ()
) async throws -> T
Unsafe*Continuation
API有意设计与Unsafe*Continuation
相同,这样代码就可以轻松在已检查和未检查之间切换。例如,上面buyVegetables
的例子可以通过把withUnsafeThrowingContinuation
换成withCheckedThrowingContinuation
选择检查:
// returns 1 or more vegetables or throws an error
func buyVegetables(shoppingList: [String]) async throws -> [Vegetable] {
try await withCheckedThrowingContinuation { continuation in
var veggies: [Vegetable] = []
buyVegetables(
shoppingList: shoppingList,
onGotAllVegetables: { veggies in continuation.resume(returning: veggies) },
onGotVegetable: { v in veggies.append(v) },
onNoMoreVegetables: { continuation.resume(returning: veggies) },
onNoVegetablesInStore: { error in continuation.resume(throwing: error) },
)
}
}
如果程序尝试多次恢复 continuation,Unsafe*Continuation
会导致未定义的行为,而CheckedContinuation
会导致陷入陷阱。CheckedContinuation
也会记录一个警告,如果 continuation 没有恢复任务就被丢弃,这会导致任务一直卡在挂起状态,它拥有的所有资源都会发生泄漏。无论程序的优化级别如何,都会进行这些检查。
Continuations 也能用来与事件驱动接口交互,这些接口比 callback 更复杂。只要整个过程遵循 continuation 被正确执行恢复操作一次的要求,continuation 可以在任何地方执行恢复操作。例如,当Operation
实现finish
操作时,会触发 continuation 的恢复操作:
class MyOperation: Operation {
let continuation: UnsafeContinuation<OperationResult, Never>
var result: OperationResult?
init(continuation: UnsafeContinuation<OperationResult, Never>) {
self.continuation = continuation
}
/* rest of operation populates `result`... */
override func finish() {
continuation.resume(returning: result!)
}
}
func doOperation() async -> OperationResult {
return await withUnsafeContinuation { continuation in
MyOperation(continuation: continuation).start()
}
}
下面例子来自 结构化并发提议 中,它把URLSession
封装到任务中,允许任务的取消控制 session 的取消,并使用 continuation 来响应网络活动中的数据和错误事件:
func download(url: URL) async throws -> Data? {
var urlSessionTask: URLSessionTask?
return try Task.withCancellationHandler {
urlSessionTask?.cancel()
} operation: {
let result: Data? = try await withUnsafeThrowingContinuation { continuation in
urlSessionTask = URLSession.shared.dataTask(with: url) { data, _, error in
if case (let cancelled as NSURLErrorCancelled)? = error {
continuation.resume(returning: nil)
} else if let error = error {
continuation.resume(throwing: error)
} else {
continuation.resume(returning: data)
}
}
urlSessionTask?.resume()
}
if let result = result {
return result
} else {
Task.cancel()
return nil
}
}
}
基于回调 API 的包装器也可以遵守其父/当前任务的取消操作,例如:
func fetch(items: Int) async throws -> [Items] {
let worker = ...
return try Task.withCancellationHandler(
handler: { worker?.cancel() }
) {
return try await withUnsafeThrowingContinuation { c in
worker.work(
onNext: { value in c.resume(returning: value) },
onCancelled: { value in c.resume(throwing: CancellationError()) },
)
}
}
}
如果任务允许有实例,可以获取调用fetch(items:)
函数的任务实例,并在 withUnsafeThrowingContinuation 内部有合适场景可以调用取消时,取消对该任务的调用。
CheckedContinuation
命名为Continuation
我们可以将CheckedContinuation
定位为执行同步/异步接口的"默认" API,方法是将 Checked 单词从名称中去掉。这当然符合 Swift 的常见理念,即首选安全接口,在性能是首要考虑因素的情况下,有选择得使用不安全接口。不过,有 2 个顾虑让我们没有这样做:
CheckedContinuation
的后果没有误用UnsafeContinuation
那么严重,但它仍然只尽力检查一些常见的误用模式,并且没有让继续误用的后果完全没有意义:丢弃没有执行恢复操作的 continuation 仍然会泄漏未恢复任务;尝试多次恢复 continuation 仍然会造成传到 continuation 中的信息丢失;如果with*Continuation
操作误用了 continuation,这仍然是一个严重的编程错误。CheckedContinuation
只会使错误更加明显。Continuation
类型占用了一个"好"名字,如果我们在将来的某个时候只移动类型,我们希望引入一个静态强制执行"恰好一次"属性的 continuation 类型。UnsafeContinuation
人们认为不应该暴露UnsafeContinuation
,因为可以用Checked
形式代替。我们认为只要用户验证了他们那些与性能敏感的 API 是正确的,就可以避免与这些 API 交互带来的检查成本。
CheckedContinuation
捕获所有误用, 或者记录所有误用CheckedContinuation
建议当程序在同一个 continuation 上尝试恢复同一个任务 2 次时进行捕获,但只在放弃 continuation 而未执行恢复操作时才记录警告。我们认为这是针对这些情况的正确权衡,原因如下:
CheckedContinuation
,多次执行恢复操作会破坏任务过程,并让它处于未定义状态。通过在任务多次恢复时捕获,CheckedContinuation
会把未定义行为变为定义良好的捕获情况。这点与标准库中其他 checked/unchecked 相似,比如!
和对于Optional
的unsafelyUnwrapped
。UnsafeContinuation
执行恢复操作失败,除了会泄漏挂起任务的资源,不会破坏任务;程序剩余的任务可以继续正常执行。而且,检测和报告这样泄漏的唯一办法是在类实现时使用deinit
方法。由于来自 ARC 优化的再计数可变性,执行 deinit 的确切点并非完全可预测。如果捕获deinit
方法,那么捕获是否执行以及何时执行可能会随着优化级别而变化,我们认为这不会带来好体验。 *Continuation
上公开更多Task
API, 或者允许在 continuation 中恢复Handle
Task
和Handle
API 对 handle 的持有者提供了任务状态的额外控制,特别是查询和设置取消状态,以及等待任务最终结果的能力。人们觉得为什么*Continuation
类型不公开这些功能。Continuation
的角色与Handle
大不相同,handle 代表且控制整个任务的生命周期,而 continuation 只代表任务生命周期中的单个挂起点。而且,*Continuation
API 主要设计用来允许与 Swift 中结构化并发模型之外的代码进行通信,任务之间的交互最好尽可能在该模型内处理。
注意*Continuation
本身也不需要支持任何任务 API。例如,某人希望某个任务在响应回调时取消其本身,他们可以通过在continuation
的 resume 类型(例如可选的nil
)插入哨兵来实现这一点:
let callbackResult: Result? = await withUnsafeContinuation { c in
someCallbackBasedAPI(
completion: { c.resume($0) },
cancellation: { c.resume(nil) })
}
if let result = callbackResult {
process(result)
} else {
cancel()
}
有些 API 除了接受 completion handler 和代理外,也允许程序控制在哪里调用 completion handler 和代理。例如,Apple 平台上的某些 API 为应该调用 completion handler 的调度队列使用参数。在这些情况下,如果原始的 API 能够在调度队列上(无论生命调度机制,比如线程或者 run loop)直接恢复任务,这是最佳场景,任务的执行器也会继续执行该任务。
为了做到这点,我们提供with*Continuation
的一个变体,除了提供 continuation,还提供任务期望在其上恢复执行的调度队列。with*Continuation
类型会提供一组unsafeResumeImmediately
API,这些 API 会在当前线程上立即回恢复当前任务的执行。它们有可能是这样:
// Given an API that takes a queue and completion handler:
func doThingAsynchronously(queue: DispatchQueue, completion: (ResultType) -> Void)
// We could wrap it in a Swift async function like:
func doThing() async -> ResultType {
await withUnsafeContinuationAndCurrentDispatchQueue { c, queue in
// Schedule to resume on the right queue, if we know it
doThingAsynchronously(queue: queue) {
c.unsafeResumeImmediately(returning: $0)
}
}
}
这种 API 必须很小心地使用,程序员也要很小心检查是否在正确的上下文中调用unsafeResumeImmediately
,并且在一段可能的无限时间内,从调用者中接管当前线程的控制权是安全的。如果在错误的上下文中执行任务,它会破坏当前已有代码,编译器和运行时所做的全部假设,最终导致错误很难调试。如果发现基于 continuation 适配器的"队列跳转"在实践中被证明是一个性能问题,我们可以将其作为核心提议的补充来研究。
第三次修改:
Continuation<T, E: Error>
类型代替单独的*Continuation<T>
和*ThrowingContinuation<T>
类型,Continuation<T, E: Error>
类型带有 Error 类型。resume()
方法,该方法相当于resume(returning: ())
方法,返回值为Void
类型。with*ThrowingContinuation
增加operation
block,该 block 有可能会抛出异常,如果从操作中传出了未捕获的错误,block 会立即恢复抛出错误的任务往下执行。第二次修改:
with*Continuation
和*Continuation.resume
的执行行为,即在挂起任务之前,with*Continuation
会立即在当前上下文中执行其操作参数,再取消挂起任务后,对应的resume
会立即返回它的调用方,任务将会由它的执行者调度。resume
时不必要的不变量;在with*Continuation
操作开始执行后的任何一个时间点,仅能有效调用一次resume
;当with*Continuation
操作返回时,不需要精确地调用resume
。Continuation
类型上增加resume()
。原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。