Dear Pulsar community, I opened PIP-173 https://github.com/apache/pulsar/issues/15902 to create a built-in Function implementing the most common basic transformations.
Let me know what you think. Best regards, Christophe ------ ## Motivation Currently, when users want to modify the data in Pulsar, they need to write a Function. For a lot of use cases, it would be handy for them to be able to use a ready-made built-in Function that implements the most common basic transformations like the ones available in [Kafka Connect’s SMTs]( https://docs.confluent.io/platform/current/connect/transforms/overview.html ). This removes users the burden of writing the Function themselves, having to understanding the perks of Pulsar Schemas, coding in a language that they may not master (probably Java if they want to do advanced stuff), and they benefit from battle-tested, maintained, performance-optimised code. ## Goal This PIP is about providing a `TransformFunction` that executes a sequence of basic transformations on the data. The `TransformFunction` shall be easy to configure, launchable as a built-in NAR. The `TransformFunction` shall be able to apply a sequence of common transformations in-memory so we don’t need to execute the `TransformFunction` multiple times and read/write to a topic each time. This PIP is not about appending such a Function to a Source or a Sink. While this is the ultimate goal, so we can provide an experience similar to Kafka SMTs and avoid a read/write to a topic, this work will be done in a future PIP. It is expected that the code written for this PIP will be reusable in this future work. ## API Changes This PIP will introduce a new `transform` module in `pulsar-function` multi-module project. The produced artifact will be a NAR of the TransformFunction. ## Implementation When it processes a record, `TransformFunction` will : * Create a mutable structure `TransformContext` that contains ```java @Data public class TransformContext { private Context context; private Schema<?> keySchema; private Object keyObject; private boolean keyModified; private Schema<?> valueSchema; private Object valueObject; private boolean valueModified; private KeyValueEncodingType keyValueEncodingType; private String key; private Map<String, String> properties; private String outputTopic; ``` If the record is a `KeyValue`, the key and value schemas and object are unpacked. Otherwise the `keySchema` and `keyObject` are null. * Call in sequence the process method of a series of `TransformStep` on this `TransformContext` ```java public interface TransformStep { void process(TransformContext transformContext) throws Exception; } ``` Each `TransformStep` can then modify the `TransformContext` as needed. * Call the `send()` method of the `TransformContext` which will create the message to send to the outputTopic, repacking the KeyValue if needed. The `TransformFunction` will read its configuration as Json from `userConfig` in the format: ```json { "steps": [ { "type": "drop-fields", "fields": "keyField1,keyField2", "part": "key" }, { "type": "merge-key-value" }, { "type": "unwrap-key-value" }, { "type": "cast", "schema-type": "STRING" } ] } ``` Each step is defined by its `type` and uses its own arguments. This example config applied on a KeyValue<AVRO, AVRO> input record with value `{key={keyField1: key1, keyField2: key2, keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}}` will give after each step: ``` {key={keyField1: key1, keyField2: key2, keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}}(KeyValue<AVRO, AVRO>) | | ”type": "drop-fields", "fields": "keyField1,keyField2”, "part": "key” | {key={keyField3: key3}, value={valueField1: value1, valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>) | | "type": "merge-key-value" | {key={keyField3: key3}, value={keyField3: key3, valueField1: value1, valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>) | | "type": "unwrap-key-value" | {keyField3: key3, valueField1: value1, valueField2: value2, valueField3: value3} (AVRO) | | "type": "cast", "schema-type": "STRING" | {"keyField3": "key3", "valueField1": "value1", "valueField2": "value2", "valueField3": "value3"} (STRING) ``` `TransformFunction` will be built as a NAR including a `pulsar-io.yaml` service file so it can be registered as a built-in function with name `transform`. ## Reject Alternatives None