Hi, Danny. When the problem occurs, can you use flame graph to confirm whether the loop in this code is causing the busyness? Since I'm not particularly familiar with kafka connector, I can't give you an accurate reply. I think Hang Ruan is an expert in this field :).
Hi, Ruan Hang. Can you take a look at this strange situation? -- Best! Xuyang 在 2024-03-10 16:49:16,"Daniel Peled" <daniel.peled.w...@gmail.com> 写道: Hello, I am sorry I am addressing you personally. I have tried sending the request in the user group and got no response If you can't help me please let me know And please tell me who can help up The problem is as followed: We have noticed that when we add a new kafka sink operator to the graph, and start from the last save point, the operator is 100% busy for several minutes and even 1/2-1 hour !!! The problematic code seems to be the following for-loop in getTransactionalProducer() method: org.apache.flink.connector.kafka.sink.KafkaWriter#getTransactionalProducer private FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer(long checkpointId) { checkState( checkpointId > lastCheckpointId, "Expected %s > %s", checkpointId, lastCheckpointId); FlinkKafkaInternalProducer<byte[], byte[]> producer = null; // in case checkpoints have been aborted, Flink would create non-consecutive transaction ids // this loop ensures that all gaps are filled with initialized (empty) transactions for (long id = lastCheckpointId + 1; id <= checkpointId; id++) { String transactionalId = TransactionalIdFactory.buildTransactionalId( transactionalIdPrefix, kafkaSinkContext.getParallelInstanceId(), id); producer = getOrCreateTransactionalProducer(transactionalId); } this.lastCheckpointId = checkpointId; assert producer != null; LOG.info("Created new transactional producer {}", producer.getTransactionalId()); return producer; } Since we added a new sink operator the lastCheckpointId is 1, And if for example the checkpointId is 20000, The loop will be executed for 20000 times !!! We have several questions: 1. Is this behaviour expected ? 2. Are we doing something wrong ? 3. Is there a way to avoid this behavior ? Best Regards, Danny