91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

在Spark Streaming job中如何讀取Kafka messages及其offsetRange

發布時間:2021-12-15 11:07:28 來源:億速云 閱讀:104 作者:柒染 欄目:大數據

本篇文章為大家展示了在Spark Streaming job中如何讀取Kafka messages及其offsetRange,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

在Spark Streaming job中讀取Kafka topic(s)中的messages時,有時我們會需要同步記錄下每次讀取的messages的offsetRange。要達到這一目的,下面這兩段代碼(代碼1和代碼2)都是正確的,而且是等價的。

代碼1(正確):

-----------------------

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

messages.foreachRDD(

new Function<JavaPairRDD<String, String>, Void>() {

@Override

public Void call(JavaPairRDD<String, String> rdd) throws Exception {

OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

JavaRDD<String> valueRDD = rdd.values();

long msgNum = processEachRDD(valueRDD, outputFolderPath, definedDuration);

if (msgNum > 0 && zkPathRoot!= null) {

writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);

}

return null;

}

});

代碼2(正確):

-----------------------

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

final AtomicReference<OffsetRange[]> offsetRanges=new AtomicReference();

lines = messages.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {

@Override

public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {

OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

offsetRanges.set(offsets);

return rdd;

}

}).map(new Function<Tuple2<String, String>, String>() {

@Override

public String call(Tuple2<String, String> tuple2) {

return tuple2._2();

}

});

lines.foreachRDD(new Function<JavaRDD<String>, Void>() {

@Override

public Void call(JavaRDD<String> rdd) throws Exception {

long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);

if (msgNum > 0 && zkPathRoot!= null) {

OffsetRange[] offsets = offsetRanges.get();

writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);

}

return null;

}

});

但是要注意,下面這兩段代碼(代碼3和代碼4)是錯誤的,它們都會拋出一個exception:java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges

代碼3(錯誤):

-----------------------

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

messages.transform(new Function<JavaPairRDD<String, String>, JavaRDD<String>>() {

@Override

public JavaRDD<String> call(JavaPairRDD<String, String> rdd) throws Exception {

return rdd.values();

}

}).foreachRDD(new Function<JavaRDD<String>, Void>() {

@Override

public Void call(JavaRDD<String> rdd) throws Exception {

long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);

if (msgNum > 0 && zkPathRoot!= null) {

OffsetRange[] offsets = offsetRanges.get();

writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);

}

return null;

}

});

代碼4(錯誤):

-----------------------

JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);

messages.map(new Function<Tuple2<String, String>, String>() {

@Override

public String call(Tuple2<String, String> tuple2) {

return tuple2._2();

}

}).foreachRDD(new Function<JavaRDD<String>, Void>() {

@Override

public Void call(JavaRDD<String> rdd) throws Exception {

long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);

if (msgNum > 0 && zkPathRoot!= null) {

OffsetRange[] offsets = offsetRanges.get();

writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);

}

return null;

}

});

上述內容就是在Spark Streaming job中如何讀取Kafka messages及其offsetRange,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

桑日县| 天峻县| 乐昌市| 富蕴县| 凉山| 农安县| 安陆市| 油尖旺区| 大连市| 内丘县| 满洲里市| 科尔| 南溪县| 百色市| 甘南县| 关岭| 璧山县| 蓬莱市| 将乐县| 石门县| 山阳县| 武山县| 西贡区| 闻喜县| 新竹市| 麦盖提县| 邢台市| 珲春市| 吴江市| 巴彦县| 阿克陶县| 水城县| 沂水县| 九台市| 巴楚县| 阳朔县| 永修县| 西宁市| 内黄县| 台湾省| 长春市|