前言
反应式编程是一种可以替代命令式编程的编程范式。这种可替代性存在的原因在于反应式编程解决了命令式编程中的一些限制。理解这些限制,有助于你更好地理解反应式编程模型的优点
反应式流规范
- 对比 Java 中的流
Java的流和反应式流Java的流和反应式流之间有很多相似之处。首先,它们的名字中都有流(Stream)这个词。
它们还提供了用于处理数据的函数式API。事实上,正如你稍后将会在介绍Reactor时看到的那样,它们甚至可以共享许多相同的操作。
Java的流通常都是同步的,并且只能处理有限的数据集。从本质上来说,它们只是使用函数来对集合进行迭代的一种方式。
反应式流支持异步处理任意大小的数据集,同样也包括无限数据集。只要数据就绪,它们就能实时地处理数据,并且能够通过回压来避免压垮数据的消费者。
- 反应式流规范
反应式流规范可以总结为4个接口:Publisher、Subscriber、Subscription和Processor。
Publisher负责生成数据,并将数据发送给Subscription(每个Subscriber对应一个Subscription)。
Publisher接口声明了一个方法subscribe(),Subscriber可以通过该方法向Publisher发起订阅。
1 |
|
初识Reactor
Reactor项目是反应式流规范的一个实现,提供了一组用于组装反应式流的函数式API。
反应式编程要求我们采取和命令式编程不一样的思维方式。此时我们不会再描述每一步要进行的步骤,反应式编程意味着要构建数据将要流经的管道。当数据流经管道时,可以对它们进行某种形式的修改或者使用。
命令式编程:
1 | String a = "Apple"; |
反应式编程:
1 | Mono.just("Apple") |
helloAPPLE!
14:36:38.685 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
helloAPPLE!
1 |
|
<!--reactor core-->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<!--reactor test-->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.2.6.RELEASE</version>
<scope>compile</scope>
</dependency>
1 |
|
Flux<String> fruitFlux = Flux.just("Apple","Orange");
fruitFlux.subscribe(System.out::println);
1 | 这里传递给subscribe()方法的lambda表达式实际上是一个java.util.Consumer,用来创建反应式流的Subscriber。在调用subscribe()之后,数据会开始流动。在这个例子中,没有中间操作,所以数据从Flux直接流向订阅者 |
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.verifyComplete();
1 |
|
List<String> list = Lists.newArrayList();
list.add("Apple");
list.add("Orange");
Flux<String> stringFlux = Flux.fromIterable(list);
1 | ![](https://tva1.sinaimg.cn/large/007S8ZIlgy1gglweh3r7uj322k0u0ay8.jpg) |
Flux
1 | 通过interval()方法创建的Flux会从0开始发布值,并且后续的条目依次递增。此外,因为interval()方法没有指定最大值,所以它可能会永远运行。我们也可以使用take()方法将结果限制为前5个条目 |
Flux
Flux
fruitFluxA.mergeWith(fruitFluxB).subscribe(System.out::println);
1 |
com.ckj.superlearn.superlearn.base.ReactorStrategy
16:03:07.343 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
Apple
Orange
Banana
watermelon
Process finished with exit code 0
1 | ![](https://tva1.sinaimg.cn/large/007S8ZIlgy1gglzay93rfj30xs0esafw.jpg) |
Flux
Flux
Flux
allFlux.subscribe(x-> System.out.println(“allFlux:”+x));
Flux<Tuple2<String, String>> zip = Flux.zip(fruitFluxA, fruitFluxB);
zip.subscribe(x-> System.out.println(“zip:”+x));
Thread.sleep(1000);
1 |
|
/com.ckj.superlearn.superlearn.base.ReactorStrategy
16:49:44.543 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
allFlux:Apple
allFlux:Orange
allFlux:Banana
zip:[Apple,Banana]
allFlux:watermelon
zip:[Orange,watermelon]
Process finished with exit code 0
1 | * 3 转换和过滤反应式流 |
Flux
fruitFluxA.subscribe(x->{
System.out.println(x);
});
1 |
/com.ckj.superlearn.superlearn.base.ReactorStrategy
17:05:00.141 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
Banana
watermelon
Process finished with exit code 0
1 |
|
Flux
fruitFluxA.subscribe(x->{
System.out.println(x);
});
1 |
com.ckj.superlearn.superlearn.base.ReactorStrategy
17:20:59.483 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
Apple
Orange
Process finished with exit code 0
1 | filter()的过滤效果 |
Flux
fruitFluxA.filter(x->x.equals(“Apple”)).subscribe(x->{
System.out.println(x);
});
1 |
com.ckj.superlearn.superlearn.base.ReactorStrategy
17:24:03.242 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
Apple
Process finished with exit code 0
1 |
|
Flux
“watermelon”, “Apple”, “Orange”, “Banana”, “watermelon”, “Apple”, “Orange”, “Banana”, “watermelon”);
fruitFluxA.flatMap(Mono::just).map(String::toUpperCase).subscribeOn(Schedulers.parallel());
![](https://tva1.sinaimg.cn/large/007S8ZIlgy1ggm1rjb3naj31580kgguh.jpg)
使用flatMap()和subscribeOn()的好处是:我们可以在多个并行线程之间拆分工作,从而增加流的吞吐量。因为工作是并行完成的,无法保证哪项工作首先完成,所以结果Flux中数据项的发布顺序是未知的
> 原创不易,如果觉得有点用的话,请毫不留情点个赞,转发一下,这将是我持续输出优质文章的最强动力。