iit2009060 commented on code in PR #21535:
URL: https://github.com/apache/kafka/pull/21535#discussion_r2877183787


##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java:
##########
@@ -342,13 +350,144 @@ public void shouldGetToRunningWithOnlyGlobalTopology() 
throws Exception {
         kafkaStreams.close();
     }
 
+    private void createBuilderWithFailedProcessor() {
+        builder = new StreamsBuilder();
+        builder.addGlobalStore(
+                Stores.keyValueStoreBuilder(
+                        Stores.inMemoryKeyValueStore("test-global-store"),
+                        Serdes.Long(),
+                        Serdes.String()
+                ),
+                globalTableTopic,
+                Consumed.with(Serdes.Long(), Serdes.String()),
+                () -> new ContextualProcessor<Long, String, Void, Void>() {
+                    @Override
+                    public void 
process(org.apache.kafka.streams.processor.api.Record<Long, String> record) {
+                        if (record.key() == 2L) {
+                            throw new RuntimeException("Test processing 
exception");
+                        }
+                    }
+                }
+        );
+    }
+
+    @Test
+    public void 
testProcessingExceptionHandlerContinueEnabledRestorationPhase() throws 
Exception {
+        createBuilderWithFailedProcessor();
+        // enable processing exception handler invoked config
+        TestGlobalProcessingExceptionHandler.shouldResume = true;
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,true);
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
+            TestGlobalProcessingExceptionHandler.class);
+
+        produceInitialGlobalTableValues();
+        startStreams();
+        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
Duration.ofSeconds(30));
+
+        assertTrue(TestGlobalProcessingExceptionHandler.handlerInvoked.get());
+    }
+    @Test
+    public void testProcessingExceptionHandlerFailEnabledRestorationPhase() 
throws Exception {
+        createBuilderWithFailedProcessor();
+        // enable processing exception handler invoked config
+        TestGlobalProcessingExceptionHandler.shouldResume = false;
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,true);
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
+                TestGlobalProcessingExceptionHandler.class);
+
+        produceInitialGlobalTableValues();
+        assertThrows(StreamsException.class, () -> {
+            startStreams();
+        });
+        assertTrue(TestGlobalProcessingExceptionHandler.handlerInvoked.get());
+
+    }
+    @Test
+    public void testProcessingExceptionHandlerDisabledRestorationPhase() 
throws Exception {
+        createBuilderWithFailedProcessor();
+        // enable processing exception handler invoked config
+        TestGlobalProcessingExceptionHandler.shouldResume = false;
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,false);
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
+                TestGlobalProcessingExceptionHandler.class);
+
+        produceInitialGlobalTableValues();
+        assertThrows(StreamsException.class, () -> {
+            startStreams();
+        });
+        assertFalse(TestGlobalProcessingExceptionHandler.handlerInvoked.get());
+
+    }
+    @Test
+    public void testProcessingExceptionHandlerContinueEnabledRunTimePhase() 
throws Exception {
+        createBuilderWithFailedProcessor();
+        // enable processing exception handler invoked config
+        TestGlobalProcessingExceptionHandler.shouldResume = true;
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,true);
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
+                TestGlobalProcessingExceptionHandler.class);
+
+        startStreams();
+        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
Duration.ofSeconds(30));
+        produceInitialGlobalTableValues();
+        
+        TestUtils.waitForCondition(
+            () -> TestGlobalProcessingExceptionHandler.handlerInvoked.get(),
+            Duration.ofSeconds(30).toMillis(),
+            "Handler was not invoked for key 2L"
+        );
+    }
+    @Test
+    public void testProcessingExceptionHandlerFailEnabledRunTimePhase() throws 
Exception {
+        createBuilderWithFailedProcessor();
+        // enable processing exception handler invoked config
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,true);
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
+                TestGlobalProcessingExceptionHandler.class);
+
+        startStreams();
+        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
Duration.ofSeconds(30));
+        produceInitialGlobalTableValues();
+        waitForApplicationState(singletonList(kafkaStreams), State.ERROR, 
Duration.ofSeconds(30));
+        assertTrue(TestGlobalProcessingExceptionHandler.handlerInvoked.get());
+    }
+    @Test
+    public void testProcessingExceptionHandlerDisabledRunTimePhase() throws 
Exception {
+        createBuilderWithFailedProcessor();
+        // enable processing exception handler invoked config
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,false);
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
+                TestGlobalProcessingExceptionHandler.class);
+
+        startStreams();
+        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
Duration.ofSeconds(30));
+        produceInitialGlobalTableValues();
+        waitForApplicationState(singletonList(kafkaStreams), State.ERROR, 
Duration.ofSeconds(30));
+        assertFalse(TestGlobalProcessingExceptionHandler.handlerInvoked.get());
+    }
+
+    public static class TestGlobalProcessingExceptionHandler implements 
ProcessingExceptionHandler {
+        static AtomicBoolean handlerInvoked = new AtomicBoolean(false);
+        static boolean shouldResume = false;
+
+        @Override
+        public Response handleError(ErrorHandlerContext context, 
org.apache.kafka.streams.processor.api.Record<?, ?> record, Exception 
exception) {
+            handlerInvoked.set(true);
+            return shouldResume ? Response.resume() : Response.fail();
+        }

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to