首页 Android正文

Retrofit+RxJava网络请求详解实战

yuange Android 2019-01-08 791 0 Retrofit+RxJava

前言

Retrofit是目前主流的网络请求框架,功能强大,操作便捷。
RxJava是实现异步操作的库。可在线程间快速切换,同时提供许多操作符,使一些复杂的操作代码变得清晰有条理。
两者结合使用后,使得网络请求更加简洁,尤其在嵌套请求等特殊场景大有作为。

本文侧重于介绍Retrofit网络请求,以及它是如何结合RxJava使用的。还没了解过RxJava的建议先到上面贴出的参考地址学习,以便更好明白两者结合的过程。

文章篇幅较长,因为希望尽可能涵盖常用、实用的模块。

demo以及文章中的RxJava部分,已从1.x更新到2.x。


介绍

下面通过配置,请求,异常处理,生命周期管理,失败重试,监听进度,封装,混淆这几个部分来介绍。

1. 配置

1.1 添加依赖

//Rxjava
compile 'io.reactivex.rxjava2:rxjava:2.1.6'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
//Retrofit
compile 'com.squareup.retrofit2:retrofit:2.3.0'
compile 'com.squareup.retrofit2:converter-gson:2.3.0'
compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
compile 'com.squareup.okhttp3:logging-interceptor:3.8.0'

1.2 开启Log日志

OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder();
//启用Log日志
HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
okHttpClientBuilder.addInterceptor(loggingInterceptor);

开启后,则可以在Log日志中看到网络请求相关的信息了,如请求地址,请求状态码,返回的结果等。

Log日志截图

1.3 开启Gson转换

Retrofit.Builder retrofitBuilder = new Retrofit.Builder();
//配置转化库,采用Gson
retrofitBuilder.addConverterFactory(GsonConverterFactory.create());

开启后,会自动把请求返回的结果(json字符串)自动转化成与其结构相符的实体。

1.4 采用Rxjava

Retrofit.Builder retrofitBuilder = new Retrofit.Builder();
//配置回调库,采用RxJava
retrofitBuilder.addCallAdapterFactory(RxJava2CallAdapterFactory.create());

1.5 设置基础请求路径BaseUrl

Retrofit.Builder retrofitBuilder = new Retrofit.Builder();
//服务器地址,基础请求路径,最好以"/"结尾
retrofitBuilder.baseUrl("https://api.douban.com/");

1.6 设置请求超时

OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder();
//设置请求超时时长为15秒
okHttpClientBuilder.connectTimeout(15, TimeUnit.SECONDS);

1.7 设置缓存

Interceptor cacheIntercepter=new Interceptor() {
     @Override
     public Response intercept(Chain chain) throws IOException {
         //对request的设置用来指定有网/无网下所走的方式
         //对response的设置用来指定有网/无网下的缓存时长

         Request request = chain.request();
         if (!NetworkUtil.isNetWorkAvailable(mContext)) {
             //无网络下强制使用缓存,无论缓存是否过期,此时该请求实际上不会被发送出去。
             //有网络时则根据缓存时长来决定是否发出请求
             request = request.newBuilder()
             .cacheControl(CacheControl.FORCE_CACHE).build();
         }

         Response response = chain.proceed(request);
         if (NetworkUtil.isNetWorkAvailable(mContext)) {
             //有网络情况下,超过1分钟,则重新请求,否则直接使用缓存数据
             int maxAge = 60; //缓存一分钟
             String cacheControl = "public,max-age=" + maxAge;
             //当然如果你想在有网络的情况下都直接走网络,那么只需要
             //将其超时时间maxAge设为0即可
              return response.newBuilder()
              .header("Cache-Control",cacheControl)
              .removeHeader("Pragma").build();
         } else {
             //无网络时直接取缓存数据,该缓存数据保存1周
             int maxStale = 60 * 60 * 24 * 7 * 1;  //1周
             return response.newBuilder()
             .header("Cache-Control", "public,only-if-cached,max-stale=" + maxStale)
             .removeHeader("Pragma").build();
         }

     }
 };
 
File cacheFile = new File(mContext.getExternalCacheDir(), "HttpCache");//缓存地址
Cache cache = new Cache(cacheFile, 1024 * 1024 * 50); //大小50Mb

//设置缓存方式、时长、地址
OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder();
okHttpClientBuilder.addNetworkInterceptor(cacheIntercepter);
okHttpClientBuilder.addInterceptor(cacheIntercepter);
okHttpClientBuilder.cache(cache);

1.8 设置header

可统一设置

Interceptor headerInterceptor = new Interceptor() {
     @Override
     public Response intercept(Chain chain) throws IOException {
         Request originalRequest = chain.request();
         Request.Builder builder = originalRequest.newBuilder();
         //设置具体的header内容
         builder.header("timestamp", System.currentTimeMillis() + "");

         Request.Builder requestBuilder = 
         builder.method(originalRequest.method(), originalRequest.body());
         Request request = requestBuilder.build();
         return chain.proceed(request);
     }
 };
//设置统一的header
OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder();
okHttpClientBuilder.addInterceptor(getHeaderInterceptor());

也可在请求方法中单独设置

@Headers("Cache-Control: max-age=120")
@GET("请求地址")
Observable<HttpResult> getInfo();

或者

@GET("请求地址")
Observable<HttpResult> getInfo(@Header("token") String token);

1.9 设置https访问

