Tom Bentley created KAFKA-9673:
----------------------------------

             Summary: Conditionally apply SMTs
                 Key: KAFKA-9673
                 URL: https://issues.apache.org/jira/browse/KAFKA-9673
             Project: Kafka
          Issue Type: New Feature
          Components: KafkaConnect
            Reporter: Tom Bentley
            Assignee: Tom Bentley


KAFKA-7052 ended up using IAE with a message, rather than NPE in the case of a 
SMT being applied to a record lacking a given field. It's still not possible to 
apply a SMT conditionally, which is what things like Debezium really need in 
order to apply transformations only to non-schema change events.

[~rhauch] suggested a mechanism to conditionally apply any SMT but was 
concerned about the possibility of a naming collision (assuming it was 
configured by a simple config)

I'd like to propose something which would solve this problem without the 
possibility of such collisions. The idea is to have a higher-level condition, 
which applies an arbitrary transformation (or transformation chain) according 
to some predicate on the record. 

More concretely, it might be configured like this:

{noformat}
  transforms.conditionalExtract.type: Conditional
  transforms.conditionalExtract.transforms: extractInt
  transforms.conditionalExtract.transforms.extractInt.type: 
org.apache.kafka.connect.transforms.ExtractField$Key
  transforms.conditionalExtract.transforms.extractInt.field: c1
  transforms.conditionalExtract.condition: topic-matches:<someRegexHere>
{noformat}

* The {{Conditional}} SMT is configured with its own list of transforms 
({{transforms.conditionalExtract.transforms}}) to apply. This would work just 
like the top level {{transforms}} config, so subkeys can be used to configure 
these transforms in the usual way.
* The {{condition}} config defines the predicate for when the transforms are 
applied to a record using a {{<condition-type>:<parameters>}} syntax

We could initially support three condition types:

*{{topic-matches:<pattern>}}* The transformation would be applied if the 
record's topic name matched the given regular expression pattern. For example, 
the following would apply the transformation on records being sent to any topic 
with a name beginning with "my-prefix-":
{noformat}
       transforms.conditionalExtract.condition: topic-matches:my-prefix-.*
{noformat}
   
*{{has-header:<header-name>}}* The transformation would be applied if the 
record had at least one header with the given name. For example, the following 
will apply the transformation on records with at least one header with the name 
"my-header":
{noformat}
       transforms.conditionalExtract.condition: has-header:my-header
{noformat}
   
*{{not:<condition-name>}}* This would negate the result of another named 
condition using the condition config prefix. For example, the following will 
apply the transformation on records which lack any header with the name 
my-header:

{noformat}
      transforms.conditionalExtract.condition: not:hasMyHeader
      transforms.conditionalExtract.condition.hasMyHeader: has-header:my-header
{noformat}

I foresee one implementation concern with this approach, which is that 
currently {{Transformation}} has to return a fixed {{ConfigDef}}, and this 
proposal would require something more flexible in order to allow the config 
parameters to depend on the listed transform aliases (and similarly for named 
predicate used for the {{not:}} predicate). I think this could be done by 
adding a {{default}} method to {{Transformation}} for getting the ConfigDef 
given the config, for example.

Obviously this would require a KIP, but before I spend any more time on this 
I'd be interested in your thoughts [~rhauch], [~rmoff], [~gunnar.morling].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to