91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么理解異步編程RxJava

發布時間:2021-11-09 17:07:42 來源:億速云 閱讀:124 作者:柒染 欄目:編程語言

這篇文章給大家介紹怎么理解異步編程RxJava,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

前言

前段時間寫了一篇對協程的一些理解,里面提到了不管是協程還是callback,本質上其實提供的是一種異步無阻塞的編程模式;并且介紹了java中對異步無阻賽這種編程模式的支持,主要提到了Future和CompletableFuture;之后有同學在下面留言提到了RxJava,剛好最近在看微服務設計這本書,里面提到了響應式擴展(Reactive extensions,Rx),而RxJava是Rx在JVM上的實現,所有打算對RxJava進一步了解。

RxJava簡介

RxJava的官網地址:https://github.com/ReactiveX/RxJava,

其中對RxJava進行了一句話描述:RxJava – Reactive Extensions for the JVM – a  library for composing asynchronous and event-based programs using  observable sequences for the Java VM.

大意就是:一個在Java VM上使用可觀測的序列來組成異步的、基于事件的程序的庫。

更詳細的說明在Netflix技術博客的一篇文章中描述了RxJava的主要特點:

  • 易于并發從而更好的利用服務器的能力。

  • 易于有條件的異步執行。

  • 一種更好的方式來避免回調地獄。

  • 一種響應式方法。

與CompletableFuture對比

之前提到CompletableFuture真正的實現了異步的編程模式,一個比較常見的使用場景:

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(耗時函數);Future<Integer> f = future.whenComplete((v, e) -> {System.out.println(v);System.out.println(e);
});System.out.println("other...");

下面用一個簡單的例子來看一下RxJava是如何實現異步的編程模式:

bservable<Long> observable = Observable.just(1, 2)
        .subscribeOn(Schedulers.io()).map(new Func1<Integer, Long>() {@Overridepublic Long call(Integer t) {try {
                    Thread.sleep(1000); //耗時的操作} catch (InterruptedException e) {
                    e.printStackTrace();
                }return (long) (t * 2);
            }
        });
observable.subscribe(new Subscriber<Long>() {@Overridepublic void onCompleted() {
        System.out.println("onCompleted");
    }@Overridepublic void onError(Throwable e) {
        System.out.println("error" + e);
    }@Overridepublic void onNext(Long result) {
        System.out.println("result = " + result);
    }
});
System.out.println("other...");

Func1中以異步的方式執行了一個耗時的操作,Subscriber(觀察者)被訂閱到Observable(被觀察者)中,當耗時操作執行完會回調Subscriber中的onNext方法。

其中的異步方式是在subscribeOn(Schedulers.io())中指定的,Schedulers.io()可以理解為每次執行耗時操作都啟動一個新的線程。

結構上其實和CompletableFuture很像,都是異步的執行一個耗時的操作,然后在有結果的時候主動告訴我結果。那我們還需要RxJava干嘛,不知道你有沒有注意,上面的例子中其實提供2條數據流[1,2],并且處理完任何一個都會主動告訴我,當然這只是它其中的一項功能,RxJava還有很多好用的功能,在下面的內容會進行介紹。

異步觀察者模式

上面這段代碼有沒有發現特別像設計模式中的:觀察者模式;首先提供一個被觀察者Observable,然后把觀察者Subscriber添加到了被觀察者列表中;

RxJava中一共提供了四種角色:Observable、Observer、Subscriber、Subjects

Observables和Subjects是兩個被觀察者,Observers和Subscribers是觀察者;

當然我們也可以查看一下源碼,看一下jdk中的Observer和RxJava的Observer

jdk中的Observer:

public interface Observer {void update(Observable o, Object arg);
}

RxJava的Observer:

public interface Observer<T> {void onCompleted();void onError(Throwable e);void onNext(T t);
}

同時可以發現Subscriber是implements Observer的:

public abstract class Subscriber<T> implements Observer<T>, Subscription

可以發現RxJava中在Observer中引入了2個新的方法:onCompleted()和onError()

onCompleted():即通知觀察者Observable沒有更多的數據,事件隊列完結
onError():在事件處理過程中出異常時,onError()會被觸發,同時隊列自動終止,不允許再有事件發出。

正是因為RxJava提供了同步和異步兩種方式進行事件的處理,個人覺得異步的方式更能體現RxJava的價值,所以這里給他命名為異步觀察者模式

好了,下面正式介紹RxJava的那些靈活的操作符,這里僅僅是簡單的介紹和簡單的實例,具體用在什么場景下,會在以后的文章中介紹

