前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >关于RxJava的基础心法解析

关于RxJava的基础心法解析

作者头像
静默加载
发布于 2020-05-29 03:07:07
发布于 2020-05-29 03:07:07
46200
代码可运行
举报
运行总次数:0
代码可运行

前言

我接触Rxjava是在2015年底,已经过去4年的时间了。

2016年学习过一阵子RxJava的操作符也做过一些笔记,我们项目的网络请求框架也替换成了Okhttp+Retrofit,所以使用RxJava做线程间切换就非常好用。

一开始接触RxJava感觉除了线程切换之外很能发现其实际的作用,因为我感觉自己响应式编程的思想,很难实际运用到开发需求当中去。但我身边有一位前辈使用Rxjava非常溜,他一般做需求的时候写的都是流式的代码。

2017年Kotlin语言Google举行的I/O开发者大会上宣布,将Kotlin语言作为安卓开发的一级编程语言,所以自己又看了看了Kotlin语言。

RxJava在我们项目中还是静静的躺着,因为自己懒的思考,懒的在代码结构上做更新,懒的对RxJava做研究。有时候感觉自己就算会了RxJava也不会将其使用在项目当中,因为自己什么业务场景之下使用Rxjava更加方便。

2018就这么有一下没一下的使用RxJava,最近在做需求开发的时候用的RxJava比较多了,一些业务场景也逐渐思考使用响应式编程。思考这样写的好处,以及怎么将之前的代码结构转化为流式结构。

感觉有时候思维观念的转变是一个漫长的过程,但有时候又会很快。凡事都可以熟能生巧,我们使用RxJava多了之后再笨也会思考。之前想不到RxJava的使用场景是因为自己见的、写的代码还不够多。

今天回过头来从代码的角度看看一次RxJava 的基础操作,事件订阅到触发的过程。

这里推荐一篇RxJava的入门的文章 给 Android 开发者的 RxJava 详解

读完本篇文章希望所有读者能明白RxJava的观察者与java的观察者模式有什么不同,以及Rxjava的观察者模式的代码运行过程。至于怎么具体的使用 Rxjava 那么就需要更多学习和实践了。

Java的观察者模式

观察者:Observer

被观察者:Observable

被观察者提供添加(注册)观察者的方法; 被观察者更新的同时可以主动通知注册它观察者更新;

观察者模式面向的需求是:收音机听广播,电台是被观察者,收音机是观察者。收音机调频到广播的波段(注册),广播发送信息(被观察者更新数据,通知所有的观察者)收音机接受信息从而播放声音(观察者数据更新)。

RxJava的观察者模式

可观察者(被观察者):Observalbe

观察者:Observer

订阅操作:subscribe()

订阅:Subscription

订阅者:Subscriber ,实现 ObserverSubscription

发布: OnSubscribe

ObservableSubscriber 通过 subscribe() 方法实现订阅关系,从而 Observable 在被订阅之后就会触发 OnSubscribe.call 进行发布事件来通知 Subscriber

RxJavaObservable.png

分析源代码

我们先看一个简单的打印数据的例子:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Observable.just(1)
    .subscribe(new Subscriber<Object>() {
        @Override
        public void onCompleted() {
            System.out.println("onCompleted");
        }
        @Override
        public void onError(Throwable e) {
            System.out.println("onError");
        }
        @Override
        public void onNext(Object o) {
            System.out.println("onNext");
            System.out.println(Integer.valueOf(o.toString()));
        }
    });

RxJava 的版本已经更新到2了,我们现在还用的是版本1。版本1中1.0和1.3这两个版本用的比较多。但这两个RxJava 版本之前改动不是很大,我们来分析分析最初始的版本,主要看看其中的设计思想啥的~!

谁触发了被观察者

