Hi, Kenan

After studying the source code and searching google for related
information, I think this should be caused by duplicate client_id [1], you
can check if there are other jobs using the same group_id in consuming this
topic. group_id is used in Flink to assemble client_id [2], if there are
already jobs using the same group _id, the duplicated client_id will be
detected on the Kafka side.

[1]
https://stackoverflow.com/questions/40880832/instancealreadyexistsexception-coming-from-kafka-consumer
[2]
https://github.com/apache/flink-connector-kafka/blob/79ae2d70499f81ce489911956c675354657dd44f/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L466

Best,
Ron

Kenan Kılıçtepe <kkilict...@gmail.com> 于2023年7月25日周二 21:48写道:

>
>
>
>
> Any help is appreciated about the exception below.
> Also my Kafkasource code is below. The parallelism is 16 for this task.
>
>                 KafkaSource<String> sourceStationsPeriodic = KafkaSource.<
> String>builder()
>                                 .setBootstrapServers(parameter.get(
> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
>                                 .setTopics(parameter.get(
> KAFKA_TOPIC_READ_WIFI))
>                                 .setGroupId(parameter.get(KAFKA_GROUP))
>                                 .setStartingOffsets(OffsetsInitializer.
> latest())
>                                 .setValueOnlyDeserializer(new
> SimpleStringSchema())
>
>                                 .build();
>                 // Our Kafka Source
>                 KafkaSource<String> sourceStationsWifiInterface =
> KafkaSource.<String>builder()
>                                 .setBootstrapServers(parameter.get(
> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
>                                 .setTopics(parameter.get(
> KAFKA_TOPIC_READ_WIFI_INTERFACE))
>                                 .setGroupId(parameter.get(KAFKA_GROUP))
>                                 .setStartingOffsets(OffsetsInitializer.
> latest())
>                                 .setValueOnlyDeserializer(new
> SimpleStringSchema())
>                                 .build();
>                 KafkaSource<String> sourceTwinMessage = KafkaSource.<
> String>builder()
>                                 .setBootstrapServers(parameter.get(
> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
>                                 .setTopics(parameter.get(
> KAFKA_TOPIC_READ_TWIN_MESSAGE))
>                                 .setGroupId(parameter.get(KAFKA_GROUP))
>                                 .setStartingOffsets(OffsetsInitializer.
> latest())
>                                 .setValueOnlyDeserializer(new
> SimpleStringSchema())
>                                 .build();
>
>                 KafkaSource<String> sourceStationsOnDemand = KafkaSource.<
> String>builder()
>                                 .setBootstrapServers(parameter.get(
> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
>                                 .setTopics(parameter.get(
> KAFKA_TOPIC_READ_STATIONS_ON_DEMAND))
>                                 .setGroupId(parameter.get(KAFKA_GROUP))
>                                 .setStartingOffsets(OffsetsInitializer.
> latest())
>                                 .setValueOnlyDeserializer(new
> SimpleStringSchema())
>                                 .build();
>
>                 KafkaSource<String> sourceDeviceInfo = KafkaSource.<String
> >builder()
>                                 .setBootstrapServers(parameter.get(
> KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY))
>                                 .setTopics(parameter.get(
> KAFKA_TOPIC_READ_DEVICE_INFO))
>                                 .setGroupId(parameter.get(KAFKA_GROUP))
>                                 .setStartingOffsets(OffsetsInitializer.
> latest())
>                                 .setValueOnlyDeserializer(new
> SimpleStringSchema())
>                                 .build();
>
>
>
> 2023-07-23 07:06:24,927 WARN  org.apache.kafka.common.utils.AppInfoParser
>                  [] - Error registering AppInfo mbean
> javax.management.InstanceAlreadyExistsException:
> kafka.admin.client:type=app-info,id=wifialgogroup1-enumerator-admin-client
>         at
> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) ~[?:?]
>         at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
> ~[?:?]
>         at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
> ~[?:?]
>         at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
> ~[?:?]
>         at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
> ~[?:?]
>         at
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
> ~[?:?]
>         at
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
>         at
> org.apache.kafka.clients.admin.KafkaAdminClient.<init>(KafkaAdminClient.java:597)
> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
>         at
> org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:539)
> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
>         at
> org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:478)
> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
>         at org.apache.kafka.clients.admin.Admin.create(Admin.java:133)
> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
>         at
> org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39)
> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
>         at
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getKafkaAdminClient(KafkaSourceEnumerator.java:410)
> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
>         at
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.start(KafkaSourceEnumerator.java:151)
> ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?]
>         at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$start$1(SourceCoordinator.java:225)
> ~[flink-dist-1.16.0.jar:1.16.0]
>         at
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:449)
> ~[flink-dist-1.16.0.jar:1.16.0]
>         at
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
> [flink-dist-1.16.0.jar:1.16.0]
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> [?:?]
>         at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> [?:?]
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> [?:?]
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> [?:?]
>         at java.lang.Thread.run(Thread.java:829) [?:?]
>
>
>

Reply via email to