91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink的函數有哪些

發布時間:2021-12-28 17:43:33 來源:億速云 閱讀:138 作者:小新 欄目:大數據

這篇文章主要介紹了Flink的函數有哪些,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

1. Map: 將數據流中的數據進行一個轉化,形成一個新的數據流,消費一個元素,并且產生一個元素

具體代碼實現

package com.wudl.core;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @version v1.0
 * @ProjectName Flinklearning
 * @ClassName WordMap
 * @Description TODO map 算子實例
 * @Date 2020/10/29 10:15
 */

public class WordMap {

    /**
     * @param args
     * Map 函數的用法
     * 映射:將數據流中的數據進行一個轉化,形成一個新的數據流,消費一個元素,并且產生一個元素
     *參數: Lambda 表達式或者,new MapFunction實現類
     * 返回值:DataStream
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setMaxParallelism(1);
        env.socketTextStream("10.204.125.140", 8899)
                .map(new MapFunction<String, String>() {
                    @Override
                    public String map(String s) throws Exception {
                        String[] split = s.split(",");
                        return split[0] + "---" + split[1];
                    }
                }).print();

        env.execute();


    }
}

2. FlatMap:

將數據流中的整體拆分成一個 一個 的個體使用, 消費一個元素并產生零到多個元素
package com.wudl.core;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.List;

/**
 * @version v1.0
 * @ProjectName Flinklearning
 * @ClassName TransformFlatMap
 * @Description TODO FlatMap
 *
 * FlatMap: 是一種扁平的映射,將數據流中的整體拆分成為一個個的個體使用, 消費后的元素產生零到多個元素
 *
 *
 *
 * @Author wudl
 * @Date 2020/10/29 10:46
 *
 *
 * 函數 FlatMap
 * 將數據流中的整體拆分成一個 一個 的個體使用, 消費一個元素并產生零到多個元素
 * 參數: lambda 表達式或者是FlatFunction的實現類
 * 返回值:DataStream
 *
 *
 *
 */

public class TransformFlatMap {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
//        DataStreamSource<List<Integer>> listDs = env.fromCollection(Arrays.asList(
//                Arrays.asList(1, 2, 3),
//                Arrays.asList(3, 4, 5),
//                Arrays.asList(8,9,0)
//        ));


//        listDs.flatMap(new FlatMapFunction<List<Integer>, Integer>() {
//            @Override
//            public void flatMap(List<Integer> list, Collector<Integer> collector) throws Exception {
//
//                for (Integer number : list) {
//                    collector.collect(number + 100);
//                }
//
//            }
//        }).print();

        DataStreamSource<String> strDs = env.socketTextStream("10.204.125.140", 8899);
        strDs.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                String[] split = s.split(",");
                collector.collect(split[0]+split[1]);
            }
        }).print();

        env.execute();

    }

}

第三種:Filter  對數據流的過濾根據指定的規則將滿足條件的(true) 的數據保留, 不瞞住條件的(false) 將丟棄

package com.wudl.core;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @version v1.0
 * @ProjectName Flinklearning
 * @ClassName TransformFilter
 * @Description TODO 流的過濾
 * @Date 2020/11/5 10:26
 */

public class TransformFilter {


    /**
     * 函數中Filter 中過濾
     * 過濾:根據指定的規則將滿足條件的(true) 的數據保留, 不瞞住條件的(false)  將丟棄
     * 返回值:DataStream
     */
    public static void main(String[] args) throws Exception {

        //1.獲取上下文的環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.設置并行度
        env.setParallelism(1);
        //3.獲取數據流
        DataStreamSource<String> SourceDs = env.socketTextStream("10.204.125.140", 8899);
        //4. 過濾數據流
        DataStream<String> filter = SourceDs.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                String[] split = value.split(",");
                return split[1].length() > 3;
            }
        });
        filter.print();
        env.execute();

    }


}

感謝你能夠認真閱讀完這篇文章,希望小編分享的“Flink的函數有哪些”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

兰坪| 揭东县| 印江| 克什克腾旗| 建平县| 屯留县| 大渡口区| 博白县| 鸡泽县| 海口市| 探索| 通海县| 克拉玛依市| 新干县| 荣成市| 会宁县| 十堰市| 宁化县| 商河县| 南康市| 嘉兴市| 焦作市| 太仓市| 沈阳市| 友谊县| 巩留县| 云梦县| 鄯善县| 上蔡县| 屏东市| 屏南县| 华蓥市| 木里| 开原市| 遂平县| 略阳县| 永嘉县| 六安市| 建始县| 驻马店市| 红河县|