单个网络请求数据并更新UI
这个比较简单,整个流程大致是:
通过
Obsrvable.create
方法,调用OkHttp
网络请求通过
map
方法结合gson
,将response
转换为bean
类通过
onNext
,解析bean
中数据,并进行数据库存储调度线程
通过
subscribe
,根据请求成功或异常来更新UI
Observable.create(new ObservableOnSubscribe<Response>() { @Override public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception { Request.Builder builder = new Request.Builder() .url("url") .get(); Request request = builder.build(); Call call = new OkHttpClient().newCall(request); Response response = call.execute(); e.onNext(response); } }).map(new Function<Response, Bean>() { @Override public Bean apply(@NonNull Response response) throws Exception { //Gson } }).doOnNext(new Consumer<Bean>() { @Override public void accept(@NonNull Bean bean) throws Exception { //saveData } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Bean>() { @Override public void accept(@NonNull Bean bean) throws Exception { //refresh UI } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { //get ERROR } });
多个网络请求依次依赖
这里主要是依赖于flatMap
关键字,FlatMap
可以将一个发射数据的Observable
变换为多个Observables
,然后将它们发射的数据合并后放进一个单独的Observable
。
利用这个特性,我们可以将Observable
转成另一个Observable
Observable.create(new ObservableOnSubscribe<Response>() { @Override public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception { Request.Builder builder = new Request.Builder() .url("url") .get(); Request request = builder.build(); Call call = new OkHttpClient().newCall(request); Response response = call.execute(); e.onNext(response); } }).map(new Function<Response, FirstBean>() { @Override public FirstBean apply(@NonNull Response response) throws Exception { //Gson } }).flatMap(new Function<FirstBean, ObservableSource<Response>>() { @Override public ObservableSource<Response> apply(@NonNull FirstBean bean) throws Exception { final String s = bean.getData(); return Observable.create(new ObservableOnSubscribe<Response>() { @Override public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception { Request.Builder builder = new Request.Builder() .url("url/" + s) .get(); Request request = builder.build(); Call call = new OkHttpClient().newCall(request); Response response = call.execute(); e.onNext(response); } }); } }).map(new Function<Response, SecondBean>() { @Override public SecondBean apply(@NonNull Response response) throws Exception { //Gson } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<SecondBean>() { @Override public void accept(@NonNull SecondBean secondBean) throws Exception { //refresh UI } });
先读取缓存数据并展示UI再获取网络数据刷新UI
这里需要依赖另一个操作符:Concat
concat
可以做到不交错的发射两个或多个Observable
的发射物,并且只有前一个Observable
终止(onComleted
)才会订阅下一个Obervable
利用这个特性,我们就可以依次的读取缓存数据展示UI,然后再获取网络数据刷新UI
首先创建一个从cache获取数据的observable
再创建一个从网络获取数据的Observable(可以通过map等方法转换数据类型)
通过concat方法将多个observable结合起来
通过subscribe订阅每一个observable
Observable<List<String>> cache = Observable.create(new ObservableOnSubscribe<List<String>>() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { CacheManager manager = CacheManager.getInstance(); List<String> data = manager.query(); e.onNext(data); //一定要有onComplete,不然不会执行第二个Observale e.onComplete(); } }); Observable<List<String>> network = Observable.create(new ObservableOnSubscribe<Response>() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { Request.Builder builder = new Request.Builder() .url("url") .get(); Request request = builder.build(); Call call = new OkHttpClient().newCall(request); Response response = call.execute(); e.onNext(response); e.onComplete(); } }).map(new Function<Response, List<String>>() { @Override public List<String> apply(@NonNull Response response) throws Exception { //解析数据 } });//两个observable的泛型应该保持一致Observable.concat(cache, network) .subscribeOn(Schedulers.io()) .subscribe(new Consumer<List<String>>() { @Override public void accept(@NonNull List<String> strings) throws Exception { //refresh ui } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { //get error } });
获取网络数据前先读取缓存
其实和上面的那种类似,只需要稍微修改一下逻辑即可:
当缓存的Observable
获取到数据时,只执行onNext
,获取不到则只执行onComplete
Observable<String> cache = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { CacheManager manager = CacheManager.getInstance(); String data = manager.queryForPosition(0); if (data != null) { e.onNext(data); } else { //调用onComplete之后会执行下一个Observable //如果缓存为空,那么直接结束,进行网络请求 e.onComplete(); } } }); Observable<String> network = Observable.create(new ObservableOnSubscribe<Response>() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { Request.Builder builder = new Request.Builder() .url("url") .get(); Request request = builder.build(); Call call = new OkHttpClient().newCall(request); Response response = call.execute(); e.onNext(response); e.onComplete(); } }).map(new Function<Response, String>() { @Override public String apply(@NonNull Response response) throws Exception { //解析数据 } });//两个observable的泛型应该保持一致Observable.concat(cache, network) .subscribeOn(Schedulers.io()) .subscribe(new Consumer<String>() { @Override public void accept(@NonNull String strings) throws Exception { //refresh ui } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { //get error } });
当然,有的时候我们的缓存可能还会分为memory
和disk
,无差,只需要多写一个Observable
然后一样通过concat
合并即可。
结合多个接口的数据再更新UI
这个时候就需要靠zip
方法啦,zip
方法可以将多个Observable
的数据结合为一个数据源再发射出去。
Observable<FirstBean> firstRequest = Observable.create(new ObservableOnSubscribe<Response>() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { Request.Builder builder = new Request.Builder() .url("firstUrl") .get(); Request request = builder.build(); Call call = new OkHttpClient().newCall(request); Response response = call.execute(); e.onNext(response); e.onComplete(); } }).map(new Function<Response, FirstBean>() { @Override public FirstBean apply(@NonNull Response response) throws Exception { //解析数据 } }); Observable<SecondBean> secondRequest = Observable.create(new ObservableOnSubscribe<Response>() { @Override public void subscribe(@NonNull ObservableEmitter e) throws Exception { Request.Builder builder = new Request.Builder() .url("secondUrl") .get(); Request request = builder.build(); Call call = new OkHttpClient().newCall(request); Response response = call.execute(); e.onNext(response); e.onComplete(); } }).map(new Function<Response, SecondBean>() { @Override public SecondBean apply(@NonNull Response response) throws Exception { //解析数据 } }); Observable.zip(firstRequest, secondRequest, new BiFunction<FirstBean, SecondBean, WholeBean>() { @Override public WholeBean apply(@NonNull FirstBean firstBean, @NonNull SecondBean secondBean) throws Exception { //结合数据为一体 } }) .subscribeOn(Schedulers.io()) .subscribe(new Consumer<WholeBean>() { @Override public void accept(@NonNull WholeBean strings) throws Exception { //refresh ui } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { //get error } });
当然,如果你的两个api返回的是相同类型的数据,那么可以直接使用merge
将数据合并,而不需要实现回调。
减少频繁的网络请求
设想一种场景:点击一次button就进行一次网络请求,或者当输入框数据变化时进行网络请求,那么这样就会在一下子产生大量的网络请求,但实际上又没有必要,这个时候就可以通过debounce
方法来处理,debounce
操作符会过滤掉发射速率过快的数据项:
为了方便处理点击事件
和Observable
的关系,我们引入RxBinding处理:
RxView.clicks(mButton) .debounce(2, TimeUnit.SECONDS) .subscribe(new Consumer<Object>() { @Override public void accept(@NonNull Object o) throws Exception { // refresh ui } }, new Consumer<Throwable>() { @Override public void accept(@NonNull Throwable throwable) throws Exception { // get error } });
评论