91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spring響應式編程實例分析

發布時間:2022-07-15 13:51:02 來源:億速云 閱讀:149 作者:iii 欄目:開發技術

今天小編給大家分享一下Spring響應式編程實例分析的相關知識點,內容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。

1. 前言

了解響應式編程,首先我們需要了解函數式操作和Stream的操作,下面我們簡單的復習一下嘍。

1.1 常用函數式編程

函數式接口中

我們先來回顧一下Java中的函數式接口。常見的有以下幾種

  • Consumer 一個輸入,無輸出

  • Supplier  無輸入,有輸出

  • Function<T,R>  輸入T,輸出R

  • BiFunction<T,U,R> 輸入T,U 輸出R

  • Predicate  有輸入,輸出boolean類型

上面的簡單函數式接口示例如下:

Consumer consumer = (i)-> System.out.println("this is " + i);
consumer.accept("consumer");

Supplier supplier  = () -> "this is supplier";
System.out.println(supplier.get());

Function<Integer,Integer> function = (i) -> i*i;
System.out.println(function.apply(8));

BiFunction<Integer,Integer,String> biFunction = (i,j)-> i+"*"+j+"="+i*j;
System.out.println(biFunction.apply(8,8));

Predicate<Integer> predicate = (i) -> i.intValue()>3;
System.out.println(predicate.test(5));

其執行結果如下:

this is consumer
this is supplier
64
8*8=64
true

1.2 Stream操作

對Stream進行操作,主要有幾個關鍵點:

  • 生成流

  • 流的中間操作其中中間操作可以有多個,中間操作會返回一個新的流(如 map ,filter,sorted等),然后交給下一個流方法使用。

  • 流的終結操作終結操作只有一個。終結操作執行后,流就到了終止狀態,無法被操作 (如forEach,toArray , findFirst 等)。

創建流的示例:

String[] strArray = {"ss","ss","","sdffg"};

Arrays.stream(strArray).forEach(System.out::println);
Arrays.asList(strArray).stream().forEach(System.out::println);
Stream.of(strArray).forEach(System.out::println);
Stream.iterate(1,(i) -> i+1).limit(10).forEach(System.out::println);
Stream.generate(() -> new Random().nextInt(10)).limit(10).forEach(System.out::println);

簡單的流處理示例:

String[] strArray1 = {"ss","ss","","sdffg","bca-de","fff"};
String collect = Stream.of(strArray1)
        .filter(i -> !i.isEmpty())//過濾空字符串
        .sorted() //排序
        .limit(1) //只取第一個元素
        .map(i -> i.replace("-", ""))//替換 "-"
        .flatMap(i -> Stream.of(i.split("")))//將字符拆成字符數組
        .sorted() //排序
        .collect(Collectors.joining());//將字符拼接組合到一起
System.out.println(collect);//最后輸出abcde

2. Java響應式編程

響應式編程會用到一個發布者和一個訂閱者,然后通過訂閱關系完成數據流的傳輸。訂閱關系中可以處理一些背壓問題,即調節消費者與生產者之間的供需平衡,讓整個程序達到最大效率。

Spring響應式編程實例分析

Java9中java.util.concurrent.Flow接口提供響應式流編程類似的功能。

下面我們實現一個基于Java 響應式編程的示例:

其中有三個簡單步驟:

  • 建立生產者

  • 構建消費者

  • 消費者訂閱生產者

  • 生產者生產內容

SubmissionPublisher publisher = new SubmissionPublisher<>();//建立生產者
Flow.Subscriber subscriber = new Flow.Subscriber() {...};//建立消費者 (其中的實現放到下面)
publisher.subscribe(subscriber);//訂閱關系
for (int i = 0; i < 10; i++) {
 publisher.submit("test reactive java : " +i); //生產者生產內容
}

消費者全部代碼如下:

Flow.Subscriber subscriber = new Flow.Subscriber() {
    Flow.Subscription subscription;
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("Subscription establish first ");
        this.subscription = subscription;
        this.subscription.request(1);
    }
    @Override
    public void onNext(Object item) {
        subscription.request(10);
        System.out.println("receive :  "+ item);
    }
    @Override
    public void onError(Throwable throwable) {
        System.out.println(" onError ");
    }
    @Override
    public void onComplete() {
        System.out.println(" onComplete ");
    }
};

其中onSubscribe方法表示建立訂閱關系

onNext接受數據,并請求生產者的數據。

onError,onComplete則是error或者完成之后的處理方法。

帶有中間處理器的響應式流

Reactive Stream 通常會基于如下的模型:

Spring響應式編程實例分析

下面我們實現一個帶有中間處理功能的響應式模型:

下面的Processor 既有發布者,又有訂閱者:

