[ https://issues.apache.org/jira/browse/KAFKA-17186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
George Yang updated KAFKA-17186: -------------------------------- Description: Deploy nodes 1, 2, and 3 in Data Center A, with MM2 service deployed on node 1. Deploy node 1 in Data Center B, with MM2 service also deployed on node 1. Currently, a service on node 1 in Data Center A acts as a producer sending messages to the `myTest` topic. A service in Data Center B acts as a consumer listening to `A.myTest`. The issue arises when MM2 on node 1 in Data Center A is stopped: the consumer in Data Center B ceases to receive messages. Even after I restarting MM2 in Data Center A, the consumer in Data Center B still does not receive messages until approximately 5 minutes later when a rebalance occurs, at which point it begins receiving messages again. [Logs From Consumer on Data Center B] ```log [2024-07-23 17:29:17,270] INFO [MirrorCheckpointConnector|worker] refreshing consumer groups took 185 ms (org.apache.kafka.connect.mirror.Scheduler:95) [2024-07-23 17:29:19,189] INFO [MirrorCheckpointConnector|worker] refreshing consumer groups took 365 ms (org.apache.kafka.connect.mirror.Scheduler:95) [2024-07-23 17:29:22,271] INFO [MirrorCheckpointConnector|worker] refreshing consumer groups took 186 ms (org.apache.kafka.connect.mirror.Scheduler:95) [2024-07-23 17:29:24,193] INFO [MirrorCheckpointConnector|worker] refreshing consumer groups took 369 ms (org.apache.kafka.connect.mirror.Scheduler:95) [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242) [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] (Re-)joining group (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604) [2024-07-23 17:29:25,386] INFO [Worker clientId=B->A, groupId=B-mm2] Successfully joined group with generation Generation\{generationId=52, memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665) [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Successfully synced group in generation Generation\{generationId=52, memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842) [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Joined group at generation 52 with protocol version 2 and got assignment: Assignment\{error=0, leader='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', leaderUrl='NOTUSED', offset=1360, connectorIds=[MirrorCheckpointConnector], taskIds=[MirrorCheckpointConnector-0, MirrorCheckpointConnector-1, MirrorCheckpointConnector-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2580) [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Starting connectors and tasks using config offset 1360 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1921) [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1950) [2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242) [2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] (Re-)joining group (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604) [2024-07-23 17:29:26,890] INFO [Worker clientId=A->B, groupId=A-mm2] Successfully joined group with generation Generation\{generationId=143, memberId='A->B-0d04e6c1-f12a-4121-89af-e9992a167a01', protocol='sessioned'} (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665) [2024-07-23 17:29:26,893] INFO [Worker clientId=A->B, groupId=A-mm2] Successfully synced group in generation Generation\{generationId=143, memberId='A->B-0d04e6c1-f12a-4121-89af-e9992a167a01', protocol='sessioned'} (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842) ``` [Configuration] ```properties name=MCS-MM2 clusters = A, B A.bootstrap.servers = [kafka1]:[port],[kafka2]:[port],[kafka3]:[port] B.bootstrap.servers = [kafka]:[port] # enable and configure individual replication flows A->B.enabled = true A->B.topics = .* B->A.enabled = true B->A.topics = .* replication.factor=2 tasks.max=3 emit.checkpoints.interval.seconds=5 A.producer.acks=all A.producer.batch.size=50000 A.consumer.auto.offset.reset=latest B.consumer.auto.offset.reset=latest A.consumer.enable.auto.commit=true B.consumer.enable.auto.commit=true A.consumer.max.poll.interval.ms=20000 B.consumer.max.poll.interval.ms=20000 checkpoints.topic.replication.factor=1 heartbeats.topic.replication.factor=1 offset-syncs.topic.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1 config.storage.replication.factor=1 refresh.topics.enabled=true refresh.topics.interval.seconds=5 refresh.groups.enabled=true refresh.groups.interval.seconds=5 ``` was: Deploy nodes 1, 2, and 3 in Data Center A, with MM2 service deployed on node 1. Deploy node 1 in Data Center B, with MM2 service also deployed on node 1. Currently, a service on node 1 in Data Center A acts as a producer sending messages to the `myTest` topic. A service in Data Center B acts as a consumer listening to `A.myTest`. The issue arises when MM2 on node 1 in Data Center A is stopped: the consumer in Data Center B ceases to receive messages. Even after I restarting MM2 in Data Center A, the consumer in Data Center B still does not receive messages until approximately 5 minutes later when a rebalance occurs, at which point it begins receiving messages again. [Logs From Consumer on Data Center B] ```log [2024-07-23 17:29:17,270] INFO [MirrorCheckpointConnector|worker] refreshing consumer groups took 185 ms (org.apache.kafka.connect.mirror.Scheduler:95) [2024-07-23 17:29:19,189] INFO [MirrorCheckpointConnector|worker] refreshing consumer groups took 365 ms (org.apache.kafka.connect.mirror.Scheduler:95) [2024-07-23 17:29:22,271] INFO [MirrorCheckpointConnector|worker] refreshing consumer groups took 186 ms (org.apache.kafka.connect.mirror.Scheduler:95) [2024-07-23 17:29:24,193] INFO [MirrorCheckpointConnector|worker] refreshing consumer groups took 369 ms (org.apache.kafka.connect.mirror.Scheduler:95) [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242) [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] (Re-)joining group (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604) [2024-07-23 17:29:25,386] INFO [Worker clientId=B->A, groupId=B-mm2] Successfully joined group with generation Generation\{generationId=52, memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665) [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Successfully synced group in generation Generation\{generationId=52, memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842) [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Joined group at generation 52 with protocol version 2 and got assignment: Assignment\{error=0, leader='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', leaderUrl='NOTUSED', offset=1360, connectorIds=[MirrorCheckpointConnector], taskIds=[MirrorCheckpointConnector-0, MirrorCheckpointConnector-1, MirrorCheckpointConnector-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2580) [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Starting connectors and tasks using config offset 1360 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1921) [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1950) [2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242) [2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] (Re-)joining group (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604) [2024-07-23 17:29:26,890] INFO [Worker clientId=A->B, groupId=A-mm2] Successfully joined group with generation Generation\{generationId=143, memberId='A->B-0d04e6c1-f12a-4121-89af-e9992a167a01', protocol='sessioned'} (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665) [2024-07-23 17:29:26,893] INFO [Worker clientId=A->B, groupId=A-mm2] Successfully synced group in generation Generation\{generationId=143, memberId='A->B-0d04e6c1-f12a-4121-89af-e9992a167a01', protocol='sessioned'} (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842) ``` [Configuration] ```properties # Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties name=MCS-MM2 # specify any number of cluster aliases clusters = A, B # connection information for each cluster # This is a comma separated host:port pairs for each cluster # for e.g. "A_host1:9092, A_host2:9092, A_host3:9092" A.bootstrap.servers = [kafka1]:[port],[kafka2]:[port],[kafka3]:[port] B.bootstrap.servers = [kafka]:[port] # enable and configure individual replication flows A->B.enabled = true A->B.topics = .* B->A.enabled = true B->A.topics = .* # Setting replication factor of newly created remote topics replication.factor=2 tasks.max=3 emit.checkpoints.interval.seconds=5 A.producer.acks=all A.producer.batch.size=50000 #A.producer.enable.idempotence=true #B.producer.enable.idempotence=true A.consumer.auto.offset.reset=latest B.consumer.auto.offset.reset=latest A.consumer.enable.auto.commit=true B.consumer.enable.auto.commit=true #B.consumer.auto.offset.reset=latest #B.consumer.connections.max.idle.ms=300000 #A.consumer.connections.max.idle.ms=300000 A.consumer.max.poll.interval.ms=20000 B.consumer.max.poll.interval.ms=20000 checkpoints.topic.replication.factor=1 heartbeats.topic.replication.factor=1 offset-syncs.topic.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1 config.storage.replication.factor=1 refresh.topics.enabled=true refresh.topics.interval.seconds=5 refresh.groups.enabled=true refresh.groups.interval.seconds=5 ``` > Cannot receive message after stopping Source Mirror Maker 2 > ----------------------------------------------------------- > > Key: KAFKA-17186 > URL: https://issues.apache.org/jira/browse/KAFKA-17186 > Project: Kafka > Issue Type: Bug > Components: mirrormaker > Affects Versions: 3.7.1 > Environment: Source Kafka Cluster per Node: > CPU(s): 32 > Memory: 32G/1.1G free > Target Kafka Cluster standalone Node: > CPU(s): 24 > Memory: 30G/909M free > Kafka Version 3.7 > Mirrormaker Version 3.7.1 > Reporter: George Yang > Priority: Major > > Deploy nodes 1, 2, and 3 in Data Center A, with MM2 service deployed on node > 1. Deploy node 1 in Data Center B, with MM2 service also deployed on node 1. > Currently, a service on node 1 in Data Center A acts as a producer sending > messages to the `myTest` topic. A service in Data Center B acts as a consumer > listening to `A.myTest`. > The issue arises when MM2 on node 1 in Data Center A is stopped: the consumer > in Data Center B ceases to receive messages. Even after I restarting MM2 in > Data Center A, the consumer in Data Center B still does not receive messages > until approximately 5 minutes later when a rebalance occurs, at which point > it begins receiving messages again. > > [Logs From Consumer on Data Center B] > ```log > [2024-07-23 17:29:17,270] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 185 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:19,189] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 365 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:22,271] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 186 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:24,193] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 369 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] > Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242) > [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] > (Re-)joining group > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604) > [2024-07-23 17:29:25,386] INFO [Worker clientId=B->A, groupId=B-mm2] > Successfully joined group with generation Generation\{generationId=52, > memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] > Successfully synced group in generation Generation\{generationId=52, > memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Joined > group at generation 52 with protocol version 2 and got assignment: > Assignment\{error=0, leader='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', > leaderUrl='NOTUSED', offset=1360, connectorIds=[MirrorCheckpointConnector], > taskIds=[MirrorCheckpointConnector-0, MirrorCheckpointConnector-1, > MirrorCheckpointConnector-2], revokedConnectorIds=[], revokedTaskIds=[], > delay=0} with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2580) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Starting > connectors and tasks using config offset 1360 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1921) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Finished > starting connectors and tasks > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1950) > [2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] > Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242) > [2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] > (Re-)joining group > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604) > [2024-07-23 17:29:26,890] INFO [Worker clientId=A->B, groupId=A-mm2] > Successfully joined group with generation Generation\{generationId=143, > memberId='A->B-0d04e6c1-f12a-4121-89af-e9992a167a01', protocol='sessioned'} > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665) > [2024-07-23 17:29:26,893] INFO [Worker clientId=A->B, groupId=A-mm2] > Successfully synced group in generation Generation\{generationId=143, > memberId='A->B-0d04e6c1-f12a-4121-89af-e9992a167a01', protocol='sessioned'} > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842) > ``` > [Configuration] > ```properties > name=MCS-MM2 > clusters = A, B > A.bootstrap.servers = [kafka1]:[port],[kafka2]:[port],[kafka3]:[port] > B.bootstrap.servers = [kafka]:[port] > # enable and configure individual replication flows > A->B.enabled = true > A->B.topics = .* > B->A.enabled = true > B->A.topics = .* > replication.factor=2 > tasks.max=3 > emit.checkpoints.interval.seconds=5 > A.producer.acks=all > A.producer.batch.size=50000 > A.consumer.auto.offset.reset=latest > B.consumer.auto.offset.reset=latest > A.consumer.enable.auto.commit=true > B.consumer.enable.auto.commit=true > A.consumer.max.poll.interval.ms=20000 > B.consumer.max.poll.interval.ms=20000 > checkpoints.topic.replication.factor=1 > heartbeats.topic.replication.factor=1 > offset-syncs.topic.replication.factor=1 > offset.storage.replication.factor=1 > status.storage.replication.factor=1 > config.storage.replication.factor=1 > refresh.topics.enabled=true > refresh.topics.interval.seconds=5 > refresh.groups.enabled=true > refresh.groups.interval.seconds=5 > ``` > -- This message was sent by Atlassian Jira (v8.20.10#820010)