前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >Java8 中的 Stream 那么强大,那你知道它的原理是什么吗?

Java8 中的 Stream 那么强大,那你知道它的原理是什么吗?

作者头像
逆锋起笔
发布于 2021-11-12 02:04:10
发布于 2021-11-12 02:04:10
91100
代码可运行
举报
文章被收录于专栏:逆锋起笔逆锋起笔
运行总次数:0
代码可运行
作者:岁月安然

elsef.com/2019/09/16/Java8中Stream的原理分析

Java 8 API添加了一个新的抽象称为流Stream,可以让你以一种声明的方式处理数据。 Stream 使用一种类似用 SQL 语句从数据库查询数据的直观方式来提供一种对 Java 集合运算和表达的高阶抽象。 Stream API可以极大提高Java程序员的生产力,让程序员写出高效率、干净、简洁的代码。 本文会对Stream的实现原理进行剖析。

Stream的组成与特点

Stream(流)是一个来自数据源的元素队列并支持聚合操作:

  • 元素是特定类型的对象,形成一个队列。 Java中的Stream并_不会_向集合那样存储和管理元素,而是按需计算
  • 数据源流的来源可以是集合Collection、数组ArrayI/O channel, 产生器generator
  • 聚合操作类似SQL语句一样的操作, 比如filter, map, reduce, find, match, sorted

和以前的Collection操作不同, Stream操作还有两个基础的特征:

  • Pipelining: 中间操作都会返回流对象本身。这样多个操作可以串联成一个管道, 如同流式风格(fluent style)。这样做可以对操作进行优化, 比如延迟执行(laziness evaluation)和短路( short-circuiting)
  • 内部迭代:以前对集合遍历都是通过Iterator或者For-Each的方式, 显式的在集合外部进行迭代, 这叫做外部迭代。 Stream提供了内部迭代的方式, 通过访问者模式 (Visitor)实现。

和迭代器又不同的是,Stream 可以并行化操作,迭代器只能命令式地、串行化操作。顾名思义,当使用串行方式去遍历时,每个 item 读完后再读下一个 item。而使用并行去遍历时,数据会被分成多个段,其中每一个都在不同的线程中处理,然后将结果一起输出。

Stream 的并行操作依赖于 Java7 中引入的 Fork/Join 框架(JSR166y)来拆分任务和加速处理过程。Java 的并行 API 演变历程基本如下:

1.0-1.4 中的 java.lang.Thread 5.0 中的 java.util.concurrent 6.0 中的 Phasers 等 7.0 中的 Fork/Join 框架 8.0 中的 Lambda

Stream具有平行处理能力,处理的过程会分而治之,也就是将一个大任务切分成多个小任务,这表示每个任务都是一个操作:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
       .forEach(out::println); 

可以看到一行简单的代码就帮我们实现了并行输出集合中元素的功能,但是由于并行执行的顺序是不可控的所以每次执行的结果不一定相同。

如果非得相同可以使用forEachOrdered方法执行终止操作:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
       .forEachOrdered(out::println);  

这里有一个疑问,如果结果需要有序,是否和我们的并行执行的初衷相悖?是的,这个场景下明显无需使用并行流,直接用串行流执行即可, 否则性能可能更差,因为最后又强行将所有并行结果进行了排序。

OK,下面我们先介绍一下Stream接口的相关知识。

BaseStream接口

Stream的父接口是BaseStream,后者是所有流实现的顶层接口,定义如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface BaseStream<T, S extends BaseStream<T, S>>
        extends AutoCloseable {
    Iterator<T> iterator();

    Spliterator<T> spliterator();

    boolean isParallel();

    S sequential();

    S parallel();

    S unordered();

    S onClose(Runnable closeHandler);

    void close();
}

其中,T为流中元素的类型,S为一个BaseStream的实现类,它里面的元素也是T并且S同样是自己:

S extends BaseStream<T, S>

是不是有点晕?

