Hi Neng, See the comment from Enrico in the PIP issue <https://github.com/apache/pulsar/issues/15902>. We've put this in "Rejected alternatives" for the reasons that:
- it won't be easily available to all Pulsar users - it would be hard to guarantee compatibility with many Pulsar versions, and the Transformations will use many advanced features of Pulsar APIs I agree with Enrico that we should maintain this function in the main repo. I'll also add that: - This is a key feature that Kafka has <https://docs.confluent.io/platform/current/connect/transforms/overview.html> and that is lacking in Pulsar - A lot of users are asking for it (especially CDC users) - The next step will be to be able to chain this function with a Sink/Source to have an experience similar to Kafka I hope this answers your concerns about this PIP. Best regards Christophe Le mer. 8 juin 2022 à 19:43, Neng Lu <freen...@gmail.com> a écrit : > I would suggest first having some concrete implementations in a separate > repo. > After verifying its functionality and performance, then we can move into > the main pulsar repo. > > On Fri, Jun 3, 2022 at 5:09 AM Enrico Olivelli <eolive...@gmail.com> > wrote: > > > Overall I agree with the proposal. > > I left some minor feedback on the issue > > > > Thank you > > > > Enrico > > > > Il giorno gio 2 giu 2022 alle ore 16:57 Christophe Bornet > > <bornet.ch...@gmail.com> ha scritto: > > > > > > Dear Pulsar community, > > > > > > I opened PIP-173 https://github.com/apache/pulsar/issues/15902 to > > create a > > > built-in Function implementing the most common basic transformations. > > > > > > Let me know what you think. > > > > > > Best regards, > > > > > > Christophe > > > > > > ------ > > > > > > ## Motivation > > > > > > Currently, when users want to modify the data in Pulsar, they need to > > write > > > a Function. > > > For a lot of use cases, it would be handy for them to be able to use a > > > ready-made built-in Function that implements the most common basic > > > transformations like the ones available in [Kafka Connect’s SMTs]( > > > > > > https://docs.confluent.io/platform/current/connect/transforms/overview.html > > > ). > > > This removes users the burden of writing the Function themselves, > having > > to > > > understanding the perks of Pulsar Schemas, coding in a language that > they > > > may not master (probably Java if they want to do advanced stuff), and > > they > > > benefit from battle-tested, maintained, performance-optimised code. > > > > > > ## Goal > > > > > > This PIP is about providing a `TransformFunction` that executes a > > sequence > > > of basic transformations on the data. > > > The `TransformFunction` shall be easy to configure, launchable as a > > > built-in NAR. > > > The `TransformFunction` shall be able to apply a sequence of common > > > transformations in-memory so we don’t need to execute the > > > `TransformFunction` multiple times and read/write to a topic each time. > > > > > > This PIP is not about appending such a Function to a Source or a Sink. > > > While this is the ultimate goal, so we can provide an experience > similar > > to > > > Kafka SMTs and avoid a read/write to a topic, this work will be done > in a > > > future PIP. > > > It is expected that the code written for this PIP will be reusable in > > this > > > future work. > > > > > > ## API Changes > > > > > > This PIP will introduce a new `transform` module in `pulsar-function` > > > multi-module project. The produced artifact will be a NAR of the > > > TransformFunction. > > > > > > ## Implementation > > > > > > When it processes a record, `TransformFunction` will : > > > > > > * Create a mutable structure `TransformContext` that contains > > > > > > ```java > > > @Data > > > public class TransformContext { > > > private Context context; > > > private Schema<?> keySchema; > > > private Object keyObject; > > > private boolean keyModified; > > > private Schema<?> valueSchema; > > > private Object valueObject; > > > private boolean valueModified; > > > private KeyValueEncodingType keyValueEncodingType; > > > private String key; > > > private Map<String, String> properties; > > > private String outputTopic; > > > ``` > > > > > > If the record is a `KeyValue`, the key and value schemas and object are > > > unpacked. Otherwise the `keySchema` and `keyObject` are null. > > > > > > * Call in sequence the process method of a series of `TransformStep` on > > > this `TransformContext` > > > > > > ```java > > > public interface TransformStep { > > > void process(TransformContext transformContext) throws Exception; > > > } > > > ``` > > > > > > Each `TransformStep` can then modify the `TransformContext` as needed. > > > > > > * Call the `send()` method of the `TransformContext` which will create > > the > > > message to send to the outputTopic, repacking the KeyValue if needed. > > > > > > The `TransformFunction` will read its configuration as Json from > > > `userConfig` in the format: > > > > > > ```json > > > { > > > "steps": [ > > > { > > > "type": "drop-fields", "fields": "keyField1,keyField2", "part": > > "key" > > > }, > > > { > > > "type": "merge-key-value" > > > }, > > > { > > > "type": "unwrap-key-value" > > > }, > > > { > > > "type": "cast", "schema-type": "STRING" > > > } > > > ] > > > } > > > ``` > > > > > > Each step is defined by its `type` and uses its own arguments. > > > > > > This example config applied on a KeyValue<AVRO, AVRO> input record with > > > value `{key={keyField1: key1, keyField2: key2, keyField3: key3}, > > > value={valueField1: value1, valueField2: value2, valueField3: value3}}` > > > will give after each step: > > > ``` > > > {key={keyField1: key1, keyField2: key2, keyField3: key3}, > > > value={valueField1: value1, valueField2: value2, valueField3: > > > value3}}(KeyValue<AVRO, AVRO>) > > > | > > > | ”type": "drop-fields", "fields": "keyField1,keyField2”, > > > "part": "key” > > > | > > > {key={keyField3: key3}, value={valueField1: value1, valueField2: > value2, > > > valueField3: value3}} (KeyValue<AVRO, AVRO>) > > > | > > > | "type": "merge-key-value" > > > | > > > {key={keyField3: key3}, value={keyField3: key3, valueField1: value1, > > > valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>) > > > | > > > | "type": "unwrap-key-value" > > > | > > > {keyField3: key3, valueField1: value1, valueField2: value2, > valueField3: > > > value3} (AVRO) > > > | > > > | "type": "cast", "schema-type": "STRING" > > > | > > > {"keyField3": "key3", "valueField1": "value1", "valueField2": "value2", > > > "valueField3": "value3"} (STRING) > > > ``` > > > > > > `TransformFunction` will be built as a NAR including a `pulsar-io.yaml` > > > service file so it can be registered as a built-in function with name > > > `transform`. > > > > > > ## Reject Alternatives > > > > > > None > > > > > -- > Best Regards, > Neng >