I would suggest first having some concrete implementations in a separate repo. After verifying its functionality and performance, then we can move into the main pulsar repo.
On Fri, Jun 3, 2022 at 5:09 AM Enrico Olivelli <eolive...@gmail.com> wrote: > Overall I agree with the proposal. > I left some minor feedback on the issue > > Thank you > > Enrico > > Il giorno gio 2 giu 2022 alle ore 16:57 Christophe Bornet > <bornet.ch...@gmail.com> ha scritto: > > > > Dear Pulsar community, > > > > I opened PIP-173 https://github.com/apache/pulsar/issues/15902 to > create a > > built-in Function implementing the most common basic transformations. > > > > Let me know what you think. > > > > Best regards, > > > > Christophe > > > > ------ > > > > ## Motivation > > > > Currently, when users want to modify the data in Pulsar, they need to > write > > a Function. > > For a lot of use cases, it would be handy for them to be able to use a > > ready-made built-in Function that implements the most common basic > > transformations like the ones available in [Kafka Connect’s SMTs]( > > > https://docs.confluent.io/platform/current/connect/transforms/overview.html > > ). > > This removes users the burden of writing the Function themselves, having > to > > understanding the perks of Pulsar Schemas, coding in a language that they > > may not master (probably Java if they want to do advanced stuff), and > they > > benefit from battle-tested, maintained, performance-optimised code. > > > > ## Goal > > > > This PIP is about providing a `TransformFunction` that executes a > sequence > > of basic transformations on the data. > > The `TransformFunction` shall be easy to configure, launchable as a > > built-in NAR. > > The `TransformFunction` shall be able to apply a sequence of common > > transformations in-memory so we don’t need to execute the > > `TransformFunction` multiple times and read/write to a topic each time. > > > > This PIP is not about appending such a Function to a Source or a Sink. > > While this is the ultimate goal, so we can provide an experience similar > to > > Kafka SMTs and avoid a read/write to a topic, this work will be done in a > > future PIP. > > It is expected that the code written for this PIP will be reusable in > this > > future work. > > > > ## API Changes > > > > This PIP will introduce a new `transform` module in `pulsar-function` > > multi-module project. The produced artifact will be a NAR of the > > TransformFunction. > > > > ## Implementation > > > > When it processes a record, `TransformFunction` will : > > > > * Create a mutable structure `TransformContext` that contains > > > > ```java > > @Data > > public class TransformContext { > > private Context context; > > private Schema<?> keySchema; > > private Object keyObject; > > private boolean keyModified; > > private Schema<?> valueSchema; > > private Object valueObject; > > private boolean valueModified; > > private KeyValueEncodingType keyValueEncodingType; > > private String key; > > private Map<String, String> properties; > > private String outputTopic; > > ``` > > > > If the record is a `KeyValue`, the key and value schemas and object are > > unpacked. Otherwise the `keySchema` and `keyObject` are null. > > > > * Call in sequence the process method of a series of `TransformStep` on > > this `TransformContext` > > > > ```java > > public interface TransformStep { > > void process(TransformContext transformContext) throws Exception; > > } > > ``` > > > > Each `TransformStep` can then modify the `TransformContext` as needed. > > > > * Call the `send()` method of the `TransformContext` which will create > the > > message to send to the outputTopic, repacking the KeyValue if needed. > > > > The `TransformFunction` will read its configuration as Json from > > `userConfig` in the format: > > > > ```json > > { > > "steps": [ > > { > > "type": "drop-fields", "fields": "keyField1,keyField2", "part": > "key" > > }, > > { > > "type": "merge-key-value" > > }, > > { > > "type": "unwrap-key-value" > > }, > > { > > "type": "cast", "schema-type": "STRING" > > } > > ] > > } > > ``` > > > > Each step is defined by its `type` and uses its own arguments. > > > > This example config applied on a KeyValue<AVRO, AVRO> input record with > > value `{key={keyField1: key1, keyField2: key2, keyField3: key3}, > > value={valueField1: value1, valueField2: value2, valueField3: value3}}` > > will give after each step: > > ``` > > {key={keyField1: key1, keyField2: key2, keyField3: key3}, > > value={valueField1: value1, valueField2: value2, valueField3: > > value3}}(KeyValue<AVRO, AVRO>) > > | > > | ”type": "drop-fields", "fields": "keyField1,keyField2”, > > "part": "key” > > | > > {key={keyField3: key3}, value={valueField1: value1, valueField2: value2, > > valueField3: value3}} (KeyValue<AVRO, AVRO>) > > | > > | "type": "merge-key-value" > > | > > {key={keyField3: key3}, value={keyField3: key3, valueField1: value1, > > valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>) > > | > > | "type": "unwrap-key-value" > > | > > {keyField3: key3, valueField1: value1, valueField2: value2, valueField3: > > value3} (AVRO) > > | > > | "type": "cast", "schema-type": "STRING" > > | > > {"keyField3": "key3", "valueField1": "value1", "valueField2": "value2", > > "valueField3": "value3"} (STRING) > > ``` > > > > `TransformFunction` will be built as a NAR including a `pulsar-io.yaml` > > service file so it can be registered as a built-in function with name > > `transform`. > > > > ## Reject Alternatives > > > > None > -- Best Regards, Neng