bseenu commented on a change in pull request #7577:
URL: https://github.com/apache/kafka/pull/7577#discussion_r417002126



##########
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##########
@@ -190,4 +227,103 @@ public void commitRecord(SourceRecord record) {
             Checkpoint.unwrapGroup(record.sourcePartition()),
             System.currentTimeMillis() - record.timestamp());
     }
+
+    private void refreshIdleConsumerGroupOffset() {
+        Map<String, KafkaFuture<ConsumerGroupDescription>> consumerGroupsDesc 
= targetAdminClient
+            .describeConsumerGroups(consumerGroups).describedGroups();
+
+        for (String group : consumerGroups) {
+            try {
+                if (consumerGroupsDesc.get(group) == null) {
+                    // if consumerGroupsDesc does not contain this group, it 
should be the new consumer
+                    // group created at source cluster and its offsets should 
be sync-ed to target
+                    newConsumerGroup.add(group);
+                    continue;
+                }
+                ConsumerGroupDescription consumerGroupDesc = 
consumerGroupsDesc.get(group).get();
+                // sync offset to the target cluster only if the state of 
current consumer group is idle or dead
+                ConsumerGroupState consumerGroupState = 
consumerGroupDesc.state();
+                if (consumerGroupState.equals(ConsumerGroupState.EMPTY) || 
consumerGroupState.equals(ConsumerGroupState.DEAD)) {
+                    idleConsumerGroupsOffset.put(group, 
targetAdminClient.listConsumerGroupOffsets(group)
+                        .partitionsToOffsetAndMetadata().get().entrySet());
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                log.error("Error querying for consumer group {} on cluster 
{}.", group, targetClusterAlias, e);
+            }
+        }
+    }
+
+    Map<String, Map<TopicPartition, OffsetAndMetadata>> syncGroupOffset() {
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetToSyncAll = 
new HashMap<>();
+
+        // first, sync offsets for the idle consumers at target
+        for (Map.Entry<String, Set<Map.Entry<TopicPartition, 
OffsetAndMetadata>>> group : idleConsumerGroupsOffset.entrySet()) {
+            String consumerGroupId = group.getKey();
+            // for each idle consumer at target, read the checkpoints 
(converted upstream offset)
+            // from the pre-populated map
+            Map<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset = 
getConvertedUpstreamOffset(consumerGroupId);
+
+            if (convertedUpstreamOffset == null) continue;
+
+            Map<TopicPartition, OffsetAndMetadata> offsetToSync = new 
HashMap<>();
+            for (Entry<TopicPartition, OffsetAndMetadata> entry : 
group.getValue()) {

Review comment:
       Yes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to