WebFlux前置知识(三)

江南一点雨 ... 2021-11-10 09:26:28
  • WebFlux
大约 11 分钟

[TOC]

Stream 流式 API 也是 JDK8 开始引入的 API,用过的小伙伴都说爽,没用过的小伙伴则感到一脸懵逼。现在 JDK 已经到 16 了,可能很多人还不熟悉 Stream 流操作,这次趁着讲 WebFlux,松哥也来和大家一起回顾一下 Stream 流操作。

老实说,松哥在日常工作中,有时候刚好碰上了也会用 Stream 流,用着确实很爽,代码简洁,而且看起来高大上,确实是一个不错的东西。不过组里有很多人其实并不太熟悉 Stream 操作,所以也就没有去强行推广,但是作为一个合格的程序员,还是很有必要去学习一下 Stream 流操作的。

# 1.Stream 流简介

说到 Stream,大家可能很容易想到 Java IO 中的各种流操作,名字看起来很像,但这其实是两个完全不同的东西。

Java8 中的 Stream 不存储数据,它通过函数式编程模式来对集合进行链状流式操作。

# 2.基本玩法

先来个简单的例子感受下 Stream 的操作。

例如我有一个数组,如下:

int[] arr = {1, 2, 3, 4, 5, 6};
1

现在想给这个数组求平均数,如下:

double asDouble = IntStream.of(arr).average().getAsDouble();
1

一行代码搞定,首先调用 IntStream.of 方法获取流对象,然后调用 average 方法计算平均数,再调用 getAsDouble 方法获取结果。全程都是方法调用,不用自己写求平均数的逻辑。

再比如求和,如下:

int sum = IntStream.of(arr).sum();
1

也是直接调用 sum 方法即可。

而且我们还可以在流中对数据进行二次加工,例如给数组中的每个元素先求平方再求和:

double[] arr = {1, 2, 3, 4, 5, 6};
double sum = DoubleStream.of(arr).map(i -> Math.pow(i, 2)).sum();
1
2

先在 Map 中对每一个元素求平方,然后再求和。

这里就能涉及到两个概念:

  • 中间操作
  • 终止操作

所谓中间操作,就是中途求平方的操作,所谓终止操作就是最终计算出结果的操作,只要方法的返回值不是一个 Stream,那就是终止操作,否则就是中间操作。

# 3.Stream 的创建

获取一个流的方式多种多样,最常见的就是从集合或者数组中获取一个流,当然我们也可以直接自己创建一个流出来。

一起来看下。

# 集合

List 集合或者 Set 集合都可以直接搞一个流出来,方式如下:

List<String> list = new ArrayList<>();
Stream<String> s1 = list.stream();
Set<String> set = new HashSet<>();
Stream<String> s2 = set.stream();
1
2
3
4

集合中直接调用 stream 方法就可以获取到流。

# 数组

通过数组获取流的方式也很简单,如下:

IntStream stream = Arrays.stream(new int[]{11, 22, 33, 44, 55, 66});
1

# 数字 Stream

也可以直接利用 IntStream、LongStream 等对象创建一个数字 Stream,如下:

IntStream s1 = IntStream.of(1, 2, 3);
DoubleStream s2 = DoubleStream.of(1, 2, 3);
LongStream s3 = LongStream.of(1L, 2L, 3L);
1
2
3

# 自己创建

Random random = new Random();
Supplier<Integer> supplier = () -> random.nextInt(100);
Stream<Integer> stream = Stream.generate(supplier).limit(5);
1
2
3

调用 Stream.generate 方法可以自己创建一个流,自己创建的时候需要提供一个 Supplier,通过调用 Supplier 中的 get 方法自动获取到元素。

无论哪种创建方式,大家需要明白的是,Stream 并不会保存数据,它只会对数据进行加工。

# 4.Stream 的中间操作

