JavaStream数据处理:延迟执行与不可变

2021年8月19日07:27:37 发表评论 63 views

忽然想不起来Stream中的累加应该怎么写?

无奈只能面向谷歌编程,花费了我宝贵的三分钟之后,学会了,很简单。

自从我用上JDK8以后,Stream就是我最常用的特性,各种流式操作用的飞起,然而这次事以后我忽然觉得Stream对我真的很陌生。

可能大家都一样,对最常用到的东西,也最容易将其忽略,哪怕你要准备面试估计也肯定想不起来要看一下Stream这种东西。

不过我既然注意到了,就要重新梳理一遍它,也算是对我的整体知识体系的查漏补缺。

花了很多功夫来写这篇Stream,希望大家和我一块重新认识并学习一下Stream,了解API也好,了解内部特性也罢,怕什么真理无穷,进一步有进一步的欢喜。

在本文中我将Stream的内容分为以下几个部分:

JavaStream数据处理:延迟执行与不可变

初看这个导图大家可能对转换流操作和终结流操作这两个名词有点蒙,其实这是我将Stream中的所有API分成两类,每一类起了一个对应的名字(参考自Java8相关书籍,见文末):

  • 转换流操作 :例如filter和map方法,将一个Stream转换成另一个Stream,返回值都是Stream。
  • 终结流操作 :例如count和collect方法,将一个Stream汇总为我们需要的结果,返回值都不是Stream。

其中转换流操作的API我也分了两类,文中会有详细例子说明,这里先看一下定义,有一个大概印象:

  1. 无状态 :即此方法的执行无需依赖前面方法执行的结果集。
  2. 有状态 :即此方法的执行需要依赖前面方法执行的结果集。

由于Stream内容过多,所以我将Stream拆成了上下两篇,本篇是第一篇,内容翔实,用例简单且丰富。

第二篇的主题虽然只有一个终结操作,但是终结操作API比较复杂,所以内容也翔实,用例也简单且丰富,从篇幅上来看两者差不多,敬请期待。


 :由于我本机的电脑是JDK11,而且写的时候忘了切换到JDK8,所以在用例中大量出现的List.of()在JDK8是没有的,它等同于JDK8中的Arrays.asList()

1. 为什么要使用Stream?

一切还要源于JDK8的发布,在那个函数式编程语言如火如荼的时代,Java由于它的臃肿而饱受诟病(强面向对象),社区迫切需要Java能加入函数式语言特点改善这种情况,终于在2014年Java发布了JDK8。

在JDK8中,我认为最大的新特性就是加入了函数式接口和lambda表达式,这两个特性取自函数式编程。

这两个特点的加入使Java变得更加简单与优雅,用函数式对抗函数式,巩固Java老大哥的地位,简直是师夷长技以制夷。

而Stream,就是JDK8又依托于上面的两个特性为集合类库做的 一个类库,它能让我们通过lambda表达式更简明扼要的以流水线的方式去处理集合内的数据,可以很轻松的完成诸如:过滤、分组、收集、归约这类操作,所以我愿将Stream称为函数式接口的最佳实践。

1.1 更清晰的代码结构

Stream拥有更清晰的代码结构,为了更好的讲解Stream怎么就让代码变清晰了,这里假设我们有一个非常简单的需求:在一个集合中找到所有大于2的元素 。

先来看看没使用Stream之前:

        List<Integer> list = List.of(1, 2, 3);
        
        List<Integer> filterList = new ArrayList<>();
        
        for (Integer i : list) {
            if (i > 2) {
                filterList.add(i);
            }
        }
        
        System.out.println(filterList);

上面的代码很好理解,我就不过多解释了,其实也还好了,因为我们的需求比较简单,如果需求再多点呢?

每多一个要求,那么if里面就又要加一个条件了,而我们开发中往往对象上都有很多字段,那么条件可能有四五个,最后可能会变成这样:

        List<Integer> list = List.of(1, 2, 3);

        List<Integer> filterList = new ArrayList<>();

        for (Integer i : list) {
            if (i > 2 && i < 10 && (i % 2 == 0)) {
                filterList.add(i);
            }
        }

        System.out.println(filterList);

if里面塞了很多条件,看起来就变得乱糟糟了,其实这也还好,最要命的是项目中往往有很多类似的需求,它们之间的区别只是某个条件不一样,那么你就需要复制一大坨代码,改吧改吧就上线了,这就导致代码里有大量重复的代码。

