Post

响应式编程介绍 - Reactor

本文主要介绍响应式编程的概念,与作为响应式编程的实现的Reactor框架的使用。但注意Reactor仍然是一个较为底层的框架,对于响应式的web开发,可以使用Spring WebFlux框架。但其不在本文的讨论范围内。

当我们开发应用程序时,有两种风格的代码我们可以写:命令式和响应式:

同步式(命令式) 的代码是一套串行任务,每次运行一个,完成前一个任务后再完成后一个。数据是批量进行处理的,在前面的任务没有完成批量数据处理前,不能将工作移交到下一个任务。

响应式 的代码很像是真正的报纸订阅的情况。定义一组任务去处理数据,但这些任务可并行运行。每个任务处理这些数据的一个子集,当它处理另外一个子集的时候,把处理完成的数据交给下一个任务。

需要明确的是,世界上没有一个完美的通用式的解决方案. 响应式自然可以实现一些命令式难以达成的目标,但这并不意味着其是万能的。响应式编程的目标是提供一种新的方式去处理数据,而不是替代命令式编程。

理解响应式编程

如果你已经对于Spring集成流,或者对于Java的StreamAPI有一定经验,那么你一般不会有理解障碍,可以直接跳过本节。

概念差异

要理解响应式编程为什么被发明,首先要了解普通同步式(命令式)的代码的局限.

传统的同步式(命令式) 的代码基本上是所有学习计算机编程相关知识时的101,所有代码一行一行地执行一串定义好的任务,每一步都需要获取到结果后才能继续执行下一步。 这种方式的好处是简单易懂,但是其缺点也很明显,就是效率低下。因为每一步都需要等待上一步的结果,所以这种方式的代码很难并行执行,而且很难处理大量的数据。 同时,也少不了阻塞问题需要解决。

对于同步式编码带来的问题,我们可以通过多线程来解决,例如Java语言支持并发编程。在Java程序中启动一个线程还是很容易的。 但是,多线程编程也有自己的问题,例如线程安全问题,线程间通信问题,线程死锁问题等等。 这些问题都是需要解决的。且这些问题都是具有挑战性的问题,随着线程数量的增加,其复杂度也会呈指数级增长。

( 当然所有的异步问题的解决都要面对多线程问题的处理,响应式编程自然也要利用线程池等技术完成异步任务的处理, 但是响应式编程更多的是在编程范式上的一种解决方案,而不是在同步式编程的基础上的一种解决方案。) (更新,具体来说在命令式编程中处理多个请求自然就需要多个线程,Reactor的一些flatmap自然也要多个线程完成并发操作, 但在web响应式编程中,其使用event loop,而不是Multi thread的的方法让一个线程可以处理多个request,具体可以看Spring WebFlux部分的博文)


本文介绍的响应式编程是一种新的编程范式,它不是在同步式编程的基础上的一种解决方案,而是一种新的编程范式。 响应式的主要特点是可以 异步地 不阻塞地 处理大量的(可以是无限的)数据。

应用于一个真实世界的类比就是,将命令式编程看做一个装水的气球,响应式编程看做花园里面的水管。两者都是在炎热的夏天让您的朋友惊喜并沉浸其中的方式。但是它们的执行风格是不同的: 一个水气球一次能携带的它的有效载荷,在撞击的那一刻浸湿了它预定的目标。然而,水球的容量是有限的,如果您想用水泡更多的人(或把同一个人淋得更湿),您唯一的选择就是增加水球的数量。 一根花园水龙带将其有效载荷作为一股水流从水龙头流向喷嘴。花园水龙头接的水带的容量可能是有限的,但在打水仗的过程中水是源源不断的。只要水从水龙头进入软管,就会一直通过软管然后从喷嘴喷出。同一个花园软管是很容易扩展的,您和朋友们可以玩得更尽兴。

你可能会联想到Java中的StreamAPI操作,其实StreamAPI操作就是响应式编程的一种实现。 但是StreamAPI操作也有自己的局限性,其处理的数据集合是有限的,并且往往只是简单地作为语法糖来简化集合迭代的流程, 而不被应用到(也无法应用到)整个应用程序中作为一种通用范式来完成各种复杂的业务逻辑。

换句话说Java的StreamApi很好,但其本身是一个工具,而不是一个更为核心的范式。具体开发响应式应用程序的规范是Reactive Streams。

Reactive Streams 规范

Reactive Streams 是 2013 年底由 Netflix、Lightbend 和 Pivotal(Spring 背后的公司)的工程师发起的一项计划。 响应式流旨在为无阻塞异步 Backpressure 流处理提供一个标准。

其重要定义了四个接口:

  • Publisher
  • Subscriber
  • Subscription
  • Processor

字面意义上就可以很好地理解,Publisher就是发布者,Subscriber就是订阅者,Processor就是两者之间的处理器,Subscription就是订阅关系。 注意具体的消息内容是 经由订阅者操作订阅关系获取的,通过订阅关系可以指定每次获取的消息数量,这就是Backpressure的概念。