我们进行了 subscribe 之后就会触发 Observable 的执行动作,然后将执行结果传输给订阅它的 Subscriber

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
//无参的subscribe
public final Subscription subscribe() {
    return subscribe(new Subscriber<T>() {
        @Override
        public final void onCompleted() {}
        @Override
        public final void onError(Throwable e) {throw new OnErrorNotImplementedException(e);}
        @Override
        public final void onNext(T args) {}
    });
}
/***部分代码省略***/
//onnext、onerror、oncomplete参数
public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete) {
    if (onNext == null) {
        throw new IllegalArgumentException("onNext can not be null");
    }
    if (onError == null) {
        throw new IllegalArgumentException("onError can not be null");
    }
    if (onComplete == null) {
        throw new IllegalArgumentException("onComplete can not be null");
    }
    return subscribe(new Subscriber<T>() {
        @Override
        public final void onCompleted() {onComplete.call();}
        @Override
        public final void onError(Throwable e) {onError.call(e);}
        @Override
        public final void onNext(T args) {onNext.call(args);}
    });
}
//observer参数
public final Subscription subscribe(final Observer<? super T> observer) {
    return subscribe(new Subscriber<T>() {
        @Override
        public void onCompleted() {observer.onCompleted();}
        @Override
        public void onError(Throwable e) {observer.onError(e);}
        @Override
        public void onNext(T t) {observer.onNext(t);}
    });
}
//Subscriber参数
public final Subscription subscribe(Subscriber<? super T> subscriber) {
    if (subscriber == null) {
        throw new IllegalArgumentException("observer can not be null");
    }
    if (onSubscribe == null) {
        throw new IllegalStateException("onSubscribe function can not be null.");
    }
    //调用订阅者的start方法
    subscriber.onStart();
    if (!(subscriber instanceof SafeSubscriber)) {
        subscriber = new SafeSubscriber<T>(subscriber);
    }
    try {
        //Observable的OnSubscribe调用call方法
        hook.onSubscribeStart(this, onSubscribe).call(subscriber);
        return hook.onSubscribeReturn(subscriber);
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        try {
            //调用订阅者的onError方法
            subscriber.onError(hook.onSubscribeError(e));
        } catch (OnErrorNotImplementedException e2) {
            throw e2;
        } catch (Throwable e2) {
            RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
            hook.onSubscribeError(r);
            throw r;
        }
        return Subscriptions.unsubscribed();
    }
}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();

class RxJavaObservableExecutionHookDefault extends RxJavaObservableExecutionHook {
    private static RxJavaObservableExecutionHookDefault INSTANCE = new RxJavaObservableExecutionHookDefault();
    public static RxJavaObservableExecutionHook getInstance() {
        return INSTANCE;
    }
}
public abstract class RxJavaObservableExecutionHook {
    public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
        // pass-thru by default
        return onSubscribe;
    }
}

RxJavaObservableExecutionHookonSubscribeStart 可以看出 hook.onSubscribeStart(this, onSubscribe).call(subscriber); 实际是 onSubscribe.call(subscirber)

开始你的表演:Observable.OnSubscribe.call

刚才我们了解到通过 subscirbe 可以通知被观察者进行 call 操作。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class Observable<T> {
    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }
    /***部分代码省略***/
    public final static <T> Observable<T> just(final T value) {
        return ScalarSynchronousObservable.create(value);
    }
}

public final class ScalarSynchronousObservable<T> extends Observable<T> {
    public static final <T> ScalarSynchronousObservable<T> create(T t) {
        return new ScalarSynchronousObservable<T>(t);
    }
    private final T t;
    protected ScalarSynchronousObservable(final T t) {
        super(new OnSubscribe<T>() {
            @Override
            public void call(Subscriber<? super T> s) {
                s.onNext(t);
                s.onCompleted();
            }
        });
        this.t = t;
    }
    public T get() {
        return t;
    }
}

我们最后看到 Observable.just(1) 生成的 Observable 实际是 ScalarSynchronousObservable 实例。 Observable.OnSubscribe 实际上是:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
new OnSubscribe<T>() {
    @Override
    public void call(Subscriber<? super T> s) {
        s.onNext(t);
        s.onCompleted();
    }
}

所以 onSubscribe.call(subscirber) 最终调用的是了 subscirberonNext和onCompleted 方法。

总结

对于Android开发人员开发来说 RxJava 是一个很好用的库,但是需要我们转化平时的对代码结构设计的思想,能很好的去使用到大部分的业务场景之中。只有对 RxJava 有了足够的了解我们才能灵活、熟练的使用。

