
【Java-webflux】Spring5新特性之webflux反应式编程-Project Reactor
概念
第一次接触反应式编程可以先看一下反应式宣言,这里有一些名词和概念解释。 简单来说,反应式编程满足以下特质:
- 即时响应性:只要建立连接, 系统就会及时地做出响应。
- 回弹性:系统在出现失败时依然保持即时响应性。
- 弹性:系统在不断变化的工作负载之下依然保持即时响应性,也就是说可以弹性伸缩。
- 消息驱动:反应式系统依赖异步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。
一、现在-流行的SpringMVC所遇到的困境
经典SpringMVC案例:
如上图所示,在
controller
中会有一些外部资源调用过程阻塞线程,导致响应延迟,如果阻塞时间长就可能导致用户流失。
二、过渡-Servlet3.0异步请求原理
1.Servlet同步请求问题
- 假设同步场景:
平均访问量:2000/s
请求耗时:250ms
tomcat
最大线程数配置:2000/s * 0.25s = 500
此时,500个线程就可以满足。但如果访问量突增,请求进来就只能排队,这样就会导致响应延迟,用户流失。
2.Servlet异步如何解决
Filter
/Servlet
在生成响应之前可能要等待一些资源的响应以完成请求处理,比如一个jdbc的查询,或者远程服务rpc调用。在Servlet
中阻塞等待是一个低效的操作,这将导致受限系统资源的急剧紧张,比如线程,连接数。
Servlet3.0引入了异步处理请求的能力,使得线程可以不用阻塞等待,提早返回到容器,从而执行更多的请求任务。把耗时的任务提交给另一个异步线程去执行,以及产生响应。
- 异步场景: 业务平均访问量:5000/s 同步请求的耗时:250ms 异步请求的耗时:20ms 理论同步场景下tomcat最大线程数配置:5000/s * 0.25s = 1250 但异步时这里tomcat最大线程数我们可以使用100,此时我们使用Servlet异步线程,将业务逻辑放到一个其他线程池中的线程去执行,不阻塞tomcat处理请求。同时可以根据不同业务使用不同线程池,从而达到对指定业务限流的效果。
三、未来-WebFlux反应式模型
这里自然离不开经典的Spring反应式模型对比
- Spring MVC
- 构建于 Servlet API 之上
- 同步阻塞 I/O 模型, 认为应用汇阻塞当前线程,所以一个 Request 对应一个 Thread,需要有一个含有大量线程的线程池
- Spring WebFlux
- 构建于 Reactive Streams Adapters 之上
- 异步非阻塞 I/O 模型,认为应用不会阻塞当前线程,所以只是需要一个包含少数固定线程数的线程池 (event loop workers) 来处理请求
- Spring反应式模型中,完全支持了全链路的流式传输,从而实现【浏览器-中间件-程序-缓存/数据库】的反应式编程。
四、从0到1快速实战Webflux
一些Webflux的基础API科普:
webflux中一般用到的api都是Reactor
中的概念,符合Reactive Streams规范
- Reactor 的主要模块 Reactor 框架主要有两个主要的模块:reactor-core 和 reactor-ipc。前者主要负责 Reactive Programming 相关的核心 API 的实现,后者负责高性能网络通信的实现,目前是基于 Netty 实现的。
- Reactor 的主要类
在 Reactor 中,经常使用的类并不是很多,主要有以下两个:
- Mono 实现了
org.reactivestreams.Publisher
接口,代表0到1个元素的发布者。 - Flux 同样实现了
org.reactivestreams.Publisher
接口,代表0到N个元素的发布者。
- Mono 实现了
- 可能会使用到的类
- Scheduler 表示背后驱动反应式流的调度器,通常由各种线程池实现。
++Publisher/Flux和Mono++
由于响应流的特点,我们不能再返回一个简单的POJO
对象来表示结果了。必须返回一个类似Java
中的Future的概念,在有结果可用时通知消费者进行消费响应。
Reactive Streams规范中这种被定义为Publisher
Publisher
是一个可以提供0-N个序列元素的提供者,并根据其订阅者Subscriber<? super T>
的需求推送元素。
一个Publisher
可以支持多个订阅者,并可以根据订阅者的逻辑进行推送序列元素,当没有订阅时发布者什么也不做。
Publisher
提供了subscribe
方法,允许消费者在有结果可用时进行消费。
如果没有消费者Publisher
不会做任何事情,他根据消费情况进行响应。
Publisher
可能返回零或者多个,甚至可能是无限的,为了更加清晰表示期待的结果就引入了两个实现模型Mono
和Flux
。
当然想充分了解webflux还需要熟悉NIO模型、Reactor模式、lambda与函数式编程等知识,这里就不逐一详解了
1.接口实现webflux的两种方式
-
传统SpringMVC注解方式实现
-
基于函数式的的WebFlux开发 其中有两个核心接口:
-
HandlerFunction
:相当于Controller
中的具体处理方法,输入为请求,输出封装在Mono
中的响应 -
RouterFunction
:相当于RequestMapping
,将url映射到具体的HandlerFunction
,输入为请求,输出封装在Mono
中的HandlerFunction
-
组合使用的话就是如下示例: 先写接口实现:
@Component
public class UserHandler {
public Mono<ServerResponse> getUser(ServerRequest request) {
Optional<String> userId = request.queryParam("userId");
return userId.map(id ->
ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(Mono.just(AuthInfo.builder().id(id).appName("lichong.work").build()), AuthInfo.class))
.orElseGet(() ->
ServerResponse.badRequest().contentType(MediaType.TEXT_PLAIN).body(Mono.just("userId is empty"), String.class));
}
}
再写对应路由:
@Configuration
public class UserRouterCfg {
public RouterFunction<ServerResponse> userRouterFunctions(UserHandler userHandler) {
return RouterFunctions.route().GET("/user", userHandler::getUser).build();
}
}
官网文档有更多示例可供参考:API文档,在RouterFunction
中还可以增加一些过滤器,都是函数式的流式编程实现,自己尝试一下就可以很快理解了,这里我简单写了一个示例参考:
@Configuration
public class UserRouterCfg {
public RouterFunction<ServerResponse> userRouterFunctions(UserHandler userHandler) {
return RouterFunctions.route().filter((((serverRequest, handlerFunction) -> {
ServerRequest.Headers headers = serverRequest.headers();
List<String> tokenHeader = headers.header("token");
if (tokenHeader.isEmpty()) {
return ServerResponse.status(HttpStatus.FORBIDDEN).build();
}
return handlerFunction.handle(serverRequest);
}))).GET("/user", userHandler::getUser).build();
}
}
2.自定义全局异常
3. webflux和关系型数据库整合
// TODO
五、Project Reactor项目
// TODO