现在不少服务器接口采用了https的形式,所以有时就需要设置https访问。
下面列举“客户端内置证书”时的配置方法,其他方式请参考 http://blog.csdn.net/dd864140130/article/details/52625666

//设置https访问(验证证书,请把服务器给的证书文件放在R.raw文件夹下)
okHttpClientBuilder.sslSocketFactory(getSSLSocketFactory(mContext, new int[]{R.raw.tomcat}));
okHttpClientBuilder.hostnameVerifier(org.apache.http.conn.ssl.SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);

getSSLSocketFactory()方法如下:

//设置https证书
protected static SSLSocketFactory getSSLSocketFactory(Context context, int[] certificates) {

    if (context == null) {
        throw new NullPointerException("context == null");
    }

    //CertificateFactory用来证书生成
    CertificateFactory certificateFactory;
    try {
        certificateFactory = CertificateFactory.getInstance("X.509");
        //Create a KeyStore containing our trusted CAs
        KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
        keyStore.load(null, null);

        for (int i = 0; i < certificates.length; i++) {
            //读取本地证书
            InputStream is = context.getResources().openRawResource(certificates[i]);
            keyStore.setCertificateEntry(String.valueOf(i), certificateFactory
            .generateCertificate(is));

            if (is != null) {
                is.close();
            }
        }
        
        //Create a TrustManager that trusts the CAs in our keyStore
        TrustManagerFactory trustManagerFactory = TrustManagerFactory
        .getInstance(TrustManagerFactory.getDefaultAlgorithm());
        trustManagerFactory.init(keyStore);

        //Create an SSLContext that uses our TrustManager
        SSLContext sslContext = SSLContext.getInstance("TLS");
        sslContext.init(null, trustManagerFactory.getTrustManagers(), new SecureRandom());
        return sslContext.getSocketFactory();

    } catch (Exception e) {

    }
    return null;
}

1.10 综合前面的配置

OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder();
//设置请求超时时长
okHttpClientBuilder.connectTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
//启用Log日志
okHttpClientBuilder.addInterceptor(getHttpLoggingInterceptor());
//设置缓存方式、时长、地址
okHttpClientBuilder.addNetworkInterceptor(getCacheInterceptor());
okHttpClientBuilder.addInterceptor(getCacheInterceptor());
okHttpClientBuilder.cache(getCache());
//设置https访问(验证证书)
okHttpClientBuilder.sslSocketFactory(getSSLSocketFactory(mContext, new int[]{R.raw.tomcat}));
okHttpClientBuilder.hostnameVerifier(org.apache.http.conn.ssl.SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
//设置统一的header
okHttpClientBuilder.addInterceptor(getHeaderInterceptor());

Retrofit retrofit = new Retrofit.Builder()
           //服务器地址
           .baseUrl(UrlConstants.HOST_SITE_HTTPS)
           //配置转化库,采用Gson
           .addConverterFactory(GsonConverterFactory.create())
           //配置回调库,采用RxJava
           .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
           //设置OKHttpClient为网络客户端
           .client(okHttpClientBuilder.build()).build();

配置后得到的retrofit变量用于后面发起请求。

2. 请求

2.1 创建API接口

定义一个接口,在其中添加具体的网络请求方法。
请求方法的格式大致如下:

@其他声明
@请求方式("请求地址")
Observable<请求返回的实体> 请求方法名(请求参数);

或者

@其他声明
@请求方式
Observable<请求返回的实体> 请求方法名(@Url String 请求地址,请求参数);

第一种格式中的请求地址,填写基础请求路径baseUrl后续的部分即可,当然填写完整地址也是可以的。
第二种格式中的请求地址,需填写完整的地址。



下面列举Get请求Post请求文件上传文件下载的接口定义。
其中HttpResult是自定义的、与后台返回的json数据结构相符的实体。

  • Get请求

请求参数逐个传入

@GET("v2/movie/in_theaters")
Observable<HttpResult> getPlayingMovie(@Query("start") int start, @Query("count") int count);

请求参数一次性传入(通过Map来存放key-value)

@GET("v2/movie/in_theaters")
Observable<HttpResult> getPlayingMovie(@QueryMap Map<String, String> map);

以上两种方式,请求参数是以“?key=vale%key=value...”方式拼接到地址后面的,假如你需要的是以"/value"的方式拼接到地址后面(restful模式?),那么可以通过@Path注解来实现:

@GET("v2/movie/in_theaters/{start}/{count}")
Observable<HttpResult> getPlayingMovie(@Path("start") int start, @Path("count") int count);
  • Post请求

请求参数逐个传入

@FormUrlEncoded
@POST("请求地址")
Observable<HttpResult> getInfo(@Field("token") String token, @Field("id") int id);

请求参数一次性传入(通过Map来存放参数名和参数值)

@FormUrlEncoded
@POST("请求地址")
Observable<HttpResult> getInfo(@FieldMap Map<String, String> map);
  • 上传文本+文件

1)上传单个文本和单个文件

@Multipart
@POST("请求地址")
Observable<HttpResult> upLoadTextAndFile(@Part("textKey") RequestBody textBody, 
                @Part("fileKey\"; filename=\"test.png") RequestBody fileBody);

第一个参数用于传文本,

--- @Part("textKey")中的"textKey"为文本参数的参数名。

--- RequestBody textBody为文本参数的参数值,生成方式如下:
RequestBody textBody = RequestBody.create(MediaType.parse("text/plain"), text);