Maven引入

<dependency><groupId>io.reactivex</groupId><artifactId>rxjava</artifactId><version>1.2.4</version></dependency>

創建Observable

1.create()創建一個Observable,并為它定義事件觸發規則

Observable<Integer> observable = Observable
            .create(new Observable.OnSubscribe<Integer>() {@Overridepublic void call(Subscriber<? super Integer> observer) {for (int i = 0; i < 5; i++) {
                        observer.onNext(i);
                    }
                    observer.onCompleted();
                }
            });
observable.subscribe(new Observer<Integer>() {...});

2.from()可以從一個列表中創建一個Observable,Observable將發射出列表中的每一個元素

List<Integer> items = new ArrayList<Integer>();for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<Integer> observable = Observable.from(items);
observable.subscribe(new Observer<Integer>() {...});

3.just()將傳入的參數依次發送出來

Observable<Integer> observable = Observable.just(1, 2, 3);
observable.subscribe(new Observer<Integer>() {...});

過濾Observable

1.filter()來過濾我們觀測序列中不想要的值

List<Integer> items = new ArrayList<Integer>();for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<Integer> observable = Observable.from(items).filter(new Func1<Integer, Boolean>() {@Overridepublic Boolean call(Integer t) {return t == 1;
            }
        });
observable.subscribe(new Observer<Integer>() {...});

2.take()和taskLast()分別取前幾個元素和后幾個元素

List<Integer> items = new ArrayList<Integer>();for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<Integer> observable = Observable.from(items).take(3);
observable.subscribe(new Observer<Integer>() {...});
Observable<Integer> observable = Observable.from(items).takeLast(2);

3.distinct()和distinctUntilChanged()

distinct()過濾掉重復的值

List<Integer> items = new ArrayList<Integer>();
items.add(1);
items.add(10);
items.add(10);
Observable<Integer> observable = Observable.from(items).distinct();
observable.subscribe(new Observer<Integer>() {...});

distinctUntilChanged()列發射一個不同于之前的一個新值時讓我們得到通知

List<Integer> items = new ArrayList<Integer>();
items.add(1);
items.add(100);
items.add(100);
items.add(200);
Observable<Integer> observable = Observable.from(items).distinctUntilChanged();
observable.subscribe(new Observer<Integer>() {...});

4.first()和last()分別取***個元素和***一個元素

List<Integer> items = new ArrayList<Integer>();for (int i = 0; i < 5; i++) {
    items.add(i);
}// Observable<Integer> observable = Observable.from(items).first();Observable<Integer> observable = Observable.from(items).last();
observable.subscribe(new Observer<Integer>() {...});

5.skip()和skipLast()分別從前或者后跳過幾個元素

List<Integer> items = new ArrayList<Integer>();for (int i = 0; i < 5; i++) {
    items.add(i);
}// Observable<Integer> observable = Observable.from(items).skip(2);Observable<Integer> observable = Observable.from(items).skipLast(2);
observable.subscribe(new Observer<Integer>() {...});

6.elementAt()取第幾個元素進行發射

List<Integer> items = new ArrayList<Integer>();for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<Integer> observable = Observable.from(items).elementAt(2);
observable.subscribe(new Observer<Integer>() {...});

7.sample()指定發射間隔進行發射

List<Integer> items = new ArrayList<Integer>();for (int i = 0; i < 50000; i++) {
    items.add(i);
}
Observable<Integer> observable = Observable.from(items).sample(1,TimeUnit.MICROSECONDS);
observable.subscribe(new Observer<Integer>() {...});

8.timeout()設定的時間間隔內如果沒有得到一個值則發射一個錯誤

List<Integer> items = new ArrayList<Integer>();for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<Integer> observable = Observable.from(items).timeout(1,TimeUnit.MICROSECONDS);
observable.subscribe(new Observer<Integer>() {...onError()...});

9.debounce()在一個指定的時間間隔過去了仍舊沒有發射一個,那么它將發射***的那個

List<Integer> items = new ArrayList<Integer>();for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<Integer> observable = Observable.from(items).debounce(1,TimeUnit.MICROSECONDS);
observable.subscribe(new Observer<Integer>() {...});

轉換Observable

1.map()接收一個指定的Func對象然后將它應用到每一個由Observable發射的值上

List<Integer> items = new ArrayList<Integer>();for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<Integer> observable = Observable.from(items).map(new Func1<Integer, Integer>() {@Overridepublic Integer call(Integer t) {return t * 2;
            }
        });
