[ 
https://issues.apache.org/jira/browse/KAFKA-17186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

George Yang updated KAFKA-17186:
--------------------------------
    Description: 
Deploy nodes 1, 2, and 3 in Data Center A, with MM2 service deployed on node 1. 
Deploy node 1 in Data Center B, with MM2 service also deployed on node 1. 
Currently, a service on node 1 in Data Center A acts as a producer sending 
messages to the `myTest` topic. A service in Data Center B acts as a consumer 
listening to `A.myTest`. 

The issue arises when MM2 on node 1 in Data Center A is stopped: the consumer 
in Data Center B ceases to receive messages. Even after I restarting MM2 in 
Data Center A, the consumer in Data Center B still does not receive messages 
until approximately 5 minutes later when a rebalance occurs, at which point it 
begins receiving messages again.

 

[Logs From Consumer on Data Center B]

```log

[2024-07-23 17:29:17,270] INFO [MirrorCheckpointConnector|worker] refreshing 
consumer groups took 185 ms (org.apache.kafka.connect.mirror.Scheduler:95)
[2024-07-23 17:29:19,189] INFO [MirrorCheckpointConnector|worker] refreshing 
consumer groups took 365 ms (org.apache.kafka.connect.mirror.Scheduler:95)
[2024-07-23 17:29:22,271] INFO [MirrorCheckpointConnector|worker] refreshing 
consumer groups took 186 ms (org.apache.kafka.connect.mirror.Scheduler:95)
[2024-07-23 17:29:24,193] INFO [MirrorCheckpointConnector|worker] refreshing 
consumer groups took 369 ms (org.apache.kafka.connect.mirror.Scheduler:95)
[2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] Rebalance 
started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242)
[2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] 
(Re-)joining group 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604)
[2024-07-23 17:29:25,386] INFO [Worker clientId=B->A, groupId=B-mm2] 
Successfully joined group with generation Generation\{generationId=52, 
memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665)
[2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] 
Successfully synced group in generation Generation\{generationId=52, 
memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842)
[2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Joined 
group at generation 52 with protocol version 2 and got assignment: 
Assignment\{error=0, leader='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', 
leaderUrl='NOTUSED', offset=1360, connectorIds=[MirrorCheckpointConnector], 
taskIds=[MirrorCheckpointConnector-0, MirrorCheckpointConnector-1, 
MirrorCheckpointConnector-2], revokedConnectorIds=[], revokedTaskIds=[], 
delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:2580)
[2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Starting 
connectors and tasks using config offset 1360 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1921)
[2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Finished 
starting connectors and tasks 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1950)
[2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] Rebalance 
started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242)
[2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] 
(Re-)joining group 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604)
[2024-07-23 17:29:26,890] INFO [Worker clientId=A->B, groupId=A-mm2] 
Successfully joined group with generation Generation\{generationId=143, 
memberId='A->B-0d04e6c1-f12a-4121-89af-e9992a167a01', protocol='sessioned'} 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665)
[2024-07-23 17:29:26,893] INFO [Worker clientId=A->B, groupId=A-mm2] 
Successfully synced group in generation Generation\{generationId=143, 
memberId='A->B-0d04e6c1-f12a-4121-89af-e9992a167a01', protocol='sessioned'} 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842)


```

[Configuration]

```properties

# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties

name=MCS-MM2
# specify any number of cluster aliases
clusters = A, B

# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for e.g. "A_host1:9092, A_host2:9092, A_host3:9092"
A.bootstrap.servers = [kafka1]:[port],[kafka2]:[port],[kafka3]:[port]
B.bootstrap.servers = [kafka]:[port]

# enable and configure individual replication flows
A->B.enabled = true
A->B.topics = .*

B->A.enabled = true
B->A.topics = .*

# Setting replication factor of newly created remote topics
replication.factor=2

tasks.max=3
emit.checkpoints.interval.seconds=5
A.producer.acks=all
A.producer.batch.size=50000
#A.producer.enable.idempotence=true
#B.producer.enable.idempotence=true

A.consumer.auto.offset.reset=latest
B.consumer.auto.offset.reset=latest
A.consumer.enable.auto.commit=true
B.consumer.enable.auto.commit=true
#B.consumer.auto.offset.reset=latest
#B.consumer.connections.max.idle.ms=300000
#A.consumer.connections.max.idle.ms=300000
A.consumer.max.poll.interval.ms=20000
B.consumer.max.poll.interval.ms=20000

checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1

offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1

refresh.topics.enabled=true
refresh.topics.interval.seconds=5

refresh.groups.enabled=true
refresh.groups.interval.seconds=5

```

 

  was:
Deploy nodes 1, 2, and 3 in Data Center A, with MM2 service deployed on node 1. 
Deploy node 1 in Data Center B, with MM2 service also deployed on node 1. 
Currently, a service on node 1 in Data Center A acts as a producer sending 
messages to the `myTest` topic. A service in Data Center B acts as a consumer 
listening to `A.myTest`. 

The issue arises when MM2 on node 1 in Data Center A is stopped: the consumer 
in Data Center B ceases to receive messages. Even after I restarting MM2 in 
Data Center A, the consumer in Data Center B still does not receive messages 
until approximately 5 minutes later when a rebalance occurs, at which point it 
begins receiving messages again.

 

[Logs From Consumer on Data Center B]

```log

[2024-07-23 17:29:17,270] INFO [MirrorCheckpointConnector|worker] refreshing 
consumer groups took 185 ms (org.apache.kafka.connect.mirror.Scheduler:95)
[2024-07-23 17:29:19,189] INFO [MirrorCheckpointConnector|worker] refreshing 
consumer groups took 365 ms (org.apache.kafka.connect.mirror.Scheduler:95)
[2024-07-23 17:29:22,271] INFO [MirrorCheckpointConnector|worker] refreshing 
consumer groups took 186 ms (org.apache.kafka.connect.mirror.Scheduler:95)
[2024-07-23 17:29:24,193] INFO [MirrorCheckpointConnector|worker] refreshing 
consumer groups took 369 ms (org.apache.kafka.connect.mirror.Scheduler:95)
[2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] Rebalance 
started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242)
[2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] 
(Re-)joining group 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604)
[2024-07-23 17:29:25,386] INFO [Worker clientId=B->A, groupId=B-mm2] 
Successfully joined group with generation Generation\{generationId=52, 
memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665)
[2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] 
Successfully synced group in generation Generation\{generationId=52, 
memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842)
[2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Joined 
group at generation 52 with protocol version 2 and got assignment: 
Assignment\{error=0, leader='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', 
leaderUrl='NOTUSED', offset=1360, connectorIds=[MirrorCheckpointConnector], 
taskIds=[MirrorCheckpointConnector-0, MirrorCheckpointConnector-1, 
MirrorCheckpointConnector-2], revokedConnectorIds=[], revokedTaskIds=[], 
delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:2580)
[2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Starting 
connectors and tasks using config offset 1360 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1921)
[2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Finished 
starting connectors and tasks 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:1950)
[2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] Rebalance 
started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242)
[2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] 
(Re-)joining group 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604)
[2024-07-23 17:29:26,890] INFO [Worker clientId=A->B, groupId=A-mm2] 
Successfully joined group with generation Generation\{generationId=143, 
memberId='A->B-0d04e6c1-f12a-4121-89af-e9992a167a01', protocol='sessioned'} 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665)
[2024-07-23 17:29:26,893] INFO [Worker clientId=A->B, groupId=A-mm2] 
Successfully synced group in generation Generation\{generationId=143, 
memberId='A->B-0d04e6c1-f12a-4121-89af-e9992a167a01', protocol='sessioned'} 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842)


```

[Configuration]

```properties

# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties

name=MCS-MM2
# specify any number of cluster aliases
clusters = A, B

# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for e.g. "A_host1:9092, A_host2:9092, A_host3:9092"
A.bootstrap.servers = [kafka1]:[port],[kafka2]:[port],[kafka3]:[port]
B.bootstrap.servers = [kafka]:[port]

# enable and configure individual replication flows
A->B.enabled = true
A->B.topics = .*

B->A.enabled = true
B->A.topics = .*

# Setting replication factor of newly created remote topics
replication.factor=2

tasks.max=3
emit.checkpoints.interval.seconds=5
A.producer.acks=all
A.producer.batch.size=50000
#A.producer.enable.idempotence=true
#B.producer.enable.idempotence=true

A.consumer.auto.offset.reset=latest
B.consumer.auto.offset.reset=latest
A.consumer.enable.auto.commit=true
B.consumer.enable.auto.commit=true
#B.consumer.auto.offset.reset=latest
#B.consumer.connections.max.idle.ms=300000
#A.consumer.connections.max.idle.ms=300000
A.consumer.max.poll.interval.ms=20000
B.consumer.max.poll.interval.ms=20000
############################# Internal Topic Settings  
#############################
# The replication factor for mm2 internal topics "heartbeats", 
"B.checkpoints.internal" and
# "mm2-offset-syncs.B.internal"
# For anything other than development testing, a value greater than 1 is 
recommended to ensure availability such as 3.
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1

# The replication factor for connect internal topics "mm2-configs.B.internal", 
"mm2-offsets.B.internal" and
# "mm2-status.B.internal"
# For anything other than development testing, a value greater than 1 is 
recommended to ensure availability such as 3.
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1

# customize as needed
# replication.policy.separator = _
# sync.topic.acls.enabled = false
# emit.heartbeats.interval.seconds = 5

refresh.topics.enabled=true
refresh.topics.interval.seconds=5

refresh.groups.enabled=true
refresh.groups.interval.seconds=5

```

 


> Cannot receive message after stopping Source Mirror Maker 2
> -----------------------------------------------------------
>
>                 Key: KAFKA-17186
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17186
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 3.7.1
>         Environment: Source Kafka Cluster per Node:
> CPU(s): 32
> Memory: 32G/1.1G free
> Target Kafka Cluster standalone Node:
> CPU(s): 24
> Memory: 30G/909M free
> Kafka Version 3.7
> Mirrormaker Version 3.7.1
>            Reporter: George Yang
>            Priority: Major
>
> Deploy nodes 1, 2, and 3 in Data Center A, with MM2 service deployed on node 
> 1. Deploy node 1 in Data Center B, with MM2 service also deployed on node 1. 
> Currently, a service on node 1 in Data Center A acts as a producer sending 
> messages to the `myTest` topic. A service in Data Center B acts as a consumer 
> listening to `A.myTest`. 
> The issue arises when MM2 on node 1 in Data Center A is stopped: the consumer 
> in Data Center B ceases to receive messages. Even after I restarting MM2 in 
> Data Center A, the consumer in Data Center B still does not receive messages 
> until approximately 5 minutes later when a rebalance occurs, at which point 
> it begins receiving messages again.
>  
> [Logs From Consumer on Data Center B]
> ```log
> [2024-07-23 17:29:17,270] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 185 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:19,189] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 365 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:22,271] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 186 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:24,193] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 369 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242)
> [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] 
> (Re-)joining group 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604)
> [2024-07-23 17:29:25,386] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Successfully joined group with generation Generation\{generationId=52, 
> memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Successfully synced group in generation Generation\{generationId=52, 
> memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Joined 
> group at generation 52 with protocol version 2 and got assignment: 
> Assignment\{error=0, leader='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', 
> leaderUrl='NOTUSED', offset=1360, connectorIds=[MirrorCheckpointConnector], 
> taskIds=[MirrorCheckpointConnector-0, MirrorCheckpointConnector-1, 
> MirrorCheckpointConnector-2], revokedConnectorIds=[], revokedTaskIds=[], 
> delay=0} with rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2580)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Starting 
> connectors and tasks using config offset 1360 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1921)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Finished 
> starting connectors and tasks 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1950)
> [2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] 
> Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242)
> [2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] 
> (Re-)joining group 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604)
> [2024-07-23 17:29:26,890] INFO [Worker clientId=A->B, groupId=A-mm2] 
> Successfully joined group with generation Generation\{generationId=143, 
> memberId='A->B-0d04e6c1-f12a-4121-89af-e9992a167a01', protocol='sessioned'} 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665)
> [2024-07-23 17:29:26,893] INFO [Worker clientId=A->B, groupId=A-mm2] 
> Successfully synced group in generation Generation\{generationId=143, 
> memberId='A->B-0d04e6c1-f12a-4121-89af-e9992a167a01', protocol='sessioned'} 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842)
> ```
> [Configuration]
> ```properties
> # Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties
> name=MCS-MM2
> # specify any number of cluster aliases
> clusters = A, B
> # connection information for each cluster
> # This is a comma separated host:port pairs for each cluster
> # for e.g. "A_host1:9092, A_host2:9092, A_host3:9092"
> A.bootstrap.servers = [kafka1]:[port],[kafka2]:[port],[kafka3]:[port]
> B.bootstrap.servers = [kafka]:[port]
> # enable and configure individual replication flows
> A->B.enabled = true
> A->B.topics = .*
> B->A.enabled = true
> B->A.topics = .*
> # Setting replication factor of newly created remote topics
> replication.factor=2
> tasks.max=3
> emit.checkpoints.interval.seconds=5
> A.producer.acks=all
> A.producer.batch.size=50000
> #A.producer.enable.idempotence=true
> #B.producer.enable.idempotence=true
> A.consumer.auto.offset.reset=latest
> B.consumer.auto.offset.reset=latest
> A.consumer.enable.auto.commit=true
> B.consumer.enable.auto.commit=true
> #B.consumer.auto.offset.reset=latest
> #B.consumer.connections.max.idle.ms=300000
> #A.consumer.connections.max.idle.ms=300000
> A.consumer.max.poll.interval.ms=20000
> B.consumer.max.poll.interval.ms=20000
> checkpoints.topic.replication.factor=1
> heartbeats.topic.replication.factor=1
> offset-syncs.topic.replication.factor=1
> offset.storage.replication.factor=1
> status.storage.replication.factor=1
> config.storage.replication.factor=1
> refresh.topics.enabled=true
> refresh.topics.interval.seconds=5
> refresh.groups.enabled=true
> refresh.groups.interval.seconds=5
> ```
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to