您好,登錄后才能下訂單哦!
這篇文章主要講解了“kafka數據源Flink Kafka Consumer分析”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“kafka數據源Flink Kafka Consumer分析”吧!
FlinkKafkaConsumer繼承自RichFunction,具有生命周期方法open()。那么flink是何時調用FlinkKafkaConsumer的open()方法呢?
StreamTask在調用算子程序之前,會執行beforeInvoke()方法,在該方法中會初始化算子的算子并且執行open()方法:
operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
initializeStateAndOpenOperators()方法中循環對算子初始化:
protected void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception { for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) { StreamOperator<?> operator = operatorWrapper.getStreamOperator(); operator.initializeState(streamTaskStateInitializer); operator.open(); } }
kafka source對應的operator為StreamSource,其open()方法為
public void open() throws Exception { super.open(); FunctionUtils.openFunction(userFunction, new Configuration()); }
FunctionUtils的openFunction()即執行算子(要繼承RichFunction)的open()方法:
public static void openFunction(Function function, Configuration parameters) throws Exception{ if (function instanceof RichFunction) { RichFunction richFunction = (RichFunction) function; richFunction.open(parameters); } }
在 StreamTask.beforeInvoke() -> new OperatorChain() -> StreamOperatorFactoryUtil.createOperator(),在OperatorChain的構造函數中,通過工廠類StreamOperatorFactory來創建StreamOperator。kafka source對應的StreamOperatorFactory為SimpleOperatorFactory,createStreamOperator()方法中調用StreamOperator的setup()方法:
public <T extends StreamOperator<OUT>> T createStreamOperator(StreamOperatorParameters<OUT> parameters) { if (operator instanceof AbstractStreamOperator) { ((AbstractStreamOperator) operator).setProcessingTimeService(processingTimeService); } if (operator instanceof SetupableStreamOperator) { ((SetupableStreamOperator) operator).setup( parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); } return (T) operator; }
kafka source對應的StreamOperator為StreamSource,其實現了SetupableStreamOperator接口。其setup方法在父類AbstractUdfStreamOperator:
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) { super.setup(containingTask, config, output); FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext()); }
FunctionUtils.setFunctionRuntimeContext()來給算子設置RuntimeContext。設置的RuntimeContext在AbstractStreamOperator的setup()方法中,為StreamingRuntimeContext:
this.runtimeContext = new StreamingRuntimeContext( environment, environment.getAccumulatorRegistry().getUserMap(), getMetricGroup(), getOperatorID(), getProcessingTimeService(), null, environment.getExternalResourceInfoProvider());
Flink調用FlinkKafkaConsumer的run()方法來生產數據。run()方法的處理邏輯:
①創建KafkaFetcher,來拉取數據
this.kafkaFetcher = createFetcher( sourceContext, subscribedPartitionsToStartOffsets, watermarkStrategy, (StreamingRuntimeContext) getRuntimeContext(), offsetCommitMode, getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP), useMetrics);
②KafkaFetcher的runFetchLoop()中創建KafkaConsumerThread線程來循環拉取kafka數據。KafkaConsumerThread通過KafkaConsumer拉取kafka數據,并交給Handover
if (records == null) { try { records = consumer.poll(pollTimeout); } catch (WakeupException we) { continue; } } try { handover.produce(records); records = null; }
KafkaFetcher通過Handover獲取拉取的kafka數據
while (running) { // this blocks until we get the next records // it automatically re-throws exceptions encountered in the consumer thread final ConsumerRecords<byte[], byte[]> records = handover.pollNext(); // get the records for each topic partition for (KafkaTopicPartitionState<T, TopicPartition> partition : subscribedPartitionStates()) { List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition.getKafkaPartitionHandle()); partitionConsumerRecordsHandler(partitionRecords, partition); } }
③通過SourceContext中的Output<StreamRecord<T>>來發送數據給下一個算子
public void collect(T element) { synchronized (lock) { output.collect(reuse.replace(element)); } }
SourceContext在StreamSource的run()方法中通過StreamSourceContexts.getSourceContext()創建。Output<StreamRecord<T>>在OperatorChain的createOutputCollector()創建,為其返回值。
for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) { @SuppressWarnings("unchecked") RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge); allOutputs.add(new Tuple2<>(output, outputEdge)); }
當有一個輸出時,是RecordWriterOutput;多個時,是CopyingDirectedOutput或DirectedOutput
④單個輸出RecordWriterOutput時,是通過成員屬性RecordWriter實例來輸出。RecordWriter通過StreamTask的createRecordWriterDelegate()創建,RecordWriterDelegate為RecordWriter的代理類,內部持有RecordWriter實例:
public static <OUT> RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> createRecordWriterDelegate( StreamConfig configuration, Environment environment) { List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWrites = createRecordWriters( configuration, environment); if (recordWrites.size() == 1) { return new SingleRecordWriter<>(recordWrites.get(0)); } else if (recordWrites.size() == 0) { return new NonRecordWriter<>(); } else { return new MultipleRecordWriters<>(recordWrites); } } private static <OUT> List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters( StreamConfig configuration, Environment environment) { List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters = new ArrayList<>(); List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(environment.getUserClassLoader()); for (int i = 0; i < outEdgesInOrder.size(); i++) { StreamEdge edge = outEdgesInOrder.get(i); recordWriters.add( createRecordWriter( edge, i, environment, environment.getTaskInfo().getTaskName(), edge.getBufferTimeout())); } return recordWriters; }
outEdgesInOrder來源于StreamGraph中的StreamNode的List<StreamEdge> outEdges。
創建RecordWriter時,根據StreamEdge的StreamPartitioner<?> outputPartitioner的isBroadcast()方法判斷是BroadcastRecordWriter還是ChannelSelectorRecordWriter:
public RecordWriter<T> build(ResultPartitionWriter writer) { if (selector.isBroadcast()) { return new BroadcastRecordWriter<>(writer, timeout, taskName); } else { return new ChannelSelectorRecordWriter<>(writer, selector, timeout, taskName); } }
outputPartitioner是根據上下游節點并行度是否一致來確定:
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) { partitioner = new ForwardPartitioner<Object>(); } else if (partitioner == null) { partitioner = new RebalancePartitioner<Object>(); }
BroadcastRecordWriter和ChannelSelectorRecordWriter最終都會調用成員屬性ResultPartitionWriter targetPartition的flush()方法來輸出數據。ResultPartitionWriter 在ConsumableNotifyingResultPartitionWriterDecorator的decorate()生成。根據對應的ResultPartitionDeploymentDescriptor來判斷是ConsumableNotifyingResultPartitionWriterDecorator還是直接傳入的partitionWriters。ConsumableNotifyingResultPartitionWriterDecorator會把消息直接傳給下個節點消費,通過ResultPartitionConsumableNotifier來通知:
public static ResultPartitionWriter[] decorate( Collection<ResultPartitionDeploymentDescriptor> descs, ResultPartitionWriter[] partitionWriters, TaskActions taskActions, JobID jobId, ResultPartitionConsumableNotifier notifier) { ResultPartitionWriter[] consumableNotifyingPartitionWriters = new ResultPartitionWriter[partitionWriters.length]; int counter = 0; for (ResultPartitionDeploymentDescriptor desc : descs) { if (desc.sendScheduleOrUpdateConsumersMessage() && desc.getPartitionType().isPipelined()) { consumableNotifyingPartitionWriters[counter] = new ConsumableNotifyingResultPartitionWriterDecorator( taskActions, jobId, partitionWriters[counter], notifier); } else { consumableNotifyingPartitionWriters[counter] = partitionWriters[counter]; } counter++; } return consumableNotifyingPartitionWriters; }
partitionWriters通過 NettyShuffleEnvironment的createResultPartitionWriters() -> ResultPartitionFactory的create() 創建。 ResultPartition的輸出是通過成員屬性ResultSubpartition[] subpartitions完成。subpartitions在ResultPartitionFactory的createSubpartitions()生成:
private void createSubpartitions( ResultPartition partition, ResultPartitionType type, BoundedBlockingSubpartitionType blockingSubpartitionType, ResultSubpartition[] subpartitions) { // Create the subpartitions. if (type.isBlocking()) { initializeBoundedBlockingPartitions( subpartitions, partition, blockingSubpartitionType, networkBufferSize, channelManager); } else { for (int i = 0; i < subpartitions.length; i++) { subpartitions[i] = new PipelinedSubpartition(i, partition); } } }
流式任務時,ResultSubpartition為PipelinedSubpartition。
ResultPartitionConsumableNotifier在TaskExecutor的associateWithJobManager()中生成:
private JobTable.Connection associateWithJobManager( JobTable.Job job, ResourceID resourceID, JobMasterGateway jobMasterGateway) { ...... ...... ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier( jobMasterGateway, getRpcService().getExecutor(), taskManagerConfiguration.getTimeout()); ...... ...... }
RpcResultPartitionConsumableNotifier遠程調用JobMaster的scheduleOrUpdateConsumers()方法,傳入ResultPartitionID partitionId
JobMaster通過ExecutionGraph的scheduleOrUpdateConsumers()通知下游消費算子。
這里有兩個關鍵代碼:
①從本算子ExecutionVertex的成員Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions中取出該分區對應的生產消費信息,這些信息存儲在IntermediateResultPartition中;
void scheduleOrUpdateConsumers(ResultPartitionID partitionId) { ....... final IntermediateResultPartition partition = resultPartitions.get(partitionId.getPartitionId()); ....... if (partition.getIntermediateResult().getResultType().isPipelined()) { // Schedule or update receivers of this partition execution.scheduleOrUpdateConsumers(partition.getConsumers()); } else { throw new IllegalArgumentException("ScheduleOrUpdateConsumers msg is only valid for" + "pipelined partitions."); } }
從IntermediateResultPartition取出消費者List<List<ExecutionEdge>> allConsumers;
從ExecutionEdge的ExecutionVertex target的Execution currentExecution中取出執行任務;
②Execution的sendUpdatePartitionInfoRpcCall()方法通過rpc調用TaskExcutor的updatePartitions()方法來執行下游消費者算子
private void sendUpdatePartitionInfoRpcCall( final Iterable<PartitionInfo> partitionInfos) { final LogicalSlot slot = assignedResource; if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final TaskManagerLocation taskManagerLocation = slot.getTaskManagerLocation(); CompletableFuture<Acknowledge> updatePartitionsResultFuture = taskManagerGateway.updatePartitions(attemptId, partitionInfos, rpcTimeout); updatePartitionsResultFuture.whenCompleteAsync( (ack, failure) -> { // fail if there was a failure if (failure != null) { fail(new IllegalStateException("Update to task [" + getVertexWithAttempt() + "] on TaskManager " + taskManagerLocation + " failed", failure)); } }, getVertex().getExecutionGraph().getJobMasterMainThreadExecutor()); } }
TaskExecutor的updatePartitions()來更新分區信息。如果之前InputChannel是未知的,則進行更新。SimpleInputGate的updateInputChannel():
public void updateInputChannel( ResourceID localLocation, NettyShuffleDescriptor shuffleDescriptor) throws IOException, InterruptedException { synchronized (requestLock) { if (closeFuture.isDone()) { // There was a race with a task failure/cancel return; } IntermediateResultPartitionID partitionId = shuffleDescriptor.getResultPartitionID().getPartitionId(); InputChannel current = inputChannels.get(partitionId); if (current instanceof UnknownInputChannel) { UnknownInputChannel unknownChannel = (UnknownInputChannel) current; boolean isLocal = shuffleDescriptor.isLocalTo(localLocation); InputChannel newChannel; if (isLocal) { newChannel = unknownChannel.toLocalInputChannel(); } else { RemoteInputChannel remoteInputChannel = unknownChannel.toRemoteInputChannel(shuffleDescriptor.getConnectionId()); remoteInputChannel.assignExclusiveSegments(); newChannel = remoteInputChannel; } LOG.debug("{}: Updated unknown input channel to {}.", owningTaskName, newChannel); inputChannels.put(partitionId, newChannel); channels[current.getChannelIndex()] = newChannel; if (requestedPartitionsFlag) { newChannel.requestSubpartition(consumedSubpartitionIndex); } for (TaskEvent event : pendingEvents) { newChannel.sendTaskEvent(event); } if (--numberOfUninitializedChannels == 0) { pendingEvents.clear(); } } } }
記錄先寫到緩存ArrayDeque<BufferConsumer> buffers中,然后通過PipelinedSubpartitionView readView的notifyDataAvailable() -> BufferAvailabilityListener availabilityListener的notifyDataAvailable() 方法來通知。
①TaskManagerServices在創建ShuffleEnvironment時,通過 NettyShuffleServiceFactory的createNettyShuffleEnvironment() -> new NettyConnectionManager() -> new NettyServer() -> ServerChannelInitializer的initChannel() -> NettyProtocol的getServerChannelHandlers() 獲取Netty服務端的處理器PartitionRequestServerHandler:
public ChannelHandler[] getServerChannelHandlers() { PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue(); PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler( partitionProvider, taskEventPublisher, queueOfPartitionQueues); return new ChannelHandler[] { messageEncoder, new NettyMessage.NettyMessageDecoder(), serverHandler, queueOfPartitionQueues }; }
②PartitionRequestServerHandler在獲取到客戶端發送的PartitionRequest 消息時, 創建CreditBasedSequenceNumberingViewReader,并通過 requestSubpartitionView() -> ResultPartitionManager的createSubpartitionView() -> ResultPartition的createSubpartitionView() 來設置CreditBasedSequenceNumberingViewReader
③CreditBasedSequenceNumberingViewReader的notifyDataAvailable()方法調用PartitionRequestQueue的notifyReaderNonEmpty(),通知下游算子:
void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) { // The notification might come from the same thread. For the initial writes this // might happen before the reader has set its reference to the view, because // creating the queue and the initial notification happen in the same method call. // This can be resolved by separating the creation of the view and allowing // notifications. // TODO This could potentially have a bad performance impact as in the // worst case (network consumes faster than the producer) each buffer // will trigger a separate event loop task being scheduled. ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader)); }
感謝各位的閱讀,以上就是“kafka數據源Flink Kafka Consumer分析”的內容了,經過本文的學習后,相信大家對kafka數據源Flink Kafka Consumer分析這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。