mjsax commented on code in PR #17005: URL: https://github.com/apache/kafka/pull/17005#discussion_r1731648217
########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -555,12 +555,24 @@ public class StreamsConfig extends AbstractConfig { /** {@code default.deserialization.exception.handler} */ Review Comment: We should update the JavaDocs, too. Using `@deprecated` annotation and pointing to the new config to be used. ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -1907,16 +1929,42 @@ public TimestampExtractor defaultTimestampExtractor() { return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); } + public DeserializationExceptionHandler getDeserializationExceptionHandler() { + if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) { + return deserializationExceptionHandler(); + } else { + return defaultDeserializationExceptionHandler(); + } + } + @SuppressWarnings("WeakerAccess") - public DeserializationExceptionHandler defaultDeserializationExceptionHandler() { + private DeserializationExceptionHandler defaultDeserializationExceptionHandler() { return getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); } @SuppressWarnings("WeakerAccess") - public ProductionExceptionHandler defaultProductionExceptionHandler() { + public DeserializationExceptionHandler deserializationExceptionHandler() { + return getConfiguredInstance(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); + } + + public ProductionExceptionHandler getProductionExceptionHandler() { Review Comment: Some comments as above ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -2552,7 +2552,8 @@ public Set<TopicPartition> partitions() { } @ParameterizedTest - @MethodSource("data") + @MethodSource("data") + @SuppressWarnings("deprecation") Review Comment: This test verifies that the handler is doing the right thing. Thus, we should update the test to use the new config and not add this annotation ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -555,12 +555,24 @@ public class StreamsConfig extends AbstractConfig { /** {@code default.deserialization.exception.handler} */ @SuppressWarnings("WeakerAccess") + @Deprecated public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler"; + @Deprecated public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface."; + /** {@code deserialization.exception.handler} */ + @SuppressWarnings("WeakerAccess") + public static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "deserialization.exception.handler"; + Review Comment: nit: no empty line ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -889,6 +901,11 @@ public class StreamsConfig extends AbstractConfig { LogAndFailExceptionHandler.class.getName(), Importance.MEDIUM, DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC) + .define(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, Review Comment: Please insert at the right place (we keep it ordered alphabetically) ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -555,12 +555,24 @@ public class StreamsConfig extends AbstractConfig { /** {@code default.deserialization.exception.handler} */ @SuppressWarnings("WeakerAccess") + @Deprecated public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler"; + @Deprecated public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface."; + /** {@code deserialization.exception.handler} */ + @SuppressWarnings("WeakerAccess") + public static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "deserialization.exception.handler"; + + protected static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface."; /** {@code default.production.exception.handler} */ @SuppressWarnings("WeakerAccess") + @Deprecated public static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.production.exception.handler"; + + /** {@code production.exception.handler} */ + @SuppressWarnings("WeakerAccess") + public static final String PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "production.exception.handler"; private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProductionExceptionHandler</code> interface."; Review Comment: nit: rename variable, dropping `DEFAULT_` to align to config name. ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -1907,16 +1929,42 @@ public TimestampExtractor defaultTimestampExtractor() { return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); } + public DeserializationExceptionHandler getDeserializationExceptionHandler() { + if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) { + return deserializationExceptionHandler(); + } else { + return defaultDeserializationExceptionHandler(); + } + } + @SuppressWarnings("WeakerAccess") - public DeserializationExceptionHandler defaultDeserializationExceptionHandler() { + private DeserializationExceptionHandler defaultDeserializationExceptionHandler() { Review Comment: This is public API, and we cannot just make it private -- we can only deprecate it -- as above, must be covered by the KIP ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -555,12 +555,24 @@ public class StreamsConfig extends AbstractConfig { /** {@code default.deserialization.exception.handler} */ @SuppressWarnings("WeakerAccess") + @Deprecated public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler"; + @Deprecated public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface."; + /** {@code deserialization.exception.handler} */ + @SuppressWarnings("WeakerAccess") + public static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "deserialization.exception.handler"; + + protected static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface."; Review Comment: Why `protected` -- should be `private`? nit: missing empty line below. ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -555,12 +555,24 @@ public class StreamsConfig extends AbstractConfig { /** {@code default.deserialization.exception.handler} */ @SuppressWarnings("WeakerAccess") + @Deprecated public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler"; + @Deprecated public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface."; + /** {@code deserialization.exception.handler} */ + @SuppressWarnings("WeakerAccess") + public static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "deserialization.exception.handler"; + + protected static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface."; /** {@code default.production.exception.handler} */ Review Comment: Update JavaDocs? ########## streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java: ########## @@ -93,6 +95,11 @@ public class TopologyConfig extends AbstractConfig { null, Importance.MEDIUM, DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC) + .define(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, Review Comment: please insert at right place (alphabetically) ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -919,6 +936,11 @@ public class StreamsConfig extends AbstractConfig { DefaultProductionExceptionHandler.class.getName(), Importance.MEDIUM, DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC) + .define(PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, Review Comment: Please insert at the right place (we keep it ordered alphabetically) ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java: ########## @@ -113,6 +115,7 @@ public static void handleDeserializationFailure(final DeserializationExceptionHa throw new StreamsException("Deserialization exception handler is set to fail upon" + " a deserialization error. If you would rather have the streaming pipeline" + " continue after a deserialization error, please set the " + + DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " or the deprecated exception handler " + Review Comment: We should just point to the new one and not refer to the deprecated one at all. (For this case, we also don't need the `@SuppressWarnings` annotation ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -1907,16 +1929,42 @@ public TimestampExtractor defaultTimestampExtractor() { return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); } + public DeserializationExceptionHandler getDeserializationExceptionHandler() { Review Comment: ```suggestion public DeserializationExceptionHandler deserializationExceptionHandler() { ``` ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -1907,16 +1929,42 @@ public TimestampExtractor defaultTimestampExtractor() { return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); } + public DeserializationExceptionHandler getDeserializationExceptionHandler() { + if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) { + return deserializationExceptionHandler(); + } else { + return defaultDeserializationExceptionHandler(); + } + } + @SuppressWarnings("WeakerAccess") - public DeserializationExceptionHandler defaultDeserializationExceptionHandler() { + private DeserializationExceptionHandler defaultDeserializationExceptionHandler() { return getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); } @SuppressWarnings("WeakerAccess") - public ProductionExceptionHandler defaultProductionExceptionHandler() { + public DeserializationExceptionHandler deserializationExceptionHandler() { Review Comment: Seems to be redundant to `[get]DeserializationExceptionHandler` above? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java: ########## @@ -1158,6 +1159,75 @@ public void shouldSkipOnDeserializationErrorsWhenReprocessing() { assertEquals(0, stateRestoreCallback.restored.size()); } + @SuppressWarnings("deprecation") + @Test + public void verifyExceptionHandlerAcceptNewConfigWhenBothArePresent() { Review Comment: This test raised the question if we should log a WARN if both are set? But why do we test this on `GlobalStateManagerImplTest`? Seems this should go into `StreamsConfigTest`? ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -1907,16 +1929,42 @@ public TimestampExtractor defaultTimestampExtractor() { return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); } + public DeserializationExceptionHandler getDeserializationExceptionHandler() { Review Comment: We should not use `get` prefix. Also, this is public API and the KIP must cover it -- can you update the KIP accordingly, and send an update to the VOTE thread? Or: we don't add it as `public` -- I personally always found that having these method being `public` is a leaky abstraction.... But there was some disagreement about it: https://github.com/apache/kafka/pull/14548 ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java: ########## @@ -1066,6 +1067,41 @@ public void shouldOverrideGlobalStreamsConfigWhenGivenNamedTopologyProps() { assertThat(topologyBuilder.topologyConfigs().parseStoreType(), equalTo(Materialized.StoreType.IN_MEMORY)); } + @SuppressWarnings("deprecation") + @Test + public void exceptionHandlerShouldAcceptNewConfig() { + final Properties topologyOverrides = new Properties(); + topologyOverrides.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndFailExceptionHandler.class); + topologyOverrides.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + + final StreamsConfig config = new StreamsConfig(StreamsTestUtils.getStreamsConfig()); + final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder( + new TopologyConfig( + "my-topology", + config, + topologyOverrides) + ); + + assertThat(topologyBuilder.topologyConfigs().getTaskConfig().deserializationExceptionHandler.getClass(), equalTo(LogAndContinueExceptionHandler.class)); + } + + @Test + public void exceptionHandlerShouldAcceptNewConfigNoOtherDeprecatedConfigPresent() { Review Comment: Instead of adding a new test, should we just update an existing test which uses the old config now, and let is use the new config instead (maybe `shouldOverrideGlobalStreamsConfigWhenGivenNamedTopologyProps`) ? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java: ########## @@ -259,6 +259,11 @@ public void setDeserializationExceptionHandler(final DeserializationExceptionHan this.deserializationExceptionHandler = deserializationExceptionHandler; } + //Visible for testing + public DeserializationExceptionHandler getDeserializationExceptionHandler() { Review Comment: ```suggestion public DeserializationExceptionHandler deserializationExceptionHandler() { ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java: ########## @@ -1066,6 +1067,41 @@ public void shouldOverrideGlobalStreamsConfigWhenGivenNamedTopologyProps() { assertThat(topologyBuilder.topologyConfigs().parseStoreType(), equalTo(Materialized.StoreType.IN_MEMORY)); } + @SuppressWarnings("deprecation") + @Test + public void exceptionHandlerShouldAcceptNewConfig() { Review Comment: ```suggestion public void newDeserializationExceptionHandlerConfigShouldOverwriteOldOne() { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org