Hi, Kenan After studying the source code and searching google for related information, I think this should be caused by duplicate client_id [1], you can check if there are other jobs using the same group_id in consuming this topic. group_id is used in Flink to assemble client_id [2], if there are already jobs using the same group _id, the duplicated client_id will be detected on the Kafka side.
[1] https://stackoverflow.com/questions/40880832/instancealreadyexistsexception-coming-from-kafka-consumer [2] https://github.com/apache/flink-connector-kafka/blob/79ae2d70499f81ce489911956c675354657dd44f/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L466 Best, Ron Kenan Kılıçtepe <kkilict...@gmail.com> 于2023年7月25日周二 21:48写道: > > > > > Any help is appreciated about the exception below. > Also my Kafkasource code is below. The parallelism is 16 for this task. > > KafkaSource<String> sourceStationsPeriodic = KafkaSource.< > String>builder() > .setBootstrapServers(parameter.get( > KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY)) > .setTopics(parameter.get( > KAFKA_TOPIC_READ_WIFI)) > .setGroupId(parameter.get(KAFKA_GROUP)) > .setStartingOffsets(OffsetsInitializer. > latest()) > .setValueOnlyDeserializer(new > SimpleStringSchema()) > > .build(); > // Our Kafka Source > KafkaSource<String> sourceStationsWifiInterface = > KafkaSource.<String>builder() > .setBootstrapServers(parameter.get( > KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY)) > .setTopics(parameter.get( > KAFKA_TOPIC_READ_WIFI_INTERFACE)) > .setGroupId(parameter.get(KAFKA_GROUP)) > .setStartingOffsets(OffsetsInitializer. > latest()) > .setValueOnlyDeserializer(new > SimpleStringSchema()) > .build(); > KafkaSource<String> sourceTwinMessage = KafkaSource.< > String>builder() > .setBootstrapServers(parameter.get( > KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY)) > .setTopics(parameter.get( > KAFKA_TOPIC_READ_TWIN_MESSAGE)) > .setGroupId(parameter.get(KAFKA_GROUP)) > .setStartingOffsets(OffsetsInitializer. > latest()) > .setValueOnlyDeserializer(new > SimpleStringSchema()) > .build(); > > KafkaSource<String> sourceStationsOnDemand = KafkaSource.< > String>builder() > .setBootstrapServers(parameter.get( > KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY)) > .setTopics(parameter.get( > KAFKA_TOPIC_READ_STATIONS_ON_DEMAND)) > .setGroupId(parameter.get(KAFKA_GROUP)) > .setStartingOffsets(OffsetsInitializer. > latest()) > .setValueOnlyDeserializer(new > SimpleStringSchema()) > .build(); > > KafkaSource<String> sourceDeviceInfo = KafkaSource.<String > >builder() > .setBootstrapServers(parameter.get( > KAFKA_SOURCE_STATIONS_BOOTLOADER_PROPERTY)) > .setTopics(parameter.get( > KAFKA_TOPIC_READ_DEVICE_INFO)) > .setGroupId(parameter.get(KAFKA_GROUP)) > .setStartingOffsets(OffsetsInitializer. > latest()) > .setValueOnlyDeserializer(new > SimpleStringSchema()) > .build(); > > > > 2023-07-23 07:06:24,927 WARN org.apache.kafka.common.utils.AppInfoParser > [] - Error registering AppInfo mbean > javax.management.InstanceAlreadyExistsException: > kafka.admin.client:type=app-info,id=wifialgogroup1-enumerator-admin-client > at > com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) ~[?:?] > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855) > ~[?:?] > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955) > ~[?:?] > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890) > ~[?:?] > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320) > ~[?:?] > at > com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) > ~[?:?] > at > org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) > ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?] > at > org.apache.kafka.clients.admin.KafkaAdminClient.<init>(KafkaAdminClient.java:597) > ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?] > at > org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:539) > ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?] > at > org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:478) > ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?] > at org.apache.kafka.clients.admin.Admin.create(Admin.java:133) > ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?] > at > org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39) > ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?] > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getKafkaAdminClient(KafkaSourceEnumerator.java:410) > ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?] > at > org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.start(KafkaSourceEnumerator.java:151) > ~[blob_p-7f823076a9b41619d082270330273927b5c89588-6161d6df252e036e35c93e64446c8834:?] > at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$start$1(SourceCoordinator.java:225) > ~[flink-dist-1.16.0.jar:1.16.0] > at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:449) > ~[flink-dist-1.16.0.jar:1.16.0] > at > org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) > [flink-dist-1.16.0.jar:1.16.0] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > [?:?] > at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) > [?:?] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > [?:?] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > [?:?] > at java.lang.Thread.run(Thread.java:829) [?:?] > > >