Hey everyone! I'm currently investigating the way Flink configures client.id
values for the Kafka consumer and I'd appreciate some help.

*Old FlinkKafkaConsumer*

It doesn't look like client.id receives any special treatment when using
deprecated FlinkKafkaConsumer: if client.id is not specified, the default
Kafka consumer behaviour is used, which relies on the provided group.id and
a sequence number (based on at AtomicInteger), e.g. in Kafka 2.8:
https://github.com/apache/kafka/blob/2.8/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L587-L599

This approach allows to:
- Fully control the way client.id is configured, if needed
- Fallback to the default Kafka consumer logic, which guarantees unique
client.id values in the same JVM, even if we have multiple sources with the
same group.id

*New KafkaSource*

Now, the new KafkaSource introduces a new client.id.prefix option. If this
option is not specified, the group.id value is used instead. If this option
is specific, the value is still overridden by Flink (basically ignored).

When Flink overrides client.id it uses the client.id.prefix and a subtask
id. So the final client.id value may not be unique if the same group id is
used for multiple sources (which is a normal practice). This can result
in many "javax.management.InstanceAlreadyExistsException" exceptions when
the underlying Kafka Consumer tries to register its own metrics with JMX.

Question: why does Flink need to override client.id and use a subtask id as
a part of it? This forces users to specify a different group.id /
client.id.prefix per Kafka source, which can explode the number of
generated metrics. I'd love to learn about the reasons that led to this
decision. In my opinion, Flink still should allow users to pass the
client.id they choose and I'm thinking if I should open a PR for that.

Thank you!

Reply via email to