Wenhao Ji created FLINK-22452: --------------------------------- Summary: Support specifying custom transactional.id prefix in FlinkKafkaProducer Key: FLINK-22452 URL: https://issues.apache.org/jira/browse/FLINK-22452 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.12.2 Reporter: Wenhao Ji
Currently, the "transactional.id"s of the Kafka producers in FlinkKafkaProducer are generated based on the task name. This mechanism has some limitations: * It will exceed Kafka's limitation if the task name is too long. (resolved in FLINK-17691) * They will very likely clash each other if the job topologies are similar. (discussed in FLINK-11654) * Only certain "transactional.id" may be authorized by [Prefixed ACLs|https://docs.confluent.io/platform/current/kafka/authorization.html#prefixed-acls] on the target Kafka cluster. Besides, the spring community has introduced the [setTransactionIdPrefix|https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/DefaultKafkaProducerFactory.html#setTransactionIdPrefix(java.lang.String)] method to their Kafka client. Therefore, I think it will be necessary to have this feature in the Flink Kafka connector. As discussed in FLINK-11654, the possible solution will be, * either introduce an additional method called setTransactionalIdPrefix(String) in the FlinkKafkaProducer, * or use the existing "transactional.id" properties as the prefix. And the behavior of the "transactional.id" generation will be * keep the behavior as it was if absent, * use the one if present as the prefix for the TransactionalIdsGenerator. -- This message was sent by Atlassian Jira (v8.3.4#803005)