其实很好理解,我们看一下接口中对S的使用就知道了:如sequential()parallel()这两个方法,它们都返回了S实例,也就是说它们分别支持对当前流进行串行或者并行的操作,并返回「改变」后的流对象。

如果是并行一定涉及到对当前流的拆分,即将一个流拆分成多个子流,子流肯定和父流的类型是一致的。子流可以继续拆分子流,一直拆分下去…

也就是说这里的SBaseStream的一个实现类,它同样是一个流,比如StreamIntStreamLongStream等。

Stream接口

再来看一下Stream的接口声明:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public interface Stream<T> extends BaseStream<T, Stream<T>> 

参考上面的解释这里不难理解:即Stream<T>可以继续拆分为Stream<T>,我们可以通过它的一些方法来证实:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Stream<T> filter(Predicate<? super T> predicate);
<R> Stream<R> map(Function<? super T, ? extends R> mapper);
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
Stream<T> sorted();
Stream<T> peek(Consumer<? super T> action);
Stream<T> limit(long maxSize);
Stream<T> skip(long n);
...

这些都是操作流的中间操作,它们的返回结果必须是流对象本身。

关闭流操作

BaseStream 实现了 AutoCloseable 接口,也就是 close() 方法会在流关闭时被调用。同时,BaseStream 中还给我们提供了onClose()方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/** * Returns an equivalent stream with an additional close handler. Close * handlers are run when the {@link #close()} method * is called on the stream, and are executed in the order they were * added. All close handlers are run, even if earlier close handlers throw * exceptions. If any close handler throws an exception, the first * exception thrown will be relayed to the caller of {@code close()}, with * any remaining exceptions added to that exception as suppressed exceptions * (unless one of the remaining exceptions is the same exception as the * first exception, since an exception cannot suppress itself.) May * return itself. * * <p>This is an <a href="package-summary.html#StreamOps">intermediate * operation</a>. * * @param closeHandler A task to execute when the stream is closed * @return a stream with a handler that is run if the stream is closed */
S onClose(Runnable closeHandler);

AutoCloseableclose()接口被调用的时候会触发调用流对象的onClose()方法,但有几点需要注意:

  • onClose() 方法会返回流对象本身,也就是说可以对改对象进行多次调用
  • 如果调用了多个onClose() 方法,它会按照调用的顺序触发,但是如果某个方法有异常则只会向上抛出第一个异常
  • 前一个 onClose() 方法抛出了异常不会影响后续 onClose() 方法的使用
  • 如果多个 onClose() 方法都抛出异常,只展示第一个异常的堆栈,而其他异常会被压缩,只展示部分信息

并行流和串行流

BaseStream接口中分别提供了并行流串行流两个方法,这两个方法可以任意调用若干次,也可以混合调用,但最终只会以最后一次方法调用的返回结果为准

参考parallel()方法的说明:

Returns an equivalent stream that is parallel. May return itself, either because the stream was already parallel, or because the underlying stream state was modified to be parallel.

所以多次调用同样的方法并不会生成新的流,而是直接复用当前的流对象。

下面的例子里以最后一次调用parallel()为准,最终是并行地计算sum

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
stream.parallel()
   .filter(...)
   .sequential()
   .map(...)
   .parallel()
   .sum();

ParallelStream背后的男人:ForkJoinPool

ForkJoin框架是从JDK7中新特性,它同ThreadPoolExecutor一样,也实现了Executor和ExecutorService 接口。它使用了一个「无限队列」来保存需要执行的任务,而线程的数量则是通过构造函数传入, 如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。

ForkJoinPool主要用来使用分治法(Divide-and-Conquer Algorithm) 来解决问题,典型的应用比如_快速排序算法_。这里的要点在于,ForkJoinPool需要使用相对少的线程来处理大量的任务。比如要对1000万个数据进行排序,那么会将这个任务分割成两个500 万的排序任务一个针对这两组500万数据的合并任务

以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。那么到最后,所有的任务加起来会有大概2000000+个。