中间操作可以分为两大类:

  • map 或者 filter 会从输入流中获取每一个元素,并且在输出流中得到一个结果,这些操作没有内部状态,称为无状态操作。
  • reduce、sum、max 这些操作都需要内部状态来累计计算结果,所以称为有状态操作。

分别来看下:

无状态操作

  • map/mapToXxx
  • flatMap/flatMapToXxx
  • filter
  • peek

有状态操作

  • distinct
  • sorted
  • limit/skip

map

Stream.map() 是 Stream 中最常用的一个转换方法,可以把一个 Stream 对象转为另外一个 Stream 对象。map 方法所接收的参数就是一个 Function 对象,松哥在前面文章中和大家介绍过 Function 对象了,就是有输入有输出(参见WebFlux 前置知识(一) (opens new window)),了解了 map 的参数,那么 map 的功能就很明白了,就是对数据进行二次加工。

举个栗子,例如把一个字符串数组转为数字:

String[] arr = {"1", "2", "3"};
Stream<String> s1 = Arrays.stream(arr);
Stream<Integer> s2 = s1.map(i -> Integer.valueOf(i));
1
2
3

再比如一个数字流,给所有的元素乘 2,如下:

IntStream.of(1, 2, 3).map(i -> 2 * i).forEach(System.out::println);
1

最后的 forEach 就是将元素打印出来。

JDK 中也提供了一些现成的格式转换,如下图:

这样可以直接将元素转为 Double、Long、Obj 等类型,例如下面这样:

String[] arr = {"1", "2", "3"};
Stream<String> s1 = Arrays.stream(arr);
s1.mapToLong(i -> Long.parseLong(i)).forEach(System.out::println);
1
2
3

flatMap

flatMap 可以把 Stream 中的每个元素都映射为一个 Stream,然后再把这多个 Stream 合并为一个 Stream。

例如如下代码,返回的 Stream 中的元素是数组:

Stream<Integer[]> s = Stream.of(new Integer[]{1, 2, 3}, new Integer[]{4, 5, 6}, new Integer[]{7, 8, 9});
s.forEach(System.out::println);
1
2

通过 flatMap 我们可以将 Stream 中的元素变为 Integer:

Stream<Integer> s = Stream.of(new Integer[]{1, 2, 3}, new Integer[]{4, 5, 6}, new Integer[]{7, 8, 9}).flatMap( i -> Arrays.stream(i));
s.forEach(System.out::println);
1
2

filter

filter 操作会对一个 Stream 中的所有元素一一进行判断,不满足条件的就被过滤掉了,剩下的满足条件的元素就构成了一个新的 Stream。

例如要找到数组中所有大于 3 的元素,如下:

IntStream.of(2, 3, 4, 5, 6, 7).filter(i -> i > 3).forEach(System.out::println);
1

filter 方法接收的参数是 Predicate 接口函数,关于 Predicate 接口函数,大家可以参考WebFlux 前置知识(一) (opens new window))一文。

peek

peek 的入参是 Consumer,没有返回值,因此当我们要对元素内部进行处理时,使用 peek 是比较合适的,这个时候可以不用 map(map 的入参是 Function,它是有返回值的)。peek 方法本身会继续返回流,可以对数据继续进行处理。

举个简单的数据转换的例子吧(最终返回的数据并不会被转换):

IntStream.of(2, 3, 4, 5, 6, 7).filter(i -> i > 3).peek(String::valueOf).forEach(i-> System.out.println(i));
1

peek 方法的感觉就像数据中途被消费了一次。

distinct

这个是去重。由于去重操作需要获取到其他元素的值(比较之后才知道是否重复),所以这个是有状态操作。如下:

IntStream.of(2, 3, 4, 3, 7, 6, 2, 5, 6, 7).distinct().forEach(System.out::println);
1

sorted

sorted 是排序,因为也需要知道其他元素的值,然后才能去重,所以这个也是有状态操作,如下:

