RxJava

嗯,今天看RxJava又发现自己有好多东西都不懂,慢慢了解吧९(།︵།)و

ReactiveX(简称Rx)

Reactive官网

Reactive翻译文档

一下是截取的一些要点:

Rx是一个使用可观察数据流进行异步编程的编程接口,结合了观察者模式、迭代器模式和函数式编程的特点。

微软给的定义是:Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序,使用Rx,开发者可以用Observables表示异步数据流,用LINQ操作符查询异步数据流, 用Schedulers参数化异步数据流的并发处理,Rx可以这样定义:Rx = Observables + LINQ + Schedulers。

Rx模式

使用观察者模式

  • 创建:Rx可以方便的创建事件流和数据流
  • 组合:Rx使用查询式的操作符组合和变换数据流
  • 监听:Rx可以订阅任何可观察的数据流并执行操作

简化代码

  • 函数式风格:对可观察的数据流使用无副作用的输入输出函数,避免了程序里错综复杂的状态
  • 简化代码:Rx的操作符通通常可以将复杂的难题简化为很少的几行代码
  • 异步错误处理:传统的try/catch没办法处理异步计算,Rx提供了合适的错误处理机制
  • 轻松使用并发:Rx的Observables和Schedulers让开发者可以摆脱底层的线程同步和各种并发问题

使用Observable的优势

Rx扩展了观察者模式用于支持数据和事件序列,添加了一些操作符,它让你可以声明式的组合这些序列,而无需关注底层的实现:如线程、同步、线程安全、并发数据结构和非阻塞IO。

Observable通过使用最佳的方式访问异步数据序列填补了这个间隙

单个数据 多个数据
同步 T getData() Iterable getData()
异步 Future getData() Observable getData()

Rx的Observable模型让你可以像使用集合数据一样操作异步事件流,对异步事件流使用各种简单、可组合的操作。

  • Observable可组合

对于单层的异步操作来说,Java中Future对象的处理方式是非常简单有效的,但是一旦涉及到嵌套,它们就开始变得异常繁琐和复杂。使用Future很难很好的组合带条件的异步执行流程(考虑到运行时各种潜在的问题,甚至可以说是不可能的),当然,要想实现还是可以做到的,但是非常困难,或许你可以用Future.get(),但这样做,异步执行的优势就完全没有了。从另一方面说,Rx的Observable一开始就是为组合异步数据流准备的。

  • Observable更灵活

Rx的Observable不仅支持处理单独的标量值(就像Future可以做的),也支持数据序列,甚至是无穷的数据流。Observable是一个抽象概念,适用于任何场景。Observable拥有它的近亲Iterable的全部优雅与灵活。

Observable是异步的双向push,Iterable是同步的单向pull,对比:

事件 Iterable(pull) Observer(push)
获取数据 T next() onNext(T)
异常处理 throws Exception onError(Exception)
任务完成 !hasNext() onCompleted()
  • Observable无偏见

Rx对于对于并发性或异步性没有任何特殊的偏好,Observable可以用任何方式实现,线程池、事件循环、非阻塞IO、Actor模式,任何满足你的需求的,你擅长或偏好的方式都可以。无论你选择怎样实现它,无论底层实现是阻塞的还是非阻塞的,客户端代码将所有与Observable的交互都当做是异步的。使用Rx可以在完全不影响Observable程序库使用者的情况下,彻底的改变Observable的底层实现。

  • 使用回调存在很多问题

回调在不阻塞任何事情的情况下,解决了Future.get()过早阻塞的问题。由于响应结果一旦就绪Callback就会被调用,它们天生就是高效率的。不过,就像使用Future一样,对于单层的异步执行来说,回调很容易使用,对于嵌套的异步组合,它们显得非常笨拙。

  • Rx是一个多语言的实现

Rx在大量的编程语言中都有实现,并尊重实现语言的风格,而且更多的实现正在飞速增加。

  • 响应式编程

Rx提供了一系列的操作符,你可以使用它们来过滤(filter)、选择(select)、变换(transform)、结合(combine)和组合(compose)多个Observable,这些操作符让执行和复合变得非常高效。