问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行,想象一下归并排序的过程。

所以当使用ThreadPoolExecutor时,使用分治法会存在问题,因为ThreadPoolExecutor中的线程无法向 任务队列中再添加一个任务并且在等待该任务完成之后再继续执行。而使用ForkJoinPool时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行

那么使用ThreadPoolExecutor或者ForkJoinPool,会有什么性能的差异呢?

首先,使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有「父子关系」的任务,比如使用4个线程来完成超过200万个任务。使用ThreadPoolExecutor 时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,显然这是不可行的。

Work Stealing原理:

  1. 每个工作线程都有自己的工作队列WorkQueue;
  2. 这是一个双端队列dequeue,它是线程私有的;
  3. ForkJoinTask中fork的子任务,将放入运行该任务的工作线程的队头,工作线程将以LIFO的顺序来处理工作队列中的任务,即堆栈的方式;
  4. 为了最大化地利用CPU,空闲的线程将从其它线程的队列中「窃取」任务来执行
  5. 但是是从工作队列的尾部窃取任务,以减少和队列所属线程之间的竞争;
  6. 双端队列的操作:push()/pop()仅在其所有者工作线程中调用,poll()是由其它线程窃取任务时调用的;
  7. 当只剩下最后一个任务时,还是会存在竞争,是通过CAS来实现的;

用ForkJoinPool的眼光来看ParallelStream

Java 8为ForkJoinPool添加了一个通用线程池,这个线程池用来处理那些没有被显式提交到任何线程池的任务。它是ForkJoinPool类型上的一个静态元素,它拥有的默认线程数量等于运行计算机上的CPU数量。当调用Arrays 类上添加的新方法时,自动并行化就会发生。比如用来排序一个数组的并行快速排序,用来对一个数组中的元素进行并行遍历。自动并行化也被运用在Java 8新添加的Stream API中。

比如下面的代码用来遍历列表中的元素并执行需要的操作:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
List<UserInfo> userInfoList =
        DaoContainers.getUserInfoDAO().queryAllByList(new UserInfoModel());
userInfoList.parallelStream().forEach(RedisUserApi::setUserIdUserInfo);

对于列表中的元素的操作都会以并行的方式执行。forEach方法会为每个元素的计算操作创建一个任务,该任务会被前文中提到的ForkJoinPool中的commonPool处理。以上的并行计算逻辑当然也可以使用ThreadPoolExecutor完成,但是就代码的可读性和代码量而言,使用ForkJoinPool明显更胜一筹。

对于ForkJoinPool通用线程池的线程数量,通常使用默认值就可以了,即运行时计算机的处理器数量。也可以通过设置系统属性:-Djava.util.concurrent .ForkJoinPool.common.parallelism=N (N为线程数量),来调整ForkJoinPool的线程数量。

值得注意的是,当前执行的线程也会被用来执行任务,所以最终的线程个数为N+1,1就是当前的主线程

这里就有一个问题,如果你在并行流的执行计算使用了_阻塞操作_,如I/O,那么很可能会导致一些问题:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static String query(String question) {
  List<String> engines = new ArrayList<String>();
  engines.add("http://www.google.com/?q=");
  engines.add("http://duckduckgo.com/?q=");
  engines.add("http://www.bing.com/search?q=");
   
  // get element as soon as it is available
  Optional<String> result = engines.stream().parallel().map((base) - {
    String url = base + question;
    // open connection and fetch the result
    return WS.url(url).get();
  }).findAny();
  return result.get();
}

这个例子很典型,让我们来分析一下:

  • 这个并行流计算操作将由主线程和JVM默认的ForkJoinPool.commonPool()来共同执行。
  • map中是一个阻塞方法,需要通过访问HTTP接口并得到它的response,所以任何一个worker线程在执行到这里的时候都会阻塞并等待结果。
  • 所以当此时再其他地方通过并行流方式调用计算方法的时候,将会受到此处阻塞等待的方法的影响。
  • 目前的ForkJoinPool的实现并未考虑补偿等待那些阻塞在等待新生成的线程的工作worker线程,所以最终ForkJoinPool.commonPool()中的线程将备用光并且阻塞等待。

