05.24 Java 9 新特性:Reactive Streams

Java 9 新特性:Reactive Streams

Reactive Streams

Reactive Streams是一个使用非阻塞背压机制的异步流处理标准。

back pressure(背压)是其中的关键概念。在异步模式中,消费者订阅生产者,从生产者那里获取数据,需要提供回调方法,当生产者产生新的可用数据后,就调用回调方法。当生产者发送数据的速度大于消费者处理的速度时,消费者就会抢占更多的资源来处理,并且有崩溃的可能。为了防止这种问题,需要一种机制,能让消费者通知生产者:生产速度太快了需要减速,然后生产者可以进行相应调整。这个机制就叫做背压

背压可以分为阻塞非阻塞

阻塞比较简单,例如生产者和消费者运行在同一个线程中,一个执行、另一个阻塞,意味着当消费者执行时,生产者不会发送新的数据。

非阻塞的方式是把 推模式改为了 拉模式,推模式是生产者来决定,生产者尽快的把数据发给消费者,拉模式是消费者来决定,消费者向生产者请求一定数量的数据,生产者会按照这个数量发送,在下次请求到来之前就是等待。

API 中的重要类型

Publisher

生产数据,供订阅者消费,只有一个方法 subscribe(Subscriber)

Subscriber

订阅生产者,接收数据(通过 onNext(T)方法)、错误信息( onError(Throwable)方法)、没有更多数据的信号( onComplete()),在这些动作之前,publisher 会调用 onSubscription(Subscription)

Subscription

是发布者和订阅者之间的连接,订阅者会通过它来请求更多的数据( request(long)),或者中断连接( cancel())。

整体流程

  • 创建一个 Publisher 和一个 Subscriber

  • 通过 Publisher::subscribe 关联订阅者

  • 发布者创建一个 Subscription 然后调用 Subscriber::onSubscription,这样订阅连接就建立起来了

  • 订阅者调用 Subscription::request 请求一定数量的数据

  • 发布者调用

    Subscriber::onNext 向订阅者传递数据,数据量不会超过订阅者指定的数量

  • 当发布者没有更多数据时会调用 Subscriber::onComplete,如果出错就调用 Subscriber::onError

  • 订阅者可以继续请求更多的数据,或者通过 Subscription::cancel 关闭连接

可以看到,订阅者调用 Subscription::request主动请求,这就是对非阻塞背压的实现。


分享到:


相關文章: