响应式Spring Data
开发基于响应式流的应用,就像是在搭建数据流流动的管道,从而异步的数据能够顺畅流过每个环节。前边的例子主要聚焦于应用层,然而绝大多数系统免不了要与数据库进行交互,所以我们也需要响应式的持久层API和支持异步的数据库驱动。就像从自来水厂到家里水龙头这个管道中,如果任何一个环节发生了阻塞,那就可能造成整体吞吐量的下降。
各个数据库都开始陆续推出异步驱动,目前Spring Data支持的可以进行响应式数据访问的数据库有MongoDB、Redis、Apache Cassandra和CouchDB。今天我们用MongoDB来写一个响应式demo。
我们这个例子很简单,就是关于User的增删改查,以及基于注解的服务端推送。
使用lombok
org.projectlombok
lombok
1.16.20
编写User
@Data // 生成无参构造方法/getter/setter/hashCode/equals/toString
@AllArgsConstructor // 生成所有参数构造方法
@NoArgsConstructor // @AllArgsConstructor会导致@Data不生成无参构造方法,需要手动添加@NoArgsConstructor,如果没有无参构造方法,可能会导致比如com.fasterxml.jackson在序列化处理时报错
public class User {
private String id;
private String username;
private String phone;
private String email;
private String name;
private Date birthday;
}
增加Spring Data的依赖
在POM中增加Spring Data Reactive Mongo的依赖:
org.springframework.boot
spring-boot-starter-data-mongodb-reactive
MongoDB是文档型的NoSQL数据库,因此,我们使用@Document注解User类:
@Data
@AllArgsConstructor
@Document
public class User {
@Id
private String id; // 注解属性id为ID
@Indexed(unique = true) // 注解属性username为索引,并且不能重复
private String username;
...
配置数据源
<code># MONGODB (MongoProperties)/<code>
<code>spring.data.mongodb.authentication-database= # Authentication database name./<code>
<code>spring.data.mongodb.database=test # Database name./<code>
<code>spring.data.mongodb.field-naming-strategy= # Fully qualified name of the FieldNamingStrategy to use./<code>
<code>spring.data.mongodb.grid-fs-database= # GridFS database name./<code>
<code>spring.data.mongodb.host=localhost # Mongo server host. Cannot be set with uri./<code>
<code>spring.data.mongodb.password= # Login password of the mongo server. Cannot be set with uri./<code>
<code>spring.data.mongodb.port=27017 # Mongo server port. Cannot be set with uri./<code>
<code>spring.data.mongodb.repositories.enabled=true # Enable Mongo repositories./<code>
<code>spring.data.mongodb.uri=mongodb://localhost/test # Mongo database URI. Cannot be set with host, port and credentials./<code>
<code>spring.data.mongodb.username= # Login user of the mongo server. Cannot be set with uri./<code>
增加DAO层repository
非响应式Spring Data的CrudReposity对应的,响应式的Spring Data也提供了相应的Repository库:ReactiveCrudReposity,当然,我们也可以使用它的子接口ReactiveMongoRepository。
我们增加UserRepository:
<code>public interface UserRepository extends ReactiveCrudRepository { // 1/<code>
<code> Mono findByUsername(String username); // 2/<code>
<code> Mono deleteByUsername(String username);/<code>
<code> }/<code>
Service层
由于业务逻辑几乎为零,只是简单调用了DAO层就不在累赘。
Controller层
<code>@RestController/<code>
<code> @RequestMapping("/user")/<code>
<code> public class UserController {/<code>
<code> @Autowired/<code>
<code> private UserService userService;/<code>
<code> @PostMapping("")/<code>
<code> public Mono save(User user) {/<code>
<code> return this.userService.save(user);/<code>
<code> }/<code>
启动应用测试一下
由于涉及到POST和DELETE方法的请求,建议用支持RESTful的client来测试。
stream+json
看到这里细心的朋友可能会有点嘀咕,怎么看是不是异步的呢?毕竟查询全部的时候,结果都用中括号括起来了,这和原来返回List的效果似乎没多大区别。假设一下查询100个数据,如果是异步的话,以我们对“异步响应式流”的印象似乎应该是一个一个至少是一批一批的到达客户端的嘛。我们加个延迟验证一下:
@GetMapping("")
public Flux findAll() {
return this.userService.findAll().delayElements(Duration.ofSeconds(1));
}
每个元素都延迟1秒,现在我们在数据库里弄三条记录,然后请求查询全部的那个URL,发现并不是像/times一样一秒一个地出来,而是3秒之后一块儿出来的。果然如此,这一点都不响应式啊!
与/times类似,我们也加一个MediaType,不过由于这里返回的是JSON,因此不能使用TEXT_EVENT_STREAM,而是使用APPLICATION_STREAM_JSON,即application/stream+json格式。
@GetMapping(value = "", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux findAll() {
return this.userService.findAll().delayElements(Duration.ofSe
conds(2));
}
produces后边的值应该是application/stream+json字符串,因此用APPLICATION_STREAM_JSON_VALUE。
重启服务再次请求,发现三个user是一秒一个的速度出来的,中括号也没有了,而是一个一个独立的JSON值构成的json stream
总结:
如果有Spring Data开发经验的话,切换到Spring Data Reactive的难度并不高。跟Spring WebFlux类似:原来返回User的话,那现在就返回Mono;原来返回List的话,那现在就返回Flux。
对于稍微复杂的业务逻辑或一些必要的异常处理,比如上边的save方法,请一定采用响应式的编程方式来定义,从而一切都是异步非阻塞的。如下图所示,从HttpServer(如Netty或Servlet3.1以上的Servlet容器)到ServerAdapter(Spring WebFlux框架提供的针对不同server的适配器),到我们编写的Controller和DAO,以及异步数据库驱动,构成了一个完整的异步非阻塞的管道,里边流动的就是响应式流。