Hi everyone, Currently, the "transactional.id"s of the Kafka producers in FlinkKafkaProducer are generated based on the task name. This mechanism has some limitations:
- It will exceed Kafka's limitation if the task name is too long. (resolved in FLINK-17691) - They will very likely clash with each other if the job topologies are similar. (discussed in FLINK-11654) - Only certain "transactional.id" may be authorized by Prefixed ACLs on the target Kafka cluster. Besides, I have also seen that a lot of other open-source Kafka connectors have already supported specifying a custom prefix during creation. For instance, the spring community has introduced the `setTransactionIdPrefix` method to their Kafka client. So I propose this improvement and hope it could be developed and released recently. This is actually a follow-up discussion of FLINK-11654 <https://issues.apache.org/jira/browse/FLINK-11654>. And I have also raised FLINK-22452 <https://issues.apache.org/jira/browse/FLINK-22452> to track this issue. As discussed, here are the possible solutions, - either introduce an additional method called `setTransactionalIdPrefix(String)` in the FlinkKafkaProducer, (which i prefer) - or use the existing "transactional.id" properties as the prefix. And the behavior of the "transactional.id" generation will be - keep the behavior as it was if absent, - use the one if present as the prefix for the TransactionalIdsGenerator. As Jiangjie Qin suggested in FLINK-11654, we still need a FLIP for this. I would love to work on this and create the FLIP. Can somebody help me (Username: zetaplusae <https://cwiki.apache.org/confluence/display/~zetaplusae>) grant the permissions on Confluence and also assign the ticket to me? Thanks, Wenhao