正如我们上面那个列子的情况分析得知,lambda的执行并不是瞬间完成的,所有使用parallel streams的程序都有可能成为阻塞程序的源头, 并且在执行过程中程序中的其他部分将无法访问这些workers,这意味着任何依赖parallel streams的程序在什么别的东西占用着common ForkJoinPool时将会变得不可预知并且暗藏危机。

小结:

  1. 当需要处理递归分治算法时,考虑使用ForkJoinPool。
  2. 仔细设置不再进行任务划分的阈值,这个阈值对性能有影响。
  3. Java 8中的一些特性会使用到ForkJoinPool中的通用线程池。在某些场合下,需要调整该线程池的默认的线程数量
  4. lambda应该尽量避免副作用,也就是说,避免突变基于堆的状态以及任何IO
  5. lambda应该互不干扰,也就是说避免修改数据源(因为这可能带来线程安全的问题)
  6. 避免访问在流操作生命周期内可能会改变的状态

并行流的性能

并行流框架的性能受以下因素影响:

  • 数据大小:数据够大,每个管道处理时间够长,并行才有意义;
  • 数据结构:每个管道操作都是基于初始数据源,通常是集合,将不同的集合数据源分割会有一定消耗;
  • 装箱:处理基本类型比装箱类型要快;
  • 核的数量:默认情况下,核数量越多,底层fork/join线程池启动线程就越多;
  • 单元处理开销:花在流中每个元素身上的时间越长,并行操作带来的性能提升越明显;

源数据结构分为以下3组:

  • 性能好:ArrayList、数组或IntStream.range(数据支持随机读取,能轻易地被任意分割)
  • 性能一般:HashSetTreeSet(数据不易公平地分解,大部分也是可以的)
  • 性能差:LinkedList(需要遍历链表,难以对半分解)、Stream.iterateBufferedReader.lines(长度未知,难以分解)

注意:下面几个部分节选自:Streams 的幕后原理,顺便感谢一下作者_Brian Goetz_,写的太通透了。推荐:拥抱 Java 8 并行流吧,让执行速度飞起!

NQ模型

要确定并行性是否会带来提速,需要考虑的最后两个因素是:可用的数据量和针对每个数据元素执行的计算量。

在我们最初的并行分解描述中,我们采用的概念是拆分来源,直到分段足够小,以致解决该分段上的问题的顺序方法更高效。分段大小必须依赖于所解决的问题,确切的讲,取决于每个元素完成的工作量。例如,计算一个字符串的长度涉及的工作比计算字符串的 SHA-1 哈希值要少得多。为每个元素完成的工作越多,“大到足够利用并行性” 的阈值就越低。类似地,拥有的数据越多, 拆分的分段就越多,而不会与 “太小” 阈值发生冲突。

一个简单但有用的并行性能模型是 NQ 模型,其中 N 是数据元素数量,Q 是为每个元素执行的工作量。乘积 N*Q 越大,就越有可能获得并行提速。对于具有很小的 Q 的问题,比如对数字求和,您通常可能希望看到 N > 10,000 以获得提速;随着 Q 增加,获得提速所需的数据大小将会减小。

并行化的许多阻碍(比如拆分成本、组合成本或遇到顺序敏感性)都可以通过 Q 更高的操作来缓解。尽管拆分某个 LinkedList 特征的结果可能很糟糕,但只要拥有足够大的 Q,仍然可能获得并行提速。

遇到顺序

遇到顺序指的是来源分发元素的顺序是否对计算至关重要。一些来源(比如基于哈希的集合和映射)没有有意义的遇到顺序。流标志 ORDERED 描述了流是否有有意义的遇到顺序。JDK 集合的 spliterator 会根据集合的规范来设置此标志;一些中间操作可能注入 ORDERED (sorted()) 或清除它 (unordered())。

