嗯,今天看RxJava又发现自己有好多东西都不懂,慢慢了解吧९(།︵།)و
ReactiveX(简称Rx)
一下是截取的一些要点:
Rx是一个使用可观察数据流进行异步编程的编程接口,结合了观察者模式、迭代器模式和函数式编程的特点。
微软给的定义是:Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序,使用Rx,开发者可以用Observables表示异步数据流,用LINQ操作符查询异步数据流, 用Schedulers参数化异步数据流的并发处理,Rx可以这样定义:Rx = Observables + LINQ + Schedulers。
Rx模式
使用观察者模式
- 创建:Rx可以方便的创建事件流和数据流
- 组合:Rx使用查询式的操作符组合和变换数据流
- 监听:Rx可以订阅任何可观察的数据流并执行操作
简化代码
- 函数式风格:对可观察的数据流使用无副作用的输入输出函数,避免了程序里错综复杂的状态
- 简化代码:Rx的操作符通通常可以将复杂的难题简化为很少的几行代码
- 异步错误处理:传统的try/catch没办法处理异步计算,Rx提供了合适的错误处理机制
- 轻松使用并发:Rx的Observables和Schedulers让开发者可以摆脱底层的线程同步和各种并发问题
使用Observable的优势
Rx扩展了观察者模式用于支持数据和事件序列,添加了一些操作符,它让你可以声明式的组合这些序列,而无需关注底层的实现:如线程、同步、线程安全、并发数据结构和非阻塞IO。
Observable通过使用最佳的方式访问异步数据序列填补了这个间隙
| 单个数据 | 多个数据 | |
|---|---|---|
| 同步 | T getData() | Iterable |
| 异步 | Future |
Observable |
Rx的Observable模型让你可以像使用集合数据一样操作异步事件流,对异步事件流使用各种简单、可组合的操作。
- Observable可组合
对于单层的异步操作来说,Java中Future对象的处理方式是非常简单有效的,但是一旦涉及到嵌套,它们就开始变得异常繁琐和复杂。使用Future很难很好的组合带条件的异步执行流程(考虑到运行时各种潜在的问题,甚至可以说是不可能的),当然,要想实现还是可以做到的,但是非常困难,或许你可以用Future.get(),但这样做,异步执行的优势就完全没有了。从另一方面说,Rx的Observable一开始就是为组合异步数据流准备的。
- Observable更灵活
Rx的Observable不仅支持处理单独的标量值(就像Future可以做的),也支持数据序列,甚至是无穷的数据流。Observable是一个抽象概念,适用于任何场景。Observable拥有它的近亲Iterable的全部优雅与灵活。
Observable是异步的双向push,Iterable是同步的单向pull,对比:
| 事件 | Iterable(pull) | Observer(push) |
|---|---|---|
| 获取数据 | T next() | onNext(T) |
| 异常处理 | throws Exception | onError(Exception) |
| 任务完成 | !hasNext() | onCompleted() |
- Observable无偏见
Rx对于对于并发性或异步性没有任何特殊的偏好,Observable可以用任何方式实现,线程池、事件循环、非阻塞IO、Actor模式,任何满足你的需求的,你擅长或偏好的方式都可以。无论你选择怎样实现它,无论底层实现是阻塞的还是非阻塞的,客户端代码将所有与Observable的交互都当做是异步的。使用Rx可以在完全不影响Observable程序库使用者的情况下,彻底的改变Observable的底层实现。
- 使用回调存在很多问题
回调在不阻塞任何事情的情况下,解决了Future.get()过早阻塞的问题。由于响应结果一旦就绪Callback就会被调用,它们天生就是高效率的。不过,就像使用Future一样,对于单层的异步执行来说,回调很容易使用,对于嵌套的异步组合,它们显得非常笨拙。
- Rx是一个多语言的实现
Rx在大量的编程语言中都有实现,并尊重实现语言的风格,而且更多的实现正在飞速增加。
- 响应式编程
Rx提供了一系列的操作符,你可以使用它们来过滤(filter)、选择(select)、变换(transform)、结合(combine)和组合(compose)多个Observable,这些操作符让执行和复合变得非常高效。
你可以把Observable当做Iterable的推送方式的等价物,使用Iterable,消费者从生产者那拉取数据,线程阻塞直至数据准备好。使用Observable,在数据准备好时,生产者将数据推送给消费者。数据可以同步或异步的到达,这种方式更灵活。
下面的例子展示了相似的高阶函数在Iterable和Observable上的应用
|
|
Observable类型给GOF的观察者模式添加了两种缺少的语义,这样就和Iterable类型中可用的操作一致了:
1、生产者可以发信号给消费者,通知它没有更多数据可用了(对于Iterable,一个for循环正常完成表示没有数据了;对于Observable,就是调用观察者的onCompleted方法)
2、生产者可以发信号给消费者,通知它遇到了一个错误(对于Iterable,迭代过程中发生错误会抛出异常;对于Observable,就是调用观察者(Observer)的onError方法)
有了这两种功能,Rx就能使Observable与Iterable保持一致了,唯一的不同是数据流的方向。任何对Iterable的操作,你都可以对Observable使用。
好,了解完一系列相关的东西,接下来该进入正题了(一天差不多也过去了)……(๑°⌓°๑) ,明天再继续,默默吐槽一下我的渣效率(´ཀ`」 ∠)
RxJava的观察者模式

onNext():相当于onClick()/onEventonCompleted():事件队列完结。RxJava不仅单独处理事件,还会将其视作一个队列,当不再会有onNext()发出时,需触发onCompleted()作为标志。onError():事件队列异常。当事件处理过程中发生异常时,会触发onError(),同时队列终止。在同一事件队列中,
onCompleted()与onError()有且只有一个,且在事件队列的最末。
RxJava的基本实现
0、导入依赖
|
|
1、创建Observer
|
|
除了Observer接口之外,RxJava还内置了一个实现了Observer的抽象类Subscriber,Subscriber对Observer进行了一些扩展:
onStart():这是一个可选的方法,默认情况下实现为空。该方法在subscribe刚开始,事件未发送之前调用,用于一些准备工作(如数据的清零或重置)。如果对准备工作有线程要求时不适用,在指定的线程可使用doOnSubscribe()方法。unsubscribe():该方法用于取消订阅,在调用该方法后,Subscriber不再接收事件。一般在调用之前,先用isUnsubscribed判断一下状态。最好在不用的时候调用unsubscribe()解除引用关系,因为在subscribe()后,Observable会持有Subscriber的引用,这个引用如果不及时释放,会有内存泄漏的危险
实际上,在 RxJava 的subscribe()过程中,Observer也总是会先被转换成一个Subscriber再使用。
代码:
|
|
2、创建Observable
- 基本方法
create()
|
|
- 将传入的参数依次发送出来:
just(T...)
|
|
- 将传入的数组或
Iterable拆分成具体对象后依次发送出来:from(T[])/from(Iterable<? extend T>)
|
|
List
|
|
3、Subscribe(订阅)
|
|
除了subscribe(Observer)和subscribe(Subscriber),subscribe()还支持不完整定义的回调,RxJava会自动根据定义创建出Subscriber 如下:
|
|
RxJava 提供了多个ActionX形式的接口 (例如Action0,Action1) ,它们可以被用以包装不同的无返回值的方法。
嗯,在满课和抽空看RxJava中一天又过去了,感觉还是一知半解,明天继续 ⁽⁽٩(๑˃̶͈̀ ˂̶͈́)۶⁾⁾
Action
上文已经提到了Action,但是……还是不太懂,所以,继续吧 ->
常用
Action0:只有一个call()方法,无参无返回值。可将Action0视作一个将onCompleted()的内容打包的包装对象,它将自己作为一个参数传入subscribe()以实现不完整定义的回调。Action1:只有一个call(T param)方法,无返回值,有一个参数。可将Action1视作一个将onNext(obj)和onError(error)的内容打包来传入subscribe()的包装对象,
使用Action的前后对比
|
|
|
|
解析
|
|
map、flatMap转换
RxJava提供了对事件序列进行变换的支持。
map
举个例子:
|
|
注意:
Func1和Action1相似,它们的区别在于Func1包装的是有返回值的方法。map()可以进行多次转换。map()实现的是一对一的转化,要实现一对多的转化用flatMap()。
flatMap
举个例子:
|
|
原理:
1、使用传入的事件对象(students)创建一个Observable对象。
2、激活Observable,使其开始发送事件。
3、每一个创建出来的Observable发送的事件,都被汇入同一个Observable,这个Observable将事件统一交给Subscriber的回调方法。
Scheduler
在不指定线程的情况下,RxJava遵循的是线程不变的原则。如果需要切换线程,则需要用到Scheduler。
API
Schedulers.immediate():默认的Scheduler,在当前线程运行。Schedulers.newThread():启用新线程,并在新线程执行操作。Schedulers.io():I/O操作(读写文件、读写数据库、网络信息交互等)使用的Scheduler。行为模式与newThread()相似,区别在于io()内部实现使用的线程池无数量上限,可以重用空闲的线程,多数情况下io()比newThread()更有效率。除此之外,计算工作不要放在io()中,以避免创建不必要的线程。Schedulers.computation():CPU密集型计算(不会被I/O等操作限制性能)时使用的Scheduler。使用固定的线程池,大小为CPU核数。同样,不要将I/O操作放在computation()中,以避免CPU的浪费。AndroidScheduler.mainThread():Android专用,制定操作在主线程中运行。
方法
subscribeOn():事件产生的线程。指定subscribe()发生时所处的线程。observeOn():事件消费的线程。指定Subscriber运行时所在的线程。
举个例子:
|
|
相关链接
差不多基本的了解就是上面那些了,然后接下来就要开始实践了。
以上内容是学习的这些文档:
还有的上文已经列出,就不加以赘述了。
除此之外,还有一篇写得很棒的对Subscription解读的文章:关于RxJava1中的Subscription的一些误解,RxJava2也是参考的该作者的系列文章学习的。