你可以把Observable当做Iterable的推送方式的等价物,使用Iterable,消费者从生产者那拉取数据,线程阻塞直至数据准备好。使用Observable,在数据准备好时,生产者将数据推送给消费者。数据可以同步或异步的到达,这种方式更灵活。

下面的例子展示了相似的高阶函数在Iterable和Observable上的应用

1
2
3
4
5
6
7
8
9
10
11
12
13
// Iterable
getDataFromLocalMemory()
.skip(10)
.take(5)
.map({ s -> return s + " transformed" })
.forEach({ println "next => " + it })
// Observable
getDataFromNetwork()
.skip(10)
.take(5)
.map({ s -> return s + " transformed" })
.subscribe({ println "onNext => " + it })

Observable类型给GOF的观察者模式添加了两种缺少的语义,这样就和Iterable类型中可用的操作一致了:

1、生产者可以发信号给消费者,通知它没有更多数据可用了(对于Iterable,一个for循环正常完成表示没有数据了;对于Observable,就是调用观察者的onCompleted方法)

2、生产者可以发信号给消费者,通知它遇到了一个错误(对于Iterable,迭代过程中发生错误会抛出异常;对于Observable,就是调用观察者(Observer)的onError方法)

有了这两种功能,Rx就能使Observable与Iterable保持一致了,唯一的不同是数据流的方向。任何对Iterable的操作,你都可以对Observable使用。


