satishd commented on code in PR #17737:
URL: https://github.com/apache/kafka/pull/17737#discussion_r1836281096


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -790,8 +790,11 @@ protected LogContext getLogContext() {
         }
 
         public void run() {
-            if (isCancelled())
+            if (isCancelled() || 
!remoteLogMetadataManager.isReady(topicIdPartition)) {
+                logger.debug("Skipping the current run for tpId {} as it is 
either cancelled: {} or " +

Review Comment:
   nit: change `tpId` to `topic-partition`



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -790,8 +790,11 @@ protected LogContext getLogContext() {
         }
 
         public void run() {
-            if (isCancelled())
+            if (isCancelled() || 
!remoteLogMetadataManager.isReady(topicIdPartition)) {
+                logger.debug("Skipping the current run for tpId {} as it is 
either cancelled: {} or " +
+                        "remote log metadata is not ready", topicIdPartition, 
isCancelled());

Review Comment:
   nit: Good to add a specific value for 
`!remoteLogMetadataManager.isReady(topicIdPartition)` in the statement. One way 
to know with this change is that if cancelled is true then this value should be 
true for now. It is better to explicitly add that as we may have more changes 
in the condition.



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -3643,6 +3649,34 @@ int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, l
         assertEquals(273, 
fetchDataInfo.fetchOffsetMetadata.relativePositionInSegment);
     }
 
+    @Test
+    public void testRLMOpsWhenMetadataIsNotReady() throws InterruptedException 
{
+        CountDownLatch latch = new CountDownLatch(2);
+        when(remoteLogMetadataManager.isReady(any(TopicIdPartition.class)))
+                .thenAnswer(ans -> {
+                    latch.countDown();
+                    return false;
+                });
+        remoteLogManager.startup();
+        remoteLogManager.onLeadershipChange(
+                Collections.singleton(mockPartition(leaderTopicIdPartition)),
+                Collections.singleton(mockPartition(followerTopicIdPartition)),
+                topicIds
+        );
+        assertNotNull(remoteLogManager.rlmCopyTask(leaderTopicIdPartition));
+        
assertNotNull(remoteLogManager.leaderExpirationTask(leaderTopicIdPartition));
+        assertNotNull(remoteLogManager.followerTask(followerTopicIdPartition));
+
+        latch.await(5, TimeUnit.SECONDS);

Review Comment:
   The earlier `remoteLogManager.onLeadershipChange` should have called 
`remoteLogMetadataManager.isReady` and return true. But if there are any 
regressions, it will wait for 5 seconds and the test will fail. Feel free to 
add a similar comment in the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to