RxJava2.0
参考:
RxJava 只看这一篇文章就够了 (上)
RxJava 只看这一篇文章就够了 (中)
RxJava 只看这一篇文章就够了 (下)
导入
implementation 'io.reactivex.rxjava2:rxjava:2.1.14'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
- 要素:
-
Observable的构造(静态方法):
- 一、构造操作符
- 二、转换操作符
- 三、组合操作符
-
四、功能操作符
- 1. delay (1, TimeUnit.SECOND)
- 2. doOnEach (Consumer/Observer)
- 3. doOnSubscribe (Consumer)
- 4. doOnNext (Consumer)
- 5. doAfterNext (Consumer)
- 6. doOnComplete (Action)
- 7. doOnError (Consumer)
- 8. doOnDispose (Action)
- 9. doOnLifecycle (Customer, Action)
- 10. doOnTerminate (Action)
- 11. doAfterTerminate (Action)
- 12. doFinally (Action)
- 13. onErrorReturn (Function)
- 14. onErrorResumeNext (Function/Observable)
- 15. onExceptionResumeNext (Observable)
- 16. retry (/2/Predicate/2,Predicate/BiPredicate)
- 17. retryUntil (BooleanSupplier)
- 18. retryWhen (Function)
- 19. repeat (/2)
- 20. repeatWhen (Function)
- 21. subscribeOn (线程)
- 22. observeOn (线程)
- 五、过滤操作符
- 六、条件操作符
- 注意
- 实践
要素:
Observable
Observer
subscribe
Disposable subscribe = observable.subscribe(observer);
Observable的构造(静态方法):
一、构造操作符
create、just、fromArray、fromIterable、fromCallable、fromFuture、defer、timer、range、rangeLong、interval、intervalRange、empty、never、error
1. create
onNext传递,onComplete结束传递
2. just(object ...) 最多10个
3. fromArray(object...) 无限个对象
for (int i = 10; i < 15; i++) {
list.add(i);
}
Observable.fromIterable(list)...
5. fromCallable(new Callable<Integer>() {...})
有一个返回值,即不是直接传入object而已
如:
Observable. fromCallable(new Callable<List<Integer>>() {
@Override
public List<Integer> call() throws Exception {
List<Integer> list = new ArrayList<>();
for (int i = 10; i < 15; i++) {
list.add(i);
}
return list;
}
})
如上对比fromIterable,相当于把list的创建过程写进Callable的call方法中
6. ? fromFuture(future)
FutureTask future= new FutureTask(...)
增加可操作Callable的cancel等方法,通过get等获取Callable的返回值
7. defer
直到被观察者被订阅后才会创建被观察者
8. timer
指定时间后发送一个`0L`
9. range(int start, int count)
从start开始,+1 count次
10. rangeLong
同上 `range(int start, int count)`
从start开始,+1 count次
11. interval
从0开始发送 +1
12. intervalRange
同上 限定了发送的个数
13. empty()
直接发送 onComplete() 事件
14. never()
不发送任何事件
15. error()
发送 onError() 事件
二、转换操作符
map、flatMap、concatMap、filter、buffer、groupBy、scan、window
1. map
转换数据类型
2. flatMap
将一个Observable转换为另一种Observable
输出无序(需要在新Observable添加delay才能看出),与concatMap
区分
Observable.just(1, 2, 3)
.flatMap(new Function<Integer, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Integer integer) throws Exception {
List<String> list = new ArrayList<>();
list.add("hello" + integer);
return Observable.fromIterable(list).delay(0, TimeUnit.MILLISECONDS);
}
})
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
输出:hello1
hello2
hello3
}
});
3. concatMap
同上,只是`concatMap`为有序输出,`flatMap`无序输出
4. buffer(count, skip)
按长度分割为集合
Observable.just(1,2,3,4,5)
.buffer(3, 2)
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> list) throws Exception {
输出:[1, 2, 3]
[3, 4, 5]
[5]
}
});
5. groupBy
将发送的数据进行分组,每个分组都会返回一个Observable
Observable.just(1, 2, 3, 4, 5, 6)
.groupBy(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) throws Exception {
return integer % 3; //①
}
})
.subscribe(new Consumer<GroupedObservable<Integer, Integer>>() {
@Override
public void accept(final GroupedObservable<Integer, Integer> stringIntegerGroupedObservable) throws Exception {
Lcat.print("----------A---------", stringIntegerGroupedObservable.getKey());
stringIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
if (stringIntegerGroupedObservable.getKey() == 2) { //stringIntegerGroupedObservable.getKey()即可获取①处的值
输出余数为2的值:2
5
}
}
});
}
});
6. window
按照实际划分窗口,将数据发送给不同的 Observable
Observable.just(1, 2, 3, 4, 5)
.window(2)
.subscribe(new Consumer<Observable<Integer>>() {
@Override
public void accept(Observable<Integer> integerObservable) throws Exception {
Lcat.print("-------------------");
integerObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
输出:-------------------
1
2
-------------------
3
4
-------------------
5
}
});
}
});
三、组合操作符
concat、concatArray、concatArrayDelayError、merge、mergeArray、mergeArrayDelayError、zip、combineLatest、combineLatestDelayError、reduce、collect、startWith、startWithArray、count
1. concat
处理最多4个Observable,前一个结束才读下一个,可以应用为,先读缓存,再请求网络; 相当于串联
调用onNext
和onComplete
标记
Observable.concat(Observable.just(1, 2).delay(1,TimeUnit.SECONDS), Observable.just("a"))
.subscribe(new Consumer<Serializable>() {
@Override
public void accept(Serializable serializable) throws Exception {
输出:
(1s后)
1
2
a
}
});
2. concatArray
同上,可以多于4个Observable
3. merge
同上,不用等到 发射器 A 发送完所有的事件再进行发射器 B 的发送。相当于并联
如上例,输出:
a
(1s后)
1
2
4. mergeArray
同上
5. concatArrayDelayError
6. mergeArrayDelayError
以上关于concat和merge的方法都是处理多个Observable的,发送onError之后都会终断继续发送事件。所以用`concatArrayDelayError`和`mergeArrayDelayError`可以防止,使所有事件发送完成再发送onError事件
7. zip
依次合并几个事件,一对一,二对二,发送次数为次数少的个数,多的部分则不处理
Observable.zip(Observable.just(1, 3), Observable.just("a", "b", "c", "d"), new BiFunction<Integer, String, Object>() {
@Override
public Object apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
输出:1a
3b
}
});
8. combineLatest
与zip类似,但是其与时间有关,只会和与其最近发送的事件结合
9. combineLatestDelayError
同上,延迟发送onError事件
10. reduce
每次用一个方法处理一个值,可以有一个 seed 作为初始值
Observable.just(1, 2, 3, 4)
.reduce(8, new BiFunction<Integer, Integer, Integer>() { //初始值为8,无则为0
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
输出:18
(即:8+1=9 9+2=11 11+3=14 14+4=18)
}
});
11. scan
同上,区别是会输出过程
如上例,输出:
8
9
11
14
18
12. collect
将数据收集到数据结构当中,如集合等
Observable.just(1, 2, 10)
.collect(new Callable<List<Integer>>() {
@Override
public List<Integer> call() throws Exception {
List<Integer> list = new ArrayList<>();
list.add(333);
return list;
}
}, new BiConsumer<List<Integer>, Integer>() {
@Override
public void accept(List<Integer> list, Integer integer) throws Exception {
list.add(integer);
}
}).subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> list) throws Exception {
Lcat.print("list", list); // [333, 1, 2, 10]
}
});
13. startWith
14. startWithArray
在发送事件之前追加事件,`startWith()` 追加一个事件,`startWithArray()` 可以追加多个事件
15. count
返回被观察者发送事件的数量
Observable.just(1, 2, "65")
.count()
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Lcat.print("事件个数为:", aLong); // 事件个数为:3
}
});
四、功能操作符
delay、doOnEach、doOnSubscribe、doOnNext、doAfterNext、doOnComplete、doOnError、doOnDispose、doOnLifecycle、doOnTerminate、doAfterTerminate、doFinally
1. delay (1, TimeUnit.SECOND)
延迟一段时间发送事件
2. doOnEach (Consumer/Observer)
每发送一件个事件之前都会先回调这个方法,即onNext
、onError
、onComplete
方法之前都会先走doOnEach
方法(onSubscribe
之前不会);
3. doOnSubscribe (Consumer)
onSubscribe()
之前都会回调该方法;
4. doOnNext (Consumer)
onNext
方法之前都会先走该方法;(接收到数据之前的操作,如保存数据到数据库等)
5. doAfterNext (Consumer)
onNext
方法之后都会先走该方法;
6. doOnComplete (Action)
onComplete()
之前都会回调这个方法;
7. doOnError (Consumer)
onError()
之前都会回调这个方法;
8. doOnDispose (Action)
调用 Disposable 的 dispose()
之后回调该方法;
9. doOnLifecycle (Customer, Action)
在回调 onSubscribe 之前回调该方法的第一个参数的回调方法,可以使用该回调方法决定是否取消订阅。第二个参数的回调方法的作用与 doOnDispose() 是一样的
10. doOnTerminate (Action)
在 onError 或者 onComplete 发送之前回调;
11. doAfterTerminate (Action)
在onError 或者 onComplete 发送之后回调;
12. doFinally (Action)
所有事件发送完毕之后回调该方法。
13. onErrorReturn (Function)
执行完onError方法之后,后续的onNext不会被执行,但是会执行一次onErrorReturn回调的方法,作为最后一个onNext的值。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onError(new NullPointerException());
e.onNext(2);
e.onNext(3);
}
}).onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
Lcat.print("e", throwable);
return 404;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Lcat.print("i", integer); // 输出: 1,404 不会输出2和3
}
});
14. onErrorResumeNext (Function/Observable)
在执行onError之后,可以继续发送该方法内的事件,new Error()或 new Exception();
15. onExceptionResumeNext (Observable)
同上,但是只能捕捉Exception,只能是new Exception();
16. retry (/2/Predicate/2,Predicate/BiPredicate)
出现错误事件,则会重新发送**所有**事件;
17. retryUntil (BooleanSupplier)
出现错误事件之后,可以通过此方法判断是否继续发送事件。
18. retryWhen (Function)
如果返回的被观察者发送 Error 事件则之前的被观察者不会继续发送事件,如果发送正常事件则之前的被观察者会继续不断重试发送事件。
19. repeat (/2)
重复发送事件(执行完所有再重复执行)
20. repeatWhen (Function)
返回一个新的被观察者设定一定逻辑来决定是否重复发送事件;
21. subscribeOn (线程)
指定被观察者的线程(多次调用此方法,只有第一次有效)
22. observeOn (线程)
指定观察者的线程(每指定一次就会生效一次)
Schedulers.computation( ) 用于使用计算任务,如事件循环和回调处理
Schedulers.immediate( ) 当前线程
Schedulers.io( ) 用于 IO 密集型任务,如果异步阻塞 IO 操作。
Schedulers.newThread( ) 创建一个新的线程
AndroidSchedulers.mainThread() Android 的 UI 线程,用于操作 UI。
五、过滤操作符
filter、ofType、skip、distinct、distinctUntilChanged、take、debounce、firstElement、lastElement、elementAt、elementAtOrError
1. filter
过滤(实现方法一目了然)
2. ofType
.ofType(Integer.class) //过滤不是int类型的数据`
可以过滤不符合该类型事件
3. distinct
去重
4. distinctUntilChanged
去除连续重复的事件
5. skip(count)
跳过前count个的输出
6. take(count)
只取前count个
7. last
取最后一个
8. debounce(timeout, unit)
去除发送频率过快的项(去除发送频率在timeout内的项)
9. firstElement
取事件序列的第一个元素
10. lastElement
取事件序列的最后一个元素
11. elementAt
指定取出事件序列中事件(index超出范围则不输出)
12. elementAtOrError
同上,超出范围会报异常崩溃
六、条件操作符
1. all (new Predicate<>{...})
判断事件序列是否**全部**满足某个事件
2. takeWhile (new Predicate<>{...})
设置条件,满足条件时就会发送该数据,反之则不发送
3. skipWhile (new Predicate<>(){...})
设置条件,满足条件时不发送该数据,反之则发送。(与`takeWhile`相反)
4. takeUntil (new Predicate<>(){...}
满足条件的下一次事件不会发送,即不满足条件的数据中,只发送第一条不满足的数据;
5. skipUntil(Observable)
当 skipUntil() 中的 Observable 发送事件了,原来的 Observable 才会发送事件给观察者。(skipUntil() 里的 Observable 并不会发送事件给观察者);
6. sequenceEqual (Observable1, Observable2)
判断两个observable发送的事件是否完全相同;
7. contains (object)
判断事件序列中是否含有某个元素;
8. isEmpty ()
判断事件序列是否为空;
9. amb (List<Observable>)
只发送最先发送事件的 Observable 中的事件, 其余Observable的事件被丢弃;
10. defaultIfEmpty (Object)
如果没emitter.onNext() 就执行了onComplete,则默认发送该Object对象。
注意
-
在Activity销毁的时候,rx的操作不会结束
所以可能需要在onDestroy中做出判断:@Override protected void onDestroy() { super.onDestroy(); if (mSubscribe != null && !mSubscribe.isDisposed()) { mSubscribe.dispose(); } }
实践
- 通过create创建网络请求的
observable1
- 通过create创建缓存读取的
observable2
- 通过concat先读缓存再读取网络
Observable.concat(observable1, observable2)
- 调用
subscribeOn(Schedulers.io())
在子线程处理 - 调用
observeOn(AndroidSchedulers.mainThread())
在主线程显示数据 - 调用
doOnNext
保存数据 - 调用
map
将json转换为实体类 - 调用
subscribe
完毕