在ReactiveX中,观察者(observer)订阅(subscribes)了一个事件源(Observable)。 然后,该观察者对事件源发出(emit)的任何事件或事件序列作出响应。 这种模式有利于并发操作,因为它不需要阻塞的等待事件源发出事件,而是以观察者的形式创建一个哨兵,随时准备在事件源发出事件的时候做出相应的响应。
此页面解释了反应模式是什么以及事件源和观察者是什么(以及观察者如何订阅事件源)。 其他页面显示了如何使用 各种事件源的操作指令 将事件源链接在一起并更改其行为。
在许多软件编程任务中,开发人员或多或少希望编写的指令是按照编写的顺序逐个执行并逐步完成。但是在ReactiveX中,许多指令可以并行执行,其结果紧接着按随机顺序由“观察者”处理。以事件源的形式定义一个校验和转换数据的机制, 然后观察者去订阅它, 当事件源按先前定义得机制行动并发出事件时观察者捕获并相应,而不是通过调用方法。
这种方式的优点是,当有一堆不依赖于彼此的任务时,可以同时执行所有任务,而不是等到每个任务完成后再执行下一个。 这样整个捆绑任务的完成时间只需要与捆绑中最长的任务一样长。
这里有许多术语用于描述这种异步编程和设计模型。 本文将使用下列术语:一个观察者(observer)订阅(subscribes)了一个事件源(Observable)。 一个事件源发出(emits) 事件(items)或通知(notifications)它的观察者通过调用观察者的方法。
在其他文档或内容下, “观察者(observer)”也叫做“订阅者(subscriber)”、“监视器(watcher)”或“反应器(reactor)” 这种模型通常被称为“反应器模式(reactor pattern)”。
此页面使用类Groovy的伪代码作为示例,但是在许多语言中都有ReactiveX实现。
在一个普通的方法调用流程如下,而在ReactiveX通常上是异步和并行调用方法:
伪代码如下所示:
// 进行调用,将其返回值赋给`returnVal` returnVal = someMethod(itsParameters); // 用returnVal做一些有用的事情
在异步模型中流程如下:
伪代码如下所示:
// 定义但是不执行onNext方法 // (在这个例子种, 观察者非常简单并且只有一个onNext方法) def myOnNext = { it -> do something useful with it }; // 定义但是不执行事件源 def myObservable = someObservable(itsParameters); // 观察者订阅事件源,并运行事件源 myObservable.subscribe(myOnNext); // 继续执行必要的代码
通过subscribe
方法关联观察者和事件源。一个观察者应该实现以下方法的某个自集:
onNext
onError
onNext
或onCompleted
。此方法将事件源抛出的错误为参数onCompleted
onNext
没有发生任何错误,则在调用最后一次之后调用此方法。
根据事件源合约(the Observable contract)中的条款, 事件源可以调用onNext
零次或多次,然后可以随后调用onCompleted
或onError
其中一个, 这将是它的最后一次调用。 按照合约, 在本文档中,调用onNext
通常称为事件的“发出(emissions)”,而调用onCompleted
或onError
称为“通知(notifications)”。
更完整的subscribe
调用如下所示:
def myOnNext = { item -> /* 对事件进行处理 */ }; def myError = { throwable -> /* 对失败的调用作出合理的处理 */ }; def myComplete = { /* 在最终进行清理工作 */ }; def myObservable = someMethod(itsParameters); myObservable.subscribe(myOnNext, myError, myComplete); // 继续执行必要的代码
在一些ReactiveX实现中,有一个专门的观察者接口Subscriber
,它实现了一个unsubscribe
方法。可以调用此方法来让观察者取消订阅。然后事件源可以(如果他们没有其他感兴趣的观察者)选择停止生成要发出的新事件。
此取消订阅的结果将通过适用于观察者订阅的事件源的链运算,这将导致链中的每个链接停止发出事件。然而,这并不能保证立即发生,即使在没有观察者处理这些发出之后,事件源也有可能产生并尝试发出事件一段时间。
ReactiveX的每种语言特定实现都有自己的命名怪癖。虽然实现之间存在许多共性,但没有规范的命名标准。
此外,这些名称中的一些在其他情境中具有不同的含义,或者在特定实现语言的常用用语中看起来很尴尬。
例如存在命名模式onEvent
(例如onNext
,
onCompleted
, onError
). 在某些上下文中,这样的名称将表示注册了哪个事件处理的方法。但是在ReactiveX指的是事件处理方法本身。
事件源何时开始发出其事件序列?这取决于事件源。 “热”事件源可以在创建后立即开始发出事件,因此任何后来订阅该事件源的观察者都可以开始在中间某处观察事件序列。 “冷”事件源需要观察者在开始发出事件之前订阅它,因此这样的观察者保证从一开始就看到整个事件序列。
在ReactiveX的一些实现中,还存在称为“可连接(Connectable)”事件源的东西。这样的事件源在调用Connect方法之前不会开始发出事件,无论是否有任何观察者都订阅了它。
事件源和观察者只是ReactiveX的开始。它们本身只不过是标准观察者模式的轻微扩展,更适合处理一系列事件而不是简单的回调。
真正的威力在于“反应性扩展(reactive extensions)” (书面称为“ReactiveX”)——能通过操作符来转换,组合,操作和处理事件源发出的事件序列。
这些Rx操作符提供了一种基于回调提供的效率优势以声明方式构造连续异步调用的方法。重要的是它们消除了通常异步系统中嵌套回调处理程序的缺点。
本文档提供各种操作符的信息及其用法示例在以下页面:
Create
, Defer
, Empty
/Never
/Throw
,
From
, Interval
, Just
, Range
, Repeat
,
Start
, and Timer
Buffer
, FlatMap
, GroupBy
, Map
, Scan
, and
Window
Debounce
, Distinct
, ElementAt
, Filter
,
First
, IgnoreElements
, Last
, Sample
,
Skip
, SkipLast
, Take
, and TakeLast
And
/Then
/When
, CombineLatest
, Join
,
Merge
, StartWith
, Switch
, and Zip
Catch
and Retry
Delay
, Do
, Materialize
/Dematerialize
,
ObserveOn
, Serialize
, Subscribe
, SubscribeOn
,
TimeInterval
, Timeout
, Timestamp
, and Using
All
, Amb
, Contains
, DefaultIfEmpty
,
SequenceEqual
, SkipUntil
, SkipWhile
, TakeUntil
,
and TakeWhile
Average
, Concat
, Count
, Max
, Min
,
Reduce
, and Sum
To
Connect
, Publish
, RefCount
, and Replay
这些页面包含有关某些操作符的信息,这些操作符不属于ReactiveX的核心,而是在一个或多个特定于语言的实现和/或可选模块中实现。
大多数操作符都在事件源上操作并返回一个事件源。这使我们可以在链中一个接一个地应用这些运算符。链中的每个操作符都会处理由前一个操作符的操作产生的事件源。
区别其他模式,如Builder模式,该模式使用特定类的各种方法通过方法的操作修改该对象,对同一类的项进行操作。这些模式允许以类似的方式链接方法。但是在Builder模式中,方法在链中出现的顺序通常并不重要,事件源的操作符顺序很重要。
一条链上的操作符不是单独的在原始的事件源上操作的,是依次在调用链被调用,每个操作符操作的事件源都由前一个操作符生成。