Hello Yaroslavl, I am using flink 1.17.1, so I am assuming that this isn’t fixed.
Regards, Yogesh On Thu, 6 Jul 2023 at 10:06 PM, Yaroslav Tkachenko <yaros...@goldsky.com> wrote: > Hi Yogesh, > > Multiple kafka sources are supported. This warning only indicates that > multiple consumers won't be able to register JMX metrics. There are several > bugs reported about this, but I believe it should be fixed for consumers in > the newer Flink versions (1.14+). > > On Wed, Jul 5, 2023 at 9:32 PM Yogesh Rao <yog...@gmail.com> wrote: > >> Hi, >> >> Wanted to know if multiple kafka sources are supported in a data pipeline >> within flink. >> >> I am looking at a scenario where data transformation and enrichment needs >> to be done when a message from both the sources is received based on a >> common identifier. >> >> I coded the logic and it looks to be working however I see a warning >> trace as below, which makes me believe perhaps it's not designed to be >> supported, I understand its just JMX registration which has failed and does >> not have any effect on the actual execution of business logic. >> >> 10:22:54,876 WARN org.apache.kafka.common.utils.AppInfoParser >> [] - Error registering AppInfo mbean >> javax.management.InstanceAlreadyExistsException: >> kafka.consumer:type=app-info,id=xyz-processing642706448-5 >> at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) ~[?:?] >> at >> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855) >> ~[?:?] >> at >> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955) >> ~[?:?] >> at >> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890) >> ~[?:?] >> at >> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320) >> ~[?:?] >> at >> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) >> ~[?:?] >> at >> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) >> ~[kafka-clients-3.2.3.jar:?] >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:816) >> ~[kafka-clients-3.2.3.jar:?] >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) >> ~[kafka-clients-3.2.3.jar:?] >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:647) >> ~[kafka-clients-3.2.3.jar:?] >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:627) >> ~[kafka-clients-3.2.3.jar:?] >> at >> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.<init>(KafkaPartitionSplitReader.java:88) >> ~[flink-connector-kafka-1.17.1.jar:1.17.1] >> at >> org.apache.flink.connector.kafka.source.KafkaSource.lambda$createReader$1(KafkaSource.java:160) >> ~[flink-connector-kafka-1.17.1.jar:1.17.1] >> at >> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.createSplitFetcher(SplitFetcherManager.java:196) >> ~[flink-connector-base-1.17.1.jar:1.17.1] >> at >> org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:107) >> ~[flink-connector-base-1.17.1.jar:1.17.1] >> >> I looked at the implementation as well and found not enough attributes >> were used to make the mbean id unique. >> >> Please let me know if this is a bug, I can raise a JIRA and perhaps even >> contribute towards its fix. >> >> Regards, >> -Yogesh >> >