[ https://issues.apache.org/jira/browse/FLINK-22452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392905#comment-17392905 ]
Nico Kruber commented on FLINK-22452: ------------------------------------- Thanks for taking care of this. Looking at the commits, however, I do not see any additions to our docs that describe the problem and the fix that is now available by setting the prefix manually. Should this be added to https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/kafka/ ? > Support specifying custom transactional.id prefix in FlinkKafkaProducer > ----------------------------------------------------------------------- > > Key: FLINK-22452 > URL: https://issues.apache.org/jira/browse/FLINK-22452 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka > Affects Versions: 1.12.2 > Reporter: Wenhao Ji > Assignee: Wenhao Ji > Priority: Major > Labels: pull-request-available > Fix For: 1.14.0 > > > Currently, the "transactional.id"s of the Kafka producers in > FlinkKafkaProducer are generated based on the task name. This mechanism has > some limitations: > * It will exceed Kafka's limitation if the task name is too long. (resolved > in FLINK-17691) > * They will very likely clash each other if the job topologies are similar. > (discussed in FLINK-11654) > * Only certain "transactional.id" may be authorized by [Prefixed > ACLs|https://docs.confluent.io/platform/current/kafka/authorization.html#prefixed-acls] > on the target Kafka cluster. > Besides, the spring community has introduced the > [setTransactionIdPrefix|https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/DefaultKafkaProducerFactory.html#setTransactionIdPrefix(java.lang.String)] > method to their Kafka client. > Therefore, I think it will be necessary to have this feature in the Flink > Kafka connector. > > As discussed in FLINK-11654, the possible solution will be, > * either introduce an additional method called > setTransactionalIdPrefix(String) in the FlinkKafkaProducer, > * or use the existing "transactional.id" properties as the prefix. > And the behavior of the "transactional.id" generation will be > * keep the behavior as it was if absent, > * use the one if present as the prefix for the TransactionalIdsGenerator. -- This message was sent by Atlassian Jira (v8.3.4#803005)