您好,登錄后才能下訂單哦!
1.什么是RxJava
? Rx是Reactive Extensions的簡寫,翻譯為響應的擴展。也就是通過由一方發出信息,另一方響應信息并作出處理的核心框架代碼。
? 該框架由微軟的架構師Erik Meijer領導的團隊開發,并在2012年11月開源。
? Rx庫支持.NET、JavaScript和C++等,現在已經支持幾乎全部的流行編程語言了。
? Rx的大部分語言庫由ReactiveX這個組織負責維護,比較流行的有RxJava/RxJS/Rx.NET,社區網站是 reactivex.io。
? RxJava作為一個流行的框架,其源碼依托在GitHub,除了支持RxJava,針對安卓系統也除了一個支持框架RxAndroid
2.RxJava簡化代碼
一般我們在安卓項目中,如果想從后臺獲取數據并刷新界面,代碼大概如下,下面我們來看一個例子:
new Thread() {@Override
br/>@Override
super.run();
for (File folder : folders) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
final Bitmap bitmap = getBitmapFromFile(file);
getActivity().runOnUiThread(new Runnable() {@Override
br/>@Override
imageCollectorView.addImage(bitmap);
}
});
}
}
}
}
}.start();
上面的代碼經過多層嵌套后 可讀性太差了!如果你用了RxJava 可以這樣寫:
Observable.from(folders)
.flatMap(new Func1<File, Observable<File>>() {@Override
br/>@Override
return Observable.from(file.listFiles());
}
})
.filter(new Func1<File, Boolean>() {@Override
br/>@Override
return file.getName().endsWith(".png");
}
})
.map(new Func1<File, Bitmap>() {@Override
br/>@Override
return getBitmapFromFile(file);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {@Override
br/>@Override
imageCollectorView.addImage(bitmap);
}
});
這樣寫的好處就是減少層次嵌套 提高了代碼的可讀性,除了簡化代碼,RxJava還可以為每個方法提供特定的運行線程。
3.引入框架
目前RxJava已經升級為2.0版本,但為了能夠更好的理解RxJava,我們可以從1.0版本開始學習。也為了讓我們的安卓項目能夠更好的使用RxJava,可以在項目中引入gradle腳本依賴:
compile 'io.reactivex:rxandroid:1.2.1'
compile 'io.reactivex:rxjava:1.1.6'
現在 我們的項目已經支持RxJava的功能了。
4.響應式的核心
所謂的響應式,無非就是存在這樣的2個部分,一部分負責發送事件/消息,另一部分負責響應事件/消息。
以前如果我們想看新聞,一般需要通過看報紙。比如,你對某個報刊雜志比較感興趣,那么你首先要做3件事:
訂閱(也就是 觀察者&被觀察者之間要相互關聯 以便被觀察的對象一變化 就會馬上通知觀察該事件的對象)
上面示例的演示代碼如下:
//1.創建被觀察者
Observable<String> observable =
Observable.create(new Observable.OnSubscribe<String>() {@Override
br/>@Override
//4.開始發送事件
//事件有3個類型 分別是onNext() onCompleted() onError()
//onCompleted() onError() 一般都是用來通知觀察者 事件發送完畢了,兩者只取其一。
subscriber.onNext("Hello Android !");
subscriber.onNext("Hello Java !");
subscriber.onNext("Hello C !");
subscriber.onCompleted();
}
});
//2.創建觀察者
Subscriber<String> subscriber = new Subscriber<String>() {@Override
br/>@Override
Log.i(TAG, "onCompleted ");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: "+e.getLocalizedMessage());
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: "+s);
}
};
//3.訂閱
observable.subscribe(subscriber);
輸出如下:
com.m520it.rxjava I/IT520: onNext: Hello Android !
com.m520it.rxjava I/IT520: onNext: Hello Java !
com.m520it.rxjava I/IT520: onNext: Hello C !
com.m520it.rxjava I/IT520: onCompleted
代碼運行的原理
? 上面的代碼中,當觀察者subscriber訂閱了被觀察者observable之后,系統會自動回調observable對象內部的call()。
? 在observable的call()方法實體中,發送了如onNext/onCompleted/onError事件后。
? 接著subscriber就能回調到到對應的方法。
5.被觀察者變種
普通的Observable發送需要三個方法onNext, onError, onCompleted,而Single作為Observable的變種,只需要兩個方法:
? onSuccess - Single發射單個的值到這個方法
? onError - 如果無法發射需要的值,Single發射一個Throwable對象到這個方法
Single只會調用這兩個方法中的一個,而且只會調用一次,調用了任何一個方法之后,訂閱關系終止。
final Single<String> single = Single.create(new Single.OnSubscribe<String>() {@Override
br/>@Override
//先調用onNext() 最后調用onCompleted()
//singleSubscriber.onSuccess("Hello Android !");
//只調用onError();
singleSubscriber.onError(new NullPointerException("mock Exception !"));
}
});
Observer<String> observer = new Observer<String>() {@Override
br/>@Override
Log.i(TAG, "onCompleted ");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: "+e.getLocalizedMessage());
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: "+s);
}
};
single.subscribe(observer);
6.觀察者變種
Observer觀察者對象,上面我們用Subscriber對象代替。因為該對象本身就是繼承了Observer。
該對象實現了onNext()&onCompleted()&onError()事件,我們如果對哪個事件比較關心,只需要實現對應的方法即可,代碼如下:
//創建觀察者
Subscriber<String> subscriber = new Subscriber<String>() {@Override
br/>@Override
Log.i(TAG, "onCompleted ");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError: "+e.getLocalizedMessage());
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: "+s);
}
};
//訂閱
observable.subscribe(subscriber);
上面的代碼中,如果你只關心onNext()事件,但卻不得不實現onCompleted()&onError()事件.這樣的代碼就顯得很臃腫。鑒于這種需求,RxJava框架在訂閱方面做了特定的調整,代碼如下:
//為指定的onNext事件創建獨立的接口
Action1<String> onNextAction = new Action1<String>() {@Override
br/>@Override
Log.i(TAG, "call: "+s);
}
};
//訂閱
observable.subscribe(onNextAction);
不知道大家注意到沒有,subscribe()訂閱的不再是觀察者,而是特定的onNext接口對象。類似的函數如下,我們可以根據需要實現對應的訂閱:
public Subscription subscribe(final Observer observer)
public Subscription subscribe(final Action1 onNext)
public Subscription subscribe(final Action1 onNext, Action1 onError)
public Subscription subscribe(final Action1 onNext, Action1 onError, Action0 onCompleted)
這里還有一個forEach函數有類似的功能:
public void forEach(final Action1 onNext)
public void forEach(final Action1 onNext, Action1 onError)
public void forEach(final Action1 onNext, Action1 onError, Action0 onComplete)
##7.Subject變種
上面2節中既介紹了被觀察者變種,又介紹了觀察者變種,這里再介紹一種雌雄同體的對象(既作為被觀察者使用,也可以作為觀察者)。
針對不同的場景一共有四種類型的Subject。他們并不是在所有的實現中全部都存在。
###AsyncSubject
一個AsyncSubject只在原始Observable完成后,發射來自原始Observable的最后一個值。它會把這最后一個值發射給任何后續的觀察者。
以下貼出代碼:
//創建被觀察者final AsyncSubject<String> subject = AsyncSubject.create();//創建觀察者
Subscriber<String> subscriber = new Subscriber<String>() {@Override
br/>@Override
Log.i(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError");
}
@Override
public void onNext(String s) {
Log.i(TAG, "s:" + s);
}
};//訂閱事件
subject.subscribe(subscriber);//被觀察者發出事件 如果調用onCompleted(),onNext()則會打印最后一個事件;如果沒有,onNext()則不打印任何事件。
subject.onNext("Hello Android ");
subject.onNext("Hello Java ");
subject.onCompleted();
輸出:
s:Hello Java onCompleted
然而,如果原始的Observable因為發生了錯誤而終止,AsyncSubject將不會發射任何數據,只是簡單的向前傳遞這個錯誤通知。
上面的觀察者被觀察者代碼相同,現在發出一系列信號,并在最后發出異常 代碼如下:
subject.onNext("Hello Android ");
subject.onNext("Hello Java ");//因為發送了異常 所以onNext()無法被打印
subject.onError(null);
###BehaviorSubject
當觀察者訂閱BehaviorSubject時,他會將訂閱前最后一次發送的事件和訂閱后的所有發送事件都打印出來,如果訂閱前無發送事件,則會默認接收構造器create(T)里面的對象和訂閱后的所有事件,代碼如下:
BehaviorSubject subject=BehaviorSubject.create("NROMAL");
Subscriber subscriber = new Subscriber() {@Override
br/>@Override
Log.i(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError");
}
@Override
public void onNext(Object o) {
Log.i(TAG, "onNext: " + o);
}
};
//subject.onNext("Hello Android !");//subject.onNext("Hello Java !");//subject.onNext("Hello C !");//這里開始訂閱 如果上面的3個注釋沒去掉,則Hello C的事件和訂閱后面的事件生效//如果上面的三個注釋去掉 則打印構造器NORMAL事件生效后和訂閱后面的事件生效
subject.subscribe(subscriber);
subject.onNext("Hello CPP !");
subject.onNext("Hello IOS !");
PublishSubject
PublishSubject只會把在訂閱發生的時間點之后來自原始Observable的數據發射給觀察者。
需要注意的是,PublishSubject可能會一創建完成就立刻開始發射數據,因此這里有一個風險:在Subject被創建后到有觀察者訂閱它之前這個時間段內,一個或多個數據可能會丟失。
代碼如下:
PublishSubject subject= PublishSubject.create();
Action1<String> onNextAction1 = new Action1<String>(){
@Override
public void call(String s) {
Log.i(TAG, "onNextAction1 call: "+s);
}
};
Action1<String> onNextAction2 = new Action1<String>(){
@Override
public void call(String s) {
Log.i(TAG, "onNextAction2 call: "+s);
}
};
subject.onNext("Hello Android !");
subject.subscribe(onNextAction1);
subject.onNext("Hello Java !");
subject.subscribe(onNextAction2);
subject.onNext("Hello IOS !");
輸出如下:
onNextAction1 call: Hello Java !
onNextAction1 call: Hello IOS !
onNextAction2 call: Hello IOS !
ReplaySubject
ReplaySubject會發射所有來自原始Observable的數據給觀察者,無論它們是何時訂閱的。
代碼如下:
ReplaySubject subject= ReplaySubject.create();
Action1<String> onNextAction1 = new Action1<String>(){
@Override
public void call(String s) {
Log.i(TAG, "onNextAction1 call: "+s);
}
};
Action1<String> onNextAction2 = new Action1<String>(){
@Override
public void call(String s) {
Log.i(TAG, "onNextAction2 call: "+s);
}
};
subject.onNext("Hello Android !");
subject.subscribe(onNextAction1);
subject.onNext("Hello Java !");
subject.subscribe(onNextAction2);
subject.onNext("Hello IOS !");
輸出如下:
onNextAction1 call: Hello Android !
onNextAction1 call: Hello Java !
onNextAction2 call: Hello Android !
onNextAction2 call: Hello Java !
onNextAction1 call: Hello IOS !
onNextAction2 call: Hello IOS !
###Subject總結
AsyncSubject無論何時訂閱 只會接收最后一次onNext()事件,如果最后出現異常,則不會打印任何onNext()
BehaviorSubject會從訂閱前最后一次oNext()開始打印直至結束。如果訂閱前無調用onNext(),則調用默認creat(T)傳入的對象。如果異常后才調用,則不打印onNext()
PublishSubject只會打印訂閱后的任何事件。
ReplaySubject無論訂閱在何時都會調用發送的事件。
以上用代碼演示了 RxJava 一些基礎功能是如何實現的,希望能給大家帶來不一樣的啟發。但這只是一個小小的分享,離真正能運用于工程的 Rx 框架還差太遠。這也讓我們明白到,一個健壯的框架,需要考慮太多東西,比如代碼的可拓展性和可讀性,性能優化,可測試性,兼容性,極端情況等等。但有時要想深入理解一個復雜框架的實現原理,就需要剝離這些細節代碼,多關注主干的調用邏輯,化繁為簡。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。