Hi all, This should be fixed. Are you setting a different `client.id.prefix` for each KafkaSource? See this: https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/kafka/#additional-properties and https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/kafka/#kafka-consumer-metrics
Best, Mason On Thu, Jul 6, 2023 at 9:43 AM Yogesh Rao <yog...@gmail.com> wrote: > Hello Yaroslavl, > > I am using flink 1.17.1, so I am assuming that this isn’t fixed. > > Regards, > Yogesh > > On Thu, 6 Jul 2023 at 10:06 PM, Yaroslav Tkachenko <yaros...@goldsky.com> > wrote: > >> Hi Yogesh, >> >> Multiple kafka sources are supported. This warning only indicates that >> multiple consumers won't be able to register JMX metrics. There are several >> bugs reported about this, but I believe it should be fixed for consumers in >> the newer Flink versions (1.14+). >> >> On Wed, Jul 5, 2023 at 9:32 PM Yogesh Rao <yog...@gmail.com> wrote: >> >>> Hi, >>> >>> Wanted to know if multiple kafka sources are supported in a data >>> pipeline within flink. >>> >>> I am looking at a scenario where data transformation and enrichment >>> needs to be done when a message from both the sources is received based on >>> a common identifier. >>> >>> I coded the logic and it looks to be working however I see a warning >>> trace as below, which makes me believe perhaps it's not designed to be >>> supported, I understand its just JMX registration which has failed and does >>> not have any effect on the actual execution of business logic. >>> >>> 10:22:54,876 WARN org.apache.kafka.common.utils.AppInfoParser >>> [] - Error registering AppInfo mbean >>> javax.management.InstanceAlreadyExistsException: >>> kafka.consumer:type=app-info,id=xyz-processing642706448-5 >>> at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) >>> ~[?:?] >>> at >>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855) >>> ~[?:?] >>> at >>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955) >>> ~[?:?] >>> at >>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890) >>> ~[?:?] >>> at >>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320) >>> ~[?:?] >>> at >>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) >>> ~[?:?] >>> at >>> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) >>> ~[kafka-clients-3.2.3.jar:?] >>> at >>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:816) >>> ~[kafka-clients-3.2.3.jar:?] >>> at >>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) >>> ~[kafka-clients-3.2.3.jar:?] >>> at >>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:647) >>> ~[kafka-clients-3.2.3.jar:?] >>> at >>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:627) >>> ~[kafka-clients-3.2.3.jar:?] >>> at >>> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.<init>(KafkaPartitionSplitReader.java:88) >>> ~[flink-connector-kafka-1.17.1.jar:1.17.1] >>> at >>> org.apache.flink.connector.kafka.source.KafkaSource.lambda$createReader$1(KafkaSource.java:160) >>> ~[flink-connector-kafka-1.17.1.jar:1.17.1] >>> at >>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.createSplitFetcher(SplitFetcherManager.java:196) >>> ~[flink-connector-base-1.17.1.jar:1.17.1] >>> at >>> org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:107) >>> ~[flink-connector-base-1.17.1.jar:1.17.1] >>> >>> I looked at the implementation as well and found not enough attributes >>> were used to make the mbean id unique. >>> >>> Please let me know if this is a bug, I can raise a JIRA and perhaps even >>> contribute towards its fix. >>> >>> Regards, >>> -Yogesh >>> >>