如果你Stream,一切都会变得清晰易懂:

        List<Integer> list = List.of(1, 2, 3).stream()
                .filter(i -> i > 2)
                .filter(i -> i < 10)
                .filter(i -> i % 2 == 0)
                .collect(toList());

这段代码你只需要关注我们最关注的东西:筛选条件就够了,filter这个方法名能让你清楚的知道它是个过滤条件,collect这个方法名也能看出来它是一个收集器,将最终结果收集到一个List里面去。

同时你可能发现了,为什么上面的代码中不用写循环?

因为Stream会帮助我们进行隐式的循环,这被称为:内部迭代,与之对应的就是我们常见的外部迭代了。

所以就算你不写循环,它也会进行一遍循环。

1.2 不必关心变量状态

Stream在设计之初就被设计为不可变的,它的不可变有两重含义:

  1. 由于每次Stream操作都会生成一个新的Stream,所以Stream是不可变的,就像String。
  2. 在Stream中只保存原集合的引用,所以在进行一些会修改元素的操作时,是通过原元素生成一份新的新元素,所以Stream 的任何操作都不会影响到原对象。

第一个含义可以帮助我们进行链式调用,实际上我们使用Stream的过程中往往会使用链式调用,而第二个含义则是函数式编程中的一大特点:不修改状态。

无论对Stream做怎么样的操作,它最终都不会影响到原集合,它的返回值也是在原集合的基础上进行计算得来的。

所以在Stream中我们不必关心操作原对象集合带来的种种副作用,用就完了。

关于函数式编程可以查阅阮一峰的函数式编程初探

1.3 延迟执行与优化

Stream只在遇到终结操作的时候才会执行,比如:

        List.of(1, 2, 3).stream()
                .filter(i -> i > 2)
                .peek(System.out::println);

这么一段代码是不会执行的,peek方法可以看作是forEach,这里我用它来打印Stream中的元素。

因为filter方法和peek方法都是转换流方法,所以不会触发执行。

如果我们在后面加入一个count方法就能正常执行:

        List.of(1, 2, 3).stream()
                .filter(i -> i > 2)
                .peek(System.out::println)
                .count();

count方法是一个终结操作,用于计算出Stream中有多少个元素,它的返回值是一个long型。

Stream的这种没有终结操作就不会执行的特性被称为延迟执行

与此同时,Stream还会对API中的无状态方法进行名为循环合并的优化,具体例子详见第三节。

2. 创建Stream

为了文章的完整性,我思来想去还是加上了创建Stream这一节,这一节主要介绍一些创建Stream的常用方式,Stream的创建一般可以分为两种情况:

  1. 使用Steam接口创建
  2. 通过集合类库创建

同时还会讲一讲Stream的并行流与连接,都是创建Stream,却具有不同的特点。

2.1 通过Stream接口创建

Stream作为一个接口,它在接口中定义了定义了几个静态方法为我们提供创建Stream的API:

    public static<T> Stream<T> of(T... values) {
        return Arrays.stream(values);
    }

首先是of方法,它提供了一个泛型可变参数,为我们创建了带有泛型的Stream流,同时在如果你的参数是基本类型的情况下会使用自动包装对基本类型进行包装:

        Stream<Integer> integerStream = Stream.of(1, 2, 3);

        Stream<Double> doubleStream = Stream.of(1.1d, 2.2d, 3.3d);

        Stream<String> stringStream = Stream.of("1", "2", "3");

当然,你也可以直接创建一个空的Stream,只需要调用另一个静态方法——empty(),它的泛型是一个Object:

        Stream<Object> empty = Stream.empty();

以上都是我们让我们易于理解的创建方式,还有一种方式可以创建一个无限制元素数量的Stream——generate():

    public static<T> Stream<T> generate(Supplier<? extends T> s) {
        Objects.requireNonNull(s);
        return StreamSupport.stream(
                new StreamSpliterators.InfiniteSupplyingSpliterator.OfRef<>(Long.MAX_VALUE, s), false);
    }

从方法参数上来看,它接受一个函数式接口——Supplier作为参数,这个函数式接口是用来创建对象的接口,你可以将其类比为对象的创建工厂,Stream将从此工厂中创建的对象放入Stream中:

        Stream<String> generate = Stream.generate(() -> "Supplier");

        Stream<Integer> generateInteger = Stream.generate(() -> 123);

