Rxjava2 笔记
Rxjava中常用的类
根据文档中的描述
RxJava 2 features several base classes you can discover operators on:
io.reactivex.Flowable : 0..N flows, supporting Reactive-Streams and backpressure //可以连续发射0-N个事件,并且背压
io.reactivex.Observable: 0..N flows, no backpressure //可以连续发射0-N个事件,但是不支持背压
io.reactivex.Single: a flow of exactly 1 item or an error //只能发射一个事件或者抛出异常
io.reactivex.Completable: a flow without items but only a completion or error signal //只能onComplete()或抛出异常不能发出事件
io.reactivex.Maybe: a flow with no items, exactly one item or an error //最多可以发出一个事件或抛出异常
1.Flowable,Observable,Single,Completeable的用法
它们的使用方法都差不多,只是要实现的方法不一样,或者内部可以调用的方法不一样而已.
Flowable
1. 事件发送者由Observable->Flowable,接受者由Observe->Subscriber
2. 创建Flowable对象时,需要指定背压策略,如 BackpressureStrategy.ERROR
3. 在onSubscribe()方法中需要调用subscription.request(long n)方法,不然不会处理事件的。
4. 默认Flowable的bufferSize为128个事件,超过128会出现MissingBackpressureException
Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(@NonNull FlowableEmitter e) throws Exception {
for (int i = 0; ; i++) {
e.onNext(i);
}
}
}, BackpressureStrategy.ERROR)
.subscribe(new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object o) {
Log.i("", "---->onNext:" + o.toString());
}
@Override
public void onError(Throwable t) {
Log.i("", "---->onError:" + t);
}
@Override
public void onComplete() {
Log.i("", "---->onComplete:");
}
});
Single
按照文档上将的
io.reactivex.Single: a flow of exactly 1 item or an error 这个类只能发送一个事件或者一个异常.
Single.create(new SingleOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull SingleEmitter<Integer> e) throws Exception {
e.onSuccess(1);
e.onSuccess(2);//没有执行
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("","---------->Single:"+integer);
}
});
Observable
1. 延时2秒跳转页面
Observable.timer(2000,TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
startActivity(new Intent(SplashActivity.this,MainActivity.class));
overridePendingTransition(0, android.R.anim.fade_out);
finish();
}
});
2. 被观察者(Observable)通过ObservableEmitter对象发送消息,观察者通过accept()方法接受消息并作出响应
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("--------","-------------->integer:"+integer);
}
});
2. 常用的操作符
Map操作符 : 可以把上游发送的事件按照指定的函数去变化
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("10");
e.onNext("11");
e.onNext("12");
e.onComplete();
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(@NonNull String s) throws Exception {
int convertResult=Integer.parseInt(s);
return convertResult;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i("","---------->map操作符:"+integer);
}
});
flatMap,concatMap操作符
1.它们两个使用方法一致
2.flatMap不保证事件的顺序
3.concatMap会严格按照上游发送数据的顺序发送.
4.上游发送的每一个事件,否会通过flatMap转换成新的事件,然后再发送
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
List<String> list=new ArrayList<String>();
for (int i = 0; i < 3; i++) {
list.add("flat map 操作符-->"+integer);
}
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("",s);
}
});
zip操作符
1. 可以从两个或多个Observable中各取出一个事件进行组合,并且一个事件只能被使用一次, 组合的顺序是严格按照事件发送的顺利 来进行的。
2. 最终下游收到的事件数量 是和上游中发送事件最少的那一根水管的事件数量相同.
3. 注意不同Observable可以在不同线程。
Observable observableInt= Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable observableStr= Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("A");
e.onNext("B");
e.onNext("C");
e.onNext("D");
e.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable.zip(observableInt, observableStr, new BiFunction() {
@Override
public Object apply(@NonNull Object o, @NonNull Object o2) throws Exception {
return o.toString()+"-"+o2.toString();
}
}).subscribe(new Consumer() {
@Override
public void accept(Object o) throws Exception {
Log.i("","---------Zip:"+o.toString());
}
});
filter,sapmle操作符
1. 都是过滤操作符
2. sample是没隔指定的时间,从上游取出一个事件交由下游处理
3. filter需要自己定义过滤逻辑