第二个参数用于传文件,

--- @Part("fileKey"; filename="test.png")
其中的"fileKey"为文件参数的参数名(由服务器后台提供)
其中的"test.png"一般是指你希望保存在服务器的文件名字,传入File.getName()即可

--- RequestBody fileBody为文件参数的参数值,生成方法如下:
RequestBody fileBody = RequestBody.create(MediaType.parse("image/png"), file);


(这里文件类型以png图片为例,所以MediaType为"image/png",
不同文件类型对应不同的type,具体请参考http://tool.oschina.net/commons

2)上传多个文本和多个文件(通过Map来传入)

@Multipart
@POST("")
Observable<HttpResult> upLoadTextAndFiles(@PartMap Map<String, RequestBody> textBodyMap, @PartMap Map<String, RequestBody> fileBodyMap);

第一个参数用于传文本,

Map的key为String,内容请参考上方“上传文本和单个文件”中@Part()里的值。
Map的value值为RequestBody,内容请参考上方“上传文本和单个文件”中RequestBody的生成。

第二个参数用于传文件,

Map的key为String,内容请参考上方“上传文本和单个文件”中@Part()里的值。
Map的value值为RequestBody,内容请参考上方“上传文本和单个文件”中RequestBody的生成。

3)另外补充多一种上传方式(2018/07/16),以上传多个文本和多个文件为例

@POST("")
Observable<HttpResult> upLoadTextAndFiles(@Body MultipartBody multipartBody);

MultipartBody 的生成方式如下:

MultipartBody.Builder builder = new MultipartBody.Builder();
//文本部分
builder.addFormDataPart("fromType", "1");
builder.addFormDataPart("content", "意见反馈内容");
builder.addFormDataPart("phone", "17700000066");

//文件部分
RequestBody requestBody = RequestBody.create(MediaType.parse("image/jpg"), file);
builder.addFormDataPart("image", file.getName(), requestBody); // “image”为文件参数的参数名(由服务器后台提供)

builder.setType(MultipartBody.FORM);
MultipartBody multipartBody = builder.build();
  • 下载文件

//下载大文件时,请加上@Streaming,否则容易出现IO异常
@Streaming
@GET("请求地址")
Observable<ResponseBody> downloadFile();
//ResponseBody是Retrofit提供的返回实体,要下载的文件数据将包含在其中

(目前使用@Streaming进行下载的话,需添加Log拦截器(且LEVEL为BODY)才不会报错,但是网上又说添加Log拦截器后进行下载容易OOM,
所以这一块还很懵,具体原因也不清楚,有知道的朋友可以告诉下我)

2.2 发起请求

完成前面说的的配置和请求接口的定义后,就可以发起请求了。

//构建Retrofit类
Retrofit retrofit = new Retrofit.Builder()
            //服务器地址
             .baseUrl("https://api.douban.com/")
             //配置转化库,采用Gson
             .addConverterFactory(GsonConverterFactory.create())
             //配置回调库,采用RxJava
             .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
             //设置OKHttpClient为网络客户端
             .client(okHttpClientBuilder.build()).build();

//获取API接口
mApiService = retrofit.create(ApiService.class);
//调用之前定义好的请求方法,得到Observable
Observable observable = mApiService.xxx();

普通请求、上传请求:

//通过Observable发起请求
observable
.subscribeOn(Schedulers.io())//指定网络请求在io后台线程中进行
.observeOn(AndroidSchedulers.mainThread())//指定observer回调在UI主线程中进行
.subscribe(observer);//发起请求,请求的结果会回调到订阅者observer中

下载请求:

//通过Observable发起请求
observable
.subscribeOn(Schedulers.io()) //指定网络请求在io后台线程中进行
.observeOn(Schedulers.io()) //指定doOnNext的操作在io后台线程进行
.doOnNext(new Consumer<ResponseBody>() {
           //doOnNext里的方法执行完毕,observer里的onNext、onError等方法才会执行。
           @Override
           public void accept(ResponseBody body) throws Exception {
                 //下载文件,保存到本地
                 //通过body.byteStream()可以得到输入流,然后就是常规的IO读写保存了。
                 ...
           }
})
.observeOn(AndroidSchedulers.mainThread()) //指定observer回调在UI主线程中进行
.subscribe(observer); //发起请求,请求的结果先回调到doOnNext进行处理,再回调到observer中

3. 异常处理

使用Retrofit+RxJava发起请求后,如果请求失败,会回调observer中的onError方法,该方法的参数为Throwable,并没能反馈更直接清楚的异常信息给我们,所以有必要对Throwable异常进行处理转换。

//observer封装类中的代码
@Override
public void onError(Throwable throwable) {
      if (throwable instanceof Exception) {
         onError(ThrowableHandler.handleThrowable(throwable));
      } else {
        onError(new HttpThrowable(HttpThrowable.UNKNOWN,"未知错误",throwable));
      }
}

//应用中具体实现的是下面这个onError方法
public abstract void onError(HttpThrowable httpThrowable);
public class ThrowableHandler {
    ....