我这里是为了方便直接使用Lamdba构造了一个Supplier对象,你也可以直接传入一个Supplier对象,它会通过Supplier接口的get() 方法来构造对象。

2.2 通过集合类库进行创建

相较于上面一种来说,第二种方式更较为常用,我们常常对集合就行Stream流操作而非手动构建一个Stream:

        Stream<Integer> integerStreamList = List.of(1, 2, 3).stream();
        
        Stream<String> stringStreamList = List.of("1", "2", "3").stream();

在Java8中,集合的顶层接口Collection被加入了一个新的接口默认方法——stream(),通过这个方法我们可以方便的对所有集合子类进行创建Stream的操作:

        Stream<Integer> listStream = List.of(1, 2, 3).stream();
        
        Stream<Integer> setStream = Set.of(1, 2, 3).stream();

通过查阅源码,可以发先 stream() 方法本质上还是通过调用一个Stream工具类来创建Stream:

    default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }

2.3 创建并行流

在以上的示例中所有的Stream都是串行流,在某些场景下,为了最大化压榨多核CPU的性能,我们可以使用并行流,它通过JDK7中引入的fork/join框架来执行并行操作,我们可以通过如下方式创建并行流:

        Stream<Integer> integerParallelStream = Stream.of(1, 2, 3).parallel();

        Stream<String> stringParallelStream = Stream.of("1", "2", "3").parallel();

        Stream<Integer> integerParallelStreamList = List.of(1, 2, 3).parallelStream();

        Stream<String> stringParallelStreamList = List.of("1", "2", "3").parallelStream();

是的,在Stream的静态方法中没有直接创建并行流的方法,我们需要在构造Stream后再调用一次parallel()方法才能创建并行流,因为调用parallel()方法并不会重新创建一个并行流对象,而是在原有的Stream对象上面设置了一个并行参数。

当然,我们还可以看到,Collection接口中可以直接创建并行流,只需要调用与stream() 对应的parallelStream()方法,就像我刚才讲到的,他们之间其实只有参数的不同:

    default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }

    default Stream<E> parallelStream() {
        return StreamSupport.stream(spliterator(), true);
    }

不过一般情况下我们并不需要用到并行流,在Stream中元素不过千的情况下性能并不会有太大提升,因为将元素分散到不同的CPU进行计算也是有成本的。

并行的好处是充分利用多核CPU的性能,但是使用中往往要对数据进行分割,然后分散到各个CPU上去处理,如果我们使用的数据是数组结构则可以很轻易的进行分割,但是如果是链表结构的数据或者Hash结构的数据则分割起来很明显不如数组结构方便。

所以只有当Stream中元素过万甚至更大时,选用并行流才能带给你更明显的性能提升。

最后,当你有一个并行流的时候,你也可以通过sequential() 将其方便的转换成串行流:

        Stream.of(1, 2, 3).parallel().sequential();

2.4 连接Stream

如果你在两处构造了两个Stream,在使用的时候希望组合在一起使用,可以使用concat():

        Stream<Integer> concat = Stream
                .concat(Stream.of(1, 2, 3), Stream.of(4, 5, 6));

如果是两种不同的泛型流进行组合,自动推断会自动的推断出两种类型相同的父类:

        Stream<Integer> integerStream = Stream.of(1, 2, 3);

        Stream<String> stringStream = Stream.of("1", "2", "3");

        Stream<? extends Serializable> stream = Stream.concat(integerStream, stringStream);

3. Stream转换操作之无状态方法

JavaStream数据处理:延迟执行与不可变

无状态方法:即此方法的执行无需依赖前面方法执行的结果集。

在Stream中无状态的API我们常用的大概有以下三个:

  1. map()方法:此方法的参数是一个Function对象,它可以使你对集合中的元素做自定义操作,并保留操作后的元素。
  2. filter()方法:此方法的参数是一个Predicate对象,Predicate的执行结果是一个Boolean类型,所以此方法只保留返回值为true的元素,正如其名我们可以使用此方法做一些筛选操作。
  3. flatMap()方法:此方法和map()方法一样参数是一个Function对象,但是此Function的返回值要求是一个Stream,该方法可以将多个Stream中的元素聚合在一起进行返回。

