91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何進行flink中的kafka源碼分析

發布時間:2021-12-15 09:55:04 來源:億速云 閱讀:131 作者:柒染 欄目:云計算

今天就跟大家聊聊有關如何進行flink中的kafka源碼分析,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

最近一直在弄flink sql相關的東西,第一階段的目標是從解決kafka的消費和寫入的問題。不過也有些同學并不是很了解,今天我們來詳細分析一下包的繼承層次。

如何進行flink中的kafka源碼分析

如何進行flink中的kafka源碼分析

如何進行flink中的kafka源碼分析

如何進行flink中的kafka源碼分析

flink源碼如下:

public class KafkaTableSourceFactory implements StreamTableSourceFactory<Row>{

    private ConcurrentHashMap<String, KafkaTableSource> kafkaTableSources = new ConcurrentHashMap<>();

    @Override
    public Map<String, String> requiredContext() {
        Map<String, String> context = new HashMap<>();
        context.put(CONNECTOR_TYPE(), KafkaConnectorDescriptor.CONNECTOR_TYPE);
        context.put(CONNECTOR_PROPERTY_VERSION(),String.valueOf(KafkaConnectorDescriptor.CONNECTOR_PROPERTY_VERSION));
        return context;
    }

    @Override
    public List<String> supportedProperties() {
        List<String> properties = new ArrayList<>();
        properties.add(KafkaConnectorDescriptor.DATABASE_KEY);
        properties.add(KafkaConnectorDescriptor.TABLE_KEY);
        return properties;
    }

    @Override
    public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
        //避免頻繁的觸發 是否需要加緩存
        KafkaTableSource kafkaTableSource;
        String dataBase = properties.get(KafkaConnectorDescriptor.DATABASE_KEY);
        String table = properties.get(KafkaConnectorDescriptor.TABLE_KEY);
        if (!kafkaTableSources.containsKey(dataBase + table)) {
            Kafka08UDMPBTableSource.Builder builder = new Kafka08UDMPBTableSource.Builder();
            kafkaTableSource = builder
                    .cluster(dataBase)
                    .subject(table)
                    .build();
            kafkaTableSources.put(dataBase + table,kafkaTableSource);
        } else {
            kafkaTableSource = kafkaTableSources.get(dataBase + table);
        }
        return kafkaTableSource;
    }

}
class Kafka08PBTableSource protected(topic: String,
                                     properties: Properties,
                                     schema: TableSchema,
                                     typeInformation: TypeInformation[Row],
                                     paramMap: util.LinkedHashMap[String, AnyRef],
                                     entryClass: String)
  extends KafkaTableSource(schema, topic, properties, new PBRowDeserializationSchema(typeInformation, paramMap,entryClass)) {

  override def createKafkaConsumer(topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row]): FlinkKafkaConsumerBase[Row] = {
    this.setStartupMode(StartupMode.EARLIEST)
    new FlinkKafkaConsumer08(topic, deserializationSchema, properties).setStartFromEarliest()

  }
}

下面用戶自定義的kafka的sink類:

class Kafka08UDMPBTableSink (topic: String,
                              properties: Properties,
                              partitioner: Optional[FlinkKafkaPartitioner[Row]],
                              paramMap: util.LinkedHashMap[String, AnyRef],
                              serializationSchema: SerializationSchema[Row],
                              fieldNames: Array[String],
                              fieldTypes: Array[TypeInformation[_]]
                            ) extends KafkaTableSink(topic, properties, partitioner.orElse(new FlinkFixedPartitioner[Row])) {

  override def createKafkaProducer(topic: String, properties: Properties, serializationSchema: SerializationSchema[Row], partitioner: Optional[FlinkKafkaPartitioner[Row]]): SinkFunction[Row]={
    new FlinkKafkaProducer08[Row](topic, serializationSchema, properties, partitioner.orElse(new FlinkFixedPartitioner[Row]))
  }

  override def createSerializationSchema(rowSchema: RowTypeInfo) = serializationSchema

  override def createCopy = new Kafka08UDMPBTableSink(topic, properties, this.partitioner, paramMap, serializationSchema, fieldNames, fieldTypes)

  override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): KafkaTableSink = {
    super.configure(this.fieldNames, this.fieldTypes)
  }

  override def getFieldNames: Array[String]=this.fieldNames

  /** Returns the types of the table fields. */
  override def getFieldTypes: Array[TypeInformation[_]]=this.fieldTypes


  override def emitDataStream(dataStream: DataStream[Row]): Unit = {
    val kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner)
    dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass, fieldNames))
  }

}
public class TrackRowDeserializationSchema implements SerializationSchema<Row>, DeserializationSchema<Row> {
    private static final long serialVersionUID = -2885556750743978636L;

    /** Type information describing the input type. */
    private TypeInformation<Row> typeInfo = null;

    private LinkedHashMap paraMap;

    private String inSchema;
    private String outSchema;
    private String inClass;
    private String outClass;
}
public class TrackRowFormatFactory extends TableFormatFactoryBase<Row>
        implements SerializationSchemaFactory<Row>, DeserializationSchemaFactory<Row> {

    public TrackRowFormatFactory() {
        super(TrackValidator.FORMAT_TYPE_VALUE, 1, false);
    }

    public TrackRowFormatFactory(String type, int version, boolean supportsSchemaDerivation) {
        super(type, version, supportsSchemaDerivation);
    }

    @Override
    protected List<String> supportedFormatProperties() {
        final List<String> properties = new ArrayList<>();
        properties.add(TrackValidator.FORMAT_IN_SCHEMA);
        properties.add(TrackValidator.FORMAT_IN_CLASS);
        properties.add(TrackValidator.FORMAT_OUT_CLASS);
        properties.add(TrackValidator.FORMAT_OUT_SCHEMA);
        properties.add(TrackValidator.FORMAT_TYPE_INFORMATION);
        properties.add(TrackValidator.FORMAT_TYPE_VALUE);
        return properties;
    }
}

看完上述內容,你們對如何進行flink中的kafka源碼分析有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

边坝县| 桑日县| 晋州市| 阿克苏市| 丹棱县| 光泽县| 稻城县| 松桃| 县级市| 张家川| 吉安市| 报价| 八宿县| 嵊泗县| 东乡族自治县| 彭水| 正阳县| 昌江| 闽侯县| 湟源县| 乌兰察布市| 木里| 巴彦淖尔市| 奎屯市| 葫芦岛市| 漯河市| 灵山县| 三穗县| 嘉义县| 乌拉特中旗| 济宁市| 古丈县| 大足县| 卫辉市| 九江市| 株洲市| 高邮市| 忻州市| 盐边县| 崇义县| 陇川县|