observable.subscribe(new Observer<Integer>() {...});

2.flatMap()函數提供一種鋪平序列的方式,然后合并這些Observables發射的數據

final Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(3));List<Integer> items = new ArrayList<Integer>();for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<Integer> observable = Observable.from(items).flatMap(new Func1<Integer, Observable<? extends Integer>>() {
            @Overridepublic Observable<? extends Integer> call(Integer t) {List<Integer> items = new ArrayList<Integer>();
                items.add(t);
                items.add(99999);return Observable.from(items).subscribeOn(scheduler);
            }
        });
observable.subscribe(new Observer<Integer>() {...});

重要的一點提示是關于合并部分:它允許交叉。這意味著flatMap()不能夠保證在最終生成的Observable中源Observables確切的發射
順序。

3.concatMap()函數解決了flatMap()的交叉問題,提供了一種能夠把發射的值連續在一起的鋪平函數,而不是合并它們。
示例代碼同上,將flatMap替換為concatMap,輸出的結果來看是有序的

4.switchMap()和flatMap()很像,除了一點:每當源Observable發射一個新的數據項(Observable)時,它將取消訂閱并停止監視之前那個數據項產生的Observable,并開始監視當前發射的這一個。
示例代碼同上,將flatMap替換為switchMap,輸出的結果只剩***一個值

5.scan()是一個累積函數,對原始Observable發射的每一項數據都應用一個函數,計算出函數的結果值,并將該值填充回可觀測序列,等待和下一次發射的數據一起使用。

List<Integer> items = new ArrayList<Integer>();for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<Integer> observable = Observable.from(items).scan(new Func2<Integer, Integer, Integer>() {

            @Override             public Integer call(Integer t1, Integer t2) {
                System.out.println(t1 + "+" + t2);return t1 + t2;
            }
        });
observable.subscribe(new Observer<Integer>() {...});

6.groupBy()來分組元素

List<Integer> items = new ArrayList<Integer>();for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<GroupedObservable<Integer, Integer>> observable = Observable
                .from(items).groupBy(new Func1<Integer, Integer>() {@Overridepublic Integer call(Integer t) {return t % 3;
                    }
                });
