侧边栏壁纸
博主头像
Ric

Talk less,think more,do more

  • 累计撰写 20 篇文章
  • 累计创建 9 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录

【Java-webflux】Spring5新特性之webflux反应式编程-Project Reactor

Ric
Ric
2022-08-22 / 1 评论 / 0 点赞 / 238 阅读 / 1,783 字 / 正在检测是否收录...
温馨提示:
若内容或图片失效,请留言反馈。

概念

第一次接触反应式编程可以先看一下反应式宣言,这里有一些名词和概念解释。
简单来说,反应式编程满足以下特质:

  1. 即时响应性:只要建立连接, 系统就会及时地做出响应。
  2. 回弹性:系统在出现失败时依然保持即时响应性。
  3. 弹性:系统在不断变化的工作负载之下依然保持即时响应性,也就是说可以弹性伸缩。
  4. 消息驱动:反应式系统依赖异步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。

反应式宣言

一、现在-流行的SpringMVC所遇到的困境

经典SpringMVC案例
经典SpringMVC案例
如上图所示,在controller中会有一些外部资源调用过程阻塞线程,导致响应延迟,如果阻塞时间长就可能导致用户流失。

二、过渡-Servlet3.0异步请求原理

1.Servlet同步请求问题

  • 假设同步场景
    平均访问量:2000/s
    请求耗时:250ms
    tomcat最大线程数配置:2000/s * 0.25s = 500

此时,500个线程就可以满足。但如果访问量突增,请求进来就只能排队,这样就会导致响应延迟,用户流失。

2.Servlet异步如何解决

Filter/Servlet在生成响应之前可能要等待一些资源的响应以完成请求处理,比如一个jdbc的查询,或者远程服务rpc调用。在Servlet阻塞等待是一个低效的操作,这将导致受限系统资源的急剧紧张,比如线程,连接数。
Servlet3.0引入了异步处理请求的能力,使得线程可以不用阻塞等待,提早返回到容器,从而执行更多的请求任务。把耗时的任务提交给另一个异步线程去执行,以及产生响应。

  • 异步场景
    业务平均访问量:5000/s
    同步请求的耗时:250ms
    异步请求的耗时:20ms
    理论同步场景下tomcat最大线程数配置:5000/s * 0.25s = 1250
    但异步时这里tomcat最大线程数我们可以使用100,此时我们使用Servlet异步线程,将业务逻辑放到一个其他线程池中的线程去执行,不阻塞tomcat处理请求。同时可以根据不同业务使用不同线程池,从而达到对指定业务限流的效果。

三、未来-WebFlux反应式模型

这里自然离不开经典的Spring反应式模型对比
Spring响应式模型对比

  • Spring MVC
    • 构建于 Servlet API 之上
    • 同步阻塞 I/O 模型, 认为应用汇阻塞当前线程,所以一个 Request 对应一个 Thread,需要有一个含有大量线程的线程池
  • Spring WebFlux
    • 构建于 Reactive Streams Adapters 之上
    • 异步非阻塞 I/O 模型,认为应用不会阻塞当前线程,所以只是需要一个包含少数固定线程数的线程池 (event loop workers) 来处理请求
    • Spring反应式模型中,完全支持了全链路的流式传输,从而实现【浏览器-中间件-程序-缓存/数据库】的反应式编程。

四、从0到1快速实战Webflux

一些Webflux的基础API科普
webflux中一般用到的api都是Reactor中的概念,符合Reactive Streams规范

  • Reactor 的主要模块
    Reactor 框架主要有两个主要的模块:reactor-core 和 reactor-ipc。前者主要负责 Reactive Programming 相关的核心 API 的实现,后者负责高性能网络通信的实现,目前是基于 Netty 实现的。
  • Reactor 的主要类
    在 Reactor 中,经常使用的类并不是很多,主要有以下两个:
    • Mono 实现了org.reactivestreams.Publisher接口,代表0到1个元素的发布者。
    • Flux 同样实现了org.reactivestreams.Publisher接口,代表0到N个元素的发布者。
  • 可能会使用到的类
    • Scheduler 表示背后驱动反应式流的调度器,通常由各种线程池实现。

Publisher/Flux和Mono
由于响应流的特点,我们不能再返回一个简单的POJO对象来表示结果了。必须返回一个类似Java中的Future的概念,在有结果可用时通知消费者进行消费响应。

Reactive Streams规范中这种被定义为Publisher
Publisher是一个可以提供0-N个序列元素的提供者,并根据其订阅者Subscriber<? super T>的需求推送元素。

一个Publisher可以支持多个订阅者,并可以根据订阅者的逻辑进行推送序列元素,当没有订阅时发布者什么也不做

Publisher提供了subscribe方法,允许消费者在有结果可用时进行消费。

如果没有消费者Publisher不会做任何事情,他根据消费情况进行响应。

Publisher可能返回零或者多个,甚至可能是无限的,为了更加清晰表示期待的结果就引入了两个实现模型MonoFlux

当然想充分了解webflux还需要熟悉NIO模型Reactor模式、lambda与函数式编程等知识,这里就不逐一详解了

1.接口实现webflux的两种方式

  • 传统SpringMVC注解方式实现
    传统SpringMVC注解方式实现

  • 基于函数式的的WebFlux开发
    其中有两个核心接口:

    • HandlerFunction:相当于Controller中的具体处理方法,输入为请求,输出封装在Mono中的响应
      HandlerFunction

    • RouterFunction:相当于RequestMapping,将url映射到具体的HandlerFunction,输入为请求,输出封装在Mono中的HandlerFunction
      RouterFunction

组合使用的话就是如下示例:
先写接口实现

@Component
public class UserHandler {
    
    public Mono<ServerResponse> getUser(ServerRequest request) {
        Optional<String> userId = request.queryParam("userId");
        return userId.map(id ->
                        ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(Mono.just(AuthInfo.builder().id(id).appName("lichong.work").build()), AuthInfo.class))
                .orElseGet(() ->
                        ServerResponse.badRequest().contentType(MediaType.TEXT_PLAIN).body(Mono.just("userId is empty"), String.class));
    }
}

再写对应路由

@Configuration
public class UserRouterCfg {

    public RouterFunction<ServerResponse> userRouterFunctions(UserHandler userHandler) {
        return RouterFunctions.route().GET("/user", userHandler::getUser).build();
    }
}

官网文档有更多示例可供参考:API文档,在RouterFunction中还可以增加一些过滤器,都是函数式的流式编程实现,自己尝试一下就可以很快理解了,这里我简单写了一个示例参考:

@Configuration
public class UserRouterCfg {

    public RouterFunction<ServerResponse> userRouterFunctions(UserHandler userHandler) {
        return RouterFunctions.route().filter((((serverRequest, handlerFunction) -> {
            ServerRequest.Headers headers = serverRequest.headers();
            List<String> tokenHeader = headers.header("token");
            if (tokenHeader.isEmpty()) {
                return ServerResponse.status(HttpStatus.FORBIDDEN).build();
            }
            return handlerFunction.handle(serverRequest);
        }))).GET("/user", userHandler::getUser).build();
    }
}

2.自定义全局异常

自定义全局异常

3. webflux和关系型数据库整合

// TODO

五、Project Reactor项目

// TODO

0

评论区