Bart De Neuter created KAFKA-12468: -------------------------------------- Summary: Initial offsets are copied from source to target cluster Key: KAFKA-12468 URL: https://issues.apache.org/jira/browse/KAFKA-12468 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 2.7.0 Reporter: Bart De Neuter
We have an active-passive setup where the 3 connectors from mirror maker 2 (heartbeat, checkpoint and source) are running on a dedicated Kafka connect cluster on the target cluster. Offset syncing is enabled as specified by KIP-545. But when activated, it seems the offsets from the source cluster are initially copied to the target cluster without translation. This causes a negative lag for all synced consumer groups. Only when we reset the offsets for each topic/partition on the target cluster and produce a record on the topic/partition in the source, the sync starts working correctly. I would expect that the consumer groups are synced but that the current offsets of the source cluster are not copied to the target cluster. This is the configuration we are currently using: Heartbeat connector {code:xml} { "name": "mm2-mirror-heartbeat", "config": { "name": "mm2-mirror-heartbeat", "connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector", "source.cluster.alias": "eventador", "target.cluster.alias": "msk", "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>", "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>", "topics": ".*", "groups": ".*", "tasks.max": "1", "replication.policy.class": "CustomReplicationPolicy", "sync.group.offsets.enabled": "true", "sync.group.offsets.interval.seconds": "5", "emit.checkpoints.enabled": "true", "emit.checkpoints.interval.seconds": "30", "emit.heartbeats.interval.seconds": "30", "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter" } } {code} Checkpoint connector: {code:xml} { "name": "mm2-mirror-checkpoint", "config": { "name": "mm2-mirror-checkpoint", "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector", "source.cluster.alias": "eventador", "target.cluster.alias": "msk", "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>", "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>", "topics": ".*", "groups": ".*", "tasks.max": "40", "replication.policy.class": "CustomReplicationPolicy", "sync.group.offsets.enabled": "true", "sync.group.offsets.interval.seconds": "5", "emit.checkpoints.enabled": "true", "emit.checkpoints.interval.seconds": "30", "emit.heartbeats.interval.seconds": "30", "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter" } } {code} Source connector: {code:xml} { "name": "mm2-mirror-source", "config": { "name": "mm2-mirror-source", "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "source.cluster.alias": "eventador", "target.cluster.alias": "msk", "source.cluster.bootstrap.servers": "<SOURCE_CLUSTER>", "target.cluster.bootstrap.servers": "<TARGET_CLUSTER>", "topics": ".*", "groups": ".*", "tasks.max": "40", "replication.policy.class": "CustomReplicationPolicy", "sync.group.offsets.enabled": "true", "sync.group.offsets.interval.seconds": "5", "emit.checkpoints.enabled": "true", "emit.checkpoints.interval.seconds": "30", "emit.heartbeats.interval.seconds": "30", "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter" } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)