Hi,

Wanted to know if multiple kafka sources are supported in a data pipeline
within flink.

I am looking at a scenario where data transformation and enrichment needs
to be done when a message from both the sources is received based on a
common identifier.

I coded the logic and it looks to be working however I see a warning trace
as below, which makes me believe perhaps it's not designed to be supported,
I understand its just JMX registration which has failed and does not have
any effect on the actual execution of business logic.

10:22:54,876 WARN  org.apache.kafka.common.utils.AppInfoParser
     [] - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException:
kafka.consumer:type=app-info,id=xyz-processing642706448-5
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) ~[?:?]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
~[?:?]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
~[?:?]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
~[?:?]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
~[?:?]
at
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
~[?:?]
at
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
~[kafka-clients-3.2.3.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:816)
~[kafka-clients-3.2.3.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666)
~[kafka-clients-3.2.3.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:647)
~[kafka-clients-3.2.3.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:627)
~[kafka-clients-3.2.3.jar:?]
at
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.<init>(KafkaPartitionSplitReader.java:88)
~[flink-connector-kafka-1.17.1.jar:1.17.1]
at
org.apache.flink.connector.kafka.source.KafkaSource.lambda$createReader$1(KafkaSource.java:160)
~[flink-connector-kafka-1.17.1.jar:1.17.1]
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.createSplitFetcher(SplitFetcherManager.java:196)
~[flink-connector-base-1.17.1.jar:1.17.1]
at
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:107)
~[flink-connector-base-1.17.1.jar:1.17.1]

I looked at the implementation as well and found not enough attributes were
used to make the mbean id unique.

Please let me know if this is a bug, I can raise a JIRA and perhaps even
contribute towards its fix.

Regards,
-Yogesh

Reply via email to