    public static HttpThrowable handleThrowable(Throwable throwable) {
        if (throwable instanceof HttpException) {
            return new HttpThrowable(HttpThrowable.HTTP_ERROR, "网络(协议)异常", throwable);
        } else if (throwable instanceof JsonParseException || throwable instanceof JSONException || throwable instanceof ParseException) {
            return new HttpThrowable(HttpThrowable.PARSE_ERROR, "数据解析异常", throwable);
        } else if (throwable instanceof UnknownHostException) {
            return new HttpThrowable(HttpThrowable.NO_NET_ERROR, "网络连接失败,请稍后重试", throwable);
        } else if (throwable instanceof SocketTimeoutException) {
            return new HttpThrowable(HttpThrowable.TIME_OUT_ERROR, "连接超时", throwable);
        } else if (throwable instanceof ConnectException) {
            return new HttpThrowable(HttpThrowable.CONNECT_ERROR, "连接异常", throwable);
        } else if (throwable instanceof javax.net.ssl.SSLHandshakeException) {
            return new HttpThrowable(HttpThrowable.SSL_ERROR, "证书验证失败", throwable);
        } else {
            return new HttpThrowable(HttpThrowable.UNKNOWN, throwable.getMessage(), throwable);
        }
    }
}
public class HttpThrowable extends Exception {
    public int errorType;
    public String message;
    public Throwable throwable;

    /**
     * 未知错误
     */
    public static final int UNKNOWN = 1000;
    /**
     * 解析错误
     */
    public static final int PARSE_ERROR = 1001;
    /**
     * 连接错误
     */
    public static final int CONNECT_ERROR = 1002;
    /**
     * DNS解析失败(无网络)
     */
    public static final int NO_NET_ERROR = 1003;
    /**
     * 连接超时错误
     */
    public static final int TIME_OUT_ERROR = 1004;
    /**
     * 网络(协议)错误
     */
    public static final int HTTP_ERROR = 1005;
    /**
     * 证书错误
     */
    public static final int SSL_ERROR = 1006;

    public HttpThrowable(int errorType, String message, Throwable throwable) {
        super(throwable);
        this.errorType = errorType;
        this.message = message;
        this.throwable = throwable;
    }
}

处理后得到ResponeThrowable,里面包含了异常码code异常描述信息message,这样就可以方便地知道请求失败的原因了。

4. 生命周期管理

4.1 意义

如果页面发起了网络请求并且在请求结果返回前就已经销毁了,那么我们应该在它销毁时把相关的请求终止。一方面是为了停止无意义的请求,另一方面是为了避免可能带来的内存泄漏。

强大的RxJava可以帮助我们实现这一需求。下面通过 takeUntil、PublishSubject、综合两者进行控制 三个部分来讲解如何实现。

4.2 takeUntil

RxJava中提供了许多操作符,这里我们需要使用takeUntil操作符。

ObservableA.takeUntil(ObservableB) 的作用是:
监视ObservableB,当它发射内容时,则停止ObservableA的发射并将其终止。

下面通过示意图和示例代码来加深了解,参考自https://zhuanlan.zhihu.com/p/21966621

示意图:


takeUntil

示例代码:

//下面的Observable.interval( x, TimeUnit.MILLISECONDS) 表示每隔x毫秒发射一个long类型数字,数字从0开始,每次递增1
Observable<Long> observableA = Observable.interval(300, TimeUnit.MILLISECONDS);
Observable<Long> observableB = Observable.interval(800, TimeUnit.MILLISECONDS);

observableA.takeUntil(observableB)
        .subscribe(new Observer<Long>() {
            
            //...onComplete...
            //...onError...
            
            @Override
            public void onNext(Long aLong) {
                System.out.println(aLong);
            }
        });
输出结果为
0
1
  • 示例代码大意:
    ObservableA每隔300ms发射一个数字(并打印出发射的数字),ObservableB每隔800ms发射一个数字。
    由于ObservableB在800ms时发射了内容,终止了ObservableA的发射,所以ObservableA最后只能发射0,1两个数字。

因此,我们可以利用takeUntil这一特性,让ObservableA负责网络请求,让ObservableB负责在页面销毁时发射事件,从而终止ObservableA(网络请求)。

4.3 PublishSubject

上面提到了需要一个ObservableB来负责在页面销毁时发射事件,PublishSubject就能充当这一角色。

阅读PublishSubject的源码可以发现,它既可充当Observable,拥有subscribe()等方法;也可充当Observer(Subscriber),拥有onNext(),onError等方法。

它的特点是进行subscribe()订阅后,并不立即发射事件,而是允许我们在认为合适的时机通过调用onNext(),onError(),onCompleted()来发射事件。

所以,我们需在Activity或Fragment的生命周期onDestroy()中通过PublishSubject来发射事件

//一般以下代码写在Activity或Fragment的基类中。

PublishSubject<LifeCycleEvent> lifecycleSubject = PublishSubject.create();

//用于提供lifecycleSubject到RetrofitUtil中。
public PublishSubject<LifeCycleEvent> getLifeSubject() {
    return lifecycleSubject;
}

//一般是在onDestroy()时发射事件终止请求,当然你也可以根据需求在生命周期的其他状态中发射。
@Override
protected void onDestroy() {
    //publishSubject发射事件
    lifecycleSubject.onNext(LifeCycleEvent.DESTROY);
    super.onDestroy();
 }

4.4 进行控制

了解 takeUntil 和 PublishSubject 后,就可以综合两者来实现生命周期的控制了。

//省略Retrofit和ApiService的构造过程
...
...
//得到负责网络请求的Observable
Observable observableNet= mApiService.getCommingMovie(count);

