您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關如何進行flink中的kafka源碼分析,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
最近一直在弄flink sql相關的東西,第一階段的目標是從解決kafka的消費和寫入的問題。不過也有些同學并不是很了解,今天我們來詳細分析一下包的繼承層次。
flink源碼如下:
public class KafkaTableSourceFactory implements StreamTableSourceFactory<Row>{ private ConcurrentHashMap<String, KafkaTableSource> kafkaTableSources = new ConcurrentHashMap<>(); @Override public Map<String, String> requiredContext() { Map<String, String> context = new HashMap<>(); context.put(CONNECTOR_TYPE(), KafkaConnectorDescriptor.CONNECTOR_TYPE); context.put(CONNECTOR_PROPERTY_VERSION(),String.valueOf(KafkaConnectorDescriptor.CONNECTOR_PROPERTY_VERSION)); return context; } @Override public List<String> supportedProperties() { List<String> properties = new ArrayList<>(); properties.add(KafkaConnectorDescriptor.DATABASE_KEY); properties.add(KafkaConnectorDescriptor.TABLE_KEY); return properties; } @Override public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) { //避免頻繁的觸發 是否需要加緩存 KafkaTableSource kafkaTableSource; String dataBase = properties.get(KafkaConnectorDescriptor.DATABASE_KEY); String table = properties.get(KafkaConnectorDescriptor.TABLE_KEY); if (!kafkaTableSources.containsKey(dataBase + table)) { Kafka08UDMPBTableSource.Builder builder = new Kafka08UDMPBTableSource.Builder(); kafkaTableSource = builder .cluster(dataBase) .subject(table) .build(); kafkaTableSources.put(dataBase + table,kafkaTableSource); } else { kafkaTableSource = kafkaTableSources.get(dataBase + table); } return kafkaTableSource; } }
class Kafka08PBTableSource protected(topic: String, properties: Properties, schema: TableSchema, typeInformation: TypeInformation[Row], paramMap: util.LinkedHashMap[String, AnyRef], entryClass: String) extends KafkaTableSource(schema, topic, properties, new PBRowDeserializationSchema(typeInformation, paramMap,entryClass)) { override def createKafkaConsumer(topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row]): FlinkKafkaConsumerBase[Row] = { this.setStartupMode(StartupMode.EARLIEST) new FlinkKafkaConsumer08(topic, deserializationSchema, properties).setStartFromEarliest() } }
下面用戶自定義的kafka的sink類:
class Kafka08UDMPBTableSink (topic: String, properties: Properties, partitioner: Optional[FlinkKafkaPartitioner[Row]], paramMap: util.LinkedHashMap[String, AnyRef], serializationSchema: SerializationSchema[Row], fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]] ) extends KafkaTableSink(topic, properties, partitioner.orElse(new FlinkFixedPartitioner[Row])) { override def createKafkaProducer(topic: String, properties: Properties, serializationSchema: SerializationSchema[Row], partitioner: Optional[FlinkKafkaPartitioner[Row]]): SinkFunction[Row]={ new FlinkKafkaProducer08[Row](topic, serializationSchema, properties, partitioner.orElse(new FlinkFixedPartitioner[Row])) } override def createSerializationSchema(rowSchema: RowTypeInfo) = serializationSchema override def createCopy = new Kafka08UDMPBTableSink(topic, properties, this.partitioner, paramMap, serializationSchema, fieldNames, fieldTypes) override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): KafkaTableSink = { super.configure(this.fieldNames, this.fieldTypes) } override def getFieldNames: Array[String]=this.fieldNames /** Returns the types of the table fields. */ override def getFieldTypes: Array[TypeInformation[_]]=this.fieldTypes override def emitDataStream(dataStream: DataStream[Row]): Unit = { val kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner) dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass, fieldNames)) } }
public class TrackRowDeserializationSchema implements SerializationSchema<Row>, DeserializationSchema<Row> { private static final long serialVersionUID = -2885556750743978636L; /** Type information describing the input type. */ private TypeInformation<Row> typeInfo = null; private LinkedHashMap paraMap; private String inSchema; private String outSchema; private String inClass; private String outClass; }
public class TrackRowFormatFactory extends TableFormatFactoryBase<Row> implements SerializationSchemaFactory<Row>, DeserializationSchemaFactory<Row> { public TrackRowFormatFactory() { super(TrackValidator.FORMAT_TYPE_VALUE, 1, false); } public TrackRowFormatFactory(String type, int version, boolean supportsSchemaDerivation) { super(type, version, supportsSchemaDerivation); } @Override protected List<String> supportedFormatProperties() { final List<String> properties = new ArrayList<>(); properties.add(TrackValidator.FORMAT_IN_SCHEMA); properties.add(TrackValidator.FORMAT_IN_CLASS); properties.add(TrackValidator.FORMAT_OUT_CLASS); properties.add(TrackValidator.FORMAT_OUT_SCHEMA); properties.add(TrackValidator.FORMAT_TYPE_INFORMATION); properties.add(TrackValidator.FORMAT_TYPE_VALUE); return properties; } }
看完上述內容,你們對如何進行flink中的kafka源碼分析有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。