yashmayya commented on code in PR #13184: URL: https://github.com/apache/kafka/pull/13184#discussion_r1104098263
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java: ########## @@ -16,54 +16,49 @@ */ package org.apache.kafka.connect.runtime; -import java.util.Map; -import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.ConnectRecord; -import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.predicates.Predicate; /** - * Decorator for a {@link Transformation} which applies the delegate only when a - * {@link Predicate} is true (or false, according to {@code negate}). + * Wrapper for a {@link Transformation} and corresponding optional {@link Predicate } + * which applies the transformation when the {@link Predicate} is true (or false, according to {@code negate}). + * If no {@link Predicate} is provided, the transformation will be unconditionally applied. * @param <R> The type of record (must be an implementation of {@link ConnectRecord}) */ -public class PredicatedTransformation<R extends ConnectRecord<R>> implements Transformation<R> { +public class TransformationStage<R extends ConnectRecord<R>> implements AutoCloseable { static final String PREDICATE_CONFIG = "predicate"; static final String NEGATE_CONFIG = "negate"; - final Predicate<R> predicate; - final Transformation<R> delegate; - final boolean negate; + private final Predicate<R> predicate; + private final Transformation<R> delegate; + private final boolean negate; - PredicatedTransformation(Predicate<R> predicate, boolean negate, Transformation<R> delegate) { + TransformationStage(Transformation<R> delegate) { + this(null, false, delegate); + } + + TransformationStage(Predicate<R> predicate, boolean negate, Transformation<R> delegate) { this.predicate = predicate; this.negate = negate; this.delegate = delegate; } - @Override - public void configure(Map<String, ?> configs) { - throw new ConnectException(PredicatedTransformation.class.getName() + ".configure() " + - "should never be called directly."); + public Class<? extends Transformation<R>> transformClass() { + @SuppressWarnings("unchecked") + Class<? extends Transformation<R>> transformClass = (Class<? extends Transformation<R>>) delegate.getClass(); + return transformClass; } - @Override public R apply(R record) { - if (negate ^ predicate.test(record)) { + if (predicate == null || negate ^ predicate.test(record)) { return delegate.apply(record); } return record; } - @Override - public ConfigDef config() { - throw new ConnectException(PredicatedTransformation.class.getName() + ".config() " + - "should never be called directly."); - } - @Override public void close() { Utils.closeQuietly(delegate, "predicated transformation"); Review Comment: ```suggestion Utils.closeQuietly(delegate, "transformation"); ``` nit: Since this may or may not be a predicated transformation now. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java: ########## @@ -270,28 +270,28 @@ public boolean includeRecordDetailsInErrorLog() { /** * Returns the initialized list of {@link Transformation} which are specified in {@link #TRANSFORMS_CONFIG}. */ - public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() { + public <R extends ConnectRecord<R>> List<TransformationStage<R>> transformations() { final List<String> transformAliases = getList(TRANSFORMS_CONFIG); - final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size()); + final List<TransformationStage<R>> transformations = new ArrayList<>(transformAliases.size()); for (String alias : transformAliases) { final String prefix = TRANSFORMS_CONFIG + "." + alias + "."; try { @SuppressWarnings("unchecked") final Transformation<R> transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class); Map<String, Object> configs = originalsWithPrefix(prefix); - Object predicateAlias = configs.remove(PredicatedTransformation.PREDICATE_CONFIG); - Object negate = configs.remove(PredicatedTransformation.NEGATE_CONFIG); + Object predicateAlias = configs.remove(TransformationStage.PREDICATE_CONFIG); + Object negate = configs.remove(TransformationStage.NEGATE_CONFIG); transformation.configure(configs); if (predicateAlias != null) { Review Comment: Thanks, the analysis makes sense and I agree that `null` seems like an appropriate default value here. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java: ########## @@ -147,10 +151,10 @@ public void singleTransform() { props.put("transforms.a.type", SimpleTransformation.class.getName()); props.put("transforms.a.magic.number", "42"); final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props); - final List<Transformation<R>> transformations = config.transformations(); + final List<TransformationStage<SinkRecord>> transformations = config.transformations(); assertEquals(1, transformations.size()); - final SimpleTransformation<R> xform = (SimpleTransformation<R>) transformations.get(0); - assertEquals(42, xform.magicNumber); + final TransformationStage<SinkRecord> xform = transformations.get(0); + assertEquals(42, xform.apply(DUMMY_RECORD).kafkaPartition().intValue()); Review Comment: Sounds good, I didn't really have any concerns with these changes; just curious about why they were being made. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java: ########## @@ -494,15 +500,25 @@ private void expectTopicCreation(String topic) { } } + private void mockSinkTransform() { + FaultyPassthrough<SinkRecord> faultyPassthrough = new FaultyPassthrough<>(); + @SuppressWarnings("unchecked") + Class<? extends Transformation<?>> value = (Class<? extends Transformation<?>>) (Class<?>) FaultyPassthrough.class; Review Comment: Huh, interesting, TIL. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java: ########## @@ -33,6 +34,7 @@ import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; Review Comment: This unused import is causing a checkstyle failure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org