Hi everyone,

Currently, the "transactional.id"s of the Kafka producers in
FlinkKafkaProducer are generated based on the task name. This mechanism has
some limitations:

 - It will exceed Kafka's limitation if the task name is too long.
(resolved in FLINK-17691)
 - They will very likely clash with each other if the job topologies are
similar. (discussed in FLINK-11654)
 - Only certain "transactional.id" may be authorized by Prefixed ACLs on
the target Kafka cluster.

Besides, I have also seen that a lot of other open-source Kafka connectors
have already supported specifying a custom prefix during creation. For
instance, the spring community has introduced the `setTransactionIdPrefix`
method to their Kafka client.

So I propose this improvement and hope it could be developed and released
recently.

This is actually a follow-up discussion of FLINK-11654
<https://issues.apache.org/jira/browse/FLINK-11654>. And I have also raised
FLINK-22452 <https://issues.apache.org/jira/browse/FLINK-22452> to track
this issue.

As discussed, here are the possible solutions,
- either introduce an additional method called
`setTransactionalIdPrefix(String)` in the FlinkKafkaProducer, (which i
prefer)
- or use the existing "transactional.id" properties as the prefix.

And the behavior of the "transactional.id" generation will be
 - keep the behavior as it was if absent,
 - use the one if present as the prefix for the TransactionalIdsGenerator.

As Jiangjie Qin suggested in FLINK-11654, we still need a FLIP for this.
I would love to work on this and create the FLIP. Can somebody help me
(Username: zetaplusae
<https://cwiki.apache.org/confluence/display/~zetaplusae>) grant the
permissions on Confluence and also assign the ticket to me?

Thanks,

Wenhao

Reply via email to