//得到负责在页面销毁时发射事件的Observable
Observable<LifeCycleEvent> observableLife = 
     lifecycleSubject.filter(new Predicate<LifeCycleEvent>() {
             @Override
             public boolean test(LifeCycleEvent lifeCycleEvent) throws Exception {
             //当生命周期为DESTROY状态时,发射事件
             return lifeCycleEvent.equals(LifeCycleEvent.DESTROY);
           }
     }).take(1);

//通过takeUntil将两个Observable联系在一起,实现生命周期的控制
observableNet.takeUntil(observableLife)
             .subscribeOn(Schedulers.io())//设置网络请求在io后台线程中进行
             .observeOn(AndroidSchedulers.mainThread())//设置请求后的回调在UI主线程中进行
             .subscribe(observer);//发起请求,请求的回调结果会传到订阅者observer中

还有其他方式可以实现生命周期的控制,具体实现可到以下地址查看:
http://www.jianshu.com/p/d62962243c33
http://mp.weixin.qq.com/s/eedFDMIQe30rQmryLeif_Q

5.失败重试机制

有时候用户的网络比较不稳定,出现了请求失败的情况。这时我们不一定就要直接反馈用户请求失败,而可以在失败后尝试重新请求,说不定这时网络恢复稳定请求成功了呢?! 这样或许可以提高用户体验。
下面介绍如何设置某个请求在失败后自动进行重试,以及设置重试的次数延迟重试的时间

先上代码:

Observable observableNet= mApiService.getCommingMovie(count);

observableNet.retryWhen(new RetryFunction(3,3))//加入失败重试机制(失败后延迟3秒开始重试,重试3次)

.takeUntil(observableLife)//生命周期控制
.subscribeOn(Schedulers.io())//设置网络请求在io后台线程中进行
.observeOn(AndroidSchedulers.mainThread())//设置请求后的回调在UI主线程中进行
.subscribe(observer);//发起请求
//请求失败重试机制
public static class RetryFunction implements Function<Observable<Throwable>, ObservableSource<?>> {

        private int retryDelaySeconds;//延迟重试的时间
        private int retryCount;//记录当前重试次数
        private int retryCountMax;//最大重试次数

        public RetryFunction(int retryDelaySeconds, int retryCountMax) {
            this.retryDelaySeconds = retryDelaySeconds;
            this.retryCountMax = retryCountMax;
        }

        @Override
        public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {

            //方案一:使用全局变量来控制重试次数,重试3次后不再重试,通过代码显式回调onError结束请求
            return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Throwable throwable) throws Exception {
                    //如果失败的原因是UnknownHostException(DNS解析失败,当前无网络),则没必要重试,直接回调error结束请求即可
                    if (throwable instanceof UnknownHostException) {
                        return Observable.error(throwable);
                    }

                    //没超过最大重试次数的话则进行重试
                    if (++retryCount <= retryCountMax) {
                        //延迟retryDelaySeconds后开始重试
                        return Observable.timer(retryDelaySeconds, TimeUnit.SECONDS);
                    }

                    return Observable.error(throwable);
                }
            });


            //方案二:使用zip控制重试次数,重试3次后不再重试(会隐式回调onComplete结束请求,但我需要的是回调onError,所以没采用方案一)
//            return Observable.zip(throwableObservable,Observable.range(1, retryCountMax),new BiFunction<Throwable, Integer, Throwable>() {
//                @Override
//                public Throwable apply(Throwable throwable, Integer integer) throws Exception {
//                    LogUtil.e("ljy",""+integer);
//                    return throwable;
//                }
//            }).flatMap(new Function<Throwable, ObservableSource<?>>() {
//                @Override
//                public ObservableSource<?> apply(Throwable throwable) throws Exception {
//                    if (throwable instanceof UnknownHostException) {
//                        return Observable.error(throwable);
//                    }
//                    return Observable.timer(retryDelaySeconds, TimeUnit.SECONDS);
//                }
//            });

        }
}

分析:

  1. 通过observableNet.retryWhen(new RetryFunction(3,3))加入失败重试机制,其参数RetryFunction中的apply方法会返回一个Observable,后面就称它为ObservableRetry吧。
    加入后,当网络请求失败时,并不会直接回调observer中的onError,而是会先将失败异常throwable作为ObservableRetry的事件源。如果ObservableRetry通过onNext发射了事件,则触发重新请求,而如果ObservableRetry发射了onError/onComplete通知,则该请求正式结束。因此可以我们对apply方法中的throwableObservable进行改造,然后返回一个合适的ObservableRetry来实现自己想要的重试效果。

  2. 代码中对throwableObservable进行了flatMap操作,目的是对其事件throwable的类型进行判断。如果为UnknownHostException类型,则表示无网络DNS解析失败,这时就没必要进行重试(都没网络还重试啥呀),直接通过Observable.error(throwable)结束该次请求。

  3. 然后通过全局变量 retryCount 和 retryCountMax 来控制重试的次数。重试retryCountMax次之后如果还是失败,那就通过Observable.error(throwable)放弃重试并结束请求。

  4. 代码中还有个方案二,与方案一的区别在于使用zip操作符来控制重试的次数。
    了解过zip的应该知道其产生的ObservableZip发射的事件总量,与组合成员中事件量少的一致。所以我们通过Observable.range(start, count)发射有限的事件,如range(1, 3)只发射"1","2","3"三个事件,从而限制了ObservableZip最终发射的事件总量不大于3,即重试的次数不超过3次。当超过3次的时候,它会隐式地调用onComplete来结束该次请求(方案一是通过显式地调用onError来结束请求,而我需要在observer的onError中反馈给用户请求失败,所以选择了方案一)

