satishd commented on code in PR #17737: URL: https://github.com/apache/kafka/pull/17737#discussion_r1836281096
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -790,8 +790,11 @@ protected LogContext getLogContext() { } public void run() { - if (isCancelled()) + if (isCancelled() || !remoteLogMetadataManager.isReady(topicIdPartition)) { + logger.debug("Skipping the current run for tpId {} as it is either cancelled: {} or " + Review Comment: nit: change `tpId` to `topic-partition` ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -790,8 +790,11 @@ protected LogContext getLogContext() { } public void run() { - if (isCancelled()) + if (isCancelled() || !remoteLogMetadataManager.isReady(topicIdPartition)) { + logger.debug("Skipping the current run for tpId {} as it is either cancelled: {} or " + + "remote log metadata is not ready", topicIdPartition, isCancelled()); Review Comment: nit: Good to add a specific value for `!remoteLogMetadataManager.isReady(topicIdPartition)` in the statement. One way to know with this change is that if cancelled is true then this value should be true for now. It is better to explicitly add that as we may have more changes in the condition. ########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -3643,6 +3649,34 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, l assertEquals(273, fetchDataInfo.fetchOffsetMetadata.relativePositionInSegment); } + @Test + public void testRLMOpsWhenMetadataIsNotReady() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(2); + when(remoteLogMetadataManager.isReady(any(TopicIdPartition.class))) + .thenAnswer(ans -> { + latch.countDown(); + return false; + }); + remoteLogManager.startup(); + remoteLogManager.onLeadershipChange( + Collections.singleton(mockPartition(leaderTopicIdPartition)), + Collections.singleton(mockPartition(followerTopicIdPartition)), + topicIds + ); + assertNotNull(remoteLogManager.rlmCopyTask(leaderTopicIdPartition)); + assertNotNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition)); + assertNotNull(remoteLogManager.followerTask(followerTopicIdPartition)); + + latch.await(5, TimeUnit.SECONDS); Review Comment: The earlier `remoteLogManager.onLeadershipChange` should have called `remoteLogMetadataManager.isReady` and return true. But if there are any regressions, it will wait for 5 seconds and the test will fail. Feel free to add a similar comment in the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org