Hello Kafka community, I struggled with this issue for a long time and finally figured out the reason.
*Issue*: MM2 not emitting checkpoints for my consumer, which is a Apache Flink job with Kafka source connector. Due to this, I’m not able to do the failover. Bi-directional cross-DC replication is working fine. *Reason: *Flink kafka connector internally uses assign() API for the consumer. This is the reason mm2 is not emitting checkpoints for my consumer group. When I tested mm2 with a simple consumer using subscribe() API, it emits checkpoints and I’m able to do the failover and offset translation using RemoteClusterUtils class. Can someone please tell me if it’s a bug in mm2 or it is an expected behaviour ? I already see few mails earlier asking on this same issue related to assign() API but they are unanswered. It’ll be great if someone can please explain this. Thanks & Regards On Wed, 5 Aug 2020 at 1:57 AM, Sunny Lohani <sunny.loh...@gmail.com> wrote: > Hi Ryanne, > > I tried setting the groups in mm2.properties, still no luck. And, the > consumer application is not kafka-console-consumer, so no issues there. > I noticed in the MM logs that MirrorCheckpointConnector is created but > without any connector task, not even a single error in the logs though. I > feel this is the reason that the checkpoints topics are empty. Below are > some log traces which indicate the same: > > [2020-08-05 01:38:12,349] INFO Started MirrorCheckpointConnector with 0 > consumer groups. > (org.apache.kafka.connect.mirror.MirrorCheckpointConnector:79) > [2020-08-05 01:38:12,350] INFO Finished creating connector > MirrorCheckpointConnector (org.apache.kafka.connect.runtime.Worker:273) > ... > [2020-08-05 01:46:12,145] INFO [Worker clientId=connect-1, groupId=B-mm2] > Joined group at generation 8 with protocol version 2 and got assignment: > Assignment{error=0, > leader='connect-1-6529239d-299d-4fc9-aea7-80c94f0838ab', > leaderUrl='NOTUSED/B', offset=20, connectorIds=[MirrorSourceConnector, > MirrorHeartbeatConnector, MirrorCheckpointConnector], > taskIds=[MirrorSourceConnector-0, MirrorHeartbeatConnector-0], > revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1549) > > Let me know your thoughts as to why the MirrorCheckpointConnector task is > not getting created ? > > Also, just fyi, I am using KafkaCat to produce messages into the local > topics of the kafka clusters. I hope this should not create any issues. > > -- > Sunny > > > On Tue, Aug 4, 2020 at 9:17 PM Ryanne Dolan <ryannedo...@gmail.com> wrote: > >> Sunny, check the groups and groups.blacklist properties. By default, MM2 >> won't replicate consumer groups from kafka-console-consumer, for example. >> Sometimes that confuses people when testing MM2. >> >> Also check the logs to see if there is any reason >> MirrorCheckpointConnector >> might be failing to start. >> >> Ryanne >> >> On Tue, Aug 4, 2020 at 10:27 AM Sunny Lohani <sunny.loh...@gmail.com> >> wrote: >> >> > Hi Ryanne, >> > >> > First of all, thanks for a quick revert. Actually, I have a consumer >> > application consuming messages in cluster A and then I failover the >> > consumer from A to cluster B. Also, the consumer is subscribed to both >> > local and remote topics. Let's say the local topic name in both the >> > clusters A and B is test-topic. Here is a sequence of steps that I am >> > following before and after the failover: >> > >> > 1. Consumer application subscribes to topics test-topic and >> B.test-topic in >> > cluster A with group.id "test-consumer" >> > 2. It consumes some messages from both the topics. >> > 3. Now, we stop the consumer application and restart it pointing to >> cluster >> > B, subscribed to topics test-topic and A.test-topic, with group.id >> > "test-consumer". I use the RemoteClusterUtils.translateOffsets() here. >> > 4. The method returns an empty map as well as the checkpoints topic in >> both >> > the clusters is empty. >> > >> > Let me know if you see anything wrong here. >> > >> > Thanks, >> > Sunny >> > >> > >> > On Tue, Aug 4, 2020 at 8:26 PM Ryanne Dolan <ryannedo...@gmail.com> >> wrote: >> > >> > > Sunny, is it possible there are no consumer groups? There will be no >> > > checkpoints, and thus nothing to use for offset translation, if there >> are >> > > no upstream consumer groups. >> > > >> > > Ryanne >> > > >> > > On Tue, Aug 4, 2020, 9:28 AM Sunny Lohani <sunny.loh...@gmail.com> >> > wrote: >> > > >> > > > Hi, >> > > > >> > > > I have 2 data centers, each having single node Zookeeper and Kafka >> > > cluster. >> > > > I have a topic (single partition) in both the data center kafka >> > > clusters. I >> > > > am using MM 2.0 as a dedicated cluster for bi-directional >> replication >> > of >> > > > the topic as well as using RemoteClusterUtils.translateOffsets() in >> my >> > > > application for offset translation during failover. But the method >> is >> > > > returning an empty map due to which the consumer is not resuming >> from >> > > > proper offsets for local/remote topics. >> > > > >> > > > When I investigated further, I found that the checkpoint >> > > > topics A.checkpoints.internal and B.checkpoints.internal in >> respective >> > > > clusters do not have any kafka message. I don't see any errors in >> the >> > > > mirror maker console logs. I searched everywhere on the internet but >> > > could >> > > > not get any help. Below is the mm.properties: >> > > > >> > > > clusters = A, B >> > > > A.bootstrap.servers = 10.34.45.113:19092 >> > > > B.bootstrap.servers = 10.34.45.113:29092 >> > > > >> > > > A->B.enabled = true >> > > > A->B.topics = .* >> > > > B->A.enabled = true >> > > > B->A.topics = .* >> > > > >> > > > # Setting replication factor of newly created remote topics >> > > > replication.factor=1 >> > > > >> > > > checkpoints.topic.replication.factor=1 >> > > > heartbeats.topic.replication.factor=1 >> > > > offset-syncs.topic.replication.factor=1 >> > > > >> > > > offset.storage.replication.factor=1 >> > > > status.storage.replication.factor=1 >> > > > config.storage.replication.factor=1 >> > > > >> > > > sync.topic.acls.enabled = false >> > > > >> > > > emit.checkpoints.enabled = true >> > > > emit.checkpoints.interval.seconds = 5 >> > > > ---- >> > > > >> > > > Need help on this urgently. Thanks in advance. >> > > > >> > > > Thanks & Regards, >> > > > Sunny Kumar Lohani, >> > > > >> > > >> > >> > -- Sunny Kumar Lohani, Sr. SW Engineer, Visa Inc., Bangalore (India)