showuon commented on PR #14051:
URL: https://github.com/apache/kafka/pull/14051#issuecomment-1646779495

   @vamossagar12 , sorry for late response. 
   For your suggestion: 
   > Remove the canAddReplicaToIsr call from 
[here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/cluster/Partition.scala#L956C56-L956C56)?
   
   I think we should keep it because it can quickly check it before acquiring 
the lock, and have a short circuit here.
   
   For the test, I just wrote a test to reproduce this NPE issue under 
`PartitionTest.scala`. For your refrence:
   
   
   ```
     @Test
     def testIsReplicaIsrEligibleWithEmptyReplicaMap(): Unit = {
       val mockMetadataCache = mock(classOf[KRaftMetadataCache])
       val partition = new Partition(topicPartition,
         replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
         interBrokerProtocolVersion = interBrokerProtocolVersion,
         localBrokerId = brokerId,
         () => defaultBrokerEpoch(brokerId),
         time,
         alterPartitionListener,
         delayedOperations,
         mockMetadataCache,
         logManager,
         alterPartitionManager)
   
       val spyPartition = spy(partition)
   
       when(offsetCheckpoints.fetch(ArgumentMatchers.anyString, 
ArgumentMatchers.eq(topicPartition)))
         .thenReturn(None)
       val log = logManager.getOrCreateLog(topicPartition, topicId = None)
       seedLogData(log, numRecords = 6, leaderEpoch = 4)
   
       val controllerEpoch = 0
       val leaderEpoch = 5
       val remoteBrokerId = brokerId + 1
       val replicas = List[Integer](brokerId, remoteBrokerId).asJava
   
       spyPartition.createLogIfNotExists(isNew = false, isFutureReplica = 
false, offsetCheckpoints, None)
   
       val initializeTimeMs = time.milliseconds()
       assertTrue(spyPartition.makeLeader(
           new LeaderAndIsrPartitionState()
             .setControllerEpoch(controllerEpoch)
             .setLeader(brokerId)
             .setLeaderEpoch(leaderEpoch)
             .setIsr(List[Integer](brokerId).asJava)
             .setPartitionEpoch(1)
             .setReplicas(replicas)
             .setIsNew(true),
           offsetCheckpoints, None), "Expected become leader transition to 
succeed")
   
       doAnswer(_ => {
         // simulate topic is deleted at the moment
         spyPartition.delete()
         val replica = new Replica(remoteBrokerId, topicPartition)
         spyPartition.updateFollowerFetchState(replica, 
mock(classOf[LogOffsetMetadata]), 0, initializeTimeMs, 0, 0)
         mock(classOf[LogReadInfo])
       }).when(spyPartition).fetchRecords(any(), any(), anyLong(), anyInt(), 
anyBoolean(), anyBoolean())
   
       fetchFollower(spyPartition, replicaId = remoteBrokerId, fetchOffset = 3L)
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to