Overall I agree with the proposal. I left some minor feedback on the issue Thank you
Enrico Il giorno gio 2 giu 2022 alle ore 16:57 Christophe Bornet <bornet.ch...@gmail.com> ha scritto: > > Dear Pulsar community, > > I opened PIP-173 https://github.com/apache/pulsar/issues/15902 to create a > built-in Function implementing the most common basic transformations. > > Let me know what you think. > > Best regards, > > Christophe > > ------ > > ## Motivation > > Currently, when users want to modify the data in Pulsar, they need to write > a Function. > For a lot of use cases, it would be handy for them to be able to use a > ready-made built-in Function that implements the most common basic > transformations like the ones available in [Kafka Connect’s SMTs]( > https://docs.confluent.io/platform/current/connect/transforms/overview.html > ). > This removes users the burden of writing the Function themselves, having to > understanding the perks of Pulsar Schemas, coding in a language that they > may not master (probably Java if they want to do advanced stuff), and they > benefit from battle-tested, maintained, performance-optimised code. > > ## Goal > > This PIP is about providing a `TransformFunction` that executes a sequence > of basic transformations on the data. > The `TransformFunction` shall be easy to configure, launchable as a > built-in NAR. > The `TransformFunction` shall be able to apply a sequence of common > transformations in-memory so we don’t need to execute the > `TransformFunction` multiple times and read/write to a topic each time. > > This PIP is not about appending such a Function to a Source or a Sink. > While this is the ultimate goal, so we can provide an experience similar to > Kafka SMTs and avoid a read/write to a topic, this work will be done in a > future PIP. > It is expected that the code written for this PIP will be reusable in this > future work. > > ## API Changes > > This PIP will introduce a new `transform` module in `pulsar-function` > multi-module project. The produced artifact will be a NAR of the > TransformFunction. > > ## Implementation > > When it processes a record, `TransformFunction` will : > > * Create a mutable structure `TransformContext` that contains > > ```java > @Data > public class TransformContext { > private Context context; > private Schema<?> keySchema; > private Object keyObject; > private boolean keyModified; > private Schema<?> valueSchema; > private Object valueObject; > private boolean valueModified; > private KeyValueEncodingType keyValueEncodingType; > private String key; > private Map<String, String> properties; > private String outputTopic; > ``` > > If the record is a `KeyValue`, the key and value schemas and object are > unpacked. Otherwise the `keySchema` and `keyObject` are null. > > * Call in sequence the process method of a series of `TransformStep` on > this `TransformContext` > > ```java > public interface TransformStep { > void process(TransformContext transformContext) throws Exception; > } > ``` > > Each `TransformStep` can then modify the `TransformContext` as needed. > > * Call the `send()` method of the `TransformContext` which will create the > message to send to the outputTopic, repacking the KeyValue if needed. > > The `TransformFunction` will read its configuration as Json from > `userConfig` in the format: > > ```json > { > "steps": [ > { > "type": "drop-fields", "fields": "keyField1,keyField2", "part": "key" > }, > { > "type": "merge-key-value" > }, > { > "type": "unwrap-key-value" > }, > { > "type": "cast", "schema-type": "STRING" > } > ] > } > ``` > > Each step is defined by its `type` and uses its own arguments. > > This example config applied on a KeyValue<AVRO, AVRO> input record with > value `{key={keyField1: key1, keyField2: key2, keyField3: key3}, > value={valueField1: value1, valueField2: value2, valueField3: value3}}` > will give after each step: > ``` > {key={keyField1: key1, keyField2: key2, keyField3: key3}, > value={valueField1: value1, valueField2: value2, valueField3: > value3}}(KeyValue<AVRO, AVRO>) > | > | ”type": "drop-fields", "fields": "keyField1,keyField2”, > "part": "key” > | > {key={keyField3: key3}, value={valueField1: value1, valueField2: value2, > valueField3: value3}} (KeyValue<AVRO, AVRO>) > | > | "type": "merge-key-value" > | > {key={keyField3: key3}, value={keyField3: key3, valueField1: value1, > valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>) > | > | "type": "unwrap-key-value" > | > {keyField3: key3, valueField1: value1, valueField2: value2, valueField3: > value3} (AVRO) > | > | "type": "cast", "schema-type": "STRING" > | > {"keyField3": "key3", "valueField1": "value1", "valueField2": "value2", > "valueField3": "value3"} (STRING) > ``` > > `TransformFunction` will be built as a NAR including a `pulsar-io.yaml` > service file so it can be registered as a built-in function with name > `transform`. > > ## Reject Alternatives > > None