Hello all, I am setting up mm2 to replicate messages, consumer groups, and consumer offset from a->b. I believe I am replicating those 3 items from a->b. my mm2 prop file is as followed:
``` # specify any number of cluster aliases clusters = a,b b.group.id=mm2-request # replication settings tasks.max = 24 replication.policy.class = org.apache.kafka.connect.mirror.IdentityReplicationPolicy a.max.poll.records = 20000 #a.receive.buffer.bytes = 33554432 #a.send.buffer.bytes = 33554432 #a.max.partition.fetch.bytes = 33554432 #a.message.max.bytes = 37755000 a.compression.type = gzip #a.max.request.size = 26214400 #a.buffer.memory = 524288000 a.batch.size = 524288 b.max.poll.records = 20000 #b.receive.buffer.bytes = 33554432 #b.send.buffer.bytes = 33554432 #b.max.partition.fetch.bytes = 33554432 #b.message.max.bytes = 37755000 b.compression.type = gzip #b.max.request.size = 26214400 #b.buffer.memory = 524288000 b.batch.size = 524288 a.bootstrap.servers = aaa.aws.confluent.cloud:9092 a.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username= "aaa" password="aaa"; a.sasl.mechanism = PLAIN a.security.protocol = SASL_SSL a.ssl.endpoint.identification.algroithm = https b.bootstrap.servers = bbb.aws.confluent.cloud:9092 b.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username= "bbb" password="bbb"; b.sasl.mechanism = PLAIN b.security.protocol = SASL_SSL b.ssl.endpoint.identification.algroithm = https # enable and configure individual replication flows a->b.enabled = true # regex which defines which topics gets replicated. For eg "foo-.*" # a->b.topics = .* a->b.topics = order-cmd-request-01 # topic exclusion topics.blacklist = .*[\\-\\.]internal, .*\\.replica # group to replicate groups = .* # group exclusion # groups.blacklist = console-consumer-.* sync.topic.acls.enabled = false sync.topic.configs.enabled = true refresh.topics.enabled = false refresh.topics.interval.seconds = 600 checkpoints.topic.replication.factor = 3 heartbeats.topic.replication.factor = 3 offset-syncs.topic.replication.factor = 3 offset.storage.replication.factor = 3 status.storage.replication.factor = 3 config.storage.replication.factor = 3 refresh.groups.enabled = true refresh.groups.interval.seconds = 600 a->b.sync.group.offsets.enabled = true sync.group.offsets.interval.seconds = 5 emit.checkpoints.interval.seconds = 5 emit.heartbeats.interval.seconds = 5 offset.translate.method = simple # sync acl # sync.topic.acls.enabled = false # enable heartbeat emit.heartbeats.enabled = true emit.checkpoints.enabled = true # Setting replication factor of newly created remote topics replication.factor = 3 ############################# Internal Topic Settings ############################# # The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and # "mm2-offset-syncs.B.internal" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. checkpoints.topic.replication.factor = 3 heartbeats.topic.replication.factor = 3 offset-syncs.topic.replication.factor = 3 # The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and # "mm2-status.B.internal" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. offset.storage.replication.factor = 3 status.storage.replication.factor = 3 config.storage.replication.factor = 3 ``` I'm able to see the messages and such on the 'b' cluster. I then proceed to terminate the api that was pointing to the 'a' cluster. Repoint it to the 'b' cluster. Redeploy and upon starting up, the api is throwing an error: ``` {"@timestamp":"2024-01-25T20:49:32.758Z", "log.level": "WARN", "message":"Error registering AppInfo mbean", "ecs.version": "1.2.0","process.thread.name": "main","log.logger":"org.apache.kafka.common.utils.AppInfoParser"," service.name":"prod-usf-order-integration-debug-api","error.type": "javax.management.InstanceAlreadyExistsException","error.message": "kafka.consumer:type=app-info,id=consumer-usfcom4-cg-order-cdc-debug-order_requests-01-usfcom4-cg-order-cdc-debug-order_requests-012c78995a-5-0" ,"error.stack_trace":"javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=consumer-usfcom4-cg-order-cdc-debug-order_requests-01-usfcom4-cg-order-cdc-debug-order_requests-012c78995a-5-0\n\tat java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)\n\tat java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)\n\tat java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)\n\tat java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)\n\tat java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)\n\tat java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)\n\tat org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:814)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632)\n\tat org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:358)\n\tat org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:326)\n\tat org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer WithAdjustedProperties(DefaultKafkaConsumerFactory.java:302)\n\tat org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:269)\n\tat org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:243)\n\tat org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:639)\n\tat org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:305)\n\tat org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338)\n\tat org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:204)\n\tat org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338)\n\tat org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312)\n\tat org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257)\n\tat org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178)\n\tat org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54)\n\tat org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356)\n\tat java.base/java.lang.Iterable.forEach(Iterable.java:75)\n\tat org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155)\n\tat org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123)\n\tat org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:934)\n\tat org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:585)\n \tat org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:144)\n\tat org.springframework.boot.SpringApplication.refresh(SpringApplication.java:767)\n\tat org.springframework.boot.SpringApplication.refresh(SpringApplication.java:759)\n\tat org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:426)\n\tat org.springframework.boot.SpringApplication.run(SpringApplication.java:326)\n\tat org.springframework.boot.SpringApplication.run(SpringApplication.java:1311)\n\tat org.springframework.boot.SpringApplication.run(SpringApplication.java:1300)\n\tat com.usfoods.panamax.integration.order.PanamaxOrderIntegrationApiApplication.main(PanamaxOrderIntegrationApiApplication.java:40)\n" } {"@timestamp":"2024-01-25T20:49:34.079Z", "log.level":"ERROR", "message":"[Consumer instanceId=usfcom4-cg-order-cdc-debug-order_requests-012c78995a-5-0, clientId=consumer-usfcom4-cg-order-cdc-debug-order_requests-01-usfcom4-cg-order-cdc-debug-order_requests-012c78995a-5-0, groupId=usfcom4-cg-order-cdc-debug-order_requests-01] Attempt to join group with generation Optional[usfcom4-cg-order-cdc-debug-order_requests-012c78995a-5-0] failed because the group instance id Generation{generationId=-1, memberId='', protocol='null'} has been fenced by another instance", "ecs.version": "1.2.0","process.thread.name": "org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1", "log.logger": "org.apache.kafka.clients.consumer.internals.AbstractCoordinator"," service.name":"prod-usf-order-integration-debug-api"} {"@timestamp":"2024-01-25T20:49:34.080Z", "log.level": "INFO", "message":"[Consumer instanceId=usfcom4-cg-order-cdc-debug-order_requests-012c78995a-5-0, clientId=consumer-usfcom4-cg-order-cdc-debug-order_requests-01-usfcom4-cg-order-cdc-debug-order_requests-012c78995a-5-0, groupId=usfcom4-cg-order-cdc-debug-order_requests-01] Join group failed with org.apache.kafka.common.errors.FencedInstanceIdException: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id.", "ecs.version": "1.2.0","process.thread.name": "org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1", "log.logger": "org.apache.kafka.clients.consumer.internals.AbstractCoordinator"," service.name":"prod-usf-order-integration-debug-api"} {"@timestamp":"2024-01-25T20:49:34.081Z", "log.level":"ERROR", "message":"' group.instance.id' has been fenced", "ecs.version": "1.2.0"," process.thread.name": "org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1", "log.logger": "org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer" ,"service.name":"prod-usf-order-integration-debug-api","error.type": "org.apache.kafka.common.errors.FencedInstanceIdException","error.message":"The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id.", "error.stack_trace":"org.apache.kafka.common.errors.FencedInstanceIdException: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id.\n"} {"@timestamp":"2024-01-25T20:49:34.084Z", "log.level":"ERROR", "message":"Fatal consumer exception; stopping container", "ecs.version": "1.2.0"," process.thread.name": "org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1", "log.logger": "org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer" ,"service.name":"prod-usf-order-integration-debug-api"}``` I also perform a describe on all the group on the target side and this is the result: ./kafka-consumer-groups.sh --bootstrap-server bbb.us-west-2.aws.confluent.cloud:9092 --command-config ../config/cc_poc_prod.properties --describe --all-groups --stat Consumer group 'usfcom4-cg-order-cdc-debug-order_requests-01' has no active members. GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS usfcom4-cg-order-cdc-debug-order_requests-01 e-1d55.usw2-az2.dom4gj8jzp6.us-west-2.aws.confluent.cloud:9092 (2) Empty 0 Consumer group 'usfcom4-cg-order-cdc-order_requests-01' has no active members. GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS usfcom4-cg-order-cdc-order_requests-01 e-1ed0.usw2-az2.dom4gj8jzp6.us-west-2.aws.confluent.cloud:9092 (0) Empty 0 I'm curious why, after shutting down the api, repointing it to the 'b' cluster, starting up the api, the 'b' cluster is throwing an InstanceAlreadyExistsException? At this point, the MM2 is still running. I believe I can have MM2 running while having the debug api work off the 'b' cluster? Any help would be appreciated. Thanks,