Overall I agree with the proposal.
I left some minor feedback on the issue

Thank you

Enrico

Il giorno gio 2 giu 2022 alle ore 16:57 Christophe Bornet
<bornet.ch...@gmail.com> ha scritto:
>
> Dear Pulsar community,
>
> I opened PIP-173 https://github.com/apache/pulsar/issues/15902 to create a
> built-in Function implementing the most common basic transformations.
>
> Let me know what you think.
>
> Best regards,
>
> Christophe
>
> ------
>
> ## Motivation
>
> Currently, when users want to modify the data in Pulsar, they need to write
> a Function.
> For a lot of use cases, it would be handy for them to be able to use a
> ready-made built-in Function that implements the most common basic
> transformations like the ones available in [Kafka Connect’s SMTs](
> https://docs.confluent.io/platform/current/connect/transforms/overview.html
> ).
> This removes users the burden of writing the Function themselves, having to
> understanding the perks of Pulsar Schemas, coding in a language that they
> may not master (probably Java if they want to do advanced stuff), and they
> benefit from battle-tested, maintained, performance-optimised code.
>
> ## Goal
>
> This PIP is about providing a `TransformFunction` that executes a sequence
> of basic transformations on the data.
> The `TransformFunction` shall be easy to configure, launchable as a
> built-in NAR.
> The `TransformFunction` shall be able to apply a sequence of common
> transformations in-memory so we don’t need to execute the
> `TransformFunction` multiple times and read/write to a topic each time.
>
> This PIP is not about appending such a Function to a Source or a Sink.
> While this is the ultimate goal, so we can provide an experience similar to
> Kafka SMTs and avoid a read/write to a topic, this work will be done in a
> future PIP.
> It is expected that the code written for this PIP will be reusable in this
> future work.
>
> ## API Changes
>
> This PIP will introduce a new `transform` module in `pulsar-function`
> multi-module project.  The produced artifact will be a NAR of the
> TransformFunction.
>
> ## Implementation
>
> When it processes a record, `TransformFunction` will :
>
> * Create a mutable structure `TransformContext` that contains
>
> ```java
> @Data
> public class TransformContext {
>     private Context context;
>     private Schema<?> keySchema;
>     private Object keyObject;
>     private boolean keyModified;
>     private Schema<?> valueSchema;
>     private Object valueObject;
>     private boolean valueModified;
>     private KeyValueEncodingType keyValueEncodingType;
>     private String key;
>     private Map<String, String> properties;
>     private String outputTopic;
> ```
>
> If the record is a `KeyValue`, the key and value schemas and object are
> unpacked. Otherwise the `keySchema` and `keyObject` are null.
>
> * Call in sequence the process method of a series of `TransformStep` on
> this `TransformContext`
>
> ```java
> public interface TransformStep {
>     void process(TransformContext transformContext) throws Exception;
> }
> ```
>
> Each `TransformStep` can then modify the `TransformContext` as needed.
>
> * Call the `send()` method of the `TransformContext` which will create the
> message to send to the outputTopic, repacking the KeyValue if needed.
>
> The `TransformFunction` will read its configuration as Json from
> `userConfig` in the format:
>
> ```json
> {
>   "steps": [
>     {
>       "type": "drop-fields", "fields": "keyField1,keyField2", "part": "key"
>     },
>     {
>       "type": "merge-key-value"
>     },
>     {
>       "type": "unwrap-key-value"
>     },
>     {
>       "type": "cast", "schema-type": "STRING"
>     }
>   ]
> }
> ```
>
> Each step is defined by its `type` and uses its own arguments.
>
> This example config applied on a KeyValue<AVRO, AVRO> input record with
> value `{key={keyField1: key1, keyField2: key2, keyField3: key3},
> value={valueField1: value1, valueField2: value2, valueField3: value3}}`
> will give after each step:
> ```
> {key={keyField1: key1, keyField2: key2, keyField3: key3},
> value={valueField1: value1, valueField2: value2, valueField3:
> value3}}(KeyValue<AVRO, AVRO>)
>            |
>            | ”type": "drop-fields", "fields": "keyField1,keyField2”,
> "part": "key”
>            |
> {key={keyField3: key3}, value={valueField1: value1, valueField2: value2,
> valueField3: value3}} (KeyValue<AVRO, AVRO>)
>            |
>            | "type": "merge-key-value"
>            |
> {key={keyField3: key3}, value={keyField3: key3, valueField1: value1,
> valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
>            |
>            | "type": "unwrap-key-value"
>            |
> {keyField3: key3, valueField1: value1, valueField2: value2, valueField3:
> value3} (AVRO)
>            |
>            | "type": "cast", "schema-type": "STRING"
>            |
> {"keyField3": "key3", "valueField1": "value1", "valueField2": "value2",
> "valueField3": "value3"} (STRING)
> ```
>
> `TransformFunction` will be built as a NAR including a `pulsar-io.yaml`
> service file so it can be registered as a built-in function with name
> `transform`.
>
> ## Reject Alternatives
>
> None

Reply via email to