Hello Kafka community,

I struggled with this issue for a long time and finally figured out the
reason.

*Issue*: MM2 not emitting checkpoints for my consumer, which is a Apache
Flink job with Kafka source connector. Due to this, I’m not able to do the
failover. Bi-directional cross-DC replication is working fine.

*Reason: *Flink kafka connector internally uses assign() API for the
consumer. This is the reason mm2 is not emitting checkpoints for my
consumer group. When I tested mm2 with a simple consumer using subscribe()
API, it emits checkpoints and I’m able to do the failover and offset
translation using RemoteClusterUtils class.

Can someone please tell me if it’s a bug in mm2 or it is an expected
behaviour ? I already see few mails earlier asking on this same issue
related to assign() API but they are unanswered. It’ll be great if someone
can please explain this.

Thanks & Regards

On Wed, 5 Aug 2020 at 1:57 AM, Sunny Lohani <sunny.loh...@gmail.com> wrote:

> Hi Ryanne,
>
> I tried setting the groups in mm2.properties, still no luck. And, the
> consumer application is not kafka-console-consumer, so no issues there.
> I noticed in the MM logs that MirrorCheckpointConnector is created but
> without any connector task, not even a single error in the logs though. I
> feel this is the reason that the checkpoints topics are empty. Below are
> some log traces which indicate the same:
>
> [2020-08-05 01:38:12,349] INFO Started MirrorCheckpointConnector with 0
> consumer groups.
> (org.apache.kafka.connect.mirror.MirrorCheckpointConnector:79)
> [2020-08-05 01:38:12,350] INFO Finished creating connector
> MirrorCheckpointConnector (org.apache.kafka.connect.runtime.Worker:273)
> ...
> [2020-08-05 01:46:12,145] INFO [Worker clientId=connect-1, groupId=B-mm2]
> Joined group at generation 8 with protocol version 2 and got assignment:
> Assignment{error=0,
> leader='connect-1-6529239d-299d-4fc9-aea7-80c94f0838ab',
> leaderUrl='NOTUSED/B', offset=20, connectorIds=[MirrorSourceConnector,
> MirrorHeartbeatConnector, MirrorCheckpointConnector],
> taskIds=[MirrorSourceConnector-0, MirrorHeartbeatConnector-0],
> revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1549)
>
> Let me know your thoughts as to why the MirrorCheckpointConnector task is
> not getting created ?
>
> Also, just fyi, I am using KafkaCat to produce messages into the local
> topics of the kafka clusters. I hope this should not create any issues.
>
> --
> Sunny
>
>
> On Tue, Aug 4, 2020 at 9:17 PM Ryanne Dolan <ryannedo...@gmail.com> wrote:
>
>> Sunny, check the groups and groups.blacklist properties. By default, MM2
>> won't replicate consumer groups from kafka-console-consumer, for example.
>> Sometimes that confuses people when testing MM2.
>>
>> Also check the logs to see if there is any reason
>> MirrorCheckpointConnector
>> might be failing to start.
>>
>> Ryanne
>>
>> On Tue, Aug 4, 2020 at 10:27 AM Sunny Lohani <sunny.loh...@gmail.com>
>> wrote:
>>
>> > Hi Ryanne,
>> >
>> > First of all, thanks for a quick revert. Actually, I have a consumer
>> > application consuming messages in cluster A and then I failover the
>> > consumer from A to cluster B. Also, the consumer is subscribed to both
>> > local and remote topics. Let's say the local topic name in both the
>> > clusters A and B is test-topic. Here is a sequence of steps that I am
>> > following before and after the failover:
>> >
>> > 1. Consumer application subscribes to topics test-topic and
>> B.test-topic in
>> > cluster A with group.id "test-consumer"
>> > 2. It consumes some messages from both the topics.
>> > 3. Now, we stop the consumer application and restart it pointing to
>> cluster
>> > B, subscribed to topics test-topic and A.test-topic, with group.id
>> > "test-consumer". I use the RemoteClusterUtils.translateOffsets() here.
>> > 4. The method returns an empty map as well as the checkpoints topic in
>> both
>> > the clusters is empty.
>> >
>> > Let me know if you see anything wrong here.
>> >
>> > Thanks,
>> > Sunny
>> >
>> >
>> > On Tue, Aug 4, 2020 at 8:26 PM Ryanne Dolan <ryannedo...@gmail.com>
>> wrote:
>> >
>> > > Sunny, is it possible there are no consumer groups? There will be no
>> > > checkpoints, and thus nothing to use for offset translation, if there
>> are
>> > > no upstream consumer groups.
>> > >
>> > > Ryanne
>> > >
>> > > On Tue, Aug 4, 2020, 9:28 AM Sunny Lohani <sunny.loh...@gmail.com>
>> > wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > I have 2 data centers, each having single node Zookeeper and Kafka
>> > > cluster.
>> > > > I have a topic (single partition) in both the data center kafka
>> > > clusters. I
>> > > > am using MM 2.0 as a dedicated cluster for bi-directional
>> replication
>> > of
>> > > > the topic as well as using RemoteClusterUtils.translateOffsets() in
>> my
>> > > > application for offset translation during failover. But the method
>> is
>> > > > returning an empty map due to which the consumer is not resuming
>> from
>> > > > proper offsets for local/remote topics.
>> > > >
>> > > > When I investigated further, I found that the checkpoint
>> > > > topics A.checkpoints.internal and B.checkpoints.internal in
>> respective
>> > > > clusters do not have any kafka message. I don't see any errors in
>> the
>> > > > mirror maker console logs. I searched everywhere on the internet but
>> > > could
>> > > > not get any help. Below is the mm.properties:
>> > > >
>> > > > clusters = A, B
>> > > > A.bootstrap.servers = 10.34.45.113:19092
>> > > > B.bootstrap.servers = 10.34.45.113:29092
>> > > >
>> > > > A->B.enabled = true
>> > > > A->B.topics = .*
>> > > > B->A.enabled = true
>> > > > B->A.topics = .*
>> > > >
>> > > > # Setting replication factor of newly created remote topics
>> > > > replication.factor=1
>> > > >
>> > > > checkpoints.topic.replication.factor=1
>> > > > heartbeats.topic.replication.factor=1
>> > > > offset-syncs.topic.replication.factor=1
>> > > >
>> > > > offset.storage.replication.factor=1
>> > > > status.storage.replication.factor=1
>> > > > config.storage.replication.factor=1
>> > > >
>> > > > sync.topic.acls.enabled = false
>> > > >
>> > > > emit.checkpoints.enabled = true
>> > > > emit.checkpoints.interval.seconds = 5
>> > > > ----
>> > > >
>> > > > Need help on this urgently. Thanks in advance.
>> > > >
>> > > > Thanks & Regards,
>> > > > Sunny Kumar Lohani,
>> > > >
>> > >
>> >
>>
> --
Sunny Kumar Lohani,
Sr. SW Engineer,
Visa Inc.,
Bangalore (India)

Reply via email to