6.监听进度

这里只讲下实现步骤思路,代码太多就不放上来了,大家可以直接看DevRing/Demo里的代码,基本参考自JessYan的ProgressManager库

6.1 上传进度

  1. 自定义请求实体,继承RequestBody重写其几个必要的方法。
    其中监听上传进度主要是重写其writeTo(BufferedSink sink)方法,从该方法中获取数据总量以及已写入请求实体的数据量,在这里通过回调传递相关进度。

  2. 自定义拦截器,实现Interceptor的intercept(Chain chain)方法。
    通过该方法将第1步定义的请求实体应用到请求中。

  3. 添加拦截器到OkHttpClient中。
    builder.addNetworkInterceptor(progressInterceptor);

6.2 下载进度

思路和上传进度差不多

  1. 自定义响应实体,继承ResponseBody重写其几个必要的方法。
    其中监听下载进度主要是重写其source(Source source)方法,从该方法中获取数据总量以及已写入响应实体的数据量,在这里通过回调传递相关进度。

  2. 自定义拦截器,实现Interceptor的intercept(Chain chain)方法。
    通过该方法将第1步定义的响应实体应用到请求中。

  3. 添加拦截器到OkHttpClient中。
    builder.addNetworkInterceptor(progressInterceptor);

7. 封装

(2018.3.27:Demo已对封装这一块做了新的调整,详情请看demo,但封装的思路还是和下文差不多的)

封装分为 初始化配置、统一转换、请求结果封装、请求回调(Observer)封装 四个部分进行。

7.1 初始化配置

public Retrofit initRetrofit() {
   
        OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder();
        //设置请求超时时长
        okHttpClientBuilder.connectTimeout(DEFAULT_TIMEOUT, TimeUnit.SECONDS);
        //启用Log日志
        okHttpClientBuilder.addInterceptor(getHttpLoggingInterceptor());
        //设置缓存方式、时长、地址
        okHttpClientBuilder.addNetworkInterceptor(getCacheInterceptor());
        okHttpClientBuilder.addInterceptor(getCacheInterceptor());
        okHttpClientBuilder.cache(getCache());
        //设置https访问(验证证书)
        okHttpClientBuilder.hostnameVerifier
        (org.apache.http.conn.ssl.SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
        //设置统一的header
        okHttpClientBuilder.addInterceptor(getHeaderInterceptor());

        Retrofit retrofit = new Retrofit.Builder()
                //服务器地址
                .baseUrl(UrlConstants.HOST_SITE_HTTPS)
                //配置转化库,采用Gson
                .addConverterFactory(GsonConverterFactory.create())
                //配置回调库,采用RxJava
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                //设置OKHttpClient为网络客户端
                .client(okHttpClientBuilder.build()).build();    
   
        return retrofit ;
}

7.2 统一转换

由于每次请求都要进行线程切换以及生命周期的控制,频繁地调用以下代码

observable.takeUntil(lifecycleObservable)
          .subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread());

因此可以使用compose方法对Observable进行统一的转换

//RetrofitUtil中的方法

/**
* 对observable进行统一转换,并发起请求
*
* @param observable         被订阅者
* @param observer           订阅者
* @param event              生命周期中的某一个状态,比如传入DESTROY,则表示在进入destroy状态时  
*                           lifecycleSubject会发射一个事件从而终止请求
* @param lifecycleSubject   生命周期事件发射者
*/
public static void composeToSubscribe(Observable observable, Observer observer, LifeCycleEvent event, PublishSubject<LifeCycleEvent> lifecycleSubject) {

    observable.compose(getTransformer(event, lifecycleSubject)).subscribe(observer);
}


/**
 * 获取统一转换用的Transformer
 *
 * @param event               生命周期中的某一个状态,比如传入DESTROY,则表示在进入destroy状态时
 *                            lifecycleSubject会发射一个事件从而终止请求
 * @param lifecycleSubject    生命周期事件发射者
 */
public static <T> ObservableTransformer<T, T> getTransformer(final LifeCycleEvent event, final PublishSubject<LifeCycleEvent> lifecycleSubject) {
    return new ObservableTransformer() {
        @Override
        public ObservableSource apply(Observable upstream) {

            //当lifecycleObservable发射事件时,终止操作。
            //统一在请求时切入io线程,回调后进入ui线程
            //加入失败重试机制(延迟3秒开始重试,重试3次)
            return upstream
                .takeUntil(getLifeCycleObservable(event, lifecycleSubject))
                .retryWhen(new RetryFunction(3,3))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread());
        }
    };
}

7.3 封装请求结果

服务器返回的请求结果,一般分为三个部分:请求结果的状态值,请求结果的描述,返回的数据内容。

{
   "status" : 1,
   "message" : "success",
   "data":{
         "name":"小明",
          "sex": 0,
          "age": 10
   }
}

其中status和message的类型是固定的,而data的类型不确定,所以data可以采用泛型表示

豆瓣接口返回的结构比较特殊,并不是上面所说的那三部分。实际结构根据服务器后台给的来定

//与请求结果结构相符的实体类
public class HttpResult<T> {

