91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

ApacheFlink中Flink數據流編程是怎樣的

發布時間:2021-09-14 10:41:50 來源:億速云 閱讀:95 作者:柒染 欄目:大數據

這期內容當中小編將會給大家帶來有關ApacheFlink中Flink數據流編程是怎樣的,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

數據源可以通過StreamExecutionEnvironment.addSource(sourceFunction)方式來創建,Flink也提供了一些內置的數據源方便使用,例如readTextFile(path) readFile(),當然,也可以寫一個自定義的數據源(可以通過實現SourceFunction方法,但是無法并行執行。或者實現可以并行實現的接口ParallelSourceFunction或者繼承RichParallelSourceFunction)

入門

首先做一個簡單入門,建立一個DataStreamSourceApp

Scala

object DataStreamSourceApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    socketFunction(env)
        env.execute("DataStreamSourceApp")
  }

  def socketFunction(env: StreamExecutionEnvironment): Unit = {
    val data=env.socketTextStream("192.168.152.45", 9999)
    data.print()
  }
}

這個方法將會從socket中讀取數據,因此我們需要在192.168.152.45中開啟服務:

nc -lk 9999

然后運行DataStreamSourceApp,在服務器上輸入:

iie4bu@swarm-manager:~$ nc -lk 9999
apache
flink
spark

在控制臺中也會輸出:

3> apache
4> flink
1> spark

前面的 341表示的是并行度。可以通過設置setParallelism來操作:

data.print().setParallelism(1)

Java

public class JavaDataStreamSourceApp {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        socketFunction(environment);
        environment.execute("JavaDataStreamSourceApp");
    }
    public static void socketFunction(StreamExecutionEnvironment executionEnvironment){
        DataStreamSource<String> data = executionEnvironment.socketTextStream("192.168.152.45", 9999);
        data.print().setParallelism(1);
    }
}

自定義添加數據源方式

Scala

實現SourceFunction接口

這種方式不能并行處理。

新建一個自定義數據源

class CustomNonParallelSourceFunction extends SourceFunction[Long]{

  var count=1L
  var isRunning = true


  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
    while (isRunning){
      ctx.collect(count)
      count+=1
      Thread.sleep(1000)
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }
}

這個方法首先定義一個初始值count=1L,然后執行的run方法,方法主要是輸出count,并且執行加一操作,當執行cancel方法時結束。調用方法如下:

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //    socketFunction(env)
    nonParallelSourceFunction(env)
    env.execute("DataStreamSourceApp")
  }

  def nonParallelSourceFunction(env: StreamExecutionEnvironment): Unit = {
    val data=env.addSource(new CustomNonParallelSourceFunction())
    data.print()
  }

輸出結果就是控制臺一直輸出count值。

無法設置并行度,除非設置并行度是1.

val data=env.addSource(new CustomNonParallelSourceFunction()).setParallelism(3)

那么控制臺報錯:

Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source
	at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:55)
	at com.vincent.course05.DataStreamSourceApp$.nonParallelSourceFunction(DataStreamSourceApp.scala:16)
	at com.vincent.course05.DataStreamSourceApp$.main(DataStreamSourceApp.scala:11)
	at com.vincent.course05.DataStreamSourceApp.main(DataStreamSourceApp.scala)

繼承ParallelSourceFunction方法

import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}

class CustomParallelSourceFunction extends ParallelSourceFunction[Long]{

  var isRunning = true
  var count = 1L


  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
    while(isRunning){
      ctx.collect(count)
      count+=1
      Thread.sleep(1000)
    }
  }

  override def cancel(): Unit = {
    isRunning=false
  }
}

方法的功能跟上面是一樣的。main方法如下:

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //    socketFunction(env)
//    nonParallelSourceFunction(env)
    parallelSourceFunction(env)


    env.execute("DataStreamSourceApp")
  }

  def parallelSourceFunction(env: StreamExecutionEnvironment): Unit = {
    val data=env.addSource(new CustomParallelSourceFunction()).setParallelism(3)
    data.print()
  }

可以設置并行度3,輸出結果如下:

2> 1
1> 1
2> 1
2> 2
3> 2
3> 2
3> 3
4> 3
4> 3

繼承RichParallelSourceFunction方法