IntStream.of(2, 3, 4, 3, 7, 6, 2, 5, 6, 7).distinct().sorted().forEach(System.out::println);
1

limit/skip

limit 和 skip 配合操作有点像数据库中的分页,skip 表示跳过 n 个元素,limit 表示取出 n 个元素。例如下面这个例子:

Arrays.asList('A', 'B', 'C', 'D', 'E', 'F').stream().skip(2).limit(3).forEach(System.out::println);
1

这个会跳过 A 和 B,最终打印出 C D E。这也是一种有状态操作。

# 5.Stream 终止操作

终止操作就是最终计算出结果的操作,只要方法的返回值不是一个 Stream,那就是终止操作,否则就是中间操作。

终止操作又分为两类:

  • 短路操作:不用处理全部元素就可以返回结果。
  • 非短路操作:必须处理所有元素才能得到最终结果。

各自都包含哪些操作,我们分别来看下:

非短路操作

  • forEach/forEachOrdered
  • collect/toArray
  • reduce
  • min/max/count

短路操作

  • findFirst/findAny
  • allMatch/anyMatch/noneMatch

forEach/forEachOrdered

forEach 和 forEachOrdered 都是接收一个 Consumer 类型的参数,完成对参数的消费,不同的是,在并行流中,forEachOrdered 会保证执行顺序。

例如如下一段代码:

int[] arr = {1, 2, 3, 4, 5, 6, 7, 8, 9};
Arrays.stream(arr).parallel().forEach(System.out::println);
Arrays.stream(arr).parallel().forEachOrdered(System.out::println);
1
2
3

前者打印出来的顺序不一定是 123456789,后者一定是。

collect/toArray

这两个都是收集器,可以将执行结果转为一个 List 集合或者一个数组:

List<Integer> list = Stream.of(1, 2, 3, 4).filter(p -> p > 2).collect(Collectors.toList());
System.out.println(list);
1
2

reduce

reduce 是 Stream 的一个聚合方法,它可以把一个 Stream 的所有元素按照聚合函数聚合成一个结果。reduce 方法传入的对象是BinaryOperator 接口,它定义了一个apply 方法,负责把上次累加的结果和本次的元素进行运算,并返回累加的结果。

举个简单的例子,数组求和,当然可以直接调用 sum 计算,我们这里也可以调用 reduce 来实现,如下:

Optional<Integer> optional = Stream.of(1, 2, 3, 4).reduce((i, j) -> i + j);
System.out.println(optional.orElse(-1));
1
2

reduce 的参数是 BinaryOperator,这个接收两个参数,第一个参数是之前计算的结果,第二个参数是本次参与计算的元素,两者累加求和。

再比如给一段话中间加上.,方式如下:

Optional<String> s = Stream.of("wwwjavaboyorg".split("")).reduce((i, j) -> i + "." + j);
System.out.println(s.orElse(""));
1
2

最终执行结果如下:

w.w.w.j.a.v.a.b.o.y.o.r.g
1

min/max/count

这个就比较简单了,就是求最大值最小值,统计总个数,如下表示统计总个数:

Stream<Integer> s = Stream.of(1, 2, 3, 4);
long count = s.count();
System.out.println("count = " + count);
1
2
3

如下表示统计最小值:

Stream<Integer> s = Stream.of(1, 2, 3, 4);
Optional<Integer> min = s.min(Comparator.comparingInt(i -> i));
System.out.println("min.get() = " + min.get());
1
2
3

findFirst/findAny

这两个就是返回流中的第一个、任意一个元素,findAny 要在并行流中测试才有效果,举个栗子:

for (int i = 0; i < 10; i++) {
    Optional<Integer> first = Stream.of(1, 2, 3, 4).parallel().findFirst();
    System.out.println("first.get() = " + first.get());
}
System.out.println("=============");
for (int i = 0; i < 10; i++) {
    Optional<Integer> first = Stream.of(1, 2, 3, 4).parallel().findAny();
    System.out.println("first.get() = " + first.get());
}
1
2
3
4
5
6
7
8
9

