Ryanne, thank you! Now it is clear, why offsets on rc behave like this.
Tolya 5 марта 2019 г., в 20:05, Ryanne Dolan <ryannedo...@gmail.com<mailto:ryannedo...@gmail.com>> написал(а): Tolya, You mentioned that you are replicating "with internal topics", so I'd expect the __consumer_offsets topic in the target cluster to include (at least) the same records as the source cluster. MirrorMaker does not translate offsets, so the downstream commits will be wrong if you try to replicate __consumer_offsets like that. Re why kafka-consumer-groups is reporting different information, I suspect that the downstream __consumer_offsets topic does not have the correct number of partitions. If __consumer_offsets was created by MirrorMaker during replication, it would have been created with the cluster's default number of partitions, which is not the same as offsets.topic.num.partitions. In this case, the semantic partitioning will be broken, and kafka-consumer-groups (or indeed KafkaConsumer) will be confused. Re why kafka-console-consumer is showing slightly different records, I believe there must be a string serde somewhere in your pipeline. It appears that the offset record value has been toString'd. let consumers read from specific time (not offset number). Should it work? Yes, that is the current best practice, though there are many reasons why this is less than ideal. Ryanne On Tue, Mar 5, 2019 at 10:07 AM Anatoliy Soldatov <aksolda...@avito.ru.invalid<mailto:aksolda...@avito.ru.invalid>> wrote: Hello, Ryanne and thank you for you answer! I am using idempotent producers. And you are right, I started replication after few days and some of source data were already deleted (because of retention) at that moment. Still I couldn’t understand logic behind Kafka-consumer-groups. With console consumer I could see that __consumer_offsets topic on both source and destination clusters consists of same data (more or less). OffsetMetadata and commit time are identical for some randomly picked events from this topic. However, Kafka-consumer-groups shows really different output for source and destination cluster for the same consumer group. As in my previous letter, it shows something around 560000 current offset + 560000 end offset on source cluster and 370000 current offset + 480000 end offset on destination cluster. But console consumer shows that on both source and destination __consumer_offsets topic has events with 560000 offsets for this group. And Kafka-consumer-groups shows only one partition for the group on destination cluster. But there are 9 partitions on both clusters. Also, as far as I know, partitioner class should be different for typical topics and __consumer_offsets topic (different hash keys). Is it correct? If so, how MirrorMaker producer handles it? I have an idea for failover – let consumers read from specific time (not offset number). Should it work? Also, I think MM2 Is a nice idea and waiting for it! Tolya 5 марта 2019 г., в 18:08, Ryanne Dolan <ryannedo...@gmail.com<mailto:ryannedo...@gmail.com>> написал(а): Tolya, That is the expected behavior. Offsets are not consistent between mirrored clusters. Kafka allows duplicate records ("at least once"), which means the downstream offsets will tend to creep higher than those in the source partitions. For example, if a producer sends a record but doesn't receive an ACK within a time out, it may resend the same record again. But the record may have actually been received by the broker, so now the broker sees the same record twice. You can use an idempotent producer to prevent duplicates and transactions for exactly-once replication, but even so, there is no guarantee the offsets are consistent. For example, a source partition doesn't necessarily start at offset zero when you start replicating it. You are correct that failover will not work as you were expecting. I've solved this problem in KIP-382 with "MirrorMaker 2.0", which is currently implemented in a draft PR here: https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fkafka%2Fpull%2F6295&data=02%7C01%7Caksoldatov%40avito.ru%7C5736c1ed9a3b418234cd08d6a18cd493%7Caf0e07b3b90b472392e63fab11dd5396%7C1%7C0%7C636874023563595939&sdata=h7pxhnNX7HDu9hHRY64y4aoj7AyaZIUavNNDWgbNjPw%3D&reserved=0 MM2 uses a sparse "offset sync" topic to keep track of the mapping between upstream and downstream offsets, and emits checkpoints that consumers can use for failover and failback. This can be automated, e.g. by resetting consumer offsets based on the latest checkpoint from another cluster. The tooling has not been released yet, but the logic is in RemoteClusterUtils. Ryanne On Tue, Mar 5, 2019, 5:06 AM Anatoliy Soldatov <aksolda...@avito.ru.invalid<mailto:aksolda...@avito.ru.invalid>> wrote: Hello, guys! I am not sure about offsets replicated by MirrorMaker. I am replicating data from one Kafka cluster (let's say cluster A, Confluent Kafka 2.0) to another (cluster B, Confluent Kafka 2.1) with internal topics. MirrorMaker lag is somewhere between 1-2k events. I started replication after some time (some old events were removed because of retention). Topics on both clusters have similar number of partitions. ConsumerGroupCommand on cluster A and cluster B showing different results as below. cluster A (source): ~> kafka-consumer-groups \ --bootstrap-server clusterA:9092 \ --describe \ --group "some_group" | sort ------------------------------ TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID some_topic 0 560498 560498 0 sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x sarama some_topic 1 560569 560571 2 sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x sarama some_topic 2 560478 560480 2 sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x sarama some_topic 3 560528 560530 2 sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x sarama some_topic 4 560542 560543 1 sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x sarama some_topic 5 560497 560498 1 sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x sarama some_topic 6 560484 560484 0 sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x sarama some_topic 7 560527 560527 0 sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x sarama some_topic 8 560539 560540 1 sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x sarama cluster B (destination): ~> kafka-consumer-groups \ --bootstrap-server clusterB:9092 \ --describe \ --group "some_group" | sort ------------------------------ Consumer group 'some_group' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID some_topic 0 373323 481950 108627 - - - However, offset metadata in __consumer_offsets topic is the same on both clusters. cluster A (source): ~> kafka-console-consumer \ --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \ --bootstrap-server clusterA:9092 \ --topic __consumer_offsets | grep some_topic ------------------------------ [some_group,some_topic,7]::[OffsetMetadata[561492,NO_METADATA],CommitTime 1551782709369,ExpirationTime 1552387509369] ... cluster B (destination): ~> kafka-console-consumer \ --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \ --bootstrap-server clusterB:9092 \ --topic __consumer_offsets | grep some_topic ------------------------------ [some_group,some_topic,7]::OffsetAndMetadata(offset=561492, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1551782709369, expireTimestamp=Some(1552387509369)) ... Notice, that offsets matches output of ConsumerGroupCommand for cluster A, but not for cluster B. So, there are my questions: Why ConsumerGroupCommand showing different results on cluster A and cluster B? Why consumer lag is so high on cluster B? Is ConsumerGroupCommand using info not from __consumer_offsets topic? Will all my consumers stuck (because of incompatible offsets) in case of failover? Kind regards, Tolya ________________________________ "This message contains confidential information/commercial secret. If you are not the intended addressee of this message you may not copy, save, print or forward it to any third party and you are kindly requested to destroy this message and notify the sender thereof by email. ?????? ????????? ???????? ???????????????? ??????????/??????????, ?????????? ???????????? ??????. ???? ?? ?? ????????? ?????????? ????????? ??????? ?????????, ?? ?? ?????? ??????????, ?????????, ???????? ??? ?????????? ??? ????? ???? ???? ?????. ??????? ?????????? ?????? ????????? ? ????????? ?? ???? ??????????? ??????????? ???????." ________________________________ "This message contains confidential information/commercial secret. If you are not the intended addressee of this message you may not copy, save, print or forward it to any third party and you are kindly requested to destroy this message and notify the sender thereof by email. Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом отправителя электронным письмом.” ________________________________ "This message contains confidential information/commercial secret. If you are not the intended addressee of this message you may not copy, save, print or forward it to any third party and you are kindly requested to destroy this message and notify the sender thereof by email. Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом отправителя электронным письмом.”