您好,登錄后才能下訂單哦!
本篇內容介紹了“flink將數據錄入數據庫”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
//主類 package flink.streaming import java.util.Properties import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.streaming.api.CheckpointingMode object StreamingTest { def main(args: Array[String]): Unit = { val kafkaProps = new Properties() //kafka的一些屬性 kafkaProps.setProperty("bootstrap.servers", "bigdata01:9092") //所在的消費組 kafkaProps.setProperty("group.id", "group2") //獲取當前的執行環境 val evn = StreamExecutionEnvironment.getExecutionEnvironment //evn.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //kafka的consumer,test1是要消費的topic val kafkaSource = new FlinkKafkaConsumer[String]("test1",new SimpleStringSchema,kafkaProps) //kafkaSource.assignTimestampsAndWatermarks(assigner) //設置從最新的offset開始消費 //kafkaSource.setStartFromGroupOffsets() kafkaSource.setStartFromLatest() //自動提交offset kafkaSource.setCommitOffsetsOnCheckpoints(true) //flink的checkpoint的時間間隔 //evn.enableCheckpointing(2000) //添加consumer val stream = evn.addSource(kafkaSource) evn.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE) //stream.setParallelism(3) val text = stream.flatMap{ _.toLowerCase().split(" ")filter { _.nonEmpty} } .map{(_,1)} .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1) .map(x=>{(x._1,(new Integer(x._2)))}) //text.print() //啟動執行 text.addSink(new Ssinks()) evn.execute("kafkawd") } }
//自定義sink package flink.streaming import java.sql.Connection import java.sql.PreparedStatement import java.sql.DriverManager import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.configuration.Configuration class Ssinks extends RichSinkFunction[(String,Integer)]{ var conn:Connection=_; var pres:PreparedStatement = _; var username = "root"; var password = "123456"; var dburl = "jdbc:mysql://192.168.6.132:3306/hgs?useUnicode=true&characterEncoding=utf-8&useSSL=false"; var sql = "insert into words(word,count) values(?,?)"; override def invoke(value:(String, Integer) ) { pres.setString(1, value._1); pres.setInt(2,value._2); pres.executeUpdate(); System.out.println("values :" +value._1+"--"+value._2); } override def open( parameters:Configuration) { Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection(dburl, username, password); pres = conn.prepareStatement(sql); super.close() } override def close() { pres.close(); conn.close(); } }
“flink將數據錄入數據庫”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。