您好,登錄后才能下訂單哦!
本篇文章為大家展示了在Spark Streaming job中如何讀取Kafka messages及其offsetRange,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
在Spark Streaming job中讀取Kafka topic(s)中的messages時,有時我們會需要同步記錄下每次讀取的messages的offsetRange。要達到這一目的,下面這兩段代碼(代碼1和代碼2)都是正確的,而且是等價的。
代碼1(正確):
-----------------------
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
messages.foreachRDD(
new Function<JavaPairRDD<String, String>, Void>() {
@Override
public Void call(JavaPairRDD<String, String> rdd) throws Exception {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
JavaRDD<String> valueRDD = rdd.values();
long msgNum = processEachRDD(valueRDD, outputFolderPath, definedDuration);
if (msgNum > 0 && zkPathRoot!= null) {
writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);
}
return null;
}
});
代碼2(正確):
-----------------------
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
final AtomicReference<OffsetRange[]> offsetRanges=new AtomicReference();
lines = messages.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
@Override
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
offsetRanges.set(offsets);
return rdd;
}
}).map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);
if (msgNum > 0 && zkPathRoot!= null) {
OffsetRange[] offsets = offsetRanges.get();
writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);
}
return null;
}
});
但是要注意,下面這兩段代碼(代碼3和代碼4)是錯誤的,它們都會拋出一個exception:java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
代碼3(錯誤):
-----------------------
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
messages.transform(new Function<JavaPairRDD<String, String>, JavaRDD<String>>() {
@Override
public JavaRDD<String> call(JavaPairRDD<String, String> rdd) throws Exception {
return rdd.values();
}
}).foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);
if (msgNum > 0 && zkPathRoot!= null) {
OffsetRange[] offsets = offsetRanges.get();
writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);
}
return null;
}
});
代碼4(錯誤):
-----------------------
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
}).foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);
if (msgNum > 0 && zkPathRoot!= null) {
OffsetRange[] offsets = offsetRanges.get();
writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);
}
return null;
}
});
上述內容就是在Spark Streaming job中如何讀取Kafka messages及其offsetRange,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。