observable.subscribe(new Observer<GroupedObservable<Integer, Integer>>() {@Overridepublic void onNext(final GroupedObservable<Integer, Integer> t) {
            t.subscribe(new Action1<Integer>() {@Overridepublic void call(Integer value) {
                    System.out.println("key:" + t.getKey()+ ", value:" + value);
                }
            });

});

7.buffer()函數將源Observable變換一個新的Observable,這個新的Observable每次發射一組列表值而不是一個一個發射。

List<Integer> items = new ArrayList<Integer>();for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<List<Integer>> observable = Observable.from(items).buffer(2);
observable.subscribe(new Observer<List<Integer>>() {...});

8.window()函數和 buffer()很像,但是它發射的是Observable而不是列表

List<Integer> items = new ArrayList<Integer>();for (int i = 0; i < 5; i++) {
    items.add(i);
}
Observable<Observable<Integer>> observable = Observable.from(items).window(2);
observable.subscribe(new Observer<Observable<Integer>>() {@Overridepublic void onNext(Observable<Integer> t) {
        t.subscribe(new Action1<Integer>() {@Overridepublic void call(Integer t) {
                System.out.println("this Action1 = " + this+ ",result = " + t);
            }
        });//onCompleted和onError});

9.cast()它將源Observable中的每一項數據都轉換為新的類型,把它變成了不同的Class

List<Father> items = new ArrayList<Father>();
items.add(new Son());
items.add(new Son());
items.add(new Father());
items.add(new Father());
Observable<Son> observable = Observable.from(items).cast(Son.class);
observable.subscribe(new Observer<Son>() {...});class Father {
}class Son extends Father {
}

組合Observables

1.merge()方法將幫助你把兩個甚至更多的Observables合并到他們發射的數據項里

List<Integer> items1 = new ArrayList<Integer>();for (int i = 0; i < 5; i++) {
    items1.add(i);
}
List<Integer> items2 = new ArrayList<Integer>();for (int i = 5; i < 10; i++) {
    items2.add(i);
}
Observable<Integer> observable1 = Observable.from(items1);
Observable<Integer> observable2 = Observable.from(items2);
Observable<Integer> observableMerge = Observable.merge(observable1,observable2);
observable.subscribe(new Observer<Integer>() {...});

2.zip()合并兩個或者多個Observables發射出的數據項,根據指定的函數 Func* 變換它們,并發射一個新值

List<Integer> items1 = new ArrayList<Integer>();for (int i = 0; i < 5; i++) {
    items1.add(i);
}
List<Integer> items2 = new ArrayList<Integer>();for (int i = 5; i < 10; i++) {
    items2.add(i);
}
Observable<Integer> observable1 = Observable.from(items1);
Observable<Integer> observable2 = Observable.from(items2);
Observable<Integer> observableZip = Observable.zip(observable1,
        observable2, new Func2<Integer, Integer, Integer>() {
            @Override             public Integer call(Integer t1, Integer t2) {return t1 * t2;
            }
        });
observable.subscribe(new Observer<Integer>() {...});

3.combineLatest()把兩個Observable產生的結果進行合并,這兩個Observable中任意一個Observable產生的結果,都和另一個Observable***產生的結果,按照一定的規則進行合并。

Observable<Long> observable1 = Observable.interval(1000,TimeUnit.MILLISECONDS);
Observable<Long> observable2 = Observable.interval(1000,TimeUnit.MILLISECONDS);
Observable.combineLatest(observable1, observable2,new Func2<Long, Long, Long>() {@Overridepublic Long call(Long t1, Long t2) {
                System.out.println("t1 = " + t1 + ",t2 = " + t2);return t1 + t2;
            }
        }).subscribe(new Observer<Long>() {...});
Thread.sleep(100000);

4.join()類似combineLatest(),但是join操作符可以控制每個Observable產生結果的生命周期,在每個結果的生命周期內,可以與另一個Observable產生的結果按照一定的規則進行合并

Observable<Long> observable1 = Observable.interval(1000,
                TimeUnit.MILLISECONDS);
        Observable<Long> observable2 = Observable.interval(1000,
                TimeUnit.MILLISECONDS);
        observable1.join(observable2, new Func1<Long, Observable<Long>>() {@Overridepublic Observable<Long> call(Long t) {
                System.out.println("left=" + t);return Observable.just(t).delay(1000, TimeUnit.MILLISECONDS);
            }
        }, new Func1<Long, Observable<Long>>() {@Overridepublic Observable<Long> call(Long t) {
                System.out.println("right=" + t);return Observable.just(t).delay(1000, TimeUnit.MILLISECONDS);
            }
        }, new Func2<Long, Long, Long>() {@Overridepublic Long call(Long t1, Long t2) {return t1 + t2;
            }
        }).subscribe(new Observer<Long>() {@Overridepublic void onCompleted() {
                System.out.println("Observable  completed");
            }@Overridepublic void onError(Throwable e) {
                System.out.println("Oh,no!  Something   wrong   happened!");
            }@Overridepublic void onNext(Long t) {
                System.out.println("[result=]" + t);
            }
        });

        Thread.sleep(100000);

5.switchOnNext()把一組Observable轉換成一個Observable,對于這組Observable中的每一個Observable所產生的結果,如果在同一個時間內存在兩個或多個Observable提交的結果,只取***一個Observable提交的結果給訂閱者

Observable<Observable<Long>> observable = Observable.interval(2, TimeUnit.SECONDS)
        .map(new Func1<Long, Observable<Long>>() {@Overridepublic Observable<Long> call(Long aLong) {return Observable.interval(1, TimeUnit.MILLISECONDS).take(5);
            }
        }).take(2);

Observable.switchOnNext(observable).subscribe(new Observer<Long>() {...});
Thread.sleep(1000000);

6.startWith()在Observable開始發射他們的數據之前,startWith()通過傳遞一個參數來先發射一個數據序列

Observable.just(1000, 2000).startWith(1, 2).subscribe(new Observer<Integer>() {...})

主要對rxjava進行了簡單的介紹,從異步編程這個角度對rxjava進行了分析;并且針對Observable的過濾,轉換,組合的API進行了簡單的介紹,當然我們更關心的是rxjava有哪些應用場景。

關于怎么理解異步編程RxJava就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

嘉鱼县| 博湖县| 内乡县| 枞阳县| 武平县| 汝南县| 伊金霍洛旗| 商都县| 隆尧县| 柏乡县| 祥云县| 云阳县| 始兴县| 新河县| 那坡县| 中西区| 上栗县| 宁乡县| 昂仁县| 台东市| 玉环县| 潍坊市| 屏山县| 邢台县| 邢台市| 兴安盟| 汉沽区| 博野县| 凉山| 水城县| 阳西县| 石棉县| 洪湖市| 陆河县| 黑山县| 盈江县| 乐业县| 固安县| 新田县| 赞皇县| 竹溪县|