public class ReactiveProcessor extends SubmissionPublisher implements Flow.Subscriber {
    private Flow.Subscription subscription;
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println( Thread.currentThread().getName() +  " Reactive processor establish connection ");
        this.subscription = subscription;
        this.subscription.request(1);
    }

    @Override
    public void onNext(Object item) {
        System.out.println(Thread.currentThread().getName() + " Reactive processor receive data: "+ item);
        this.submit(item.toString().toUpperCase());
        this.subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("Reactive processor error ");
        throwable.printStackTrace();
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
        System.out.println(Thread.currentThread().getName() + " Reactive processor receive data complete ");
    }
}

如上中間處理器訂閱發布者, 同時消費者再訂閱中間處理器。中間處理器也可以調節發布訂閱的生產消費速率。

SubmissionPublisher publisher = new SubmissionPublisher<>(); //創建生產者
ReactiveProcessor reactiveProcessor = new ReactiveProcessor(); // 創建中間處理器
publisher.subscribe(reactiveProcessor); //中間處理器訂閱生產者
Flow.Subscriber subscriber = new Flow.Subscriber() {...}; //創建消費者
reactiveProcessor.subscribe(subscriber); //消費者訂閱中間處理器
for (int i = 0; i < 10; i++) {
    publisher.submit("test reactive java : " +i); //生產者生產數據
}

通過上述生產者-> 中間處理器->消費者, 可以將生產者生產的數據全部變成大寫,然后再發送給最終的消費者。

以上式Java中的reactive 編程示例。Java會不同線程來分別處理消費者與生產者的消息處理

3. Reactor

Reactor中兩個比較關鍵的對象式Flux和Mono, 整個Spring的響應式編程均式基于projectreactor項目。Reactor是響應式編程的依賴,主要是基于JVM構建非阻塞程序。

根據Reactor的介紹,此類響應式編程的的三方庫(Reactor)主要是解決一些JVM經典異步編程中的一些缺點,并且還可以專注于一些新的特性,如下:

  • 可組合性與可讀性 (Composability and readability)

  • 可以使用豐富的運算操作符將數據作為流進行操作

  • 訂閱之前,不會有任何事

  • 背壓特性(Backpressure ),可以理解為消費者可以向生產者發送產出率過高的信號,從而調整生產速率。或者消費者可以選擇一次性拉去一捆數據進行消費。

  • 于并發無關的高度抽象的高級功能

其中有這么一段解釋,可以形象的說明響應式編程。

Reactive的程序可以想象成車間的流水線,reactor既是流水線上的傳送帶,又是處理工作站。原料從一個原始的生產者出發,最終成為產品被推總給消費者。

3.1 Flux & Mono

下面我們介紹一下Flux和Mono。

在Reactor中Flux和Mono均是Publisher,即生產者。兩者也有不同。Flux對象表示0到N個異步的響應序列,而Mono只代表0個(empty)或者1個結果。

Reactor官網上介紹的Flux示意如下:

Spring響應式編程實例分析

Mono示意如下:

Spring響應式編程實例分析

3.2 Flux Mono創建與使用

我們也可以單獨引用其依賴。

使用maven依賴

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId> 
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId> 
        <scope>test</scope>
    </dependency>
</dependencies>

Mono創建

分別創建空Mono和一個包含一個String的Mono,并由消費者消費打印。

Mono.empty().subscribe(System.out::println);
Mono.just("Hello Mono Java North").subscribe(System.out::print);

Flux創建

Flux創建有如下的一些方法,

  • just(通過不定參數創建)

  • range(從某個整數開始,往后的整數數量)

  • fromArray,fromIterable,fromStream,從名稱上就可以看出來,通過數組,迭代器,Stream流創建Flux

下面式一些Java代碼示例

Flux.just(1,2,3,4,5).subscribe(System.out::print);
Flux.range(1,20).subscribe(System.out::print);
Flux.fromArray(new String[]{"a1","a2","a3","a4","a5","a6"}).skip(2).subscribe(System.out::print);
Flux.fromIterable(Arrays.asList(1,2,3,4,5,6,7)).subscribe(System.out::println);
Flux.fromStream(Stream.of(Arrays.asList(1,2,3,4,5,6,7))).subscribe(System.out::print);

我們再舉一個generate的例子

public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)

如上代碼所示,generate需要一個Callable參數,而且是supplier (即沒有輸入值,只有一個輸出)

另一個參數是BiFunction (前面我們也介紹過,需要兩個輸入值,一個輸出值)。BiFunction中的其中一個輸入值是SynchronousSink,下面我們給出一個generate創建Flux的示例。

Flux.generate(
 () -> 0, //提供一個初始狀態值0
 (i, sink) -> {
    sink.next("3*" + i + "=" + 3 * i);//使用初始值去生產一個3的乘法
    if (i > 9) sink.complete();//設置停止條件
    return i + 1;//返回一個新的狀態值,以便在下一次的生產中使用,除非響應序列終止
}).subscribe(System.out::println);

下面我們在看一個Flux嵌套處理示例:

