三默网为您带来有关“RxJava2.X 学习笔记 -- 创建操作符”的文章内容,供您阅读参考。

RxJava2.X 学习笔记 -- 创建操作符

2023-01-21 19:52:57

RxJava 基础知识

RxJava创建操作符

注: 使用Rxjava之前需要添加依赖

dependencies {
    implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
    implementation 'io.reactivex.rxjava2:rxjava:2.2.0'
    implementation 'io.reactivex.rxjava2:rxkotlin:2.3.0'
    // 注:RxJava2 与 RxJava1 不能共存,即依赖不能同时存在
}
复制代码
操作符 作用
create() 使用一个函数从头创建一个Observable
just() 将一个或多个对象转换成发射这个或这些对象的一个Observable
from() 将一个Iterable,一个Future或者一个数组转换成一个Observable
defer() 只有当订阅者订阅才创建Observable,为每个订阅创建一个新的Observable
range() 创建一个发射指定范围的整数序列的Observable
interval() 创建一个按照给定的时间间隔发射整数序列的Observable
timer() 创建一个在给定的延迟之后发射单个数据的Observable
empty() 创建一个什么都不做直接通知完成的Observable
error() 创建一个什么都不做直接通知错误的Observable
never() 创建一个不发射任何数据的Observable

一. Create

使用一个函数从头开始创建一个Observable

官方文档: reactivex.io/documentati…

RxJava建议我们在传递create方法的函数时,先检查一下观察者的isDisposed状态,以便在没有观察者的时候,让我们的Observable停止发射数据,防止运行昂贵的运算

  • 实际应用
fun testCreate() {
            Observable.create<Int> {
                //判断Observable的isDisposed状态
                if (!it.isDisposed) {
                    it.onNext(1)
                    it.onNext(2)
                    it.onNext(3)
                    it.onComplete()
                }
            }.subscribeBy(
                    onNext = { Log.e("TAG", "onNext: ${it.toString()}") },
                    onComplete = { Log.e("TAG", "onComplete") }
            )
        }

复制代码

执行结果

    onNext: 1
    onNext: 2
    onNext: 3
    onComplete
复制代码

二. Just

创建一个发射指定值的Observable

官方文档: reactivex.io/documentati…

just 可以接受一至十个参数,返回一个按参数列表顺序发射这些数据的Observable

  • 实际应用
fun testJust(){
            Observable.just(1,2,3,4,5,6,7,8,9,10)
                    .subscribeBy(
                            onNext = { Log.e("TAG", "onNext: ${it.toString()}") },
                            onComplete = { Log.e("TAG", "onComplete") }
                    )
        }
复制代码

执行结果

    onNext: 1
    onNext: 2
    onNext: 3
    onNext: 4
    onNext: 5
    onNext: 6
    onNext: 7
    onNext: 8
    onNext: 9
    onNext: 10
    onComplete
复制代码

如果在just()中传入null,则会抛出一个空指针异常

三. From

将其他种类的对象和数据类型转换为Observable

  • 实际应用
fun testFrom(){
            val list = arrayListOf<Int>(1,2,3,4,5)
            list.toObservable()
                    .subscribeBy(
                            onNext = { Log.e("TAG", "onNext: ${it.toString()}") },
                            onComplete = { Log.e("TAG", "onComplete") }
                    )
        }

复制代码

执行结果

    onNext: 1
    onNext: 2
    onNext: 3
    onNext: 4
    onNext: 5
    onComplete
复制代码

对于Future,它会发射Future.get()方法返回的单个数据

class MyCallable : Callable<String>{
        override fun call(): String {
            Log.e("TAG","模拟一些耗时操作...")
            Thread.sleep(5000)
            return "OK"
        }
    }
    
fun testFromFuture(){
            val executorService = Executors.newSingleThreadExecutor();
            val future = executorService.submit(MyCallable())
            Observable.fromFuture(future)
                    .subscribeBy(
                            onNext = {Log.e("TAG","onNext: $it")}
                    )
        }