allMatch/anyMatch/noneMatch

allMatch、anyMatch、noneMatch 用来判断所有元素、任意元素或者没有元素满足给定的条件。这三个方法的参数都是一个 Predicate 接口函数。

boolean b = Stream.of(1, 2, 3, 4).allMatch(i -> i > 5);
System.out.println("b = " + b);
1
2

# 6.并行流

通常情况下,对 Stream 的元素进行处理是单线程的,即一个一个元素进行处理。有时候我们希望可以并行处理 Stream 元素,因为在元素数量非常大的情况,并行处理可以大大加快处理速度。

把一个普通 Stream 转换为可以并行处理的 Stream 非常简单,只需要用 parallel 方法进行转换:

new Random().ints().limit(50).parallel().forEach(i->{
    System.out.println(Thread.currentThread().getName() + "--->" + i);
});
1
2
3

这样数据在后台就是并行打印的。

# 7.收集器

收集器可以将计算结果重新整理收集到一个集合中,这个集合可以是一个 List/Set 获取其他,并且还可以在收集的过程中对数据进行处理。

例如我有一个 users 集合,里边保存了用户数据,用户有 username、age 以及 gender 三个属性,如下代码分别表示:

  • 提取出用户对象中的 age 属性组成新的集合并返回。
  • 提取出用户对象中的 username 属性组成新的集合并返回。
  • 提取出用户对象中的 gender 属性组成新的集合并返回(这里是一个 Set 集合,所以会自动去重)。
List<Integer> ages = users.stream().map(User::getAge).collect(Collectors.toList());
System.out.println("ages = " + ages);
List<String> usernames = users.stream().map(User::getUsername).collect(Collectors.toList());
System.out.println("usernames = " + usernames);
Set<String> genders = users.stream().map(User::getGender).collect(Collectors.toSet());
System.out.println("genders = " + genders);
1
2
3
4
5
6

Collectors.toList() 最终返回的是 ArrayList,Collectors.toSet() 最终返回的是 HashSet。

如果我们想返回一个 Vector 或者 TreeSet,也是可以的,如下:

List<Integer> ages = users.stream().map(User::getAge).collect(Collectors.toList());
System.out.println("ages = " + ages);
List<String> usernames = users.stream().map(User::getUsername).collect(Collectors.toCollection(Vector::new));
System.out.println("usernames = " + usernames);
TreeSet<String> genders = users.stream().map(User::getGender).collect(Collectors.toCollection(TreeSet::new));
System.out.println("genders = " + genders);
1
2
3
4
5
6

也可以获取某一个字段的统计信息:

IntSummaryStatistics ageStatistics = users.stream().collect(Collectors.summarizingInt(User::getAge));
System.out.println("ageStatistics = " + ageStatistics);
1
2

这个统计信息中包含:总和、最小值、平均值以及最大值等:

ageStatistics = IntSummaryStatistics{count=20, sum=1222, min=9, average=61.100000, max=96}
1

还可以对数据进行分块,将男女不同性别统计出来:

Map<Boolean, List<User>> map = users.stream().collect(Collectors.partitioningBy(u -> u.getGender().equals("男")));
System.out.println("map = " + map);
1
2

也可以按照性别对数据进行分组,如下:

Map<String, List<User>> map2 = users.stream().collect(Collectors.groupingBy(User::getGender));
System.out.println("map2 = " + map2);
1
2

分组后,Map 中的 key 就是性别;分块后,Map 中的 key 就是 true/false。

再比如统计男女的人数:

Map<String, Long> map2 = users.stream().collect(Collectors.groupingBy(User::getGender,Collectors.counting()));
System.out.println("map2 = " + map2);
1
2

好啦,今天我们就先聊这么多~