项目作者: qyxxjd

项目描述 :
RxJava 2.X Retrofit OkHttp demo
高级语言: Java
项目地址: git://github.com/qyxxjd/RxJava2Demo.git
创建时间: 2016-12-26T11:30:50Z
项目社区:https://github.com/qyxxjd/RxJava2Demo

开源协议:

下载


RxJava 1.X升级RxJava 2.X部分变更

描述 RxJava 1.X RxJava 2.X
package包名 rx.xxx io.reactivex.xxx
Reactive Streams规范 1.X早于Reactive Streams规范出现,仅部分支持规范 完全支持
Backpressure 背压 对背压的支持不完善 Observable设计为不支持背压
新增Flowable支持背压
null空值 支持 不再支持null值,传入null值会抛出 NullPointerException
Schedulers线程调度器 Schedulers.immediate()
Schedulers.trampoline()
Schedulers.computation()
Schedulers.newThread()
Schedulers.io()
Schedulers.from(executor)
AndroidSchedulers.mainThread()
移除Schedulers.immediate()
新增Schedulers.single()
其它未变
Single 行为类似Observable,但只会发射一个onSuccessonError 按照Reactive Streams规范重新设计,遵循协议onSubscribe(onSuccess/onError)
Completable 行为类似Observable,要么全部成功,要么就失败 按照Reactive Streams规范重新设计,遵循协议onSubscribe (onComplete/onError)
Maybe 2.X新增,行为类似Observable,可能会有一个数据或一个错误,也可能什么都没有。可以将其视为一种返回可空值的方法。这种方法如果不抛出异常的话,将总是会返回一些东西,但是返回值可能为空,也可能不为空。按照Reactive Streams规范设计,遵循协议onSubscribe (onSuccess/onError/onComplete)
Flowable 2.X新增,行为类似Observable,按照Reactive Streams规范设计,支持背压Backpressure
Subject AsyncSubject
BehaviorSubject
PublishSubject
ReplaySubject
UnicastSubject
2.X依然维护这些Subject现有的功能,并新增:
AsyncProcessor
BehaviorProcessor
PublishProcessor
ReplayProcessor
UnicastProcessor
支持背压Backpressure
Subscriber Subscriber 由于与Reactive Streams的命名冲突,Subscriber已重命名为Disposable

RxJava 2.X + Retrofit + OkHttp 简单示例:

1.声明接口

  1. public interface FaceApi {
  2. @Multipart
  3. @POST("facepp/v3/compare")
  4. Observable<String> compare(@Part("api_key") RequestBody apiKey,
  5. @Part("api_secret") RequestBody apiSecret,
  6. @Part MultipartBody.Part... files);
  7. }

2.初始化Api

  1. private void initApi() {
  2. OkHttpClient okHttpClient = new OkHttpClient.Builder()
  3. .addNetworkInterceptor(new HttpLoggingInterceptor().setLevel(HttpLoggingInterceptor.Level.BODY))
  4. .retryOnConnectionFailure(true)
  5. .connectTimeout(CONNECT_TIMEOUT_TIME, TimeUnit.SECONDS)
  6. .writeTimeout(CONNECT_TIMEOUT_TIME, TimeUnit.SECONDS)
  7. .readTimeout(CONNECT_TIMEOUT_TIME, TimeUnit.SECONDS)
  8. .build();
  9. Gson gson = new GsonBuilder()
  10. .setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES)
  11. .create();
  12. mFaceApi = new Retrofit.Builder().baseUrl(PrivateConstant.FACE_URL_PREFIX)
  13. .client(okHttpClient)
  14. .addConverterFactory(GsonConverterFactory.create(gson))
  15. .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
  16. .build()
  17. .create(FaceApi.class);
  18. }

3.开始网络请求

  1. /**
  2. * 测试人脸识别API
  3. *
  4. * 实际项目中:步骤1和3会在合适的地方进行统一处理,不需要每个接口都进行设置
  5. *
  6. * @param imagePath1 需要比对的照片1
  7. * @param imagePath2 需要比对的照片2
  8. * @return
  9. */
  10. private void testFaceApi(@NonNull String imagePath1, @NonNull String imagePath2) {
  11. //PrivateConstant里面声明的私有api_id,需要自己到官网申请
  12. mFaceApi.compare(convert(PrivateConstant.FACE_API_ID),
  13. convert(PrivateConstant.FACE_API_SECRET),
  14. convert("image_file1", new File(imagePath1)),
  15. convert("image_file2", new File(imagePath2)))
  16. //1.线程切换的封装
  17. .compose(RxTransformer.<String>applySchedulers(RxTransformer.Observable.IO_ON_UI))
  18. //2.当前Activity onStop时自动取消请求
  19. .compose(this.<String>bindEvent(ActivityEvent.STOP))
  20. //3.原始数据转换为对象
  21. .map(DATA_PARSE_FUNCTION)
  22. .subscribeWith(new DisposableObserver<IdentifyResult>() {
  23. @Override
  24. public void onNext(IdentifyResult identifyResult) {
  25. XLog.d("FaceApi --> " + identifyResult.toString());
  26. }
  27. @Override
  28. public void onError(Throwable e) {
  29. XLog.e("FaceApi --> " + e.getMessage());
  30. }
  31. @Override
  32. public void onComplete() {
  33. XLog.d("FaceApi --> onComplete");
  34. }
  35. });
  36. }

RxJava 2.X 简单示例: 查看代码