先来看看一个map()方法的示例:

        Stream<Integer> integerStreamList = List.of(1, 2, 3).stream();

        Stream<Integer> mapStream = integerStreamList.map(i -> i * 10);

我们拥有一个List,想要对其中的每个元素进行乘10 的操作,就可以采用如上写法,其中的i是对List中元素的变量名, 后面的逻辑则是要对此元素进行的操作,以一种非常简洁明了的方式传入一段代码逻辑执行,这段代码最后会返回一个包含操作结果的新Stream。

这里为了更好的帮助大家理解,我画了一个简图:

JavaStream数据处理:延迟执行与不可变


接下来是filter()方法示例:

        Stream<Integer> integerStreamList = List.of(10, 20, 30).stream();

        Stream<Integer> filterStream = integerStreamList.filter(i -> i >= 20);

在这段代码中会执行i >= 20 这段逻辑,然后将返回值为true的结果保存在一个新的Stream中并返回。

这里我也有一个简单的图示:

JavaStream数据处理:延迟执行与不可变


flatMap() 方法的描述在上文我已经描述过,但是有点过于抽象,我在学习此方法中也是搜索了很多示例才有了较好的理解。

根据官方文档的说法,此方法是为了进行一对多元素的平展操作:

        List<Order> orders = List.of(new Order(), new Order());

        Stream<Item> itemStream = orders.stream()
                .flatMap(order -> order.getItemList().stream());

这里我通过一个订单示例来说明此方法,我们的每个订单中都包含了一个商品List,如果我想要将两个订单中所有商品List组成一个新的商品List,就需要用到flatMap()方法。

在上面的代码示例中可以看到每个订单都返回了一个商品List的Stream,我们在本例中只有两个订单,所以也就是最终会返回两个商品List的Stream,flatMap()方法的作用就是将这两个Stream中元素提取出来然后放到一个新的Stream中。

老规矩,放一个简单的图示来说明:

JavaStream数据处理:延迟执行与不可变

图例中我使用青色代表Stream,在最终的输出中可以看到flatMap()将两个流变成了一个流进行输出,这在某些场景中非常有用,比如我上面的订单例子。


还有一个很不常用的无状态方法peek()

    Stream<T> peek(Consumer<? super T> action);

peek方法接受一个Consumer对象做参数,这是一个无返回值的参数,我们可以通过peek方法做些打印元素之类的操作:

        Stream<Integer> peekStream = integerStreamList.peek(i -> System.out.println(i));

然而如果你不太熟悉的话,不建议使用,某些情况下它并不会生效,比如:

        List.of(1, 2, 3).stream()
                .map(i -> i * 10)
                .peek(System.out::println)
                .count();

API文档上面也注明了此方法是用于Debug,通过我的经验,只有当Stream最终需要重新生产元素时,peek才会执行。

上面的例子中,count只需要返回元素个数,所以peek没有执行,如果换成collect方法就会执行。

或者如果Stream中存在过滤方法如filter方法和match相关方法,它也会执行。

3.1 基础类型Stream

上一节提到了三个Stream中最常用的三个无状态方法,在Stream的无状态方法中还有几个和map()与flatMap()对应的方法,它们分别是:

  1. mapToInt
  2. mapToLong
  3. mapToDouble
  4. flatMapToInt
  5. flatMapToLong
  6. flatMapToDouble

这六个方法首先从方法名中就可以看出来,它们只是在map()或者flatMap()的基础上对返回值进行转换操作,按理说没必要单拎出来做成一个方法,实际上它们的关键在于返回值:

  1. mapToInt返回值为IntStream
  2. mapToLong返回值为LongStream
  3. mapToDouble返回值为DoubleStream
  4. flatMapToInt返回值为IntStream
  5. flatMapToLong返回值为LongStream
  6. flatMapToDouble返回值为DoubleStream

在JDK5中为了使Java更加的面向对象,引入了包装类的概念,八大基础数据类型都对应着一个包装类,这使你在使用基础类型时可以无感的进行自动拆箱/装箱,也就是自动使用包装类的转换方法。

比如,在最前文的示例中,我用了这样一个例子:

        Stream<Integer> integerStream = Stream.of(1, 2, 3);