    private int count;//请求的数量
    private int start;//请求的起始页码
    private int total;//得到的数据总数
    private String title;//请求结果的描述
    private T subjects;//返回的数据内容,类型不确定,使用泛型T表示

    //getter&setter
    ...
}

7.4 封装请求回调(Observer)

(DevRing中提供了三种封装好的Observer,分别用于普通请求,上传请求(可监听进度),下载请求(可监听进度))

可对Observer封装一层,作用:

  • 在onError中进行统一的异常处理,得到更直接详细的异常信息

  • 在onNext中进行统一操作,如请求回来后,先判断token是否失效,如果失效则直接跳转登录页面

  • 在onNext中对返回的结果进行处理,得到更直接的数据信息

  • 在onSubscribe中进行请求前的操作,注意,onSubscribe是执行在 subscribe() 被调用时的线程,所以如果在onSubscribe里进行UI操作,就要保证subscribe()也是调用在UI线程里。

public abstract class HttpObserver<T> implements Observer<HttpResult<T>> {

    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onComplete() {

    }

    @Override
    public void onError(Throwable e) {
        if (e instanceof Exception) {
            //访问获得对应的Exception
            ExceptionHandler.ResponeThrowable responeThrowable = ExceptionHandler.handleException(e);
            onError(responeThrowable.code, responeThrowable.message);
        } else {
            //将Throwable 和 未知错误的status code返回
            ExceptionHandler.ResponeThrowable responeThrowable = new ExceptionHandler.ResponeThrowable(e, ExceptionHandler.ERROR.UNKNOWN);
            onError(responeThrowable.code, responeThrowable.message);
        }
    }

    @Override
    public void onNext(HttpResult<T> httpResult) {
        //做一些回调后需统一处理的事情
        //如请求回来后,先判断token是否失效
        //如果失效则直接跳转登录页面
        //...

        //如果没失效,则正常回调
        onNext(httpResult.getTitle(), httpResult.getSubjects());
    }

    //具体实现下面两个方法,便可从中得到更直接详细的信息
    public abstract void onNext(String title, T t);
    public abstract void onError(int errType, String errMessage);
}

到此,封装算是结束了,这样使用起来就会便捷很多,整个的使用流程会在下面的“使用”中介绍,也可以查看demo。

8. 混淆

在proguard-rules.pro文件中添加以下内容进行混淆配置

#Retrofit开始
-dontwarn retrofit2.**
-keep class retrofit2.** { *; }
-keepattributes Signature
-keepattributes Exceptions
-dontwarn okio.**
#Retrofit结束


#Rxjava&RxAndroid开始
-dontwarn sun.misc.**
-keepclassmembers class rx.internal.util.unsafe.*ArrayQueue*Field* {
   long producerIndex;
   long consumerIndex;
}
-keepclassmembers class rx.internal.util.unsafe.BaseLinkedQueueProducerNodeRef {
    rx.internal.util.atomic.LinkedQueueNode producerNode;
}
-keepclassmembers class rx.internal.util.unsafe.BaseLinkedQueueConsumerNodeRef {
    rx.internal.util.atomic.LinkedQueueNode consumerNode;
}
#Rxjava&RxAndroid结束

使用

经过前面的配置和封装后,下面演示一下在实际场景的使用。

1. 一般场景

请求正在上映的电影,然后在View层展示

@GET("v2/movie/in_theaters")
Observable<HttpResult<List<MovieRes>>> getPlayingMovie(@Query("count") int count);
//被订阅者(用于发起网络请求)
Observable observable = RetrofitUtil.getApiService().getPlayingMovie(count);

//订阅者(网络请求回调)
HttpObserver<List<MovieRes>> observer = new HttpObserver<List<MovieRes>>() {
            //请求成功回调
            @Override
            public void onNext(String title, List<MovieRes> list) {
                LogUtil.d(TAG,"获取"+title+"成功");
                //通过IView接口将数据回调给View层展示
                if (mIView != null) {
                    mIView.getMovieSuccess(list);
                }
            }

            //请求失败回调
            @Override
            public void onError(int errType, String errMessage) {
                //通过IView接口将数据回调给View层展示
                if (mIView != null) {
                    mIView.getMovieFail(errType, errMessage);
                }
            }
        };

//通过IView接口获取View层的PublishSubject来进行生命周期的控制 
PublishSubject<LifeCycleEvent> lifecycleSubject = mIView.getLifeSubject();

//发起请求
RetrofitUtil.composeToSubscribe(observable, observer, lifecycleSubject);

2. 特殊场景

由于没找到相符的接口,所以demo中没有提供以下代码。就当作提供个思路,请谅解。

2.1 嵌套请求(使用flatMap实现)

场景:先请求token,再根据得到的token请求用户信息,最后在View层展示

@GET("...")
Observable<HttpResult<String>> getToken();

@GET("...")
Observable<HttpResult<UserInfo>> getUserInfo(@Query("token") String token);
//被订阅者(用于发起网络请求)
Observable observable = RetrofitUtil.getApiService().getToken()
    .flatMap(new Function<HttpResult<String>, ObservableSource<HttpResult<UserInfo>>{
    
        @Override
        public ObservableSource<HttpResult<UserInfo>> apply(HttpResult<String> httpResult) throws Exception {
        
          //从httpResult中得到请求来的token,然后再发起用户信息的请求
          return RetrofitUtil.getApiService().getUserInfo(httpResult.getData());
        }
    });

