[ https://issues.apache.org/jira/browse/FLINK-27318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17530623#comment-17530623 ]
Arvid Heise commented on FLINK-27318: ------------------------------------- This is exactly what I meant: You don't restart a task, you start a new one. So you should use a new transaction prefix for all intents and purposes. Just imagine, you run the same application twice at the same time on different input topics. You'd naturally pick two different prefixes, so that the transactions don't interfere. Now your use case implies you run the same application twice just at different times. I'd still naturally choose different transaction prefixes to avoid that the transactions interfere. For example, imagine that some prefix transactions linger around and need to be canceled (that is the whole reason for the algorithm that causes issues to you). Just for clarification: there is no way in Kafka to check how many and which transactions are open on a given topic so we need to go to such lengths to cancel all transactions or else you wouldn't see any data in your output until all transactions timeout. I'd argue that our approach is faster in most settings. Usually, we can shortcut this whole algorithm by using the last successful transaction id but we don't have this information if we are not resuming from a checkpoint. Again, Kafka has no transaction API to speak of. Kafka assumes that you use the same transaction id for the whole application, which kind of works for Kafka Streams but doesn't play nicely with Flink: In the worst case, we can't write any data during a checkpoint at all. But just to double-check: Do you delete the output topic before restarting? That seems natural to me and I would have assumed that related transaction data is deleted on Kafka side but I wouldn't be surprised if it didn't and we see the exhibited issue. > KafkaSink: when init transaction, it take too long to get a producerId with > epoch=0 > ----------------------------------------------------------------------------------- > > Key: FLINK-27318 > URL: https://issues.apache.org/jira/browse/FLINK-27318 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.14.4 > Reporter: Zhengqi Zhang > Priority: Major > Attachments: image-2022-04-20-17-34-48-207.png, > image-2022-04-20-17-59-27-397.png, image-2022-05-01-16-50-27-264.png > > > as we can see, the new KafkaSink aborts all transactions that have been > created by a subtask in a previous run, only return when get a producerId was > unused before(epoch=0). But this can take a long time, especially if the task > has been started and cancelled many times before. In my tests, it even took > {*}10 minutes{*}. Is there a better way to solve this problem, or {*}do what > FlinkKafkaProducer did{*}. > !image-2022-04-20-17-59-27-397.png|width=534,height=256! > !image-2022-04-20-17-34-48-207.png|width=556,height=412! -- This message was sent by Atlassian Jira (v8.20.7#820007)