RxJava 基础知识
RxJava创建操作符
注: 使用Rxjava之前需要添加依赖
dependencies {
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
implementation 'io.reactivex.rxjava2:rxjava:2.2.0'
implementation 'io.reactivex.rxjava2:rxkotlin:2.3.0'
// 注:RxJava2 与 RxJava1 不能共存,即依赖不能同时存在
}
复制代码
操作符 | 作用 |
---|---|
create() | 使用一个函数从头创建一个Observable |
just() | 将一个或多个对象转换成发射这个或这些对象的一个Observable |
from() | 将一个Iterable,一个Future或者一个数组转换成一个Observable |
defer() | 只有当订阅者订阅才创建Observable,为每个订阅创建一个新的Observable |
range() | 创建一个发射指定范围的整数序列的Observable |
interval() | 创建一个按照给定的时间间隔发射整数序列的Observable |
timer() | 创建一个在给定的延迟之后发射单个数据的Observable |
empty() | 创建一个什么都不做直接通知完成的Observable |
error() | 创建一个什么都不做直接通知错误的Observable |
never() | 创建一个不发射任何数据的Observable |
一. Create
使用一个函数从头开始创建一个Observable
RxJava建议我们在传递create方法的函数时,先检查一下观察者的isDisposed状态,以便在没有观察者的时候,让我们的Observable停止发射数据,防止运行昂贵的运算
- 实际应用
fun testCreate() {
Observable.create<Int> {
//判断Observable的isDisposed状态
if (!it.isDisposed) {
it.onNext(1)
it.onNext(2)
it.onNext(3)
it.onComplete()
}
}.subscribeBy(
onNext = { Log.e("TAG", "onNext: ${it.toString()}") },
onComplete = { Log.e("TAG", "onComplete") }
)
}
复制代码
执行结果
onNext: 1
onNext: 2
onNext: 3
onComplete
复制代码
二. Just
创建一个发射指定值的Observable
just 可以接受一至十个参数,返回一个按参数列表顺序发射这些数据的Observable
- 实际应用
fun testJust(){
Observable.just(1,2,3,4,5,6,7,8,9,10)
.subscribeBy(
onNext = { Log.e("TAG", "onNext: ${it.toString()}") },
onComplete = { Log.e("TAG", "onComplete") }
)
}
复制代码
执行结果
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onNext: 6
onNext: 7
onNext: 8
onNext: 9
onNext: 10
onComplete
复制代码
如果在just()中传入null,则会抛出一个空指针异常
三. From
将其他种类的对象和数据类型转换为Observable
- 实际应用
fun testFrom(){
val list = arrayListOf<Int>(1,2,3,4,5)
list.toObservable()
.subscribeBy(
onNext = { Log.e("TAG", "onNext: ${it.toString()}") },
onComplete = { Log.e("TAG", "onComplete") }
)
}
复制代码
执行结果
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onComplete
复制代码
对于Future,它会发射Future.get()方法返回的单个数据
class MyCallable : Callable<String>{
override fun call(): String {
Log.e("TAG","模拟一些耗时操作...")
Thread.sleep(5000)
return "OK"
}
}
fun testFromFuture(){
val executorService = Executors.newSingleThreadExecutor();
val future = executorService.submit(MyCallable())
Observable.fromFuture(future)
.subscribeBy(
onNext = {Log.e("TAG","onNext: $it")}
)
}
复制代码
执行结果
模拟一些耗时操作...
onNext: OK
复制代码
from方法有一个可接受两个可选参数的版本,分别指定超时时长和时间单位.如果过了指定的时长,Future还没有返回,Observable就会发射错误通知并终止
fun testFromFuture(){
val executorService = Executors.newSingleThreadExecutor();
val future = executorService.submit(MyCallable())
Observable.fromFuture(future,3,TimeUnit.SECONDS)
.subscribeBy(
onNext = {Log.e("TAG","onNext: $it")}
)
}
复制代码
执行结果
模拟一些耗时操作...
AndroidRuntime: FATAL EXCEPTION: main
io.reactivex.exceptions.OnErrorNotImplementedException
复制代码
四. Repeat
创建一个发射特定数据重复多次的Observable
repeat 不是创建一个Observable,而是重复发射原始Observable的数据序列,这个序列可以是无限的,也可以通过repeat(n)指定重复次数
fun testRepeat(){
Observable.just("hello")
//.repeat() //无限
.repeat(3) //循环3次
.subscribeBy(
onNext = { Log.e("TAG", "onNext: ${it.toString()}") },
onComplete = { Log.e("TAG", "onComplete") },
onError = { it.printStackTrace() }
)
}
复制代码
执行结果
onNext: hello
hello
onComplete
复制代码
在RxJava2.x中还有两个repeat相关的操作符: repeatWhen 和 repeatUntil
1. repeatWhen
repeatWhen 不是缓存和重复原始Observable的数据序列,而是根据指定的条件重新订阅和发布原来的Observable
fun testRepeatWhen(){
Observable.range(0,5)
.repeatWhen {
Observable.timer(10,TimeUnit.SECONDS)
}
.subscribeBy(
onNext = { Log.e("TAG", "onNext: ${it.toString()}") },
onComplete = { Log.e("TAG", "onComplete") },
onError = { it.printStackTrace() }
)
}
复制代码
执行结果
09-06 05:55:14.901 22472-22472/com.mufeng.rxjavademo E/TAG: onNext: 0
onNext: 1
onNext: 2
onNext: 3
onNext: 4
09-06 05:55:24.903 22472-22505/com.mufeng.rxjavademo E/TAG: onNext: 0
09-06 05:55:24.904 22472-22505/com.mufeng.rxjavademo E/TAG: onNext: 1
onNext: 2
onNext: 3
onNext: 4
09-06 05:55:24.911 22472-22505/com.mufeng.rxjavademo E/TAG: onComplete
复制代码
2. repeatUntil
repeatUntil是RxJava2.x新增的操作符,表示直到某个条件就不在重复发射数据
fun testRepeatUntil(){
val time = System.currentTimeMillis();
Observable.just("hello")
.repeatUntil {
System.currentTimeMillis() - time > 5000
}
.subscribeBy(
onNext = { Log.e("TAG", "onNext: ${it.toString()}") },
onComplete = { Log.e("TAG", "onComplete") },
onError = { it.printStackTrace() }
)
}
复制代码
执行结果
09-06 06:02:15.220 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.552 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.552 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.568 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.569 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.578 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.579 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.579 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.579 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.580 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.580 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.581 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.581 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.583 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.584 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.609 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.609 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:18.384 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:18.384 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:20.177 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:20.178 22728-22728/com.mufeng.rxjavademo E/TAG: onComplete
复制代码
五. Empty/Never/Error
1. empty()
创建一个观察者对象,只发送onComplete事件
Observable.empty<Int>()
.subscribeBy(
onNext = { Log.e("TAG","接受onNext事件==$it")},
onError = { Log.e("TAG","响应Error事件: ${it.localizedMessage}")},
onComplete = { Log.e("TAG","响应Complete事件")}
)
复制代码
2. error()
该方法创建的被观察者对象发送事件的特点:仅发送Error事件,直接通知异常
可自定义异常
Observable.error<Int>(Throwable("未知异常"))
.subscribeBy(onError = { Log.e("TAG","响应Error事件: ${it.localizedMessage}")})
复制代码
3. never()
不发送任何事件
Observable.never<Int>()
.subscribeBy(
onNext = { Log.e("TAG","接受onNext事件==$it")},
onError = { Log.e("TAG","响应Error事件: ${it.localizedMessage}")},
onComplete = { Log.e("TAG","响应Complete事件")}
)
复制代码
六. Defer
直到有观察者订阅时,才动态创建Observable 并为每个观察者创建一个全新的Observable
每次订阅时, 都会得到一个刚创建的最新的Observable对象, 可以确保Observable对象里面的数据是最新的
- 常用操作符
- RxJava 2․x: defer
- RxKotlin: defer
//第一次赋值
var i = 100
val observable = Observable.defer {
Observable.just(i)
}
//第二次复制
i = 200
//进行订阅, 此时才会去用defer()操作符去创建Observable对象
observable.subscribeBy(
onNext = { Log.e("TAG", "接受onNext事件==$it") },
onError = { Log.e("TAG", "响应Error事件: ${it.localizedMessage}") },
onComplete = { Log.e("TAG", "响应Complete事件") }
)
复制代码
七. Timer
创建一个Observable, 它在一个给定的延迟后发射一个特殊的值
Observable.timer(2, TimeUnit.SECONDS)
.subscribeBy(
onNext = { Log.e("TAG", "接受onNext事件==$it") },
onError = { Log.e("TAG", "响应Error事件: ${it.localizedMessage}") },
onComplete = { Log.e("TAG", "响应Complete事件") }
)
复制代码
八. Interval
按照一个固定时间间隔发射整数序列的Observable
- 常用操作符
- RxJava 2.X: interval
- RxKotlin: interval
/**
* 第一个参数: 第一次延迟时间
* 第二个参数: 后续发送事件的间隔时间
* 第三个参数: 时间单位
*/
Observable.interval(2,1,TimeUnit.SECONDS)
.subscribeBy(
onNext = { Log.e("TAG", "接受onNext事件==$it") },
onError = { Log.e("TAG", "响应Error事件: ${it.localizedMessage}") },
onComplete = { Log.e("TAG", "响应Complete事件") }
)
复制代码
九. Range
连续发送一个事件序列,可以指定范围
/**
* range 发送整数序列,没有延迟
* 第一个参数: 事件序列的起始点
* 第二个参数: 事件数量
* 如果设置成负数则会直接抛出异常
*/
Observable.range(3,10)
.subscribeBy(
onNext = { Log.e("TAG", "接受onNext事件==$it") },
onError = { Log.e("TAG", "响应Error事件: ${it.localizedMessage}") },
onComplete = { Log.e("TAG", "响应Complete事件") }
)
/**
* intervalRange: 发送整数序列,可以设置延迟
* 第一个参数: 事件序列起始点
* 第二个参数: 事件数量
* 第三个参数: 事件时间间隔
* 第四个参数: 事件单位
*/
Observable.intervalRange(3,10,2,1,TimeUnit.SECONDS)
.subscribeBy(
onNext = { Log.e("TAG", "接受onNext事件==$it") },
onError = { Log.e("TAG", "响应Error事件: ${it.localizedMessage}") },
onComplete = { Log.e("TAG", "响应Complete事件") }
)
复制代码
以上是Observable创建操作符的使用,这些操作符不仅在Observable可以使用,在Flowable等也可以使用
Demo地址: RxJavaDemo