本篇文章只是一个 RxJava 简单的基础开篇,观察者:Observer订阅操作:subscribe()订阅:Subscription订阅者:Subscriber 以及 ObserverSubscription 的订阅关系,之后我会慢慢的学习和分享关于 RxJava 更多的知识。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
【Android】RxJava的使用(一)基本用法
前言 最近经常看到RxJava这个字眼,也看到很多人在博客中推荐使用RxJava。好奇的我怎么能错过,于是Google了一下,说RxJava好用的和说RxJava难用的都有,于是自己也亲自尝试了一下(感觉不容易上手)。写博客记录下学习的过程,一方面作为巩固,另一方面希望能帮助到初学者。 (第一篇只介绍了RxJava的基本用法,暂时还无法看出RxJava的优势。切莫着急,随着慢慢深入就能体会到RxJava带来的方便了) 参考:给 Android 开发者的 RxJava 详解 (本文部分内容引用自该博客)
Gavin-ZYX
2018/05/18
1.3K0
RxJava简析
rxjava文档地址https://mcxiaoke.gitbooks.io/rxdocs/content/ 这个是中文版的
用户3112896
2020/11/25
7420
源码阅读--RxJava(一)
所有有关RxJava,RxAndroid的介绍性文章在这里贴出一二: http://blog.csdn.net/caroline_wendy/article/details/50444461 http://frodoking.github.io/2015/09/08/reactivex/
提莫队长
2019/02/21
3760
Android:这是一篇 清晰 易懂的Rxjava 入门教程
2、如果读者还没学习过Rxjava 1.0也没关系,因为Rxjava 2.0只是在Rxjava 1.0上增加了一些新特性,本质原理 & 使用基本相同
用户2802329
2018/08/07
8690
Android:这是一篇 清晰 易懂的Rxjava 入门教程
Rxjava源码解析笔记 | 创建Observable 与 Observer/Subscriber 以及之间订阅实现的源码分析
接下来是Rxjava的SDK中subscribe()的传入参数 是Observer时候(observable.subscribe(observer);)的源码:
凌川江雪
2019/06/11
1.6K0
RxJava 详解
我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava ,并且使用的场景越来越多 。而最近这几个月,我也发现国内越来越多的人开始提及 RxJava 。有人说『RxJava 真是太好用了』,有人说『RxJava 真是太难用了』,另外更多的人表示:我真的百度了也谷歌了,但我还是想问: RxJava 到底是什么?
AWeiLoveAndroid
2018/09/03
1.9K0
RxJava 详解
RxJava2.0你不知道的事(三)
后面三种观察者模式差不多,Maybe/MaybeObserver可以说是Single/SingleObserver和Completable/CompletableObserver的复合体。
开发者技术前线
2020/11/23
6840
RxJava2.0你不知道的事(三)
RxJava
我不想说这些乱七八糟的概念了,实际上是我根本说不清,不过观察者模式和回调机制要知道
spilledyear
2019/01/28
1.2K0
RxJava
RxJava再回首
很早前就看了RxJava,当时就觉得好牛掰,但是公司项目一直没有用起来,知识不用就会忘,前段时间突然要写RxJava,发现已经不会写了。所以今天再回头整理一下RxJava的头绪,一方面给其它想了解RxJava的人提供参考,另一方面也是给自己将来再遗忘时回来翻阅。
大公爵
2018/09/05
8750
RxJava再回首
Rxjava源码解析笔记 | Rxjava基本用法
Rxjava四要素 被观察者 在Rxjava当中, 决定什么时候触发事件, 决定触发什么样的事件; 观察者 决定事件触发的时候将产生什么样的行为; 类似于传统观察者模式, 观察者会随着被观察者的状态变化而发生相应的操作; 订阅 区别于传统观察者模式; 观察者和被观察者需要通过订阅来联系; 通过subscribe()方法完成这个订阅关系; 完成订阅关系后, 即可令被观察者(Observable)在需要的时候, 发出事件来通知观察者(Observer) 事件 区别于传统观察者模式;
凌川江雪
2019/06/05
7210
RxAndroid从零开始学习之一(RxJava的简单Demo)
现在RxJava,RxAndroid似乎很火,很多开发群里面都在讨论。因为本人虽然一直在开发Android,但近两年跟系统内置app和framework层打交道更多,没有想到脱离互联网领域这么点时间,这么多新技术就冒出来了。所以想想很有危机感,感觉脱离群众很久了。那么,来吧。打算用一段时间将这个知识点从零开始学起。
Frank909
2019/01/14
2.4K0
Android 2 新框架 rxjava ,retrifit
Rxjava主要作用就是用来处理异步,当你的业务需要访问数据库,访问网络,或者任何耗时的操作,都可以借助Rxjava来实现。  但是有人说在Android中已经有很多异步操作的API,比如Handler,AsyncTask等,这些都能满足基本的异步操作,为什么还要使用Rxjava呢?  首先我们开看一个例子做个比较:
zhangjiqun
2024/12/16
1470
深入RxJava2 源码解析(一)
ReactiveX 响应式编程库,这是一个程序库,通过使用可观察的事件序列来构成异步和事件驱动的程序。
aoho求索
2019/03/07
1.3K0
Android RxJava:一步步带你源码分析 RxJava
步骤1:创建被观察者(Observable)& 定义需发送的事件 步骤2:创建观察者(Observer) & 定义响应事件的行为 步骤3:通过订阅(subscribe)连接观察者和被观察者
Carson.Ho
2019/02/22
6090
Android RxJava的使用
首语 最近因为项目上线,挤不出时间,已经好久没有更新博客了😛,目前项目也做差不多了,写几篇总结类型的博客,梳理一下。 本文主要对RxJava及常用操作符的使用进行总结,同时对RxJava在Android中几种常见的使用场景进行举例。 简介 RxJava是Reactive Extensions的Java VM实现:该库用于通过使用可观察的序列来组成异步和基于事件的程序。 Rx是Reactive Extensions的缩写的简写,它是一个使用可观察数据流进行异步编程的编程接口,Rx结合了观察者模式、迭代器模
八归少年
2022/06/29
3.1K0
Android RxJava的使用
RxJava系列六(从微观角度解读RxJava源码)
前言 通过前面五个篇幅的介绍,相信大家对RxJava的基本使用以及操作符应该有了一定的认识。但是知其然还要知其所以然;所以从这一章开始我们聊聊源码,分析RxJava的实现原理。本文我们主要从三个方面来分析RxJava的实现: RxJava基本流程分析 操作符原理分析 线程调度原理分析 本章节基于RxJava1.1.9版本的源码 一、RxJava执行流程分析 在RxJava系列2(基本概念及使用介绍)中我们介绍过,一个最基本的RxJava调用是这样的: 示例A Observable.create(new
张磊BARON
2018/04/13
1.6K0
RxJava系列六(从微观角度解读RxJava源码)
Rxjava 2.x 源码系列 - 基础框架分析
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/gdutxiaoxu/article/details/80501816
程序员徐公
2018/09/18
5400
Rxjava 2.x 源码系列 - 基础框架分析
这是一篇清晰易懂的 Rxjava 入门教程
原文作者:Carson_Ho 原文地址:http://www.jianshu.com/p/a406b94f3188 特别声明:本文为Carson_Ho原创并授权发布,未经原作者允许请勿转载,转载请联系
非著名程序员
2018/02/09
6.9K0
这是一篇清晰易懂的 Rxjava 入门教程
锦囊篇|一文摸懂RxJava
(1)包结构变化RxJava 3 components are located under the io.reactivex.rxjava3 package (RxJava 1 has rx and RxJava 2 is just io.reactivex. This allows version 3 to live side by side with the earlier versions. In addition, the core types of RxJava (Flowable, Observer, etc.) have been moved to io.reactivex.rxjava3.core.为了阅读障碍的朋友们给出我的一份四级水准翻译,有以下的几点变化:
ClericYi
2020/06/23
8390
一篇博客让你了解RxJava
RxJava可以说是2016年最流行的项目之一了,最近也接触了一下RxJava,于是想写一篇博客,希望能通过这篇博客让大家能对其进行了解,本篇博客是基于RxJava2.0,跟RxJava1.0还是有很多不同的
老马的编程之旅
2022/06/22
5540
一篇博客让你了解RxJava
相关推荐
【Android】RxJava的使用(一)基本用法
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验