Hi Darius, I fear that your use case has not been directly supported. You can certainly use the workarounds for that and I don't know of a better way. Consumer/Producer configs both in legacy and unified implementations are only meant to be statically settable.
For the new unified interfaces, the issue is more pronounced as you do not even have the possibility to subclass them. If you'd like to see the feature, please open a ticket. Maybe you can also revise that this configuration needs to be set dynamically. Would it be enough to set different properties between jobmanager and taskmanagers? On Fri, Nov 19, 2021 at 11:51 PM Darius Kasad <dka...@gmail.com> wrote: > I've overridden v1.11.3 FlinkKafkaConsumer's 'open' method in order to set > TLS configuration for kafka from the task manager node where the kafka > consumer is running (TLS configuration differs between job manager and each > task manager in our environment, which is why we use 'open' vs. setting > configuration on construct). > > However, when I do the same for FlinkKafkaProducer, I've noticed that > 'initializeState', which creates a kafka producer, is called prior to > calling 'open'. This causes an exception since a kafka producer is created > with the wrong configuration (i.e. the config from the job manager where > FlinkKafkaProducer was constructed). > > What is the appropriate way to set up configuration for a > FlinkKafkaProducer, allowing me to read config from the taskManager node > where the producer is executing? I can override both 'open' and > 'initializeState' to set up config; this solution works, but is there a > better alternative (e.g. 'createProducer', etc.)? What about v1.4.x > KafkaSource/KafkaSink? > > Thanks, > Darius >