91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

flink將數據錄入數據庫

發布時間:2021-08-31 18:40:56 來源:億速云 閱讀:211 作者:chen 欄目:移動開發

本篇內容介紹了“flink將數據錄入數據庫”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

//主類
package flink.streaming
import java.util.Properties
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.CheckpointingMode
object StreamingTest {
  def main(args: Array[String]): Unit = {
    val kafkaProps = new Properties()
    //kafka的一些屬性
    kafkaProps.setProperty("bootstrap.servers", "bigdata01:9092")
    //所在的消費組
    kafkaProps.setProperty("group.id", "group2")
    //獲取當前的執行環境
    val evn = StreamExecutionEnvironment.getExecutionEnvironment
    //evn.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //kafka的consumer,test1是要消費的topic
    val kafkaSource = new FlinkKafkaConsumer[String]("test1",new SimpleStringSchema,kafkaProps)
    //kafkaSource.assignTimestampsAndWatermarks(assigner)
    //設置從最新的offset開始消費
    //kafkaSource.setStartFromGroupOffsets()
    kafkaSource.setStartFromLatest()
    //自動提交offset
    kafkaSource.setCommitOffsetsOnCheckpoints(true)
    
    //flink的checkpoint的時間間隔
    //evn.enableCheckpointing(2000)
    //添加consumer
    val stream = evn.addSource(kafkaSource)
    evn.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE)
    //stream.setParallelism(3)
    val text = stream.flatMap{ _.toLowerCase().split(" ")filter { _.nonEmpty} }
          .map{(_,1)}
          .keyBy(0)
          .timeWindow(Time.seconds(5))
          .sum(1)
          .map(x=>{(x._1,(new Integer(x._2)))})
     //text.print()
     //啟動執行    
     
     text.addSink(new Ssinks())
     
    evn.execute("kafkawd")  
    
  }
}
//自定義sink
package flink.streaming
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.DriverManager
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.configuration.Configuration
class Ssinks extends RichSinkFunction[(String,Integer)]{
  
      var conn:Connection=_;
      var pres:PreparedStatement = _;
      var username = "root";
      var password = "123456";
      var dburl = "jdbc:mysql://192.168.6.132:3306/hgs?useUnicode=true&characterEncoding=utf-8&useSSL=false";
      var sql = "insert into words(word,count) values(?,?)";
  override def invoke(value:(String, Integer) ) {
    
    pres.setString(1, value._1);
		pres.setInt(2,value._2);
		pres.executeUpdate();
		System.out.println("values :" +value._1+"--"+value._2);
  }
  
  override def open( parameters:Configuration) {
		Class.forName("com.mysql.jdbc.Driver");
		conn = DriverManager.getConnection(dburl, username, password);
		pres = conn.prepareStatement(sql);
		super.close()
	}
  
	override def close() {
	  pres.close();
	  conn.close();
	}
}

“flink將數據錄入數據庫”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

武安市| 南城县| 鞍山市| 阿坝县| 南郑县| 德州市| 乳源| 什邡市| 进贤县| 阿拉尔市| 宣威市| 桦南县| 三台县| 台北市| 浮梁县| 通山县| 米泉市| 丰台区| 星子县| 黎川县| 淅川县| 新田县| 弋阳县| 贵港市| 郸城县| 浦县| 华安县| 新巴尔虎右旗| 静海县| 中宁县| 波密县| 鹤峰县| 壶关县| 武功县| 富阳市| 怀安县| 宜州市| 桓仁| 广元市| 三江| 乌拉特前旗|