Hi, Xuyang & Daniel. I have checked this part of code. I think it is an expected behavior. As marked in code comments, this loop makes sure that the transactions before this checkpoint id are re-created.
The situation Daniel mentioned will happen only when all checkpoint between 1 and 20000 fails. If so, we should check why these checkpoints failed. The transaction producer will be used when the DeliveryGuarantee is EXACTLY_ONCE. If other DeliveryGuarantee is accepted, you could use other DeliveryGuarantee to skip it. I think it is better to check whether there are many checkpoints failed, and check the flame graph to make sure this code caused the busyness. Best, Hang Xuyang <xyzhong...@163.com> 于2024年3月11日周一 09:58写道: > Hi, Danny. > When the problem occurs, can you use flame graph to confirm whether the > loop in this code is causing the busyness? > Since I'm not particularly familiar with kafka connector, I can't give you > an accurate reply. I think Hang Ruan is an expert in this field :). > > Hi, Ruan Hang. Can you take a look at this strange situation? > > > -- > Best! > Xuyang > > > 在 2024-03-10 16:49:16,"Daniel Peled" <daniel.peled.w...@gmail.com> 写道: > > Hello, > > I am sorry I am addressing you personally. > I have tried sending the request in the user group and got no response > > If you can't help me please let me know > And please tell me who can help up > > The problem is as followed: > > We have noticed that when we add a *new kafka sink* operator to the > graph, *and start from the last save point*, the operator is 100% busy > for several minutes and *even 1/2-1 hour* !!! > > The problematic code seems to be the following for-loop in > getTransactionalProducer() method: > > > *org.apache.flink.connector.kafka.sink.KafkaWriter#getTransactionalProducer* > > private FlinkKafkaInternalProducer<byte[], byte[]> > getTransactionalProducer(long checkpointId) { > checkState( > checkpointId > lastCheckpointId, > "Expected %s > %s", > checkpointId, > lastCheckpointId); > FlinkKafkaInternalProducer<byte[], byte[]> producer = null; > // in case checkpoints have been aborted, Flink would create > non-consecutive transaction ids > // this loop ensures that all gaps are filled with initialized > (empty) transactions > > > > > > * for (long id = lastCheckpointId + 1; id <= checkpointId; id++) { > String transactionalId = > TransactionalIdFactory.buildTransactionalId( > transactionalIdPrefix, kafkaSinkContext.getParallelInstanceId(), id); > producer = getOrCreateTransactionalProducer(transactionalId); > }* > this.lastCheckpointId = checkpointId; > assert producer != null; > LOG.info("Created new transactional producer {}", > producer.getTransactionalId()); > return producer; > } > > > Since we added a new sink operator the lastCheckpointId is 1, > And if for example the checkpointId is 20000, > The loop will be executed for 20000 times !!! > > > We have several questions: > 1. Is this behaviour expected ? > 2. Are we doing something wrong ? > 3. Is there a way to avoid this behavior ? > > Best Regards, > Danny > >