如果流没有遇到顺序,大部分流操作都必须遵守该顺序。对于顺序执行,会「自动保留遇到顺序」,因为元素会按遇到它们的顺序自然地处理。甚至在并行执行中,许多操作(无状态中间操作和一些终止操作(比如 reduce())),遵守遇到顺序不会产生任何实际成本。但对于其他操作(有状态中间操作,其语义与遇到顺序关联的终止操作,比如 findFirst()forEachOrdered()), 在并行执行中遵守遇到顺序的责任可能很重大。如果流有一个已定义的遇到顺序,但该顺序对结果没有意义, 那么可以通过使用 unordered() 操作删除 ORDERED 标志,加速包含顺序敏感型操作的管道的顺序执行。

作为对遇到顺序敏感的操作的示例,可以考虑 limit(),它会在指定大小处截断一个流。在顺序执行中实现 limit() 很简单:保留一个已看到多少元素的计数器,在这之后丢弃任何元素。但是在并行执行中,实现 limit() 要复杂得多;您需要保留前 N 个元素。此要求大大限制了利用并行性的能力;如果输入划分为多个部分,您只有在某个部分之前的所有部分都已完成后,才知道该部分的结果是否将包含在最终结果中。因此,该实现一般会错误地选择不使用所有可用的核心,或者缓存整个试验性结果,直到您达到目标长度。

如果流没有遇到顺序,limit() 操作可以自由选择任何 N 个元素,这让执行效率变得高得多。知道元素后可立即将其发往下游, 无需任何缓存,而且线程之间唯一需要执行的协调是发送一个信号来确保未超出目标流长度。

遇到顺序成本的另一个不太常见的示例是排序。如果遇到顺序有意义,那么 sorted() 操作会实现一种稳定 排序 (相同的元素按照它们进入输入时的相同顺序出现在输出中),而对于无序的流,稳定性(具有成本)不是必需的。 distinct() 具有类似的情况:如果流有一个遇到顺序,那么对于多个相同的输入元素,distinct() 必须发出其中的第一个, 而对于无序的流,它可以发出任何元素 — 同样可以获得高效得多的并行实现。

在您使用 collect() 聚合时会遇到类似的情形。如果在无序流上执行 collect(groupingBy()) 操作, 与任何键对应的元素都必须按它们在输入中出现的顺序提供给下游收集器。此顺序对应用程序通常没有什么意义,而且任何顺序都没有意义。在这些情况下,可能最好选择一个并发 收集器(比如 groupingByConcurrent()),它可以忽略遇到顺序, 并让所有线程直接收集到一个共享的并发数据结构中(比如 ConcurrentHashMap),而不是让每个线程收集到它自己的中间映射中, 然后再合并中间映射(这可能产生很高的成本)。

什么时候该使用并行流

谈了这么多,关于并行流parallelStream的使用注意事项需要格外注意,它并不是解决性能的万金油,相反,如果使用不当会严重影响性能。我会在另外一篇文章里单独谈这个问题。

逆锋起笔是一个专注于程序员圈子的技术平台,你可以收获最新技术动态最新内测资格BAT等大厂的经验精品学习资料职业路线副业思维,微信搜索逆锋起笔关注!