class CustomRichParallelSourceFunction extends RichParallelSourceFunction[Long] {
  var isRunning = true
  var count = 1L


  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
    while (isRunning) {
      ctx.collect(count)
      count += 1
      Thread.sleep(1000)
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }
}
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //    socketFunction(env)
    //    nonParallelSourceFunction(env)
//    parallelSourceFunction(env)
    richParallelSourceFunction(env)

    env.execute("DataStreamSourceApp")
  }

  def richParallelSourceFunction(env: StreamExecutionEnvironment): Unit = {
    val data = env.addSource(new CustomRichParallelSourceFunction()).setParallelism(3)
    data.print()
  }

Java

實現SourceFunction接口

import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class JavaCustomNonParallelSourceFunction implements SourceFunction<Long> {

    boolean isRunning = true;
    long count = 1;

    @Override
    public void run(SourceFunction.SourceContext ctx) throws Exception {
        while (isRunning) {
            ctx.collect(count);
            count+=1;
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning=false;
    }
}
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//        socketFunction(environment);
        nonParallelSourceFunction(environment);
        environment.execute("JavaDataStreamSourceApp");

    }

    public static void nonParallelSourceFunction(StreamExecutionEnvironment executionEnvironment){
        DataStreamSource data = executionEnvironment.addSource(new JavaCustomNonParallelSourceFunction());
        data.print().setParallelism(1);
    }

當設置并行度時:

        DataStreamSource data = executionEnvironment.addSource(new JavaCustomNonParallelSourceFunction()).setParallelism(2);

那么報錯異常:

Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source
	at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:55)
	at com.vincent.course05.JavaDataStreamSourceApp.nonParallelSourceFunction(JavaDataStreamSourceApp.java:16)
	at com.vincent.course05.JavaDataStreamSourceApp.main(JavaDataStreamSourceApp.java:10)

實現ParallelSourceFunction接口

import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;

public class JavaCustomParallelSourceFunction implements ParallelSourceFunction<Long> {

    boolean isRunning = true;
    long count = 1;

    @Override
    public void run(SourceContext ctx) throws Exception {
        while (isRunning) {
            ctx.collect(count);
            count+=1;
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning=false;
    }
}
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//        socketFunction(environment);
//        nonParallelSourceFunction(environment);
        parallelSourceFunction(environment);

        environment.execute("JavaDataStreamSourceApp");
    }

    public static void parallelSourceFunction(StreamExecutionEnvironment executionEnvironment){
        DataStreamSource data = executionEnvironment.addSource(new JavaCustomParallelSourceFunction()).setParallelism(2);
        data.print().setParallelism(1);
    }

可以設置并行度,輸出結果:

1
1
2
2
3
3
4
4
5
5

繼承抽象類RichParallelSourceFunction

public class JavaCustomRichParallelSourceFunction extends RichParallelSourceFunction<Long> {

    boolean isRunning = true;
    long count = 1;

    @Override
    public void run(SourceContext ctx) throws Exception {
        while (isRunning) {
            ctx.collect(count);
            count+=1;
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning=false;
    }
}
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//        socketFunction(environment);
//        nonParallelSourceFunction(environment);
//        parallelSourceFunction(environment);
        richpParallelSourceFunction(environment);
        environment.execute("JavaDataStreamSourceApp");
    }

    public static void richpParallelSourceFunction(StreamExecutionEnvironment executionEnvironment){
        DataStreamSource data = executionEnvironment.addSource(new JavaCustomRichParallelSourceFunction()).setParallelism(2);
        data.print().setParallelism(1);
    }

輸出結果:

1
1
2
2
3
3
4
4
5
5
6
6

SourceFunction  ParallelSourceFunction  RichParallelSourceFunction類之間的關系

上述就是小編為大家分享的ApacheFlink中Flink數據流編程是怎樣的了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

岢岚县| 阿拉善盟| 南皮县| 鄂州市| 牟定县| 楚雄市| 秭归县| 泰和县| 磐安县| 时尚| 巴塘县| 密山市| 吴忠市| 东阿县| 彰化市| 平塘县| 富裕县| 红原县| 岢岚县| 万安县| 崇左市| 宝坻区| 绥化市| 老河口市| 白银市| 长葛市| 堆龙德庆县| 那曲县| 沾益县| 中宁县| 宜君县| 东莞市| 郎溪县| 抚松县| 伊宁县| 宁远县| 富裕县| 宣汉县| 宜城市| 班玛县| 镇平县|