Spring WebFlux-快速上门(三)

响应式Spring Data

开发基于响应式流的应用,就像是在搭建数据流流动的管道,从而异步的数据能够顺畅流过每个环节。前边的例子主要聚焦于应用层,然而绝大多数系统免不了要与数据库进行交互,所以我们也需要响应式的持久层API和支持异步的数据库驱动。就像从自来水厂到家里水龙头这个管道中,如果任何一个环节发生了阻塞,那就可能造成整体吞吐量的下降。

各个数据库都开始陆续推出异步驱动,目前Spring Data支持的可以进行响应式数据访问的数据库有MongoDB、Redis、Apache Cassandra和CouchDB。今天我们用MongoDB来写一个响应式demo。

我们这个例子很简单,就是关于User的增删改查,以及基于注解的服务端推送。

使用lombok

org.projectlombok

lombok

1.16.20

编写User

@Data // 生成无参构造方法/getter/setter/hashCode/equals/toString

@AllArgsConstructor // 生成所有参数构造方法

@NoArgsConstructor // @AllArgsConstructor会导致@Data不生成无参构造方法,需要手动添加@NoArgsConstructor,如果没有无参构造方法,可能会导致比如com.fasterxml.jackson在序列化处理时报错

public class User {

private String id;

private String username;

private String phone;

private String email;

private String name;

private Date birthday;

}

增加Spring Data的依赖

在POM中增加Spring Data Reactive Mongo的依赖:

org.springframework.boot

spring-boot-starter-data-mongodb-reactive

MongoDB是文档型的NoSQL数据库,因此,我们使用@Document注解User类:

@Data

@AllArgsConstructor

@Document

public class User {

@Id

private String id; // 注解属性id为ID

@Indexed(unique = true) // 注解属性username为索引,并且不能重复

private String username;

...

配置数据源

<code># MONGODB (MongoProperties)/<code>
<code>spring.data.mongodb.authentication-database= # Authentication database name./<code>
<code>spring.data.mongodb.database=test # Database name./<code>
<code>spring.data.mongodb.field-naming-strategy= # Fully qualified name of the FieldNamingStrategy to use./<code>
<code>spring.data.mongodb.grid-fs-database= # GridFS database name./<code>
<code>spring.data.mongodb.host=localhost # Mongo server host. Cannot be set with uri./<code>
<code>spring.data.mongodb.password= # Login password of the mongo server. Cannot be set with uri./<code>
<code>spring.data.mongodb.port=27017 # Mongo server port. Cannot be set with uri./<code>
<code>spring.data.mongodb.repositories.enabled=true # Enable Mongo repositories./<code>
<code>spring.data.mongodb.uri=mongodb://localhost/test # Mongo database URI. Cannot be set with host, port and credentials./<code>
<code>spring.data.mongodb.username= # Login user of the mongo server. Cannot be set with uri./<code>

增加DAO层repository

非响应式Spring Data的CrudReposity对应的,响应式的Spring Data也提供了相应的Repository库:ReactiveCrudReposity,当然,我们也可以使用它的子接口ReactiveMongoRepository。

我们增加UserRepository:

<code>public interface UserRepository extends ReactiveCrudRepository {  // 1/<code>
<code>        Mono findByUsername(String username);     // 2/<code>
<code>        Mono deleteByUsername(String username);/<code>
<code>    }/<code>
  • 同样的,ReactiveCrudRepository的泛型分别是User和ID的类型;
  • ReactiveCrudRepository已经提供了基本的增删改查的方法,根据业务需要,我们增加四个方法
  • Service层

    由于业务逻辑几乎为零,只是简单调用了DAO层就不在累赘。

    Controller层

    <code>@RestController/<code>
    <code>    @RequestMapping("/user")/<code>
    <code>    public class UserController {/<code>
    <code>        @Autowired/<code>
    <code>        private UserService userService;/<code>
    <code>        @PostMapping("")/<code>
    <code>        public Mono save(User user) {/<code>
    <code>            return this.userService.save(user);/<code>
    <code>        }/<code>

    启动应用测试一下

    由于涉及到POST和DELETE方法的请求,建议用支持RESTful的client来测试。


    stream+json

    看到这里细心的朋友可能会有点嘀咕,怎么看是不是异步的呢?毕竟查询全部的时候,结果都用中括号括起来了,这和原来返回List的效果似乎没多大区别。假设一下查询100个数据,如果是异步的话,以我们对“异步响应式流”的印象似乎应该是一个一个至少是一批一批的到达客户端的嘛。我们加个延迟验证一下:

    @GetMapping("")

    public Flux findAll() {

    return this.userService.findAll().delayElements(Duration.ofSeconds(1));

    }

    每个元素都延迟1秒,现在我们在数据库里弄三条记录,然后请求查询全部的那个URL,发现并不是像/times一样一秒一个地出来,而是3秒之后一块儿出来的。果然如此,这一点都不响应式啊!

    与/times类似,我们也加一个MediaType,不过由于这里返回的是JSON,因此不能使用TEXT_EVENT_STREAM,而是使用APPLICATION_STREAM_JSON,即application/stream+json格式。

    @GetMapping(value = "", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)

    public Flux findAll() {

    return this.userService.findAll().delayElements(Duration.ofSe

    conds(2));

    }

    produces后边的值应该是application/stream+json字符串,因此用APPLICATION_STREAM_JSON_VALUE。

    重启服务再次请求,发现三个user是一秒一个的速度出来的,中括号也没有了,而是一个一个独立的JSON值构成的json stream


    总结:

    如果有Spring Data开发经验的话,切换到Spring Data Reactive的难度并不高。跟Spring WebFlux类似:原来返回User的话,那现在就返回Mono;原来返回List的话,那现在就返回Flux。

    对于稍微复杂的业务逻辑或一些必要的异常处理,比如上边的save方法,请一定采用响应式的编程方式来定义,从而一切都是异步非阻塞的。如下图所示,从HttpServer(如Netty或Servlet3.1以上的Servlet容器)到ServerAdapter(Spring WebFlux框架提供的针对不同server的适配器),到我们编写的Controller和DAO,以及异步数据库驱动,构成了一个完整的异步非阻塞的管道,里边流动的就是响应式流。


    Spring WebFlux-快速上门(三)


    分享到:


    相關文章: