mjsax commented on code in PR #17005: URL: https://github.com/apache/kafka/pull/17005#discussion_r1733654478
########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -553,15 +553,35 @@ public class StreamsConfig extends AbstractConfig { @SuppressWarnings("WeakerAccess") public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG; - /** {@code default.deserialization.exception.handler} */ + /** + * {@code default.deserialization.exception.handler} + * @deprecated since 4.0. + * Use deserialization.exception.handler instead + */ @SuppressWarnings("WeakerAccess") + @Deprecated public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler"; + @Deprecated public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface."; - /** {@code default.production.exception.handler} */ + /** {@code deserialization.exception.handler} */ + @SuppressWarnings("WeakerAccess") + public static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "deserialization.exception.handler"; + protected static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface."; + + /** + * {@code default.production.exception.handler} + * @deprecated since 4.0. + * Use production.exception.handler instead Review Comment: as above ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -1907,13 +1937,38 @@ public TimestampExtractor defaultTimestampExtractor() { return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); } + public DeserializationExceptionHandler deserializationExceptionHandler() { + if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null && + getClass(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) { + log.warn("Both the deprecated and new config for deserialization exception handler are configured !!"); + } + if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) { + return getConfiguredInstance(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); + } else { + return defaultDeserializationExceptionHandler(); + } + } + + /** + * @deprecated as of kafka 4.0. Use deserializationExceptionHandler() instead Review Comment: ```suggestion * @deprecated as of kafka 4.0; use {@link #deserializationExceptionHandler()} instead ``` ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -1907,13 +1937,38 @@ public TimestampExtractor defaultTimestampExtractor() { return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); } + public DeserializationExceptionHandler deserializationExceptionHandler() { + if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null && + getClass(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) { + log.warn("Both the deprecated and new config for deserialization exception handler are configured !!"); Review Comment: ```suggestion log.warn("Both the deprecated and new config for deserialization exception handler are configured. The deprecated one will be ignored."); ``` ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -918,12 +938,7 @@ public class StreamsConfig extends AbstractConfig { Type.CLASS, DefaultProductionExceptionHandler.class.getName(), Importance.MEDIUM, - DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC) - .define(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, Review Comment: Thanks for the side cleanup! ########## streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java: ########## @@ -1622,6 +1624,30 @@ public void testInvalidProcessingExceptionHandler() { ); } + @Test + public void testDeserializationExceptionHandlerWhenOnlyNewConfigIsSet() { + props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + streamsConfig = new StreamsConfig(props); + assertEquals(LogAndContinueExceptionHandler.class, streamsConfig.deserializationExceptionHandler().getClass()); + } + + @SuppressWarnings("deprecation") + @Test + public void testDeserializationExceptionHandlerWhenBothConfigsAreSet() { + props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndFailExceptionHandler.class); + streamsConfig = new StreamsConfig(props); + assertEquals(LogAndContinueExceptionHandler.class, streamsConfig.deserializationExceptionHandler().getClass()); + } + + @SuppressWarnings("deprecation") + @Test + public void testDeserializationExceptionHandlerWhenOnlyOldConfigIsSet() { + props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + streamsConfig = new StreamsConfig(props); + assertEquals(LogAndFailExceptionHandler.class, streamsConfig.deserializationExceptionHandler().getClass()); Review Comment: This one should expect `LogAndContinue` right? (I think that this test passing right now exposes the bug that you use `getClass()` right now, instead of `origins().containsKey()` ########## streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java: ########## @@ -93,6 +95,11 @@ public class TopologyConfig extends AbstractConfig { null, Importance.MEDIUM, DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC) + .define(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, Review Comment: Seems this was not addressed yet. ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -553,15 +553,35 @@ public class StreamsConfig extends AbstractConfig { @SuppressWarnings("WeakerAccess") public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG; - /** {@code default.deserialization.exception.handler} */ + /** + * {@code default.deserialization.exception.handler} + * @deprecated since 4.0. + * Use deserialization.exception.handler instead Review Comment: Make one line: ``` @deprecated since 4.0; use {@link #DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG} instead ``` ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -918,12 +938,7 @@ public class StreamsConfig extends AbstractConfig { Type.CLASS, DefaultProductionExceptionHandler.class.getName(), Importance.MEDIUM, - DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC) - .define(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, - Type.CLASS, - LogAndFailProcessingExceptionHandler.class.getName(), - Importance.MEDIUM, - PROCESSING_EXCEPTION_HANDLER_CLASS_DOC) + PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC) Review Comment: Should this stay `DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC`? ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -1907,13 +1937,38 @@ public TimestampExtractor defaultTimestampExtractor() { return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); } + public DeserializationExceptionHandler deserializationExceptionHandler() { + if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null && + getClass(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) { + log.warn("Both the deprecated and new config for deserialization exception handler are configured !!"); + } + if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) { + return getConfiguredInstance(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); + } else { + return defaultDeserializationExceptionHandler(); + } + } + + /** + * @deprecated as of kafka 4.0. Use deserializationExceptionHandler() instead + * @return DeserializationExceptionHandler + */ + @Deprecated @SuppressWarnings("WeakerAccess") public DeserializationExceptionHandler defaultDeserializationExceptionHandler() { return getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); } + public ProductionExceptionHandler productionExceptionHandler() { + if (getClass(PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) { Review Comment: This should work the same as for deserialization handler, including the check if both old and new configs are set? ########## streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java: ########## @@ -223,11 +230,15 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo timestampExtractorSupplier = () -> globalAppConfigs.getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); } - if (isTopologyOverride(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, topologyOverrides)) { - deserializationExceptionHandlerSupplier = () -> getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); - log.info("Topology {} is overriding {} to {}", topologyName, DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, getClass(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)); + final String deserializationExceptionHandlerKey = getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null ? Review Comment: as above: `getClass` -> `origins().containsKey()` ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -1907,13 +1937,38 @@ public TimestampExtractor defaultTimestampExtractor() { return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); } + public DeserializationExceptionHandler deserializationExceptionHandler() { + if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null && + getClass(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) { + log.warn("Both the deprecated and new config for deserialization exception handler are configured !!"); + } + if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) { Review Comment: as above ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -1907,13 +1937,38 @@ public TimestampExtractor defaultTimestampExtractor() { return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); } + public DeserializationExceptionHandler deserializationExceptionHandler() { + if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null && + getClass(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) { + log.warn("Both the deprecated and new config for deserialization exception handler are configured !!"); + } + if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) { + return getConfiguredInstance(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); + } else { + return defaultDeserializationExceptionHandler(); + } + } + + /** + * @deprecated as of kafka 4.0. Use deserializationExceptionHandler() instead + * @return DeserializationExceptionHandler + */ + @Deprecated @SuppressWarnings("WeakerAccess") public DeserializationExceptionHandler defaultDeserializationExceptionHandler() { return getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); } + public ProductionExceptionHandler productionExceptionHandler() { + if (getClass(PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) { + return getConfiguredInstance(PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class); + } else { + return defaultProductionExceptionHandler(); + } + } + @SuppressWarnings("WeakerAccess") - public ProductionExceptionHandler defaultProductionExceptionHandler() { + private ProductionExceptionHandler defaultProductionExceptionHandler() { Review Comment: We cannot make public stuff private -- we need to first deprecate it ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -1907,13 +1937,38 @@ public TimestampExtractor defaultTimestampExtractor() { return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); } + public DeserializationExceptionHandler deserializationExceptionHandler() { + if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null && Review Comment: `getClass` would return the default if not set. I think we would need to use ``` originals().containsKey(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) ``` we should add a test if the log line is printed, too (cf helper `LogCaptureAppender.java`) ########## streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java: ########## @@ -1622,6 +1624,30 @@ public void testInvalidProcessingExceptionHandler() { ); } + @Test + public void testDeserializationExceptionHandlerWhenOnlyNewConfigIsSet() { + props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + streamsConfig = new StreamsConfig(props); + assertEquals(LogAndContinueExceptionHandler.class, streamsConfig.deserializationExceptionHandler().getClass()); + } + + @SuppressWarnings("deprecation") + @Test + public void testDeserializationExceptionHandlerWhenBothConfigsAreSet() { + props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndFailExceptionHandler.class); + streamsConfig = new StreamsConfig(props); + assertEquals(LogAndContinueExceptionHandler.class, streamsConfig.deserializationExceptionHandler().getClass()); Review Comment: We can add an assertion for the WARN log line in this test. ########## streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java: ########## @@ -1622,6 +1624,30 @@ public void testInvalidProcessingExceptionHandler() { ); } + @Test + public void testDeserializationExceptionHandlerWhenOnlyNewConfigIsSet() { Review Comment: ```suggestion public void shouldSetAndGetDeserializationExceptionHandlerWhenOnlyNewConfigIsSet() { ``` ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -1907,13 +1937,38 @@ public TimestampExtractor defaultTimestampExtractor() { return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); } + public DeserializationExceptionHandler deserializationExceptionHandler() { + if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null && + getClass(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) { + log.warn("Both the deprecated and new config for deserialization exception handler are configured !!"); + } + if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) { + return getConfiguredInstance(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); + } else { + return defaultDeserializationExceptionHandler(); + } + } + + /** + * @deprecated as of kafka 4.0. Use deserializationExceptionHandler() instead + * @return DeserializationExceptionHandler Review Comment: I think we can omit this one ########## streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java: ########## @@ -1622,6 +1624,30 @@ public void testInvalidProcessingExceptionHandler() { ); } + @Test + public void testDeserializationExceptionHandlerWhenOnlyNewConfigIsSet() { + props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + streamsConfig = new StreamsConfig(props); + assertEquals(LogAndContinueExceptionHandler.class, streamsConfig.deserializationExceptionHandler().getClass()); + } + + @SuppressWarnings("deprecation") + @Test + public void testDeserializationExceptionHandlerWhenBothConfigsAreSet() { + props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndFailExceptionHandler.class); + streamsConfig = new StreamsConfig(props); + assertEquals(LogAndContinueExceptionHandler.class, streamsConfig.deserializationExceptionHandler().getClass()); + } + + @SuppressWarnings("deprecation") + @Test + public void testDeserializationExceptionHandlerWhenOnlyOldConfigIsSet() { Review Comment: ```suggestion public void shouldUseOldDeserializationExceptionHandlerWhenOnlyOldConfigIsSet() { ``` ########## streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java: ########## @@ -1622,6 +1624,30 @@ public void testInvalidProcessingExceptionHandler() { ); } + @Test + public void testDeserializationExceptionHandlerWhenOnlyNewConfigIsSet() { + props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + streamsConfig = new StreamsConfig(props); + assertEquals(LogAndContinueExceptionHandler.class, streamsConfig.deserializationExceptionHandler().getClass()); + } + + @SuppressWarnings("deprecation") + @Test + public void testDeserializationExceptionHandlerWhenBothConfigsAreSet() { Review Comment: ```suggestion public void shouldUseNewDeserializationExceptionHandlerWhenBothConfigsAreSet() { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org