Hi everyone, I'd like to propose adding client.id.prefix to the KafkaSink to mirror the functionality provided by the KafkaSource.
Defining client.id is very important when running workloads with many different Kafka clients: they help with identification and enforcing quotas. Due to the specific implementation details, you can't simply set client.id for a KafkaSource or KafkaSink in Flink, you can find more context in https://issues.apache.org/jira/browse/FLINK-8093, which was opened in 2017 and is still unresolved. I don't see any reason to treat KafkaSink differently: if we can define a client.id.prefix for the KafkaSource we should be able to define it for the KafkaSink. I've created an issue https://issues.apache.org/jira/browse/FLINK-28842 and a PR https://github.com/apache/flink/pull/20475 to resolve this.