类似于JDBC定义了连结数据库的底层规范,Reactive Streams定义了响应式编程的底层规范。 具体在使用响应式编程的时候,书中使用使用Reactive Streams规范的Reactor来实现响应式编程。当然存在一些其他的实现,例如RxJava,不过他们在使用上的差异不大。

Reactor使用示例

如果是springboot项目,导入reactor-core即可(书中的项目中还导入了reactor-test,以方便地进行测试) ,如果是非springboot项目,还需要导入reactor-bom ,具体步骤就不赘述了。


响应式编程需要我们从与命令式编程完全不同的角度去思考。响应式编程是通过直接建立一个用于数据流通的管道,而不是描述一系列需要进行的步骤。作为数据流通的管道,它可以被改变或以某种方式被使用。 例如,假设您想到利用一个人的名字,把它的所有字母变为大写,然后用它来创建一个问候语,最后将它打印出来。在命令式编程模型中,代码会是这个样子:

1
2
3
4
String name = "Craig";
String capitalName = name.toUpperCase();
String greeting = "Hello, " + capitalName + "!";
System.out.println(greeting);
1
2
3
4
Mono.just("Craig")
    .map(n -> n.toUpperCase())
    .map(cn -> "Hello, " + cn + "!")
    .subscribe(System.out::println);

关于just,map等方法的细节将很快在后一节讲述。

目前需要理解的是Mono这一概念, Mono 是 Reactor 的两个核心类型之一,另一个是 Flux。两者都是响应式流的 Publisher 的实现。Flux 表示零个、一个或多个(可能是无限个)数据项的管道。Mono 特定用于已知的数据返回项不多于一个的响应式类型。

其实差不多这些也就是全部的概念了,使用响应式框架的好处是其封装了 之前我们提到的Reactive Streams规范,使得我们不用关系 Publisher Subscriber Subscription Processor等概念,只需要关心数据的流向即可。

通用的响应式操作

如果要全部列举出响应式编程的操作,那么将要有200多个操作进行介绍,然而所有的操作其实可以分为以下几类:

  • 创建操作
  • 合并(拆分)操作
  • 转换与过滤操作
  • 逻辑操作

Creation

一般在实际场景中,我们的数据流都是从外部获取的,很少会直接使用创建操作来创建数据流。但是在测试的时候,创建操作是非常有用的。

  • just 直接用静态方法创建
  • fromArray 或者 fromIterable 从数组或者集合中创建
  • fromStream 从一个JavaStream中创建
  • range 从一个范围中创建 ,例如range(1, 10) 将会创建一个从1到10的数据流
  • interval 从一个时间间隔中创建,例如interval(Duration.ofSeconds(1)) 将会创建一个每秒钟产生一个数据的数据流

Combination

  • mergeWith 合并两个数据流,但是不保证顺序
  • zip 严格按照 一个A接着一个B的顺序合并两个数据流
  • first 从两个数据流中获取第一个数据流,抛弃另一个数据流, 该方法被弃用,新的方法是firstWithSignal,first的判定为第一个发出信号的数据流

Transformation

  • skip 跳过前n个数据,或者跳过一段时间内的数据
  • take skip的反向操作,只要前n个,或者只要一段时间内的数据
  • filter 不赘述了,自定义的过滤条件
  • distinct 去重
  • map 类似于java8的stream的map操作,但注意map方法是同步执行的
  • flatMap,搭配subscribeOn方法指定一个线程池,可以实现异步的map操作,其本质上是将map操作定义为另外的多个Mono或者Flux,然后再结果合并。所以最终结果的顺序是不确定的。
  • buffer 将Flux流中的数据,按照n个一组的方式进行分组,最终返回的是一个Flux<List>的数据流 为什么要这么做,即把响应式流对象 又 转为了同步式的List? 这是因为其后面往往紧接着另一个flatMap操作,让其他线程的流同时接受List进行处理,List的Size就是bufferSize,这也和其方法名对应上了。
  • collectList 如果buffer不知道参数,其会将整个流中的数据缓存为一个Flux<List> ,但如果你真的有将数据转为List的需求,那么可以使用collectList方法,其会将整个流中的数据缓存为一个Mono ,这个Mono就是一个List了。
  • collectMap ,指定一个 元素 到 key 的Function实现,然后流中的数据转为Map,注意如果key重复,后者会覆写前者

Logic

  • all
  • any 方法内部传递lambda实现,返回boolean类型

总结

  • 响应式编程 让程序运行的逻辑 从一步一步执行预定义的操作。 变成了定义一个数据流,然后让(可以是无数的)数据 流进这个数据流管道,然后流出操作后的数据。
  • Reactor Stream 是响应式编程的一个规范,其定义了四种类型:Publisher、Subscriber、Subscription 和 Transformer(Publisher 和 Subscriber 的组合)。
  • Project Reactor 实现了 Reactive Steam,并将流定义抽象为两种主要类型,Flux 和 Mono,每种类型都提供数百个操作。
  • Spring 利用 Reactor 创建响应式控制器、Repository、REST 客户端和其他响应式框架支持。( 即Reactor还是一个较为底层的框架,Spring在其基础上创建了其他的框架,例如WebFlux用于响应式的web应用开发)
This post is licensed under CC BY 4.0 by the author.