需求:將字符串去空格,并去重,然后排序輸出。

String str = "qa ws ed rf tg yh uj i k ol p za sx dc vf bg hn jm k loi yt ";
Flux.fromArray(str.split(" "))//通過數組創建Flux
    .flatMap(i -> Flux.fromArray(i.split(""))) 
    .distinct() // 去重
    .sort() //排序
    .subscribe(System.out::print); 
    //flatMap與Stream中的flatMap類似,接受Function作為參數,輸入一個值,輸出一個值,此處輸出均為Publisher,

以上就是Flux和Mono的一些簡單介紹,同時Ractor也支持JDK中的FlowPubliser 和FlowSubscriber與 Reactor中的publisher, subscriber的適配等.

4. WebFlux

SpringBoot 2之后支持的Reactive響應式編程。

關于Reactive技術棧和經典的Servlet技術棧對比,Spring官網的這張圖比較清晰。

Spring響應式編程實例分析

Spring響應式編程主要依賴于Reactor第三方庫,即上面講的Flux和Mono的庫。

WebFlux主要有以下幾個要點:

  • 反應式棧web框架

  • 完全異步非阻塞

  • 運行在netty,undertow,Servlet3.1 + 容器

  • 核心反應式庫 Reactor

  • 返回 Flux 或Mono

  • 支持注解和函數編程兩種編程模式

Spring WebFlux示例

下面我們給出幾個SpringBoot 的響應式web示例。

可以去https://start.spring.io/ 新建webflux的項目也可以。

項目中的主要依賴就是spring-boot-starter-webflux

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-webflux</artifactId>
  </dependency>

基于注解的WebFlux

以下是一個最簡單的基于注解的WebFlux

@GetMapping("/hello/mono1")
public Mono<String> mono(){
    return Mono.just("Hello Mono -  Java North");
}

@GetMapping("/hello/flux1")
public Flux<String> flux(){
    return Flux.just("Hello Flux","Hello Java North");
}

基于函數式編程的WebFlux

創建RouterFunction,將其注入到Spring中即可。

@Bean
public RouterFunction<ServerResponse> testRoutes1() {
    return RouterFunctions.route().GET("/flux/function", new HandlerFunction<ServerResponse>() {
        @Override
        public Mono<ServerResponse> handle(ServerRequest request) {
            return ServerResponse.ok().bodyValue("hello web flux , Hello Java North");
        }
    }).build();
}

//上面的方法使用函數式編程替換之后如下
@Bean
public RouterFunction<ServerResponse> testRoutes() {
    return RouterFunctions.route().GET("/flux/function",
         request -> ServerResponse.ok()
                    .bodyValue("Hello web flux , Hello Java North")).build();
}

Flux與Mono的響應式編程延遲示例

下面我們編寫一段返回Mono的響應式Web服務。

@GetMapping("/hello/mono")
public Mono<String> stringMono(){
    Mono<String> from = Mono.fromSupplier(() -> {
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "Hello, Spring Reactive  date time:"+ LocalDateTime.now();
    });
    System.out.println( "thread : " + Thread.currentThread().getName()+ " ===  " + LocalDateTime.now() +"  ==========Mono function complete==========");
    return from;
}

使用postman請求如下, 5秒鐘后返回數據。后臺卻在5秒中之前已經處理完整個方法。

Spring響應式編程實例分析

后臺打印日志:

Spring響應式編程實例分析

再看一組Flux

@GetMapping(value = "/hello/flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> flux1(){
    Flux<String> stringFlux = Flux.fromStream(IntStream.range(1,6).mapToObj(i ->{
        mySleep(1);//表示睡1秒
        return "java north flux" + i + "date time: " +LocalDateTime.now();
    }));
    System.out.println("thread : " + Thread.currentThread().getName()+ " ===  " + LocalDateTime.now() + "  ==========Flux function complete=========");
    return stringFlux;
}

此次使用谷歌瀏覽器請求此服務:

可以發現每隔一秒就會有一條消息被生產出來。

Spring響應式編程實例分析

后臺完成時間同樣是在一開始就完成整個方法:

Spring響應式編程實例分析

通過上述對Flux 與 Mono的例子,可以好好體會一下響應式編程。

以上就是“Spring響應式編程實例分析”這篇文章的所有內容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

丹东市| 罗城| 屯门区| 丹寨县| 自治县| 车险| 宁乡县| 涡阳县| 贞丰县| 盐城市| 金坛市| 项城市| 博客| 永丰县| 洪湖市| 波密县| 枣阳市| 景洪市| 普兰县| 静安区| 五莲县| 横峰县| 苏尼特右旗| 收藏| 九江市| 雷波县| 什邡市| 临朐县| 民权县| 灵丘县| 两当县| 广昌县| 涞水县| 剑川县| 玉林市| 赞皇县| 呈贡县| 扶余县| 班玛县| 新巴尔虎右旗| 介休市|