在 Flink 中,可以使用 filter()
函數對數據進行過濾。filter()
函數接收一個 FilterFunction
類型的參數,該參數定義了過濾條件。
以下是一個簡單的示例,演示如何使用 Flink 對數據流進行過濾:
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
public class FlinkFilterExample {
public static void main(String[] args) throws Exception {
// 創建 Flink 執行環境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 從 Kafka 讀取數據
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(consumer);
// 定義過濾條件
FilterFunction<String> filterFunction = new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.contains("filter");
}
};
// 使用 filter 函數過濾數據
DataStream<String> filteredStream = stream.filter(filterFunction);
// 將過濾后的數據寫入 Kafka
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("output-topic", new SimpleStringSchema(), properties);
filteredStream.addSink(producer);
// 執行 Flink 任務
env.execute("Flink Filter Example");
}
}
在上述示例中,我們首先從 Kafka 讀取數據,然后定義了一個過濾條件,該條件只保留包含 “filter” 的字符串。接下來,我們使用 filter()
函數對數據流進行過濾,并將過濾后的數據寫入 Kafka。最后,我們執行 Flink 任務。