satishd commented on a change in pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#discussion_r642095305



##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+public class ConsumerManager implements Closeable {

Review comment:
       I will add that.

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+
+/**
+ * This class is responsible for consuming messages from remote log metadata 
topic ({@link 
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME})
+ * partitions and maintain the state of the remote log segment metadata. It 
gives an API to add or remove
+ * for what topic partition's metadata should be consumed by this instance 
using
+ * {{@link #addAssignmentsForPartitions(Set)}} and {@link 
#removeAssignmentsForPartitions(Set)} respectively.
+ * <p>
+ * When a broker is started, controller sends topic partitions that this 
broker is leader or follower for and the
+ * partitions to be deleted. This class receives those notifications with
+ * {@link #addAssignmentsForPartitions(Set)} and {@link 
#removeAssignmentsForPartitions(Set)} assigns consumer for the
+ * respective remote log metadata partitions by using {@link 
RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}.
+ * Any leadership changes later are called through the same API. We will 
remove the partitions that ard deleted from
+ * this broker which are received through {@link 
#removeAssignmentsForPartitions(Set)}.
+ * <p>
+ * After receiving these events it invokes {@link 
RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)},
+ * which maintains in-memory representation of the state of {@link 
RemoteLogSegmentMetadata}.
+ */
+class ConsumerTask implements Runnable, Closeable {
+    private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
+
+    private static final long POLL_INTERVAL_MS = 30L;
+
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
+    private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+
+    private volatile boolean close = false;
+    private volatile boolean assignPartitions = false;
+
+    private final Object assignPartitionsLock = new Object();
+
+    // Remote log metadata topic partitions that consumer is assigned to.
+    private volatile Set<Integer> assignedMetaPartitions = 
Collections.emptySet();
+
+    // User topic partitions that this broker is a leader/follower for.
+    private Set<TopicIdPartition> assignedTopicPartitions = 
Collections.emptySet();
+
+    // Map of remote log metadata topic partition to target end offsets to be 
consumed.
+    private final Map<Integer, Long> partitionToTargetEndOffsets = new 
ConcurrentHashMap<>();
+
+    // Map of remote log metadata topic partition to consumed offsets.
+    private final Map<Integer, Long> partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+
+    public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
+                        RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+                        RemoteLogMetadataTopicPartitioner topicPartitioner) {
+        this.consumer = consumer;
+        this.remotePartitionMetadataEventHandler = 
remotePartitionMetadataEventHandler;
+        this.topicPartitioner = topicPartitioner;
+    }
+
+    @Override
+    public void run() {
+        log.info("Started Consumer task thread.");
+        try {
+            while (!close) {
+                Set<Integer> assignedMetaPartitionsSnapshot = 
maybeWaitForPartitionsAssignment();
+
+                if (!assignedMetaPartitionsSnapshot.isEmpty()) {
+                    executeReassignment(assignedMetaPartitionsSnapshot);
+                }
+
+                log.info("Polling consumer to receive remote log metadata 
topic records");
+                ConsumerRecords<byte[], byte[]> consumerRecords
+                        = consumer.poll(Duration.ofSeconds(POLL_INTERVAL_MS));
+                for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
+                    handleRemoteLogMetadata(serde.deserialize(record.value()));
+                    partitionToConsumedOffsets.put(record.partition(), 
record.offset());
+                }
+
+                // Check whether messages are received till end offsets or not 
for the assigned metadata partitions.
+                if (!partitionToTargetEndOffsets.isEmpty()) {
+                    for (Map.Entry<Integer, Long> entry : 
partitionToTargetEndOffsets.entrySet()) {
+                        final Long offset = 
partitionToConsumedOffsets.getOrDefault(entry.getKey(), 0L);
+                        if (offset >= entry.getValue()) {
+                            partitionToTargetEndOffsets.remove(entry.getKey());
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("Error occurred in consumer task, close:[{}]", close, e);
+        }
+
+        closeConsumer();
+        log.info("Exiting from consumer task thread");
+    }
+
+    private void closeConsumer() {
+        log.info("Closing the consumer instance");
+        if (consumer != null) {
+            try {
+                consumer.close(Duration.ofSeconds(30));
+            } catch (Exception e) {
+                log.error("Error encountered while closing the consumer", e);
+            }
+        }
+    }
+
+    private Set<Integer> maybeWaitForPartitionsAssignment() {
+        Set<Integer> assignedMetaPartitionsSnapshot = Collections.emptySet();
+        synchronized (assignPartitionsLock) {
+            while (assignedMetaPartitions.isEmpty()) {
+                // If no partitions are assigned, wait until they are assigned.
+                log.info("Waiting for assigned remote log metadata 
partitions..");
+                try {
+                    assignPartitionsLock.wait();
+                } catch (InterruptedException e) {
+                    throw new KafkaException(e);
+                }
+            }
+
+            if (assignPartitions) {
+                assignedMetaPartitionsSnapshot = new 
HashSet<>(assignedMetaPartitions);
+                assignPartitions = false;
+            }
+        }
+        return assignedMetaPartitionsSnapshot;
+    }
+
+    private void handleRemoteLogMetadata(RemoteLogMetadata remoteLogMetadata) {
+        
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
+    }
+
+    private void executeReassignment(Set<Integer> 
assignedMetaPartitionsSnapshot) {
+        Set<TopicPartition> assignedMetaTopicPartitions = 
assignedMetaPartitionsSnapshot.stream()
+                .map(partitionNum -> new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partitionNum))
+                .collect(Collectors.toSet());
+        log.info("Reassigning partitions to consumer task [{}]", 
assignedMetaTopicPartitions);
+        consumer.assign(assignedMetaTopicPartitions);
+
+        log.debug("Fetching end offsets to consumer task [{}]", 
assignedMetaTopicPartitions);
+        Map<TopicPartition, Long> endOffsets;
+        while (true) {
+            try {
+                endOffsets = consumer.endOffsets(assignedMetaTopicPartitions, 
Duration.ofSeconds(30));
+                break;
+            } catch (Exception e) {
+                // ignore exception
+                log.debug("Error encountered in fetching end offsets", e);
+            }
+        }
+        log.debug("Fetched end offsets to consumer task [{}]", endOffsets);
+
+        for (Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
+            if (entry.getValue() > 0) {
+                partitionToTargetEndOffsets.put(entry.getKey().partition(), 
entry.getValue());
+            }
+        }
+    }
+
+    public void addAssignmentsForPartitions(Set<TopicIdPartition> 
updatedPartitions) {
+        updateAssignmentsForPartitions(updatedPartitions, 
Collections.emptySet());
+    }
+
+    public void removeAssignmentsForPartitions(Set<TopicIdPartition> 
partitions) {
+        updateAssignmentsForPartitions(Collections.emptySet(), partitions);
+    }
+
+    private void updateAssignmentsForPartitions(Set<TopicIdPartition> 
addedPartitions,
+                                                Set<TopicIdPartition> 
removedPartitions) {
+        log.info("Updating assignments for addedPartitions: {} and 
removedPartition: {}", addedPartitions, removedPartitions);
+        ensureNotClosed();
+
+        Objects.requireNonNull(addedPartitions, "addedPartitions must not be 
null");
+        Objects.requireNonNull(removedPartitions, "removedPartitions must not 
be null");
+
+        if (addedPartitions.isEmpty() && removedPartitions.isEmpty()) {
+            return;
+        }
+
+        synchronized (assignPartitionsLock) {
+            Set<TopicIdPartition> updatedReassignedPartitions = new 
HashSet<>(assignedTopicPartitions);
+            updatedReassignedPartitions.addAll(addedPartitions);
+            updatedReassignedPartitions.removeAll(removedPartitions);
+            Set<Integer> updatedAssignedMetaPartitions = new HashSet<>();

Review comment:
       Are you saying it should be initialized with 
`updatedAssignedMetaPartitions = new HashSet<>(assignedMetaPartitions);`. This 
is not needed as we are recomputing that set based on 
updatedReassignedPartitions.
   

##########
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+
+/**
+ * This class is responsible for consuming messages from remote log metadata 
topic ({@link 
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME})
+ * partitions and maintain the state of the remote log segment metadata. It 
gives an API to add or remove
+ * for what topic partition's metadata should be consumed by this instance 
using
+ * {{@link #addAssignmentsForPartitions(Set)}} and {@link 
#removeAssignmentsForPartitions(Set)} respectively.
+ * <p>
+ * When a broker is started, controller sends topic partitions that this 
broker is leader or follower for and the
+ * partitions to be deleted. This class receives those notifications with
+ * {@link #addAssignmentsForPartitions(Set)} and {@link 
#removeAssignmentsForPartitions(Set)} assigns consumer for the
+ * respective remote log metadata partitions by using {@link 
RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}.
+ * Any leadership changes later are called through the same API. We will 
remove the partitions that ard deleted from
+ * this broker which are received through {@link 
#removeAssignmentsForPartitions(Set)}.
+ * <p>
+ * After receiving these events it invokes {@link 
RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)},
+ * which maintains in-memory representation of the state of {@link 
RemoteLogSegmentMetadata}.
+ */
+class ConsumerTask implements Runnable, Closeable {
+    private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
+
+    private static final long POLL_INTERVAL_MS = 30L;
+
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
+    private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+
+    private volatile boolean close = false;
+    private volatile boolean assignPartitions = false;
+
+    private final Object assignPartitionsLock = new Object();
+
+    // Remote log metadata topic partitions that consumer is assigned to.
+    private volatile Set<Integer> assignedMetaPartitions = 
Collections.emptySet();
+
+    // User topic partitions that this broker is a leader/follower for.
+    private Set<TopicIdPartition> assignedTopicPartitions = 
Collections.emptySet();
+
+    // Map of remote log metadata topic partition to target end offsets to be 
consumed.
+    private final Map<Integer, Long> partitionToTargetEndOffsets = new 
ConcurrentHashMap<>();
+
+    // Map of remote log metadata topic partition to consumed offsets.
+    private final Map<Integer, Long> partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+
+    public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
+                        RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
+                        RemoteLogMetadataTopicPartitioner topicPartitioner) {
+        this.consumer = consumer;
+        this.remotePartitionMetadataEventHandler = 
remotePartitionMetadataEventHandler;
+        this.topicPartitioner = topicPartitioner;
+    }
+
+    @Override
+    public void run() {
+        log.info("Started Consumer task thread.");
+        try {
+            while (!close) {
+                Set<Integer> assignedMetaPartitionsSnapshot = 
maybeWaitForPartitionsAssignment();
+
+                if (!assignedMetaPartitionsSnapshot.isEmpty()) {
+                    executeReassignment(assignedMetaPartitionsSnapshot);
+                }
+
+                log.info("Polling consumer to receive remote log metadata 
topic records");
+                ConsumerRecords<byte[], byte[]> consumerRecords
+                        = consumer.poll(Duration.ofSeconds(POLL_INTERVAL_MS));
+                for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
+                    handleRemoteLogMetadata(serde.deserialize(record.value()));
+                    partitionToConsumedOffsets.put(record.partition(), 
record.offset());
+                }
+
+                // Check whether messages are received till end offsets or not 
for the assigned metadata partitions.
+                if (!partitionToTargetEndOffsets.isEmpty()) {
+                    for (Map.Entry<Integer, Long> entry : 
partitionToTargetEndOffsets.entrySet()) {
+                        final Long offset = 
partitionToConsumedOffsets.getOrDefault(entry.getKey(), 0L);
+                        if (offset >= entry.getValue()) {
+                            partitionToTargetEndOffsets.remove(entry.getKey());
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("Error occurred in consumer task, close:[{}]", close, e);
+        }
+
+        closeConsumer();
+        log.info("Exiting from consumer task thread");
+    }
+
+    private void closeConsumer() {
+        log.info("Closing the consumer instance");
+        if (consumer != null) {
+            try {
+                consumer.close(Duration.ofSeconds(30));
+            } catch (Exception e) {
+                log.error("Error encountered while closing the consumer", e);
+            }
+        }
+    }
+
+    private Set<Integer> maybeWaitForPartitionsAssignment() {
+        Set<Integer> assignedMetaPartitionsSnapshot = Collections.emptySet();
+        synchronized (assignPartitionsLock) {
+            while (assignedMetaPartitions.isEmpty()) {
+                // If no partitions are assigned, wait until they are assigned.
+                log.info("Waiting for assigned remote log metadata 
partitions..");
+                try {
+                    assignPartitionsLock.wait();
+                } catch (InterruptedException e) {
+                    throw new KafkaException(e);
+                }
+            }
+
+            if (assignPartitions) {
+                assignedMetaPartitionsSnapshot = new 
HashSet<>(assignedMetaPartitions);
+                assignPartitions = false;
+            }
+        }
+        return assignedMetaPartitionsSnapshot;
+    }
+
+    private void handleRemoteLogMetadata(RemoteLogMetadata remoteLogMetadata) {
+        
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
+    }
+
+    private void executeReassignment(Set<Integer> 
assignedMetaPartitionsSnapshot) {
+        Set<TopicPartition> assignedMetaTopicPartitions = 
assignedMetaPartitionsSnapshot.stream()
+                .map(partitionNum -> new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partitionNum))
+                .collect(Collectors.toSet());
+        log.info("Reassigning partitions to consumer task [{}]", 
assignedMetaTopicPartitions);
+        consumer.assign(assignedMetaTopicPartitions);
+
+        log.debug("Fetching end offsets to consumer task [{}]", 
assignedMetaTopicPartitions);
+        Map<TopicPartition, Long> endOffsets;
+        while (true) {
+            try {
+                endOffsets = consumer.endOffsets(assignedMetaTopicPartitions, 
Duration.ofSeconds(30));
+                break;
+            } catch (Exception e) {
+                // ignore exception
+                log.debug("Error encountered in fetching end offsets", e);
+            }
+        }
+        log.debug("Fetched end offsets to consumer task [{}]", endOffsets);
+
+        for (Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
+            if (entry.getValue() > 0) {
+                partitionToTargetEndOffsets.put(entry.getKey().partition(), 
entry.getValue());
+            }
+        }
+    }
+
+    public void addAssignmentsForPartitions(Set<TopicIdPartition> 
updatedPartitions) {
+        updateAssignmentsForPartitions(updatedPartitions, 
Collections.emptySet());
+    }
+
+    public void removeAssignmentsForPartitions(Set<TopicIdPartition> 
partitions) {
+        updateAssignmentsForPartitions(Collections.emptySet(), partitions);
+    }
+
+    private void updateAssignmentsForPartitions(Set<TopicIdPartition> 
addedPartitions,
+                                                Set<TopicIdPartition> 
removedPartitions) {
+        log.info("Updating assignments for addedPartitions: {} and 
removedPartition: {}", addedPartitions, removedPartitions);
+        ensureNotClosed();
+
+        Objects.requireNonNull(addedPartitions, "addedPartitions must not be 
null");
+        Objects.requireNonNull(removedPartitions, "removedPartitions must not 
be null");
+
+        if (addedPartitions.isEmpty() && removedPartitions.isEmpty()) {
+            return;
+        }
+
+        synchronized (assignPartitionsLock) {
+            Set<TopicIdPartition> updatedReassignedPartitions = new 
HashSet<>(assignedTopicPartitions);
+            updatedReassignedPartitions.addAll(addedPartitions);
+            updatedReassignedPartitions.removeAll(removedPartitions);
+            Set<Integer> updatedAssignedMetaPartitions = new HashSet<>();
+            for (TopicIdPartition tp : updatedReassignedPartitions) {
+                
updatedAssignedMetaPartitions.add(topicPartitioner.metadataPartition(tp));
+            }
+
+            if (!updatedAssignedMetaPartitions.equals(assignedMetaPartitions)) 
{

Review comment:
       Nice catch. I will update the code. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to