yashmayya commented on code in PR #13184:
URL: https://github.com/apache/kafka/pull/13184#discussion_r1104098263


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java:
##########
@@ -16,54 +16,49 @@
  */
 package org.apache.kafka.connect.runtime;
 
-import java.util.Map;
 
-import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.ConnectRecord;
-import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
 
 /**
- * Decorator for a {@link Transformation} which applies the delegate only when 
a
- * {@link Predicate} is true (or false, according to {@code negate}).
+ * Wrapper for a {@link Transformation} and corresponding optional {@link 
Predicate }
+ * which applies the transformation when the {@link Predicate} is true (or 
false, according to {@code negate}).
+ * If no {@link Predicate} is provided, the transformation will be 
unconditionally applied.
  * @param <R> The type of record (must be an implementation of {@link 
ConnectRecord})
  */
-public class PredicatedTransformation<R extends ConnectRecord<R>> implements 
Transformation<R> {
+public class TransformationStage<R extends ConnectRecord<R>> implements 
AutoCloseable {
 
     static final String PREDICATE_CONFIG = "predicate";
     static final String NEGATE_CONFIG = "negate";
-    final Predicate<R> predicate;
-    final Transformation<R> delegate;
-    final boolean negate;
+    private final Predicate<R> predicate;
+    private final Transformation<R> delegate;
+    private final boolean negate;
 
-    PredicatedTransformation(Predicate<R> predicate, boolean negate, 
Transformation<R> delegate) {
+    TransformationStage(Transformation<R> delegate) {
+        this(null, false, delegate);
+    }
+
+    TransformationStage(Predicate<R> predicate, boolean negate, 
Transformation<R> delegate) {
         this.predicate = predicate;
         this.negate = negate;
         this.delegate = delegate;
     }
 
-    @Override
-    public void configure(Map<String, ?> configs) {
-        throw new ConnectException(PredicatedTransformation.class.getName() + 
".configure() " +
-                "should never be called directly.");
+    public Class<? extends Transformation<R>> transformClass() {
+        @SuppressWarnings("unchecked")
+        Class<? extends Transformation<R>> transformClass = (Class<? extends 
Transformation<R>>) delegate.getClass();
+        return transformClass;
     }
 
-    @Override
     public R apply(R record) {
-        if (negate ^ predicate.test(record)) {
+        if (predicate == null || negate ^ predicate.test(record)) {
             return delegate.apply(record);
         }
         return record;
     }
 
-    @Override
-    public ConfigDef config() {
-        throw new ConnectException(PredicatedTransformation.class.getName() + 
".config() " +
-                "should never be called directly.");
-    }
-
     @Override
     public void close() {
         Utils.closeQuietly(delegate, "predicated transformation");

Review Comment:
   ```suggestion
           Utils.closeQuietly(delegate, "transformation");
   ```
   nit: Since this may or may not be a predicated transformation now.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -270,28 +270,28 @@ public boolean includeRecordDetailsInErrorLog() {
     /**
      * Returns the initialized list of {@link Transformation} which are 
specified in {@link #TRANSFORMS_CONFIG}.
      */
-    public <R extends ConnectRecord<R>> List<Transformation<R>> 
transformations() {
+    public <R extends ConnectRecord<R>> List<TransformationStage<R>> 
transformations() {
         final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
 
-        final List<Transformation<R>> transformations = new 
ArrayList<>(transformAliases.size());
+        final List<TransformationStage<R>> transformations = new 
ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
 
             try {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = 
Utils.newInstance(getClass(prefix + "type"), Transformation.class);
                 Map<String, Object> configs = originalsWithPrefix(prefix);
-                Object predicateAlias = 
configs.remove(PredicatedTransformation.PREDICATE_CONFIG);
-                Object negate = 
configs.remove(PredicatedTransformation.NEGATE_CONFIG);
+                Object predicateAlias = 
configs.remove(TransformationStage.PREDICATE_CONFIG);
+                Object negate = 
configs.remove(TransformationStage.NEGATE_CONFIG);
                 transformation.configure(configs);
                 if (predicateAlias != null) {

Review Comment:
   Thanks, the analysis makes sense and I agree that `null` seems like an 
appropriate default value here.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java:
##########
@@ -147,10 +151,10 @@ public void singleTransform() {
         props.put("transforms.a.type", SimpleTransformation.class.getName());
         props.put("transforms.a.magic.number", "42");
         final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, 
props);
-        final List<Transformation<R>> transformations = 
config.transformations();
+        final List<TransformationStage<SinkRecord>> transformations = 
config.transformations();
         assertEquals(1, transformations.size());
-        final SimpleTransformation<R> xform = (SimpleTransformation<R>) 
transformations.get(0);
-        assertEquals(42, xform.magicNumber);
+        final TransformationStage<SinkRecord> xform = transformations.get(0);
+        assertEquals(42, 
xform.apply(DUMMY_RECORD).kafkaPartition().intValue());

Review Comment:
   Sounds good, I didn't really have any concerns with these changes; just 
curious about why they were being made.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -494,15 +500,25 @@ private void expectTopicCreation(String topic) {
         }
     }
 
+    private void mockSinkTransform() {
+        FaultyPassthrough<SinkRecord> faultyPassthrough = new 
FaultyPassthrough<>();
+        @SuppressWarnings("unchecked")
+        Class<? extends Transformation<?>> value = (Class<? extends 
Transformation<?>>) (Class<?>) FaultyPassthrough.class;

Review Comment:
   Huh, interesting, TIL.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java:
##########
@@ -33,6 +34,7 @@
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;

Review Comment:
   This unused import is causing a checkstyle failure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to