[TOC]
Stream 流式 API 也是 JDK8 开始引入的 API,用过的小伙伴都说爽,没用过的小伙伴则感到一脸懵逼。现在 JDK 已经到 16 了,可能很多人还不熟悉 Stream 流操作,这次趁着讲 WebFlux,松哥也来和大家一起回顾一下 Stream 流操作。
老实说,松哥在日常工作中,有时候刚好碰上了也会用 Stream 流,用着确实很爽,代码简洁,而且看起来高大上,确实是一个不错的东西。不过组里有很多人其实并不太熟悉 Stream 操作,所以也就没有去强行推广,但是作为一个合格的程序员,还是很有必要去学习一下 Stream 流操作的。
# 1.Stream 流简介
说到 Stream,大家可能很容易想到 Java IO 中的各种流操作,名字看起来很像,但这其实是两个完全不同的东西。
Java8 中的 Stream 不存储数据,它通过函数式编程模式来对集合进行链状流式操作。
# 2.基本玩法
先来个简单的例子感受下 Stream 的操作。
例如我有一个数组,如下:
int[] arr = {1, 2, 3, 4, 5, 6};
现在想给这个数组求平均数,如下:
double asDouble = IntStream.of(arr).average().getAsDouble();
一行代码搞定,首先调用 IntStream.of 方法获取流对象,然后调用 average 方法计算平均数,再调用 getAsDouble 方法获取结果。全程都是方法调用,不用自己写求平均数的逻辑。
再比如求和,如下:
int sum = IntStream.of(arr).sum();
也是直接调用 sum 方法即可。
而且我们还可以在流中对数据进行二次加工,例如给数组中的每个元素先求平方再求和:
double[] arr = {1, 2, 3, 4, 5, 6};
double sum = DoubleStream.of(arr).map(i -> Math.pow(i, 2)).sum();
先在 Map 中对每一个元素求平方,然后再求和。
这里就能涉及到两个概念:
- 中间操作
- 终止操作
所谓中间操作,就是中途求平方的操作,所谓终止操作就是最终计算出结果的操作,只要方法的返回值不是一个 Stream,那就是终止操作,否则就是中间操作。
# 3.Stream 的创建
获取一个流的方式多种多样,最常见的就是从集合或者数组中获取一个流,当然我们也可以直接自己创建一个流出来。
一起来看下。
# 集合
List 集合或者 Set 集合都可以直接搞一个流出来,方式如下:
List<String> list = new ArrayList<>();
Stream<String> s1 = list.stream();
Set<String> set = new HashSet<>();
Stream<String> s2 = set.stream();
集合中直接调用 stream 方法就可以获取到流。
# 数组
通过数组获取流的方式也很简单,如下:
IntStream stream = Arrays.stream(new int[]{11, 22, 33, 44, 55, 66});
# 数字 Stream
也可以直接利用 IntStream、LongStream 等对象创建一个数字 Stream,如下:
IntStream s1 = IntStream.of(1, 2, 3);
DoubleStream s2 = DoubleStream.of(1, 2, 3);
LongStream s3 = LongStream.of(1L, 2L, 3L);
# 自己创建
Random random = new Random();
Supplier<Integer> supplier = () -> random.nextInt(100);
Stream<Integer> stream = Stream.generate(supplier).limit(5);
调用 Stream.generate 方法可以自己创建一个流,自己创建的时候需要提供一个 Supplier,通过调用 Supplier 中的 get 方法自动获取到元素。
无论哪种创建方式,大家需要明白的是,Stream 并不会保存数据,它只会对数据进行加工。
# 4.Stream 的中间操作
中间操作可以分为两大类:
- map 或者 filter 会从输入流中获取每一个元素,并且在输出流中得到一个结果,这些操作没有内部状态,称为无状态操作。
- reduce、sum、max 这些操作都需要内部状态来累计计算结果,所以称为有状态操作。
分别来看下:
无状态操作
- map/mapToXxx
- flatMap/flatMapToXxx
- filter
- peek
有状态操作
- distinct
- sorted
- limit/skip
map
Stream.map() 是 Stream 中最常用的一个转换方法,可以把一个 Stream 对象转为另外一个 Stream 对象。map 方法所接收的参数就是一个 Function 对象,松哥在前面文章中和大家介绍过 Function 对象了,就是有输入有输出(参见WebFlux 前置知识(一)),了解了 map 的参数,那么 map 的功能就很明白了,就是对数据进行二次加工。
举个栗子,例如把一个字符串数组转为数字:
String[] arr = {"1", "2", "3"};
Stream<String> s1 = Arrays.stream(arr);
Stream<Integer> s2 = s1.map(i -> Integer.valueOf(i));
再比如一个数字流,给所有的元素乘 2,如下:
IntStream.of(1, 2, 3).map(i -> 2 * i).forEach(System.out::println);
最后的 forEach 就是将元素打印出来。
JDK 中也提供了一些现成的格式转换,如下图:
这样可以直接将元素转为 Double、Long、Obj 等类型,例如下面这样:
String[] arr = {"1", "2", "3"};
Stream<String> s1 = Arrays.stream(arr);
s1.mapToLong(i -> Long.parseLong(i)).forEach(System.out::println);
flatMap
flatMap 可以把 Stream 中的每个元素都映射为一个 Stream,然后再把这多个 Stream 合并为一个 Stream。
例如如下代码,返回的 Stream 中的元素是数组:
Stream<Integer[]> s = Stream.of(new Integer[]{1, 2, 3}, new Integer[]{4, 5, 6}, new Integer[]{7, 8, 9});
s.forEach(System.out::println);
通过 flatMap 我们可以将 Stream 中的元素变为 Integer:
Stream<Integer> s = Stream.of(new Integer[]{1, 2, 3}, new Integer[]{4, 5, 6}, new Integer[]{7, 8, 9}).flatMap( i -> Arrays.stream(i));
s.forEach(System.out::println);
filter
filter 操作会对一个 Stream 中的所有元素一一进行判断,不满足条件的就被过滤掉了,剩下的满足条件的元素就构成了一个新的 Stream。
例如要找到数组中所有大于 3 的元素,如下:
IntStream.of(2, 3, 4, 5, 6, 7).filter(i -> i > 3).forEach(System.out::println);
filter 方法接收的参数是 Predicate 接口函数,关于 Predicate 接口函数,大家可以参考WebFlux 前置知识(一))一文。
peek
peek 的入参是 Consumer,没有返回值,因此当我们要对元素内部进行处理时,使用 peek 是比较合适的,这个时候可以不用 map(map 的入参是 Function,它是有返回值的)。peek 方法本身会继续返回流,可以对数据继续进行处理。
举个简单的数据转换的例子吧(最终返回的数据并不会被转换):
IntStream.of(2, 3, 4, 5, 6, 7).filter(i -> i > 3).peek(String::valueOf).forEach(i-> System.out.println(i));
peek 方法的感觉就像数据中途被消费了一次。
distinct
这个是去重。由于去重操作需要获取到其他元素的值(比较之后才知道是否重复),所以这个是有状态操作。如下:
IntStream.of(2, 3, 4, 3, 7, 6, 2, 5, 6, 7).distinct().forEach(System.out::println);
sorted
sorted 是排序,因为也需要知道其他元素的值,然后才能去重,所以这个也是有状态操作,如下:
IntStream.of(2, 3, 4, 3, 7, 6, 2, 5, 6, 7).distinct().sorted().forEach(System.out::println);
limit/skip
limit 和 skip 配合操作有点像数据库中的分页,skip 表示跳过 n 个元素,limit 表示取出 n 个元素。例如下面这个例子:
Arrays.asList('A', 'B', 'C', 'D', 'E', 'F').stream().skip(2).limit(3).forEach(System.out::println);
这个会跳过 A 和 B,最终打印出 C D E。这也是一种有状态操作。
# 5.Stream 终止操作
终止操作就是最终计算出结果的操作,只要方法的返回值不是一个 Stream,那就是终止操作,否则就是中间操作。
终止操作又分为两类:
- 短路操作:不用处理全部元素就可以返回结果。
- 非短路操作:必须处理所有元素才能得到最终结果。
各自都包含哪些操作,我们分别来看下:
非短路操作
- forEach/forEachOrdered
- collect/toArray
- reduce
- min/max/count
短路操作
- findFirst/findAny
- allMatch/anyMatch/noneMatch
forEach/forEachOrdered
forEach 和 forEachOrdered 都是接收一个 Consumer 类型的参数,完成对参数的消费,不同的是,在并行流中,forEachOrdered 会保证执行顺序。
例如如下一段代码:
int[] arr = {1, 2, 3, 4, 5, 6, 7, 8, 9};
Arrays.stream(arr).parallel().forEach(System.out::println);
Arrays.stream(arr).parallel().forEachOrdered(System.out::println);
前者打印出来的顺序不一定是 123456789,后者一定是。
collect/toArray
这两个都是收集器,可以将执行结果转为一个 List 集合或者一个数组:
List<Integer> list = Stream.of(1, 2, 3, 4).filter(p -> p > 2).collect(Collectors.toList());
System.out.println(list);
reduce
reduce 是 Stream 的一个聚合方法,它可以把一个 Stream 的所有元素按照聚合函数聚合成一个结果。reduce 方法传入的对象是BinaryOperator 接口,它定义了一个apply 方法,负责把上次累加的结果和本次的元素进行运算,并返回累加的结果。
举个简单的例子,数组求和,当然可以直接调用 sum 计算,我们这里也可以调用 reduce 来实现,如下:
Optional<Integer> optional = Stream.of(1, 2, 3, 4).reduce((i, j) -> i + j);
System.out.println(optional.orElse(-1));
reduce 的参数是 BinaryOperator,这个接收两个参数,第一个参数是之前计算的结果,第二个参数是本次参与计算的元素,两者累加求和。
再比如给一段话中间加上.,方式如下:
Optional<String> s = Stream.of("wwwjavaboyorg".split("")).reduce((i, j) -> i + "." + j);
System.out.println(s.orElse(""));
最终执行结果如下:
w.w.w.j.a.v.a.b.o.y.o.r.g
min/max/count
这个就比较简单了,就是求最大值最小值,统计总个数,如下表示统计总个数:
Stream<Integer> s = Stream.of(1, 2, 3, 4);
long count = s.count();
System.out.println("count = " + count);
如下表示统计最小值:
Stream<Integer> s = Stream.of(1, 2, 3, 4);
Optional<Integer> min = s.min(Comparator.comparingInt(i -> i));
System.out.println("min.get() = " + min.get());
findFirst/findAny
这两个就是返回流中的第一个、任意一个元素,findAny 要在并行流中测试才有效果,举个栗子:
for (int i = 0; i < 10; i++) {
Optional<Integer> first = Stream.of(1, 2, 3, 4).parallel().findFirst();
System.out.println("first.get() = " + first.get());
}
System.out.println("=============");
for (int i = 0; i < 10; i++) {
Optional<Integer> first = Stream.of(1, 2, 3, 4).parallel().findAny();
System.out.println("first.get() = " + first.get());
}
allMatch/anyMatch/noneMatch
allMatch、anyMatch、noneMatch 用来判断所有元素、任意元素或者没有元素满足给定的条件。这三个方法的参数都是一个 Predicate 接口函数。
boolean b = Stream.of(1, 2, 3, 4).allMatch(i -> i > 5);
System.out.println("b = " + b);
# 6.并行流
通常情况下,对 Stream 的元素进行处理是单线程的,即一个一个元素进行处理。有时候我们希望可以并行处理 Stream 元素,因为在元素数量非常大的情况,并行处理可以大大加快处理速度。
把一个普通 Stream 转换为可以并行处理的 Stream 非常简单,只需要用 parallel 方法进行转换:
new Random().ints().limit(50).parallel().forEach(i->{
System.out.println(Thread.currentThread().getName() + "--->" + i);
});
这样数据在后台就是并行打印的。
# 7.收集器
收集器可以将计算结果重新整理收集到一个集合中,这个集合可以是一个 List/Set 获取其他,并且还可以在收集的过程中对数据进行处理。
例如我有一个 users 集合,里边保存了用户数据,用户有 username、age 以及 gender 三个属性,如下代码分别表示:
- 提取出用户对象中的 age 属性组成新的集合并返回。
- 提取出用户对象中的 username 属性组成新的集合并返回。
- 提取出用户对象中的 gender 属性组成新的集合并返回(这里是一个 Set 集合,所以会自动去重)。
List<Integer> ages = users.stream().map(User::getAge).collect(Collectors.toList());
System.out.println("ages = " + ages);
List<String> usernames = users.stream().map(User::getUsername).collect(Collectors.toList());
System.out.println("usernames = " + usernames);
Set<String> genders = users.stream().map(User::getGender).collect(Collectors.toSet());
System.out.println("genders = " + genders);
Collectors.toList()
最终返回的是 ArrayList,Collectors.toSet()
最终返回的是 HashSet。
如果我们想返回一个 Vector 或者 TreeSet,也是可以的,如下:
List<Integer> ages = users.stream().map(User::getAge).collect(Collectors.toList());
System.out.println("ages = " + ages);
List<String> usernames = users.stream().map(User::getUsername).collect(Collectors.toCollection(Vector::new));
System.out.println("usernames = " + usernames);
TreeSet<String> genders = users.stream().map(User::getGender).collect(Collectors.toCollection(TreeSet::new));
System.out.println("genders = " + genders);
也可以获取某一个字段的统计信息:
IntSummaryStatistics ageStatistics = users.stream().collect(Collectors.summarizingInt(User::getAge));
System.out.println("ageStatistics = " + ageStatistics);
这个统计信息中包含:总和、最小值、平均值以及最大值等:
ageStatistics = IntSummaryStatistics{count=20, sum=1222, min=9, average=61.100000, max=96}
还可以对数据进行分块,将男女不同性别统计出来:
Map<Boolean, List<User>> map = users.stream().collect(Collectors.partitioningBy(u -> u.getGender().equals("男")));
System.out.println("map = " + map);
也可以按照性别对数据进行分组,如下:
Map<String, List<User>> map2 = users.stream().collect(Collectors.groupingBy(User::getGender));
System.out.println("map2 = " + map2);
分组后,Map 中的 key 就是性别;分块后,Map 中的 key 就是 true/false。
再比如统计男女的人数:
Map<String, Long> map2 = users.stream().collect(Collectors.groupingBy(User::getGender,Collectors.counting()));
System.out.println("map2 = " + map2);
好啦,今天我们就先聊这么多~