复制代码

执行结果

 模拟一些耗时操作...
 onNext: OK
复制代码

from方法有一个可接受两个可选参数的版本,分别指定超时时长和时间单位.如果过了指定的时长,Future还没有返回,Observable就会发射错误通知并终止

fun testFromFuture(){
            val executorService = Executors.newSingleThreadExecutor();
            val future = executorService.submit(MyCallable())
            Observable.fromFuture(future,3,TimeUnit.SECONDS)
                    .subscribeBy(
                            onNext = {Log.e("TAG","onNext: $it")}
                    )
        }
复制代码

执行结果

模拟一些耗时操作...
AndroidRuntime: FATAL EXCEPTION: main
    
io.reactivex.exceptions.OnErrorNotImplementedException
复制代码

四. Repeat

创建一个发射特定数据重复多次的Observable

repeat 不是创建一个Observable,而是重复发射原始Observable的数据序列,这个序列可以是无限的,也可以通过repeat(n)指定重复次数

fun testRepeat(){
            Observable.just("hello")
                    //.repeat() //无限
                    .repeat(3)  //循环3次
                    .subscribeBy(
                            onNext = { Log.e("TAG", "onNext: ${it.toString()}") },
                            onComplete = { Log.e("TAG", "onComplete") },
                            onError = { it.printStackTrace() }
                    )
        }
复制代码

执行结果

 onNext: hello
 hello
 onComplete
复制代码

在RxJava2.x中还有两个repeat相关的操作符: repeatWhen 和 repeatUntil

1. repeatWhen

repeatWhen 不是缓存和重复原始Observable的数据序列,而是根据指定的条件重新订阅和发布原来的Observable

fun testRepeatWhen(){
            Observable.range(0,5)
                    .repeatWhen {
                        Observable.timer(10,TimeUnit.SECONDS)
                    }
                    .subscribeBy(
                            onNext = { Log.e("TAG", "onNext: ${it.toString()}") },
                            onComplete = { Log.e("TAG", "onComplete") },
                            onError = { it.printStackTrace() }
                    )
        }
复制代码

执行结果

09-06 05:55:14.901 22472-22472/com.mufeng.rxjavademo E/TAG: onNext: 0
    onNext: 1
    onNext: 2
    onNext: 3
    onNext: 4
09-06 05:55:24.903 22472-22505/com.mufeng.rxjavademo E/TAG: onNext: 0
09-06 05:55:24.904 22472-22505/com.mufeng.rxjavademo E/TAG: onNext: 1
    onNext: 2
    onNext: 3
    onNext: 4
09-06 05:55:24.911 22472-22505/com.mufeng.rxjavademo E/TAG: onComplete
复制代码
2. repeatUntil

repeatUntil是RxJava2.x新增的操作符,表示直到某个条件就不在重复发射数据

fun testRepeatUntil(){
            val time = System.currentTimeMillis();
            Observable.just("hello")
                    .repeatUntil {
                        System.currentTimeMillis() - time > 5000
                    }
                    .subscribeBy(
                            onNext = { Log.e("TAG", "onNext: ${it.toString()}") },
                            onComplete = { Log.e("TAG", "onComplete") },
                            onError = { it.printStackTrace() }
                    )
        }
复制代码

执行结果

09-06 06:02:15.220 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.552 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.552 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.568 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.569 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.578 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.579 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.579 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.579 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.580 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.580 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.581 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.581 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.583 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.584 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.609 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:16.609 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:18.384 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:18.384 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:20.177 22728-22728/com.mufeng.rxjavademo E/TAG: onNext: hello
09-06 06:02:20.178 22728-22728/com.mufeng.rxjavademo E/TAG: onComplete

复制代码

五. Empty/Never/Error

1. empty()

创建一个观察者对象,只发送onComplete事件

Observable.empty<Int>()
        .subscribeBy(
                onNext = { Log.e("TAG","接受onNext事件==$it")},
                onError = { Log.e("TAG","响应Error事件: ${it.localizedMessage}")},
                onComplete = { Log.e("TAG","响应Complete事件")}
        )