我在创建Stream中使用了基本数据类型参数,其泛型则被自动包装成了Integer,但是我们有时可能忽略自动拆装箱也是有代价的,如果我们想在使用Stream中忽略这个代价则可以使用Stream中转为基础数据类型设计的Stream:

  1. IntStream:对应 基础数据类型中的int、short、char、boolean
  2. LongStream:对应基础数据类型中的long
  3. DoubleStream:对应基础数据类型中的double和float

在这些接口中都可以和上文的例子一样通过of方法构造Stream,且不会自动拆装箱。

所以上文中提到的那六个方法实际上就是将普通流转换成这种基础类型流,在我们需要的时候可以拥有更高的效率。

基础类型流在API方面拥有Stream一样的API,所以在使用方面只要明白了Stream,基础类型流也都是一样的。

 :IntStream、LongStream和DoubleStream都是接口,但并非继承自Stream接口。

3.2 无状态方法的循环合并

说完无状态的这几个方法我们来看一个前文中的例子:

        List<Integer> list = List.of(1, 2, 3).stream()
                .filter(i -> i > 2)
                .filter(i -> i < 10)
                .filter(i -> i % 2 == 0)
                .collect(toList());

在这个例子中我用了三次filter方法,那么大家觉得Stream会循环三次进行过滤吗?

如果换掉其中一个filter为map,大家觉得会循环几次?

        List<Integer> list = List.of(1, 2, 3).stream()
                .map(i -> i * 10)
                .filter(i -> i < 10)
                .filter(i -> i % 2 == 0)
                .collect(toList());

从我们的直觉来看,需要先使用map方法对所有元素做处理,然后再使用filter方法做过滤,所以需要执行三次循环。

但回顾无状态方法的定义,你可以发现其他这三个条件可以放在一个循环里面做,因为filter只依赖map的计算结果,而不必依赖map执行完后的结果集,所以只要保证先操作map再操作filter,它们就可以在一次循环内完成,这种优化方式被称为循环合并

所有的无状态方法都可以放在同一个循环内执行,它们也可以方便的使用并行流在多个CPU上执行。

4. Stream转换操作之有状态方法

前面说完了无状态方法,有状态方法就比较简单了,只看名字就可以知道它的作用:

方法名方法结果
distinct()元素去重。
sorted()元素排序,重载的两个方法,需要的时候可以传入一个排序对象。
limit(long maxSize)传入一个数字,代表只取前X个元素。
skip(long n)传入一个数字,代表跳过X个元素,取后面的元素。
takeWhile(Predicate predicate)JDK9新增,传入一个断言参数当第一次断言为false时停止,返回前面断言为true的元素。
dropWhile(Predicate predicate)JDK9新增,传入一个断言参数当第一次断言为false时停止,删除前面断言为true的元素。

以上就是所有的有状态方法,它们的方法执行都必须依赖前面方法执行的结果集才能执行,比如排序方法就需要依赖前面方法的结果集才能进行排序。

同时limit方法和takeWhile是两个短路操作方法,这意味效率更高,因为可能内部循环还没有走完时就已经选出了我们想要的元素。

所以有状态的方法不像无状态方法那样可以在一个循环内执行,每个有状态方法都要经历一个单独的内部循环,所以编写代码时的顺序会影响到程序的执行结果以及性能,希望各位读者在开发过程中注意。

5. 总结

本文主要是对Stream做了一个概览,并讲述了Stream的两大特点:

  1. 不可变:不影响原集合,每次调用都返回一个新的Stream。
  2. 延迟执行:在遇到终结操作之前,Stream不会执行。

同时也将Stream的API分成了转换操作和终结操作两类,并讲解了所有常用的转换操作,下一章的主要内容将是终结操作。

在看Stream源码的过程中发现了一个有趣的事情,在ReferencePipeline类中(Stream的实现类),它的方法顺序从上往下正好是:无状态方法 → 有状态方法 → 聚合方法。

好了,学完本篇后,我想大家对Stream的整体已经很清晰了,同时对转换操作的API应该也已经掌握了,毕竟也不多😂,Java8还有很多强大的特性,我们下次接着聊~


同时,本文在写作过程中也参考了以下书籍:

这三本书都非常好,第一本是Java核心技术的作者写的,如果你想全面的了解JDK8的升级可以看这本。

第二本可以说是一个小册子,只有一百多页很短,主要讲了一些函数式的思想。

如果你只能看一本,那么我这里推荐第三本,豆瓣评分高达9.2,内容和质量都当属上乘。

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: