kirktrue commented on code in PR #21117:
URL: https://github.com/apache/kafka/pull/21117#discussion_r2669503166


##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -11749,4 +11750,84 @@ public void 
testDescribeTopicsTimeoutWhenNoBrokerResponds() throws Exception {
             assertTrue(duration >= 150L && duration < 30000);
         }
     }
+
+    /**
+     * Test that OutOfMemoryError is properly propagated and not masked as 
TimeoutException.
+     * This test simulates an OOM error during response processing and 
verifies it propagates
+     * without being wrapped. This is a regression test for KAFKA-19932.
+     */
+    @Test
+    public void testOutOfMemoryErrorPropagation() throws Exception {
+        MockTime time = new MockTime();
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
mockCluster(1, 0),
+                AdminClientConfig.RETRIES_CONFIG, "2",
+                AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "100")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            // Create a spy of MetadataResponse that throws OutOfMemoryError 
when topicMetadata() is accessed
+            // The AdminClient calls response.topicMetadata() in listTopics 
handleResponse(), which will trigger the OOM
+            MetadataResponseData data = new MetadataResponseData();
+            MetadataResponse realResponse = new MetadataResponse(data, 
ApiKeys.METADATA.latestVersion());
+            MetadataResponse spyResponse = spy(realResponse);
+
+            // Configure the spy to throw OutOfMemoryError when 
topicMetadata() is called
+            // This simulates an OOM occurring during response processing
+            doThrow(new OutOfMemoryError("Simulated OOM during response 
handling"))
+                .when(spyResponse).topicMetadata();
+
+            // Prepare the mocked response that will throw OOM
+            env.kafkaClient().prepareResponse(spyResponse);
+
+            // Make the listTopics call - this will internally trigger a 
metadata request
+            ListTopicsResult result = env.adminClient().listTopics(new 
ListTopicsOptions().timeoutMs(10000));
+
+            // The OOM should propagate as-is, not be wrapped in 
TimeoutException
+            // We expect ExecutionException wrapping the OutOfMemoryError 
(this is standard Future behavior)
+            ExecutionException exception = 
assertThrows(ExecutionException.class, () -> result.names().get());
+            assertInstanceOf(OutOfMemoryError.class, exception.getCause(),
+                    "Expected OutOfMemoryError to be propagated, but got: " + 
exception.getCause());
+            assertEquals("Simulated OOM during response handling", 
exception.getCause().getMessage());

Review Comment:
   Another option is to create the `OutOfMemoryError` outside of the 
`doThrow()` and then perform `assertEquals()`.



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -11749,4 +11750,84 @@ public void 
testDescribeTopicsTimeoutWhenNoBrokerResponds() throws Exception {
             assertTrue(duration >= 150L && duration < 30000);
         }
     }
+
+    /**
+     * Test that OutOfMemoryError is properly propagated and not masked as 
TimeoutException.
+     * This test simulates an OOM error during response processing and 
verifies it propagates
+     * without being wrapped. This is a regression test for KAFKA-19932.
+     */
+    @Test
+    public void testOutOfMemoryErrorPropagation() throws Exception {
+        MockTime time = new MockTime();
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
mockCluster(1, 0),
+                AdminClientConfig.RETRIES_CONFIG, "2",
+                AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "100")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            // Create a spy of MetadataResponse that throws OutOfMemoryError 
when topicMetadata() is accessed
+            // The AdminClient calls response.topicMetadata() in listTopics 
handleResponse(), which will trigger the OOM
+            MetadataResponseData data = new MetadataResponseData();
+            MetadataResponse realResponse = new MetadataResponse(data, 
ApiKeys.METADATA.latestVersion());
+            MetadataResponse spyResponse = spy(realResponse);

Review Comment:
   Is it possible to use a generic "mock" here instead of a spy?



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -11749,4 +11750,84 @@ public void 
testDescribeTopicsTimeoutWhenNoBrokerResponds() throws Exception {
             assertTrue(duration >= 150L && duration < 30000);
         }
     }
+
+    /**
+     * Test that OutOfMemoryError is properly propagated and not masked as 
TimeoutException.
+     * This test simulates an OOM error during response processing and 
verifies it propagates
+     * without being wrapped. This is a regression test for KAFKA-19932.
+     */
+    @Test
+    public void testOutOfMemoryErrorPropagation() throws Exception {
+        MockTime time = new MockTime();
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
mockCluster(1, 0),
+                AdminClientConfig.RETRIES_CONFIG, "2",
+                AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "100")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            // Create a spy of MetadataResponse that throws OutOfMemoryError 
when topicMetadata() is accessed
+            // The AdminClient calls response.topicMetadata() in listTopics 
handleResponse(), which will trigger the OOM
+            MetadataResponseData data = new MetadataResponseData();
+            MetadataResponse realResponse = new MetadataResponse(data, 
ApiKeys.METADATA.latestVersion());
+            MetadataResponse spyResponse = spy(realResponse);
+
+            // Configure the spy to throw OutOfMemoryError when 
topicMetadata() is called
+            // This simulates an OOM occurring during response processing
+            doThrow(new OutOfMemoryError("Simulated OOM during response 
handling"))
+                .when(spyResponse).topicMetadata();
+
+            // Prepare the mocked response that will throw OOM
+            env.kafkaClient().prepareResponse(spyResponse);
+
+            // Make the listTopics call - this will internally trigger a 
metadata request
+            ListTopicsResult result = env.adminClient().listTopics(new 
ListTopicsOptions().timeoutMs(10000));
+
+            // The OOM should propagate as-is, not be wrapped in 
TimeoutException
+            // We expect ExecutionException wrapping the OutOfMemoryError 
(this is standard Future behavior)
+            ExecutionException exception = 
assertThrows(ExecutionException.class, () -> result.names().get());
+            assertInstanceOf(OutOfMemoryError.class, exception.getCause(),
+                    "Expected OutOfMemoryError to be propagated, but got: " + 
exception.getCause());

Review Comment:
   Can you see if you can use TestUtils.assertFutureThrows() here? It could be 
slightly less boilerplate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to