提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
本文介绍项目开发中使用到rxjava的情形,以及详细的代码。
RxJava是一个基于事件流实现异步操作的框架,其作用是实现异步操作,类似于Android中的AsyncTask。它是在Java虚拟机(JVM)上使用可观测的序列来构建异步的、基于事件的程序。RxJava结合了观察者模式,迭代器模式和函数式的精华,最早由Netflix公司用于减少REST调用次数,后迁移到Java平台,并得到了广泛的应用。
RxJava的一些主要特点包括支持Java 8 Lambda表达式,支持异步和同步编程,具有单一依赖关系,以及简洁、优雅的代码风格。此外,RxJava还解决了“回调地狱”问题,异步处理不再需要回调一层套一层,而是用链式调用的方式完成不同线程的回调。
对于Android开发者来说,RxJava在开发过程中常与RxAndroid一同使用,RxAndroid是针对RxJava在Android平台上使用的响应式扩展组件。然而,尽管RxJava带来了编程上的便利,但其复杂性也使得一些开发者对其持有保留态度。
。
代码如下(示例):
implementation 'io.reactivex.rxjava2:rxjava:2.2.21'
需求:刚进入页面就进行连接(异步返回结果:失败、成功、连接中),点击按钮的时候,有几种状态: 1、连接失败--重新开始连接
? ? ? ? 1.1 连接成功 --调阅读的方法
? ? ? ? 1.2 连接失败 --UI进行提示失败
2、连接中
? ? ? ? ?2.1 连接成功 --调阅读的方法
? ? ? ? 2.2 连接失败 --UI进行提示失败
3、连接成功 --调阅读的方法
?
class MainActivity : AppCompatActivity() {
private val subHandle = SubHandle()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
subHandle.mConsumer = null
subHandle.connect().subscribe()
}
fun btRead(view: View) {
subHandle.handleStatus {
subHandle.read()
}
}
/**
* 刚进入页面就进行连接,点击按钮的时候,有几种状态:
* 1、连接失败--重新开始连接,
* 1.1 连接成功 --调阅读的方法
* 1.2 连接失败 --UI进行提示失败
* 2、连接中
* 2.1 连接成功 --调阅读的方法
* 2.2 连接失败 --UI进行提示失败
* 3、连接成功 --调阅读的方法
*/
class SubHandle {
var mConsumer: ((Int) -> Unit)? = null
private var status = AtomicInteger(-1) // 0连接失败 1正在连接中 2连接成功
private var disposable: Disposable? = null
fun connect(): Observable<Int> {
status.set(1)
Log.e("TAG", "=连接=")
return Observable.interval(5, TimeUnit.SECONDS)
.take(1)
.map {
val random = Random(System.currentTimeMillis())
val randomNumber = random.nextInt(3) // 生成一个0到2之间的随机整数
Log.e("TAG", "==funA输出$randomNumber")
randomNumber
}
.subscribeOn(Schedulers.io())
.doOnNext {
if (it == 2) {
status.set(2)
mConsumer?.invoke(status.get())
} else {
status.set(0)
Log.e("TAG", "连接阅读器失败,给UI提示")
}
}
}
fun handleStatus(consumer: (Int) -> Unit) {
mConsumer = consumer
when (status.get()) {
0 -> {
Log.e("TAG", "连接失败过,正重试连接")
disposable?.dispose()
disposable = connect().subscribe()
}
1 -> Log.e("TAG", "正在连接")
2 -> mConsumer?.invoke(status.get())
}
}
fun read() {
Log.e("TAG", "开始阅读")
}
}
}
class MainActivity : AppCompatActivity() {
private var canRead = false
private var connectStatus = 0 //1 代表 SUCC, 2 代表 FAIL, 0 代表 CONNECTING
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
connect()
}
private fun connect() {
Log.e("TAG", "=连接=")
Thread(Runnable {
Thread.sleep(5000) // 休眠5秒钟
Observable.just(randomStatus())
.doOnNext { connectStatus = it }
.filter {
Log.e("TAG", "it状态" + it)
it == 1 && canRead
}
.subscribeOn(Schedulers.io())
.doOnNext { read() }
.subscribe()
}).start()
}
fun btRead(view: View) {
canRead = true
Log.e("TAG", "点击按钮" + connectStatus)
when (connectStatus) {
1 -> read() // 1 代表 SUCC
2 -> connect() // 2 代表 FAIL
else -> {}
}
}
private fun read() {
Log.e("TAG", "开始阅读")
}
private fun randomStatus(): Int {
val random = Random(System.currentTimeMillis())
return random.nextInt(3) //生成一个0到2之间的随机整数
}
}
class MainActivity : AppCompatActivity() {
private val compositeDisposable = CompositeDisposable()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
}
@SuppressLint("CheckResult")
fun btRead(view: View) {
Log.e("TAG", "jjjjjj")
showFirstDialog()
.flatMap { showSecondDialog() }
.flatMap { showThirdDialog() }.subscribe({
Log.e("TAG", "3个弹窗都选了确定")
}, { error ->
Log.e("TAG", "点击了取消$error")
})
}
private fun showFirstDialog(): Observable<Unit> {
return Observable.create<Unit> { emitter ->
val dialog = AlertDialog.Builder(this)
.setMessage("第一个弹窗")
.setPositiveButton("确定") { _, _ ->
emitter.onNext(Unit) // 发送事件,表示点击了确定按钮
}
.setNegativeButton("取消") { _, _ ->
emitter.onError(Throwable("1取消")) // 发送错误事件,表示点击了取消按钮
}
.setOnCancelListener {
emitter.onError(Throwable("1取消")) // 发送错误事件,表示点击了返回键
}
.create()
dialog.show()
emitter.setCancellable { dialog.dismiss() } // 在取消订阅时关闭弹窗
}
}
private fun showSecondDialog(): Observable<Unit> {
return Observable.create<Unit> { emitter ->
val dialog = AlertDialog.Builder(this)
.setMessage("第二个弹窗")
.setPositiveButton("确定") { _, _ ->
emitter.onNext(Unit)
}
.setNegativeButton("取消") { _, _ ->
emitter.onError(Throwable("2取消"))
}
.setOnCancelListener {
emitter.onError(Throwable("2取消"))
}
.create()
dialog.show()
emitter.setCancellable { dialog.dismiss() }
}
}
private fun showThirdDialog(): Observable<Unit> {
return Observable.create<Unit> { emitter ->
val dialog = AlertDialog.Builder(this)
.setMessage("第三个弹窗")
.setPositiveButton("确定") { _, _ ->
emitter.onNext(Unit)
}
.setNegativeButton("取消") { _, _ ->
emitter.onError(Throwable("3取消"))
}
.setOnCancelListener {
emitter.onError(Throwable("3取消"))
}
.create()
dialog.show()
emitter.setCancellable { dialog.dismiss() }
}
}
}
RxJava是一个基于Java语言的Reactive Extensions库,它用于实现异步编程和流式处理,通过将事件和数据流以数据序列的形式进行处理,提高了代码的可读性和可维护性。