chia7712 commented on code in PR #17957:
URL: https://github.com/apache/kafka/pull/17957#discussion_r1869445168


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -389,79 +389,84 @@ public CompletableFuture<Void> maybeInitialize() {
                     .build())
                 .build()
             ).whenComplete((result, exception) -> {
-                if (exception != null) {
-                    log.error("Failed to initialize the share partition: 
{}-{}", groupId, topicIdPartition, exception);
-                    completeInitializationWithException(future, exception);
-                    return;
-                }
+                lock.writeLock().lock();
+                try {
+                    if (exception != null) {
+                        log.error("Failed to initialize the share partition: 
{}-{}", groupId, topicIdPartition, exception);
+                        completeInitializationWithException(future, exception);
+                        return;
+                    }
 
-                if (result == null || result.topicsData() == null || 
result.topicsData().size() != 1) {
-                    log.error("Failed to initialize the share partition: 
{}-{}. Invalid state found: {}.",
-                        groupId, topicIdPartition, result);
-                    completeInitializationWithException(future, new 
IllegalStateException(String.format("Failed to initialize the share partition 
%s-%s", groupId, topicIdPartition)));
-                    return;
-                }
+                    if (result == null || result.topicsData() == null || 
result.topicsData().size() != 1) {
+                        log.error("Failed to initialize the share partition: 
{}-{}. Invalid state found: {}.",
+                            groupId, topicIdPartition, result);
+                        completeInitializationWithException(future, new 
IllegalStateException(String.format("Failed to initialize the share partition 
%s-%s", groupId, topicIdPartition)));
+                        return;
+                    }
 
-                TopicData<PartitionAllData> state = result.topicsData().get(0);
-                if (state.topicId() != topicIdPartition.topicId() || 
state.partitions().size() != 1) {
-                    log.error("Failed to initialize the share partition: 
{}-{}. Invalid topic partition response: {}.",
-                        groupId, topicIdPartition, result);
-                    completeInitializationWithException(future, new 
IllegalStateException(String.format("Failed to initialize the share partition 
%s-%s", groupId, topicIdPartition)));
-                    return;
-                }
+                    TopicData<PartitionAllData> state = 
result.topicsData().get(0);
+                    if (state.topicId() != topicIdPartition.topicId() || 
state.partitions().size() != 1) {
+                        log.error("Failed to initialize the share partition: 
{}-{}. Invalid topic partition response: {}.",
+                            groupId, topicIdPartition, result);
+                        completeInitializationWithException(future, new 
IllegalStateException(String.format("Failed to initialize the share partition 
%s-%s", groupId, topicIdPartition)));
+                        return;
+                    }
 
-                PartitionAllData partitionData = state.partitions().get(0);
-                if (partitionData.partition() != topicIdPartition.partition()) 
{
-                    log.error("Failed to initialize the share partition: 
{}-{}. Invalid partition response: {}.",
-                        groupId, topicIdPartition, partitionData);
-                    completeInitializationWithException(future, new 
IllegalStateException(String.format("Failed to initialize the share partition 
%s-%s", groupId, topicIdPartition)));
-                    return;
-                }
+                    PartitionAllData partitionData = state.partitions().get(0);
+                    if (partitionData.partition() != 
topicIdPartition.partition()) {
+                        log.error("Failed to initialize the share partition: 
{}-{}. Invalid partition response: {}.",
+                            groupId, topicIdPartition, partitionData);
+                        completeInitializationWithException(future, new 
IllegalStateException(String.format("Failed to initialize the share partition 
%s-%s", groupId, topicIdPartition)));
+                        return;
+                    }
 
-                if (partitionData.errorCode() != Errors.NONE.code()) {
-                    KafkaException ex = 
fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
-                    log.error("Failed to initialize the share partition: 
{}-{}. Exception occurred: {}.",
-                        groupId, topicIdPartition, partitionData);
-                    completeInitializationWithException(future, ex);
-                    return;
-                }
+                    if (partitionData.errorCode() != Errors.NONE.code()) {
+                        KafkaException ex = 
fetchPersisterError(partitionData.errorCode(), partitionData.errorMessage());
+                        log.error("Failed to initialize the share partition: 
{}-{}. Exception occurred: {}.",
+                            groupId, topicIdPartition, partitionData);
+                        completeInitializationWithException(future, ex);
+                        return;
+                    }
 
-                try {
-                    startOffset = 
startOffsetDuringInitialization(partitionData.startOffset());
-                } catch (Exception e) {
-                    completeInitializationWithException(future, e);
-                    return;
-                }
-                stateEpoch = partitionData.stateEpoch();
-
-                List<PersisterStateBatch> stateBatches = 
partitionData.stateBatches();
-                for (PersisterStateBatch stateBatch : stateBatches) {
-                    if (stateBatch.firstOffset() < startOffset) {
-                        log.error("Invalid state batch found for the share 
partition: {}-{}. The base offset: {}"
-                                + " is less than the start offset: {}.", 
groupId, topicIdPartition,
-                            stateBatch.firstOffset(), startOffset);
-                        completeInitializationWithException(future, new 
IllegalStateException(String.format("Failed to initialize the share partition 
%s-%s", groupId, topicIdPartition)));
+                    try {
+                        startOffset = 
startOffsetDuringInitialization(partitionData.startOffset());
+                    } catch (Exception e) {
+                        completeInitializationWithException(future, e);
                         return;
                     }
-                    InFlightBatch inFlightBatch = new 
InFlightBatch(EMPTY_MEMBER_ID, stateBatch.firstOffset(),
-                        stateBatch.lastOffset(), 
RecordState.forId(stateBatch.deliveryState()), stateBatch.deliveryCount(), 
null);
-                    cachedState.put(stateBatch.firstOffset(), inFlightBatch);
-                }
-                // Update the endOffset of the partition.
-                if (!cachedState.isEmpty()) {
-                    // If the cachedState is not empty, findNextFetchOffset 
flag is set to true so that any AVAILABLE records
-                    // in the cached state are not missed
-                    findNextFetchOffset.set(true);
-                    endOffset = 
cachedState.lastEntry().getValue().lastOffset();
-                    // In case the persister read state RPC result contains no 
AVAILABLE records, we can update cached state
-                    // and start/end offsets.
-                    maybeUpdateCachedStateAndOffsets();
-                } else {
-                    endOffset = startOffset;
+                    stateEpoch = partitionData.stateEpoch();
+
+                    List<PersisterStateBatch> stateBatches = 
partitionData.stateBatches();
+                    for (PersisterStateBatch stateBatch : stateBatches) {
+                        if (stateBatch.firstOffset() < startOffset) {
+                            log.error("Invalid state batch found for the share 
partition: {}-{}. The base offset: {}"
+                                    + " is less than the start offset: {}.", 
groupId, topicIdPartition,
+                                stateBatch.firstOffset(), startOffset);
+                            completeInitializationWithException(future, new 
IllegalStateException(String.format("Failed to initialize the share partition 
%s-%s", groupId, topicIdPartition)));
+                            return;
+                        }
+                        InFlightBatch inFlightBatch = new 
InFlightBatch(EMPTY_MEMBER_ID, stateBatch.firstOffset(),
+                            stateBatch.lastOffset(), 
RecordState.forId(stateBatch.deliveryState()), stateBatch.deliveryCount(), 
null);
+                        cachedState.put(stateBatch.firstOffset(), 
inFlightBatch);
+                    }
+                    // Update the endOffset of the partition.
+                    if (!cachedState.isEmpty()) {
+                        // If the cachedState is not empty, 
findNextFetchOffset flag is set to true so that any AVAILABLE records
+                        // in the cached state are not missed
+                        findNextFetchOffset.set(true);
+                        endOffset = 
cachedState.lastEntry().getValue().lastOffset();
+                        // In case the persister read state RPC result 
contains no AVAILABLE records, we can update cached state
+                        // and start/end offsets.
+                        maybeUpdateCachedStateAndOffsets();
+                    } else {
+                        endOffset = startOffset;
+                    }
+                    // Set the partition state to Active and complete the 
future.
+                    partitionState = SharePartitionState.ACTIVE;
+                    future.complete(null);

Review Comment:
   @adixitconfluent I haven't observed this scenario in the codebase, so the 
following discussion aims to prevent potential deadlocks in the future.
   
   Let's assume we add an action that attempts to complete a delayed fetch to 
the future completed by `persister`. This means the `persister` thread holds 
the write lock of the shared partition and then requires the lock for the 
delayed fetch. Meanwhile, another thread holding the lock for the same delayed 
fetch needs to acquire the lock for the shared partition. This situation could 
lead to a deadlock based on my understanding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to