Hey everyone! I'm currently investigating the way Flink configures client.id values for the Kafka consumer and I'd appreciate some help.
*Old FlinkKafkaConsumer* It doesn't look like client.id receives any special treatment when using deprecated FlinkKafkaConsumer: if client.id is not specified, the default Kafka consumer behaviour is used, which relies on the provided group.id and a sequence number (based on at AtomicInteger), e.g. in Kafka 2.8: https://github.com/apache/kafka/blob/2.8/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L587-L599 This approach allows to: - Fully control the way client.id is configured, if needed - Fallback to the default Kafka consumer logic, which guarantees unique client.id values in the same JVM, even if we have multiple sources with the same group.id *New KafkaSource* Now, the new KafkaSource introduces a new client.id.prefix option. If this option is not specified, the group.id value is used instead. If this option is specific, the value is still overridden by Flink (basically ignored). When Flink overrides client.id it uses the client.id.prefix and a subtask id. So the final client.id value may not be unique if the same group id is used for multiple sources (which is a normal practice). This can result in many "javax.management.InstanceAlreadyExistsException" exceptions when the underlying Kafka Consumer tries to register its own metrics with JMX. Question: why does Flink need to override client.id and use a subtask id as a part of it? This forces users to specify a different group.id / client.id.prefix per Kafka source, which can explode the number of generated metrics. I'd love to learn about the reasons that led to this decision. In my opinion, Flink still should allow users to pass the client.id they choose and I'm thinking if I should open a PR for that. Thank you!