好,了解完一系列相关的东西,接下来该进入正题了(一天差不多也过去了)……(๑°⌓°๑) ,明天再继续,默默吐槽一下我的渣效率(´ཀ`」 ∠)


RxJava的观察者模式

  • onNext():相当于onClick()onEvent

  • onCompleted():事件队列完结。RxJava不仅单独处理事件,还会将其视作一个队列,当不再会有onNext()发出时,需触发onCompleted()作为标志。

  • onError():事件队列异常。当事件处理过程中发生异常时,会触发onError(),同时队列终止。

  • 在同一事件队列中,onCompleted()onError()有且只有一个,且在事件队列的最末。

RxJava的基本实现

0、导入依赖

1
2
compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'

1、创建Observer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 创建一个观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "Completed");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "Error");
}
@Override
public void onNext(String s) {
Log.d(TAG, "s");
}
};

除了Observer接口之外,RxJava还内置了一个实现了Observer的抽象类SubscriberSubscriberObserver进行了一些扩展:

  • onStart():这是一个可选的方法,默认情况下实现为空。该方法在subscribe刚开始,事件未发送之前调用,用于一些准备工作(如数据的清零或重置)。如果对准备工作有线程要求时不适用,在指定的线程可使用doOnSubscribe()方法。

  • unsubscribe():该方法用于取消订阅,在调用该方法后,Subscriber不再接收事件。一般在调用之前,先用isUnsubscribed判断一下状态。最好在不用的时候调用 unsubscribe()解除引用关系,因为在subscribe()后,Observable会持有Subscriber的引用,这个引用如果不及时释放,会有内存泄漏的危险

实际上,在 RxJava 的subscribe()过程中,Observer也总是会先被转换成一个Subscriber再使用。

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "Completed");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "Error");
}
@Override
public void onNext(String s) {
Log.d(TAG, "s");
}
};

2、创建Observable

  • 基本方法create()
1
2
3
4
5
6
7
8
9
// 创建被观察者
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("World");
subscriber.onCompleted();
}
});
  • 将传入的参数依次发送出来:just(T...)
1
Observable observable = Observable.just("Hello", "World");
  • 将传入的数组或Iterable拆分成具体对象后依次发送出来:from(T[])from(Iterable<? extend T>)
1
2
String[] words = {"Hello", "World"};
Observable observable = Observable.from(words);
  • List
1
2
3
4
List<String> list = new ArrayList<String>();
list.add("Hello");
list.add("World");
Observable observable = Observable.from(list);

3、Subscribe(订阅)

1
2
observable.subscribe(observer);
// observable.subscribe(subscriber);

除了subscribe(Observer)subscribe(Subscriber)subscribe()还支持不完整定义的回调,RxJava会自动根据定义创建出Subscriber 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(TAG, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(TAG, "completed");
}
};
observable.subscribe(onNextAction);
observable.subscribe(onNextAction, onErrorAction);
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

RxJava 提供了多个ActionX形式的接口 (例如Action0Action1) ,它们可以被用以包装不同的无返回值的方法。


嗯,在满课和抽空看RxJava中一天又过去了,感觉还是一知半解,明天继续 ⁽⁽٩(๑˃̶͈̀ ˂̶͈́)۶⁾⁾


Action

上文已经提到了Action,但是……还是不太懂,所以,继续吧 ->

常用

  • Action0:只有一个call()方法,无参无返回值。可将Action0视作一个将onCompleted()的内容打包的包装对象,它将自己作为一个参数传入subscribe()以实现不完整定义的回调。

  • Action1:只有一个call(T param)方法,无返回值,有一个参数。可将Action1视作一个将onNext(obj)onError(error)的内容打包来传入subscribe()的包装对象,

使用Action的前后对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable.just("Hello", "World")
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.i(TAG, s);
}
});
1
2
3
4
5
6
7
Observable.just("Hello", "World")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, s);
}
}

解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 处理onNext()中的内容
Action1<String> onNextAction = new Action1<String>() {
@Override
public void call(String s) {
Log.d(TAG, s);
}
};
// 处理onError()中的内容
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
// Error handling
}
};
// 处理onCompleted()中的内容
Action0 onCompletedAction = new Action0() {
@Override
public void call() {
Log.d(TAG, "completed");
}
};
// 使用onNextAction()来定义onNext()
observable.subscribe(onNextAction);
// 使用onNextAction()和onErrorAction()来定义onNext()和onError
observable.subscribe(onNextAction, onErrorAction);
// 使用onNextAction()、onErrorAction()和onCompletedAction()来定义onNext()、onError()和onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

map、flatMap转换

RxJava提供了对事件序列进行变换的支持。

map

举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Observable.just("Hello", "World")
// 将String类型的转化为Integer类型的哈希码
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return s.hashCode();
}
})
// 将转化后得到的Integer类型的哈希码再转化为String类型
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return integer.intValue() + "";
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, s);
}
});

注意:

  • Func1Action1相似,它们的区别在于Func1包装的是有返回值的方法。

  • map()可以进行多次转换。

  • map()实现的是一对一的转化,要实现一对多的转化用flatMap()

flatMap

举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.i(TAG, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);

原理:

1、使用传入的事件对象(students)创建一个Observable对象。

2、激活Observable,使其开始发送事件。

3、每一个创建出来的Observable发送的事件,都被汇入同一个Observable,这个Observable将事件统一交给Subscriber的回调方法。

Scheduler

在不指定线程的情况下,RxJava遵循的是线程不变的原则。如果需要切换线程,则需要用到Scheduler。

API

  • Schedulers.immediate():默认的Scheduler,在当前线程运行。

  • Schedulers.newThread():启用新线程,并在新线程执行操作。

  • Schedulers.io():I/O操作(读写文件、读写数据库、网络信息交互等)使用的Scheduler。行为模式与newThread()相似,区别在于io()内部实现使用的线程池无数量上限,可以重用空闲的线程,多数情况下io()newThread()更有效率。除此之外,计算工作不要放在io()中,以避免创建不必要的线程。

  • Schedulers.computation():CPU密集型计算(不会被I/O等操作限制性能)时使用的Scheduler。使用固定的线程池,大小为CPU核数。同样,不要将I/O操作放在computation()中,以避免CPU的浪费。

  • AndroidScheduler.mainThread():Android专用,制定操作在主线程中运行。

方法

  • subscribeOn() :事件产生的线程。指定subscribe()发生时所处的线程。

  • observeOn():事件消费的线程。指定Subscriber运行时所在的线程。

举个例子:

1
2
3
4
5
6
7
8
9
10
11
Observable.just(1, 2, 3, 4)
// 指定 subscribe() 发生在 IO 线程
.subscribeOn(Schedulers.io())
// 指定 Subscriber 的回调发生在主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});

相关链接

差不多基本的了解就是上面那些了,然后接下来就要开始实践了。

以上内容是学习的这些文档:

Github

给 Android 开发者的 RxJava 详解

还有的上文已经列出,就不加以赘述了。

除此之外,还有一篇写得很棒的对Subscription解读的文章:关于RxJava1中的Subscription的一些误解,RxJava2也是参考的该作者的系列文章学习的。