结合RxJava处理单个元素的API调用和列表的API调用,关键在于利用RxJava的操作符将异步任务流式化,并处理并发、错误重试等场景。以下是完整的解决方案:
Observable
/Flowable
处理数据流。getUser(id)
)。getUser
)。subscribeOn
/observeOn
控制线程。onErrorResumeNext
或retry
处理。flatMap
+并发参数优化性能。Observable<User> fetchUser(int id) {
return apiService.getUser(id) // 假设返回Observable<User>
.subscribeOn(Schedulers.io())
.onErrorResumeNext(e -> {
// 错误处理(如返回空用户或重试)
return Observable.just(User.EMPTY);
});
}
场景1:顺序执行(逐个调用)
List<Integer> ids = Arrays.asList(1, 2, 3);
Observable.fromIterable(ids)
.concatMap(id -> fetchUser(id)) // 顺序执行
.toList()
.subscribe(users -> System.out.println(users));
场景2:并发执行(控制最大并发数)
Observable.fromIterable(ids)
.flatMap(id -> fetchUser(id), 5) // 最大并发5个请求
.toList()
.subscribe(users -> System.out.println(users));
场景3:批量请求(合并为单个API)
// 假设后端支持批量查询(如GET /users?ids=1,2,3)
Observable<List<User>> fetchUsers(List<Integer> ids) {
return apiService.getUsers(ids) // 返回Observable<List<User>>
.subscribeOn(Schedulers.io());
}
// 调用示例
fetchUsers(ids).subscribe(users -> System.out.println(users));
flatMap
/concatMap
:将单个元素转换为异步流。toList()
:收集所有结果合并为列表。retryWhen
:实现指数退避重试(示例见下文)。fetchUser(id)
.retryWhen(errors ->
errors.flatMap(e -> (e instanceof TimeoutException) ?
Observable.timer(1, TimeUnit.SECONDS) :
Observable.error(e)))
.subscribe();
Flowable
代替Observable
防止数据溢出。cache()
结果。timeout
避免长时间阻塞。// 模拟API服务
interface ApiService {
Observable<User> getUser(int id);
Observable<List<User>> getUsers(List<Integer> ids);
}
// 实际调用
List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);
// 方案1:并发控制(最大3个并行请求)
Observable.fromIterable(ids)
.flatMap(id -> apiService.getUser(id)
.onErrorReturnItem(User.EMPTY)
.subscribeOn(Schedulers.io()), 3)
.filter(user -> user != User.EMPTY)
.toList()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(users -> updateUI(users));
// 方案2:批量请求(更高效)
apiService.getUsers(ids)
.timeout(5, TimeUnit.SECONDS)
.retry(3)
.subscribe(users -> updateUI(users));
通过上述方法,可以灵活处理单元素与列表API调用的组合需求,兼顾性能与代码可维护性。
没有搜到相关的文章