Stream 简介
Stream 是一种为 lambda 设计的一种操作流水线。创建一个 Stream 之后,你可以添加你想要的多个中间操作到流水线中,然后使用终结操作,原始数据就会通过你设计的 Stream 流水线,输出给终结操作,然后输出数据。
所以,看本文之前,请确保你已经对 lambda 已经足够了解。
lambda 解析:https://yuque.com/page/luan.ma/lambda/
Stream 的类型
Stream 有分普通流和数值流,之间没有继承关系,普通流用一个泛型表示流中的数据结构类型,如 Stream
数值流主要是避免重复的装箱拆箱,统一用原始数值类型(无法应用泛型指定类型),int long double,我们在做终结操作的时候需要统一装箱 .box() 转成普通流
Stream 的生命周期
创建流 -> 中间操作 -> 终结操作
Stream 的特点
无存储。stream不是一种数据结构,它只是某种数据源的一个视图,数据源可以是一个数组,Java 容器或 I/O channel 等。
为函数式编程而生。对stream的任何修改都不会修改背后的数据源,比如对stream执行过滤操作并不会删除被过滤的元素,而是会产生一个不包含被过滤元素的新stream。
惰式执行。stream上的操作(中间操作)并不会立即执行,只有等到用户真正需要结果的时候(终结操作)才会执行。
可消费性。stream只能被“消费”一次,一旦遍历过就会失效(终结操作就是消费操作),就像容器的迭代器那样,想要再次遍历必须重新生成。
区分中间操作和结束操作最简单的方法,就是看方法的返回值,返回值为stream的大都是中间操作,否则是结束操作。
创建流
从 Colletion
.stream()
.parallelStream()
从数组
Arrays.stream(T array)
Stream.of()
从输入流
- BufferedReader.lines()
从目录树
- Files.walk(Paths.get(“C:\“))
创建各种数值流
Random.ints()
IntStream.of()
IntStream.range()
…Stream.***()
自己创建流(可创建无穷流)
Stream.generate() 丢进一个类似迭代器的东西即可
Stream.iterate(0, n -> n + 3).limit(10). forEach(x -> System.out.print(x + “ “)); 创建一个自己迭代的流
中间操作
并行化
- .parallel()
装箱操作
- .boxed() 把数值流转回普通流,才能执行终结操作
转换操作
一对一普通转换 .map()
一对多转换 .flatMap() 本质上是把每个对象转换成流,流会自动合并
1 | Stream<List<Integer>> stream = Stream.of(Arrays.asList(1,2), Arrays.asList(3, 4, 5)); |
- 直接转成 数值流 .mapToInt .flatMapToInt
4) 排序操作 .sorted
5) 对每一个对象操作 .peek
6) 保留前 n 项 .limit()
无穷流必须执行限流操作,否则将进入死循环
7) 去掉前 n 项 .skip()
8) 筛选操作 .filter()
true 留,false 被删除
终结操作
终结操作后 Stream 将会被消费完成,不能再执行中间操作
转数组 .toArray()
- stream.toArray(String[]::new)
转 Collection/String .collect()
forEach 逐一消费所有项目
无法提前结束循环,只能用 return 提前结束当前循环两两结合操作 .reduce()
.max
.min
.findFirst
.findAny
match 检查
allMatch:Stream 中全部元素符合传入的 predicate,返回 true
anyMatch:Stream 中只要有一个元素符合传入的 predicate,返回 true
noneMatch:Stream 中没有一个元素符合传入的 predicate,返回 true
reduce 操作
reduce操作可以实现从一组元素中生成一个值,sum()
、max()
、min()
、count()
等都是reduce操作,将他们单独设为函数只是因为常用。reduce()
的方法定义有三种重写形式:
Optional<T> reduce(BinaryOperator<T> accumulator)
T reduce(T identity, BinaryOperator<T> accumulator)
<U> U reduce(U identity, BiFunction<U,? super T,U> accumulator, BinaryOperator<U> combiner)
虽然函数定义越来越长,但语义不曾改变,多的参数只是为了指明初始值(参数identity),或者是指定并行执行时多个部分结果的合并方式(参数combiner)。reduce()
最常用的场景就是从一堆值中生成一个值。用这么复杂的函数去求一个最大或最小值,你是不是觉得设计者有病。其实不然,因为“大”和“小”或者“求和”有时会有不同的语义。而Optional是(一个)值的容器,可以避免 null 值的问题,下面会提到。
需求:从一组单词中找出最长的单词。这里“大”的含义就是“长”。
1 | // 找出最长的单词 |
需求:求出一组单词的长度之和。这是个“求和”操作,操作对象输入类型是String,而结果类型是Integer。
1 | // 求单词长度之和 |
Collect 操作
Collect 是终结操作的一个函数,最为强大,不仅可以将流转化成各种数据结构,也可以再补充中间操作不能进行许多操作。
收集器(Collector)是为Stream.collect()
方法量身打造的工具接口(类)。考虑一下将一个Stream转换成一个容器(或者Map)需要做哪些工作?我们至少需要两样东西:
目标容器是什么?是ArrayList还是HashSet,或者是个TreeMap。
新元素如何添加到容器中?是
List.add()
还是Map.put()
。
如果并行的进行规约,还需要告诉collect() 3. 多个部分结果如何合并成一个。
结合以上分析,collect()方法定义为<R> R collect(Supplier<R> supplier, BiConsumer<R,? super T> accumulator, BiConsumer<R,R> combiner)
,三个参数依次对应上述三条分析。不过每次调用_collect()都要传入这三个参数太麻烦,收集器Collector 就是对这三个参数的简单封装,所以_collect()的另一定义为<R,A> R collect(Collector<? super T,A,R> collector)
。Collectors工具类可通过静态方法生成各种常用的 Collector。
举例来说,如果要将Stream规约成List可以通过如下两种方式实现:
1 | List<String> list = stream.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);// 方式1 |
常用的转 Collection / String,Collectors 为辅助类
转 list stream.collect(Collectors.toList());
转 set stream.collect(Collectors.toSet());
转其他 stream.collect(Collectors.toCollection(Stack::new));
转 String stream.collect(Collectors.joining()).toString();
转 map
前面已经说过Stream背后依赖于某种数据源,数据源可以是数组、容器等,但不能是Map。反过来从Stream生成Map是可以的,但我们要想清楚Map的key和value分别代表什么,根本原因是我们要想清楚要干什么。通常在三种情况下collect()
的结果会是Map:
使用
Collectors.toMap()
生成的收集器,用户需要指定如何生成Map的key和value。使用
Collectors.partitioningBy()
生成的收集器,对元素进行二分区操作时用到。使用
Collectors.groupingBy()
生成的收集器,对元素做group操作时用到。
情况 1:使用toMap()
生成的收集器,这种情况是最直接的,前面例子中已提到,这是和Collectors.toCollection()
并列的方法。如下代码展示将学生列表转换成由<学生,GPA>组成的Map。。
1 | // 使用toMap()统计学生GPA |
情况 2:使用partitioningBy()
生成的收集器,这种情况适用于将Stream
中的元素依据某个二值逻辑(满足条件,或不满足)分成互补相交的两部分,比如男女性别、成绩及格与否等。下列代码展示将学生分成成绩及格或不及格的两部分。拉出来之后用 get(true) 和 get(false) 拉出去两个列表。
1 | // Partition students into passing and failing |
情况 3:使用groupingBy()
生成的收集器,这是比较灵活的一种情况。跟 SQL 中的group by语句类似,这里的groupingBy()也是按照某个属性对数据进行分组,属性相同的元素会被对应到_Map 的同一个_key上。下列代码展示将员工按照部门进行分组:
1 | // Group employees by department |
以上只是分组的最基本用法,有些时候仅仅分组是不够的。在 SQL 中使用group by是为了协助其他查询,比如1. 先将员工按照部门分组,2. 然后统计每个部门员工的人数。Java 类库设计者也考虑到了这种情况,增强版的groupingBy()
能够满足这种需求。增强版的groupingBy()
允许我们对元素分组之后再执行某种运算,比如求和、计数、平均值、类型转换等。这种先将元素分组的收集器叫做上游收集器,之后执行其他运算的收集器叫做下游收集器(downstream Collector)。我们可以简单理解,下游收集器就是对 map 的 values 做了一个 forEach
1 | // 使用下游收集器统计每个部门的人数 |
上面代码的逻辑是不是越看越像 SQL?高度非结构化。还有更狠的,下游收集器还可以包含更下游的收集器,这绝不是为了炫技而增加的把戏,而是实际场景需要。考虑将员工按照部门分组的场景,如果我们想得到每个员工的名字(字符串),而不是一个个_Employee对象_,可通过如下方式做到:
1 | // 按照部门对员工分布组,并只保留员工的名字 |
Optional 容器
一般用法:
- 新建一个 可空 Optional,ifPresent 非空则执行 xxx 操作
Optional.ofNullable(text).ifPresent(System.out::println);
- 从 reduce 等 stream 终结函数返回
2) 检查 Optional 是否为空,一般和三元符配合使用,可同时照顾到非空和空
isPresent()?1:0;
- orElse() 用法,取值,如果为空,则为默认值(默认值马上获得,传入的是真实值)
String name = Optional.ofNullable(nullName).orElse(“john”);
- orElseGet() 取值,如果为空,则为默认值,默认值为一个获取方法
Optional.ofNullable(text).orElseGet(this::getMyDefault);
当容器内的值为 null 时,orElse() 和 orElseGet() 完全相同,当容器内值不为 null 时,则 orElseGet() 不会执行相关的函数
.filter() 过滤,如果.filter()内容为真,则返回内容,如果为假,则容器内为空。支持链式操作
boolean is2017 = yearOptional.filter(y -> y == 2017).isPresent();.map() 转换,不用判断非空
int size = listOptional .map(List::size).orElse(0);.flatMap() 多层 Optional 自动拆开
Stream 底层实现
Stream 实际上是一个流水线(Pipelines),那么他的链式调用+惰性执行的原理是什么呢?
所谓流水线,就是先装配,后启动,一次完成。而不是一步一步迭代实现,这样最大的弊端是没有办法应对复杂的数据结构。效率也十分低
我们举个例子
1 | List<String> test = Arrays.asList("liu","zhang","huang","chen","lix","fuc"); |
这是一组流水线 Stream 拆开来生成多个 Stream 变量。我们知道,Stream 实际上是一个接口,那么,我们调用了这些函数之后,到底返回了一个什么对象呢?我们直接用 IDE 告诉我们答案
可以看到,首先这里有一个双向链表的结构,每次中间操作,都会增加一个新的 AbstractPipeline,然后记录第一个 AbstractPipeline 和 上一个 AbstractPipeline,上一个 AbstractPipeline 也会记录当前新增的 AbstractPipeline。
而另一方面,根据增加的操作不同,也会有不同的 AbstractPipeline 子类,包括 ReferencePipeline, SliceOps, SortedOps, StatelessOp 等等,只是实现的层级不同,我们稍后在纠结这些。
并且,这些实现类内部会有一个 核心的逻辑方法opWrapSink(int flags, Sink<P_OUT> sink
,会把逻辑打包成一个 Sink 对象,这个 Sink 对象还接收另外一个 Sink 对象作为构造函数参数。
我们拿 .filter() 举例,内置了一个函数会返回 Sink 对象,目前还是惰性执行,所以没有立刻生成:
1 |
|
Sink 对象源码,我们最关注的是构造函数,可以看到它又藏了另外一个 sink
1 | static abstract class ChainedReference<T, E_OUT> implements Sink<T> { |
当我们走到终结操作的时候,会先执行一个这样的操作:
1 |
|
好了,千辛万苦,我们终于得到了这么一个 Sink,这个 Sink 保存了所有的中间流操作和最后一个 reduce 规约操作的所有操作对象。也就是说,我们的流水线建成了。
拿到这个 Sink 之后,我们就可以愉快的进行迭代了
1 | // AbstractPipelie.copyInto() |
遍历调用 Sink 的 begin() 钩子,主要是用来准备数据结构,每个 Sink 的 begin 都会递归调用下游的 begin
1 | // SliceOps |
调用 forEachRemaining() 方法
1 | // Spliterators |
最后调用 end() 方法封口,同样是递归调用
1 | // SotedOps() |
经过这些处理之后,会被丢进 reduce 操作 或者是 collect 操作收集 流中的数据。
关于并且流时候的情况,调用了 Fork/Join 框架,比较复杂,以后再更。