Hi all,

This should be fixed. Are you setting a different `client.id.prefix` for
each KafkaSource? See this:
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/kafka/#additional-properties
and
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/kafka/#kafka-consumer-metrics

Best,
Mason

On Thu, Jul 6, 2023 at 9:43 AM Yogesh Rao <yog...@gmail.com> wrote:

> Hello Yaroslavl,
>
> I am using flink 1.17.1, so I am assuming that this isn’t fixed.
>
> Regards,
> Yogesh
>
> On Thu, 6 Jul 2023 at 10:06 PM, Yaroslav Tkachenko <yaros...@goldsky.com>
> wrote:
>
>> Hi Yogesh,
>>
>> Multiple kafka sources are supported. This warning only indicates that
>> multiple consumers won't be able to register JMX metrics. There are several
>> bugs reported about this, but I believe it should be fixed for consumers in
>> the newer Flink versions (1.14+).
>>
>> On Wed, Jul 5, 2023 at 9:32 PM Yogesh Rao <yog...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Wanted to know if multiple kafka sources are supported in a data
>>> pipeline within flink.
>>>
>>> I am looking at a scenario where data transformation and enrichment
>>> needs to be done when a message from both the sources is received based on
>>> a common identifier.
>>>
>>> I coded the logic and it looks to be working however I see a warning
>>> trace as below, which makes me believe perhaps it's not designed to be
>>> supported, I understand its just JMX registration which has failed and does
>>> not have any effect on the actual execution of business logic.
>>>
>>> 10:22:54,876 WARN  org.apache.kafka.common.utils.AppInfoParser
>>>        [] - Error registering AppInfo mbean
>>> javax.management.InstanceAlreadyExistsException:
>>> kafka.consumer:type=app-info,id=xyz-processing642706448-5
>>> at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)
>>> ~[?:?]
>>> at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
>>> ~[?:?]
>>> at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
>>> ~[?:?]
>>> at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
>>> ~[?:?]
>>> at
>>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
>>> ~[?:?]
>>> at
>>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>>> ~[?:?]
>>> at
>>> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
>>> ~[kafka-clients-3.2.3.jar:?]
>>> at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:816)
>>> ~[kafka-clients-3.2.3.jar:?]
>>> at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666)
>>> ~[kafka-clients-3.2.3.jar:?]
>>> at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:647)
>>> ~[kafka-clients-3.2.3.jar:?]
>>> at
>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:627)
>>> ~[kafka-clients-3.2.3.jar:?]
>>> at
>>> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.<init>(KafkaPartitionSplitReader.java:88)
>>> ~[flink-connector-kafka-1.17.1.jar:1.17.1]
>>> at
>>> org.apache.flink.connector.kafka.source.KafkaSource.lambda$createReader$1(KafkaSource.java:160)
>>> ~[flink-connector-kafka-1.17.1.jar:1.17.1]
>>> at
>>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.createSplitFetcher(SplitFetcherManager.java:196)
>>> ~[flink-connector-base-1.17.1.jar:1.17.1]
>>> at
>>> org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:107)
>>> ~[flink-connector-base-1.17.1.jar:1.17.1]
>>>
>>> I looked at the implementation as well and found not enough attributes
>>> were used to make the mbean id unique.
>>>
>>> Please let me know if this is a bug, I can raise a JIRA and perhaps even
>>> contribute towards its fix.
>>>
>>> Regards,
>>> -Yogesh
>>>
>>

Reply via email to