create操作符

  1. Observable.create(new ObservableOnSubscribe<Integer>() {
  2. @Override public void subscribe(ObservableEmitter<Integer> emitter)
  3. throws Exception {
  4. if (!emitter.isDisposed()) {
  5. for (int i = 0; i < 10; i++) {
  6. emitter.onNext(i);
  7. }
  8. emitter.onComplete();
  9. }
  10. }
  11. })
  12. .subscribeOn(Schedulers.io())
  13. .unsubscribeOn(Schedulers.io())
  14. .observeOn(AndroidSchedulers.mainThread())
  15. //这里只列举三种常见的使用方式
  16. .subscribe(OBSERVER); //方式1
  17. //.subscribeWith(DISPOSABLE_OBSERVER); //方式2
  18. //.subscribe(NEXT_CONSUMER, ERROR_CONSUMER, COMPLETE); //方式3

fromArray操作符

  1. Observable.fromArray(1, 2, 3, 4, 5)
  2. //使用变换将线程控制的代码封装起来,使代码更简洁,也便于管理
  3. .compose(RxTransformer.<Integer>applySchedulers(RxTransformer.Observable.IO))
  4. .subscribeWith(DISPOSABLE_OBSERVER);

fromCallable操作符

  1. Observable.fromCallable(new Callable<Integer>() {
  2. @Override public Integer call() throws Exception {
  3. return 123;
  4. }
  5. })
  6. .compose(RxTransformer.<Integer>applySchedulers(RxTransformer.Observable.IO))
  7. .subscribeWith(DISPOSABLE_OBSERVER);

fromIterable操作符

  1. ArrayList<Integer> list = new ArrayList<>();
  2. list.add(123);
  3. list.add(456);
  4. list.add(789);
  5. Observable.fromIterable(list)
  6. .compose(RxTransformer.<Integer>applySchedulers(RxTransformer.Observable.IO))
  7. .subscribeWith(DISPOSABLE_OBSERVER);

fromPublisher操作符

  1. Observable.fromPublisher(new Publisher<Integer>() {
  2. @Override public void subscribe(Subscriber<? super Integer> s) {
  3. s.onNext(6);
  4. s.onNext(7);
  5. s.onNext(8);
  6. s.onNext(9);
  7. s.onComplete();
  8. }
  9. })
  10. .compose(RxTransformer.<Integer>applySchedulers(RxTransformer.Observable.IO))
  11. .subscribeWith(DISPOSABLE_OBSERVER);

just操作符

  1. Observable.just(1, 2, 3, 4, 5, 6)
  2. .compose(RxTransformer.<Integer>applySchedulers(RxTransformer.Observable.IO))
  3. .subscribeWith(DISPOSABLE_OBSERVER);

range操作符

  1. Observable.range(100, 60)
  2. .compose(RxTransformer.<Integer>applySchedulers(RxTransformer.Observable.IO))
  3. .subscribeWith(DISPOSABLE_OBSERVER);

timer操作符

  1. Observable.timer(10, TimeUnit.MILLISECONDS)
  2. .compose(RxTransformer.<Long>applySchedulers(RxTransformer.Observable.COMPUTATION))
  3. .subscribe(new Consumer<Long>() {
  4. @Override
  5. public void accept(Long aLong) throws Exception {
  6. XLog.d("延迟10毫秒的任务启动");
  7. }
  8. });

interval操作符

  1. Observable.interval(1, TimeUnit.SECONDS)
  2. .compose(RxTransformer.<Long>applySchedulers(RxTransformer.Observable.COMPUTATION))
  3. .subscribe(new Consumer<Long>() {
  4. @Override
  5. public void accept(Long aLong) throws Exception {
  6. XLog.d("每隔1秒的定时任务启动");
  7. }
  8. });

上面示例代码用到的变量

  1. private static final Observer<Integer> OBSERVER = new Observer<Integer>() {
  2. @Override public void onSubscribe(Disposable d) {
  3. XLog.d("onSubscribe");
  4. }
  5. @Override public void onNext(Integer value) {
  6. XLog.d("onNext:" + value);
  7. }
  8. @Override public void onError(Throwable e) {
  9. XLog.e("onError:" + e.getMessage());
  10. }
  11. @Override public void onComplete() {
  12. XLog.d("onComplete");
  13. }
  14. };
  15. private static final DisposableObserver<Integer> DISPOSABLE_OBSERVER
  16. = new DisposableObserver<Integer>() {
  17. @Override public void onNext(Integer value) {
  18. XLog.d("onNext:" + value);
  19. }
  20. @Override public void onError(Throwable e) {
  21. XLog.e("onError:" + e.getMessage());
  22. }
  23. @Override public void onComplete() {
  24. XLog.d("onComplete");
  25. }
  26. };
  27. private static final Consumer<Integer> NEXT_CONSUMER = new Consumer<Integer>() {
  28. @Override public void accept(Integer integer) throws Exception {
  29. XLog.d("onNext:" + integer);
  30. }
  31. };
  32. private static final Consumer<Throwable> ERROR_CONSUMER = new Consumer<Throwable>() {
  33. @Override public void accept(Throwable throwable) throws Exception {
  34. XLog.e("onError:" + throwable.getMessage());
  35. }
  36. };
  37. private static final Action COMPLETE = new Action() {
  38. @Override public void run() throws Exception {
  39. XLog.d("onComplete");
  40. }
  41. };