复制代码

2. error()

该方法创建的被观察者对象发送事件的特点:仅发送Error事件,直接通知异常
可自定义异常

Observable.error<Int>(Throwable("未知异常"))
        .subscribeBy(onError = { Log.e("TAG","响应Error事件: ${it.localizedMessage}")})
复制代码

3. never()

不发送任何事件

Observable.never<Int>()
        .subscribeBy(
                onNext = { Log.e("TAG","接受onNext事件==$it")},
                onError = { Log.e("TAG","响应Error事件: ${it.localizedMessage}")},
                onComplete = { Log.e("TAG","响应Complete事件")}
        )
复制代码

六. Defer

直到有观察者订阅时,才动态创建Observable 并为每个观察者创建一个全新的Observable

每次订阅时, 都会得到一个刚创建的最新的Observable对象, 可以确保Observable对象里面的数据是最新的

  • 常用操作符
  1. RxJava 2․x: defer
  2. RxKotlin: defer
//第一次赋值
var i = 100
val observable = Observable.defer {
    Observable.just(i)
}
//第二次复制
i = 200
            
//进行订阅, 此时才会去用defer()操作符去创建Observable对象
observable.subscribeBy(
        onNext = { Log.e("TAG", "接受onNext事件==$it") },
        onError = { Log.e("TAG", "响应Error事件: ${it.localizedMessage}") },
        onComplete = { Log.e("TAG", "响应Complete事件") }
)
复制代码

七. Timer

创建一个Observable, 它在一个给定的延迟后发射一个特殊的值

Observable.timer(2, TimeUnit.SECONDS)
        .subscribeBy(
                onNext = { Log.e("TAG", "接受onNext事件==$it") },
                onError = { Log.e("TAG", "响应Error事件: ${it.localizedMessage}") },
                onComplete = { Log.e("TAG", "响应Complete事件") }
        )
复制代码

八. Interval

按照一个固定时间间隔发射整数序列的Observable

  • 常用操作符
  1. RxJava 2.X: interval
  2. RxKotlin: interval
/**
 * 第一个参数: 第一次延迟时间
 * 第二个参数: 后续发送事件的间隔时间
 * 第三个参数: 时间单位
 */
Observable.interval(2,1,TimeUnit.SECONDS)
        .subscribeBy(
                onNext = { Log.e("TAG", "接受onNext事件==$it") },
                onError = { Log.e("TAG", "响应Error事件: ${it.localizedMessage}") },
                onComplete = { Log.e("TAG", "响应Complete事件") }
        )
复制代码

九. Range

连续发送一个事件序列,可以指定范围

/**
 * range 发送整数序列,没有延迟
 * 第一个参数: 事件序列的起始点
 * 第二个参数: 事件数量
 * 如果设置成负数则会直接抛出异常
 */
Observable.range(3,10)
        .subscribeBy(
                onNext = { Log.e("TAG", "接受onNext事件==$it") },
                onError = { Log.e("TAG", "响应Error事件: ${it.localizedMessage}") },
                onComplete = { Log.e("TAG", "响应Complete事件") }
        )
        
/**
 * intervalRange: 发送整数序列,可以设置延迟
 * 第一个参数: 事件序列起始点
 * 第二个参数: 事件数量
 * 第三个参数: 事件时间间隔
 * 第四个参数: 事件单位
 */
Observable.intervalRange(3,10,2,1,TimeUnit.SECONDS)
        .subscribeBy(
                onNext = { Log.e("TAG", "接受onNext事件==$it") },
                onError = { Log.e("TAG", "响应Error事件: ${it.localizedMessage}") },
                onComplete = { Log.e("TAG", "响应Complete事件") }
        )
复制代码
以上是Observable创建操作符的使用,这些操作符不仅在Observable可以使用,在Flowable等也可以使用
Demo地址: RxJavaDemo