I would suggest first having some concrete implementations in a separate
repo.
After verifying its functionality and performance, then we can move into
the main pulsar repo.

On Fri, Jun 3, 2022 at 5:09 AM Enrico Olivelli <eolive...@gmail.com> wrote:

> Overall I agree with the proposal.
> I left some minor feedback on the issue
>
> Thank you
>
> Enrico
>
> Il giorno gio 2 giu 2022 alle ore 16:57 Christophe Bornet
> <bornet.ch...@gmail.com> ha scritto:
> >
> > Dear Pulsar community,
> >
> > I opened PIP-173 https://github.com/apache/pulsar/issues/15902 to
> create a
> > built-in Function implementing the most common basic transformations.
> >
> > Let me know what you think.
> >
> > Best regards,
> >
> > Christophe
> >
> > ------
> >
> > ## Motivation
> >
> > Currently, when users want to modify the data in Pulsar, they need to
> write
> > a Function.
> > For a lot of use cases, it would be handy for them to be able to use a
> > ready-made built-in Function that implements the most common basic
> > transformations like the ones available in [Kafka Connect’s SMTs](
> >
> https://docs.confluent.io/platform/current/connect/transforms/overview.html
> > ).
> > This removes users the burden of writing the Function themselves, having
> to
> > understanding the perks of Pulsar Schemas, coding in a language that they
> > may not master (probably Java if they want to do advanced stuff), and
> they
> > benefit from battle-tested, maintained, performance-optimised code.
> >
> > ## Goal
> >
> > This PIP is about providing a `TransformFunction` that executes a
> sequence
> > of basic transformations on the data.
> > The `TransformFunction` shall be easy to configure, launchable as a
> > built-in NAR.
> > The `TransformFunction` shall be able to apply a sequence of common
> > transformations in-memory so we don’t need to execute the
> > `TransformFunction` multiple times and read/write to a topic each time.
> >
> > This PIP is not about appending such a Function to a Source or a Sink.
> > While this is the ultimate goal, so we can provide an experience similar
> to
> > Kafka SMTs and avoid a read/write to a topic, this work will be done in a
> > future PIP.
> > It is expected that the code written for this PIP will be reusable in
> this
> > future work.
> >
> > ## API Changes
> >
> > This PIP will introduce a new `transform` module in `pulsar-function`
> > multi-module project.  The produced artifact will be a NAR of the
> > TransformFunction.
> >
> > ## Implementation
> >
> > When it processes a record, `TransformFunction` will :
> >
> > * Create a mutable structure `TransformContext` that contains
> >
> > ```java
> > @Data
> > public class TransformContext {
> >     private Context context;
> >     private Schema<?> keySchema;
> >     private Object keyObject;
> >     private boolean keyModified;
> >     private Schema<?> valueSchema;
> >     private Object valueObject;
> >     private boolean valueModified;
> >     private KeyValueEncodingType keyValueEncodingType;
> >     private String key;
> >     private Map<String, String> properties;
> >     private String outputTopic;
> > ```
> >
> > If the record is a `KeyValue`, the key and value schemas and object are
> > unpacked. Otherwise the `keySchema` and `keyObject` are null.
> >
> > * Call in sequence the process method of a series of `TransformStep` on
> > this `TransformContext`
> >
> > ```java
> > public interface TransformStep {
> >     void process(TransformContext transformContext) throws Exception;
> > }
> > ```
> >
> > Each `TransformStep` can then modify the `TransformContext` as needed.
> >
> > * Call the `send()` method of the `TransformContext` which will create
> the
> > message to send to the outputTopic, repacking the KeyValue if needed.
> >
> > The `TransformFunction` will read its configuration as Json from
> > `userConfig` in the format:
> >
> > ```json
> > {
> >   "steps": [
> >     {
> >       "type": "drop-fields", "fields": "keyField1,keyField2", "part":
> "key"
> >     },
> >     {
> >       "type": "merge-key-value"
> >     },
> >     {
> >       "type": "unwrap-key-value"
> >     },
> >     {
> >       "type": "cast", "schema-type": "STRING"
> >     }
> >   ]
> > }
> > ```
> >
> > Each step is defined by its `type` and uses its own arguments.
> >
> > This example config applied on a KeyValue<AVRO, AVRO> input record with
> > value `{key={keyField1: key1, keyField2: key2, keyField3: key3},
> > value={valueField1: value1, valueField2: value2, valueField3: value3}}`
> > will give after each step:
> > ```
> > {key={keyField1: key1, keyField2: key2, keyField3: key3},
> > value={valueField1: value1, valueField2: value2, valueField3:
> > value3}}(KeyValue<AVRO, AVRO>)
> >            |
> >            | ”type": "drop-fields", "fields": "keyField1,keyField2”,
> > "part": "key”
> >            |
> > {key={keyField3: key3}, value={valueField1: value1, valueField2: value2,
> > valueField3: value3}} (KeyValue<AVRO, AVRO>)
> >            |
> >            | "type": "merge-key-value"
> >            |
> > {key={keyField3: key3}, value={keyField3: key3, valueField1: value1,
> > valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
> >            |
> >            | "type": "unwrap-key-value"
> >            |
> > {keyField3: key3, valueField1: value1, valueField2: value2, valueField3:
> > value3} (AVRO)
> >            |
> >            | "type": "cast", "schema-type": "STRING"
> >            |
> > {"keyField3": "key3", "valueField1": "value1", "valueField2": "value2",
> > "valueField3": "value3"} (STRING)
> > ```
> >
> > `TransformFunction` will be built as a NAR including a `pulsar-io.yaml`
> > service file so it can be registered as a built-in function with name
> > `transform`.
> >
> > ## Reject Alternatives
> >
> > None
>


-- 
Best Regards,
Neng

Reply via email to