//订阅者(网络请求回调)
HttpObserver<UserInfo> observer= new HttpObserver<UserInfo>() {
            //请求成功回调
            @Override
            public void onNext(UserInfo userInfo) {
                //通过IView接口将数据回调给View层展示
                if (mIView != null) {
                    mIView.getUserInfoSuccess(userInfo);
                }
            }

            //请求失败回调
            @Override
            public void onError(int errType, String errMessage) {
                //通过IView接口将数据回调给View层展示
                if (mIView != null) {
                    mIView.getUserInfoFail(errType, errMessage);
                }
            }
        };

//通过IView接口获取View层的PublishSubject来进行生命周期的控制 
PublishSubject<LifeCycleEvent> lifecycleSubject = mIView.getLifeSubject();

//发起请求
RetrofitUtil.composeToSubscribe(observable, observer, lifecycleSubject);

2.2 组合请求返回的结果(使用zip实现)

场景:请求今日最佳男歌手,请求今日最佳女歌手,将男歌手和女歌手进行组合,得到“最佳歌手组合”,最后在View层展示

@GET("...")
Observable<HttpResult<Singer>> getBestSingerMale();

@GET("...")
Observable<HttpResult<Singer>> getBestSingerFemale();
//被订阅者(用于发起网络请求)
Observable observableMale = RetrofitUtil.getApiService().getBestSingerMale();
Observable observableFemale = RetrofitUtil.getApiService().getBestSingerFemale();
Observable observableGroup =
    Observable.zip(observableMale , observableFemale , 
         new BiFunction<HttpResult<Singer>, HttpResult<Singer>, HttpResult<SingerGroup>() {
             @Override
             public HttpResult<SingerGroup> apply(HttpResult<Singer> resultMale, 
                                                 HttpResult<Singer> resultFemale) {
            
             //组合男女歌手
             Singer singerMale = resultMale.getData();
             Singer singerFemale = resultFemale.getData();
             SingerGroup singerGroup = new SingerGroup(singerMale, singerFemale);
             
             HttpResult<SingerGroup> resultGroup = new HttpResult<SingerGroup>();
             resultGroup.setData(singerGroup); 
             
             return resultGroup;
            }
         });

//订阅者(网络请求回调)
HttpObserver<SingerGroup> observer= new HttpObserver<SingerGroup>() {
            //请求成功回调
            @Override
            public void onNext(SingerGroup singerGroup) {
                //通过IView接口将数据回调给View层展示
                if (mIView != null) {
                    mIView.getSingerGroupSuccess(singerGroup);
                }
            }

            //请求失败回调
            @Override
            public void onError(int errType, String errMessage) {
                //通过IView接口将数据回调给View层展示
                if (mIView != null) {
                    mIView.getSingerGroupFail(errType, errMessage);
                }
            }
        };

//通过IView接口获取View层的PublishSubject来进行生命周期的控制    
PublishSubject<LifeCycleEvent> lifecycleSubject = mIView.getLifeSubject();

//发起请求
RetrofitUtil.composeToSubscribe(observableGroup , observer, lifecycleSubject);

实际开发中肯定还有其他的特殊场景,关键是运用好RxJava的操作符。操作符的学习地址已贴在文章顶部。


更新:

已将demo和文章中关于Rxjava的部分从1.x改为2.x
这里贴一下RxJava2与RxJava1的区别总结(随笔记录,仅供参考):

  • RxJava2 按是否可以背压处理,分成Observable和Flowable,Observable的订阅者为Observer,Flowable的订阅者为Subscriber。
    不了解背压的可以看这个系列的5-9篇。

  • 背压处理
    1)上游(Flowable)通过emitter.requested()查看事件容器的剩余空间。下游(Subscriber)通过subscription.request(n)从事件容器中请求并消耗事件(消耗一个事件并不代表事件容器立刻多出一个位置)
    2)四种策略 BUFFER,ERROR,DROP,LATEST
    Buffer:事件容器的空间不限制,非Buffer策略时事件容器大小为128
    ERROR:当事件容器溢出时会报MissingBackpressureException。该策略下,当下游累计消耗完96个事件后,才会给事件容器腾出96个位置。
    DROP: 事件容器装入128个事件后,剩下的将不会装入,当下游累计消耗完128个事件后,才会给事件容器腾出128个位置,这时再取当前时刻发送的事件装入。
    LATEST: 与DROP类似,但它会保证取到最后发射的事件

  • Observable多了几个小伙伴:Single、Completable、Maybe。他们都继承了ObservableSource。
    Single/SingleObserver:只发送/接收onNext和onError,且只发送一次
    Completable/Completable:只发送/接收onComplete和onError
    Maybe:Single与Completable的结合

  • Func1改为Function,Func2..n改为BiFunction。其中的方法call改成了apply。另外对于filter()过滤,其参数为不为Function而是Predicate

  • Action1改为Consumer,Action2改为BiConsumer。其中的方法call改成了accept。

  • Observer/Subscriber的抽象方法中多了一个onSubscribe(Disposable/Subscription),类似1.x的onStart方法,它在subscribe()时调用。其中的参数Disposable/Subscription可以用来取消订阅/查询订阅状态,Subscription还可用于背压中请求消耗事件。

  • 不再能发送null事件,Observable<Void> 不再发射任何值,而是正常结束或者抛出空指针。


评论

在线客服-可直接交谈

您好!有什么需要可以为您服务吗?