C0urante commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r428333743



##########
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+/**
+ * A predicate which is true for records which are tombstones (i.e. have null 
key).
+ * @param <R> The type of connect record.
+ */
+public class RecordIsTombstone<R extends ConnectRecord<R>> implements 
Predicate<R> {
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef();

Review comment:
       Probably won't impact performance too much but we could technically use 
a single `ConfigDef` instance for the entire class instead of creating a new 
one every time this method is called.

##########
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+/**
+ * A predicate which is true for records which are tombstones (i.e. have null 
key).
+ * @param <R> The type of connect record.
+ */
+public class RecordIsTombstone<R extends ConnectRecord<R>> implements 
Predicate<R> {
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef();
+    }
+
+    @Override
+    public boolean test(R record) {
+        return record.key() == null;

Review comment:
       I think we want to check the value instead of the key here?
   ```suggestion
           return record.value() == null;
   ```

##########
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/RecordIsTombstone.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+/**
+ * A predicate which is true for records which are tombstones (i.e. have null 
key).

Review comment:
       I think a tombstone is defined as a record with a null value, not a null 
key:
   ```suggestion
    * A predicate which is true for records which are tombstones (i.e. have 
null values).
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PredicatedTransformation.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
+
+/**
+ * Decorator for a {@link Transformation} which applies the delegate only when 
a
+ * {@link Predicate} is true (or false, according to {@code negate}).
+ * @param <R>
+ */
+class PredicatedTransformation<R extends ConnectRecord<R>> implements 
Transformation<R> {
+
+    /*test*/ final Predicate<R> predicate;
+    /*test*/ final Transformation<R> delegate;
+    /*test*/ final boolean negate;
+
+    PredicatedTransformation(Predicate<R> predicate, boolean negate, 
Transformation<R> delegate) {
+        this.predicate = predicate;
+        this.negate = negate;
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+
+    }
+
+    @Override
+    public R apply(R record) {
+        if (negate ^ predicate.test(record)) {
+            return delegate.apply(record);
+        }
+        return record;
+    }
+
+    @Override
+    public ConfigDef config() {
+        return null;

Review comment:
       Same comment here as with `Filter`; probably want to return a non-null 
`ConfigDef` here.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
##########
@@ -257,12 +273,25 @@ public boolean includeRecordDetailsInErrorLog() {
         final List<Transformation<R>> transformations = new 
ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
+
             try {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = getClass(prefix + 
"type").asSubclass(Transformation.class)
                         .getDeclaredConstructor().newInstance();
-                transformation.configure(originalsWithPrefix(prefix));
-                transformations.add(transformation);
+                Map<String, Object> configs = originalsWithPrefix(prefix);
+                Object predicateAlias = configs.remove("predicate");
+                Object negate = configs.remove("negate");

Review comment:
       Do we need to remove these properties here, or can we just read them? 
Removing might cause issues with SMTs that have config properties with these 
names; would leaving them in be likely to cause issues as well?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
##########
@@ -257,12 +273,25 @@ public boolean includeRecordDetailsInErrorLog() {
         final List<Transformation<R>> transformations = new 
ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
+
             try {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = getClass(prefix + 
"type").asSubclass(Transformation.class)
                         .getDeclaredConstructor().newInstance();
-                transformation.configure(originalsWithPrefix(prefix));
-                transformations.add(transformation);
+                Map<String, Object> configs = originalsWithPrefix(prefix);
+                Object predicateAlias = configs.remove("predicate");
+                Object negate = configs.remove("negate");
+                transformation.configure(configs);
+                if (predicateAlias != null) {
+                    String predicatePrefix = "predicates." + predicateAlias + 
".";
+                    @SuppressWarnings("unchecked")
+                    Predicate<R> predicate = getClass(predicatePrefix + 
"type").asSubclass(Predicate.class)
+                            .getDeclaredConstructor().newInstance();
+                    predicate.configure(originalsWithPrefix(predicatePrefix));
+                    transformations.add(new 
PredicatedTransformation<>(predicate, negate == null ? false : 
Boolean.parseBoolean(negate.toString()), transformation));

Review comment:
       Just curious, why directly parse the `negate` property here instead of 
doing that in `PredicatedTransformation::configure`?

##########
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+/**
+ * A predicate which is true for records with at least one header with the 
configured name.
+ * @param <R> The type of connect record.
+ */
+public class HasHeaderKey<R extends ConnectRecord<R>> implements Predicate<R> {
+
+    private static final String NAME_CONFIG_KEY = "name";
+    private String name;
+
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef().define(NAME_CONFIG_KEY, ConfigDef.Type.STRING, 
null,
+                new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM,
+                "The header name.");
+    }
+
+    @Override
+    public boolean test(R record) {
+        return record.headers().allWithName(name).hasNext();

Review comment:
       Might want to break this up to avoid an NPE since 
[Headers::allWithName](https://github.com/apache/kafka/blob/67770072da1bd13762af978faaa278c4039167a2/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java#L52)
 is technically allowed to return `null` in some situations:
   ```suggestion
           Iterator<Header> headersWithName = 
record.headers().allWithName(name);
           return headersWithName != null ? headersWithName.hasNext() : false;
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
##########
@@ -257,12 +273,25 @@ public boolean includeRecordDetailsInErrorLog() {
         final List<Transformation<R>> transformations = new 
ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
+
             try {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = getClass(prefix + 
"type").asSubclass(Transformation.class)
                         .getDeclaredConstructor().newInstance();
-                transformation.configure(originalsWithPrefix(prefix));
-                transformations.add(transformation);
+                Map<String, Object> configs = originalsWithPrefix(prefix);
+                Object predicateAlias = configs.remove("predicate");
+                Object negate = configs.remove("negate");
+                transformation.configure(configs);
+                if (predicateAlias != null) {
+                    String predicatePrefix = "predicates." + predicateAlias + 
".";
+                    @SuppressWarnings("unchecked")
+                    Predicate<R> predicate = getClass(predicatePrefix + 
"type").asSubclass(Predicate.class)
+                            .getDeclaredConstructor().newInstance();

Review comment:
       Blegh, was hoping we might be able to use `Utils::newInstance` or 
`AbstractConfig::newConfiguredInstance` but it looks like neither quite does 
what we need; the former doesn't do casting to a subclass unless you give it 
the FQCN of a class instead of an already-loaded `Class<?>` object, and the 
latter doesn't give enough control over exactly which properties the new 
instance is configured with.
   
   Since we're using this same logic in several different places in this file 
alone, we might consider expanding the `Utils` class with a new utility method 
that does this for us. Maybe something like:
   
   ```java
   public static Class<T> newInstance(Class<?> klass, Class<T> baseClass) {
       // Return an instance of klass that has been automatically cast to the 
type of baseClass
   }
   ```

##########
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/HasHeaderKey.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+/**
+ * A predicate which is true for records with at least one header with the 
configured name.
+ * @param <R> The type of connect record.
+ */
+public class HasHeaderKey<R extends ConnectRecord<R>> implements Predicate<R> {
+
+    private static final String NAME_CONFIG_KEY = "name";
+    private String name;
+
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef().define(NAME_CONFIG_KEY, ConfigDef.Type.STRING, 
null,
+                new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM,
+                "The header name.");
+    }
+
+    @Override
+    public boolean test(R record) {
+        return record.headers().allWithName(name).hasNext();
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        this.name = (String) configs.get(NAME_CONFIG_KEY);

Review comment:
       Might consider using a 
[SimpleConfig](https://github.com/apache/kafka/blob/67770072da1bd13762af978faaa278c4039167a2/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/SimpleConfig.java)
 here. Won't make a huge difference with the class as-is, but will make it 
easier to make changes in the future if we ever want to expand on the 
configurability of this predicate.

##########
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Filter.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+/**
+ * Drops all records, filtering them from subsequent transformations in the 
chain.
+ * This is intended to be used conditionally to filter out records matching 
(or not matching)
+ * a particular {@link 
org.apache.kafka.connect.transforms.predicates.Predicate}.
+ * @param <R> The type of record.
+ */
+public class Filter<R extends ConnectRecord<R>> implements Transformation<R> {
+
+    @Override
+    public R apply(R record) {
+        return null;
+    }
+
+    @Override
+    public ConfigDef config() {
+        return null;

Review comment:
       I think this will fail during validation since [transformations must 
provide non-null 
ConfigDefs](https://github.com/apache/kafka/blob/67770072da1bd13762af978faaa278c4039167a2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L355-L363).
 Might want to instantiate a single empty static `ConfigDef` object for the 
class and just return that?

##########
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/predicates/TopicNameMatches.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.predicates;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+/**
+ * A predicate which is true for records with a topic name that matches the 
configured regular expression.
+ * @param <R> The type of connect record.
+ */
+public class TopicNameMatches<R extends ConnectRecord<R>> implements 
Predicate<R> {
+
+    public static final String PATTERN_CONFIG_KEY = "pattern";
+    private Pattern pattern;
+
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef().define(PATTERN_CONFIG_KEY, 
ConfigDef.Type.STRING, null,
+                new ConfigDef.Validator() {
+                    @Override
+                    public void ensureValid(String name, Object value) {
+                        if (value != null) {
+                            compile(name, value);
+                        }
+                    }
+                }, ConfigDef.Importance.MEDIUM,
+                "A Java regular expression for matching against the name of a 
record's topic.");
+    }
+
+    private Pattern compile(String name, Object value) {
+        try {
+            return Pattern.compile((String) value);
+        } catch (PatternSyntaxException e) {
+            throw new ConfigException(name, value, "entry must be a 
Java-compatible regular expression: " + e.getMessage());
+        }
+    }
+
+    @Override
+    public boolean test(R record) {
+        return record.topic() != null && 
pattern.matcher(record.topic()).matches();
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        this.pattern = compile(PATTERN_CONFIG_KEY, 
configs.get(PATTERN_CONFIG_KEY));

Review comment:
       Same comment as elsewhere: might want to use a `SimpleConfig` in this 
class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to