References

  • http://movingon.cn/2017/05/02/jdk8-Stream-BaseStream-%E6%BA%90%E7%A0%81%E9%9A%BE%E7%82%B9%E6%B5%85%E6%9E%901/
  • https://www.jianshu.com/p/bd825cb89e00
  • https://jrebel.com/rebellabs/java-parallel-streams-are-bad-for-your-health/
  • https://blog.csdn.net/weixx3/article/details/81266552
  • https://www.ibm.com/developerworks/cn/java/j-java-streams-5-brian-goetz/index.html
  • https://www.ibm.com/developerworks/cn/java/j-java-streams-3-brian-goetz/index.html
  • https://juejin.im/post/5dc5a148f265da4d4f65c191
  • https://stackoverrun.com/cn/q/10341100
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-11-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 逆锋起笔 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Java8 中的 Stream 那么彪悍,你知道它的原理是什么吗?
> 公众号:[Java小咖秀](https://t.1yb.co/jwkk),网站:[javaxks.com](https://www.javaxks.com)
Java小咖秀
2021/03/21
6680
Java 8 的Stream流那么强大,你知道它的原理吗
Java 8 API添加了一个新的抽象称为流Stream,可以让你以一种声明的方式处理数据。
玖柒的小窝
2021/12/09
8170
Java8并行http请求加快访问速度 原
1.通常我们在获取到一个list列表后需要一个挨着一个的进行遍历处理数据,如果每次处理都需要长时间操作,那整个流程下来时间就是每一次处理时间的总和。
chinotan
2019/04/03
2.7K0
Java8并行http请求加快访问速度
                                                                            原
深入浅出vue_深入浅出pandas
Stream 不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的,它更像一个高级版本的 Iterator。原始版本的 Iterator,用户只能显式地一个一个遍历元素并对其执行某些操作;高级版本的 Stream,用户只要给出需要对其包含的元素执行什么操作,比如 “过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等,Stream 会隐式地在内部进行遍历,做出相应的数据转换。
全栈程序员站长
2022/11/04
4470
JAVA8 中的stream 是什么?
元素是特定类型的对象,形成一个队列。Java中的Stream并不会存储元素,而是按需计算。 数据源 流的来源。可以是集合,数组,I/O channel, 产生器generator 等。 聚合操作 类似SQL语句一样的操作, 比如filter, map, reduce, find, match, sorted等。 和以前的Collection操作不同, Stream操作还有两个基础的特征:
Java宝典
2021/01/14
1.4K0
深入浅出parallelStream
今天小强带来java8的Stream,Stream是java8中新增加的一个特性,被java猿统称为流。
程序员小强
2019/06/03
1.3K0
Lambda表达式最佳实践(2)Stream与ParallelStream
range(int startInclusive, int endExclusive) rangeClosed(int startInclusive, int endInclusive)
干货满满张哈希
2021/04/12
6490
【Java基础-1】 Java8新特性Stream详解
Java8的API中添加了一个新的特性: 流,即stream。stream是将数组或者集合的元素视为流,流在管道中流动过程中,对数据进行筛选、排序和其他操作。
云深i不知处
2020/09/16
1.2K0
把Stream流学透了你也能写出简洁高效的代码,快来点击进来看看吧(建议收藏)
  最近刚好有空给大家整理下JDK8的特性,这个在实际开发中的作用也是越来越重了,本文重点讲解下Stream API
用户4919348
2021/06/10
5100
把Stream流学透了你也能写出简洁高效的代码,快来点击进来看看吧(建议收藏)
Java 8 Stream计算原理
从Java 8 开始,我们可以使用Stream接口以及lambda表达式进行“流式计算”。它可以让我们对集合的操作更加简洁、更加可读、更加高效。
Jensen_97
2023/07/20
2330
Java 8 Stream计算原理
【Java8新特性】关于Java8的Stream API,看这一篇就够了!!
作者个人研发的在高并发场景下,提供的简单、稳定、可扩展的延迟消息队列框架,具有精准的定时任务和延迟队列处理功能。自开源半年多以来,已成功为十几家中小型企业提供了精准定时调度方案,经受住了生产环境的考验。为使更多童鞋受益,现给出开源框架地址:
冰河
2020/10/29
9070
【Java8新特性】关于Java8的Stream API,看这一篇就够了!!
Java8并行流:执行速度快的飞起!
并且还需要关注多个线程之间共享变量的修改问题。而 Java8 为我们提供了并行流,可以一键开启并行模式。是不是很酷呢?让我们来看看。
程序员白楠楠
2020/12/17
1.3K0
Java8的Stream流详解「建议收藏」
关于应用在Stream流上的操作,可以分成两种:Intermediate(中间操作)和Terminal(终止操作)。中间操作的返回结果都是Stream,故可以多个中间操作叠加;终止操作用于返回我们最终需要的数据,只能有一个终止操作。至于哪些方法是中间操作,哪些方法是终止操作,我们一会儿再说。
全栈程序员站长
2022/11/08
1.1K0
Java8的Stream流详解「建议收藏」
4.强大的 Stream API
注:在项目开发过程中,使用流式处理时,在数据量较大的情况下,通过并行流可以开启多个线程来执行处理,parallelStream与Stream的区别在于parallelStream开启了多线程的处理方式,所以当对方法替换的同时,必须关注方法处理的过程中,是否用到线程不安全的类型例如HashMap,ArrayList等待,方法内部使用不会出现线程安全问题,当变量在方法外部定义尤为重要,使用并行流需谨慎,时刻考虑线程安全问题。否则可能造成程序死锁,或数据的准确性。造成的后果完全取决于使用非线程安全类的效果。
Yuyy
2022/06/28
4500
4.强大的 Stream API
【JDK8 新特性 7】并行的Stream流&parallelStream背后的技术
上一篇文章:(1条消息) 【JDK8 新特性 6】收集Stream流中的结果_一切总会归于平淡的博客-CSDN博客
叫我阿杰好了
2022/11/07
1.1K0
【JDK8 新特性 7】并行的Stream流&parallelStream背后的技术
[源码解析] 当 Java Stream 遇见 Flink
在分析Alink源码的时候,发现Alink使用了 Java Stream,又去Flink源码搜索,发现Flink也有大量使用。一时兴起,想看看 Java Stream 和 Flink 这种流处理框架的异同点。当然这种比较还是注重于理念和设计思路上的。因为就应用领域和复杂程度来说, Java Stream 和 Flink 属于数量级别的差距。
罗西的思考
2020/09/07
1.1K0
跟我学 Java 8 新特性之 Stream 流(二)关键知识点
我们的第一篇文章,主要是通过一个Demo,让大家体验了一下使用流API的那种酣畅淋漓的感觉。如果你没有实践,我还是再次呼吁你动手敲一敲,自己实实在跑一遍上一篇的Demo。
Java技术栈
2018/07/31
5200
跟我学 Java 8 新特性之 Stream 流(二)关键知识点
由浅入深体验 Stream 流
    Stream 流是 Java 8 新提供给开发者的一组操作集合的 API,将要处理的元素集合看作一种流,流在管道中传输,并且可以在管道的节点上进行处理,比如筛选、排序、聚合等。元素流在管道中经过中间操作(intermediate operation)的处理,最后由终端操作 (terminal operation) 得到前面处理的结果。Stream 流可以极大的提高开发效率,也可以使用它写出更加简洁明了的代码。我自从接触过 Stream 流之后,可以说对它爱不释手。本文将由浅及深带您体验 Stream 流的使用。那么就让我们从流的简单使用来开启体验之旅。
用户2242639
2021/06/29
4770
[三]java8 函数式编程Stream 概念深入理解 Stream 运行原理 Stream设计思路
        流不是存储元素的数据结构;相反,它通过一个计算操作的管道,从一个数据源,如数据结构、数组、生成器函数或i/o通道中传递元素
noteless
2018/09/11
2K0
[三]java8 函数式编程Stream 概念深入理解 Stream 运行原理 Stream设计思路
Java8中的Stream()与ParallelStream()的区别
  为集合创建并行流,对于ParallelStream,需要知道的是里面的执行是异步的,并且使用的线程池是ForkJoinPool.common,可以通过设置-Djava.util.concurrent.ForkJoinPool.common.parallelism = N来调整线程池的大小;
用户10325771
2023/03/01
6560
推荐阅读
相关推荐
Java8 中的 Stream 那么彪悍,你知道它的原理是什么吗?
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文