Hi Neng,

See the comment from Enrico in the PIP issue
<https://github.com/apache/pulsar/issues/15902>. We've put this in
"Rejected alternatives" for the reasons that:

   - it won't be easily available to all Pulsar users
   - it would be hard to guarantee compatibility with many Pulsar versions,
   and the Transformations will use many advanced features of Pulsar APIs

I agree with Enrico that we should maintain this function in the main repo.
I'll also add that:

   - This is a key feature that Kafka has
   <https://docs.confluent.io/platform/current/connect/transforms/overview.html>
and
   that is lacking in Pulsar
   - A lot of users are asking for it (especially CDC users)
   - The next step will be to be able to chain this function with a
   Sink/Source to have an experience similar to Kafka

I hope this answers your concerns about this PIP.

Best regards

Christophe

Le mer. 8 juin 2022 à 19:43, Neng Lu <freen...@gmail.com> a écrit :

> I would suggest first having some concrete implementations in a separate
> repo.
> After verifying its functionality and performance, then we can move into
> the main pulsar repo.
>
> On Fri, Jun 3, 2022 at 5:09 AM Enrico Olivelli <eolive...@gmail.com>
> wrote:
>
> > Overall I agree with the proposal.
> > I left some minor feedback on the issue
> >
> > Thank you
> >
> > Enrico
> >
> > Il giorno gio 2 giu 2022 alle ore 16:57 Christophe Bornet
> > <bornet.ch...@gmail.com> ha scritto:
> > >
> > > Dear Pulsar community,
> > >
> > > I opened PIP-173 https://github.com/apache/pulsar/issues/15902 to
> > create a
> > > built-in Function implementing the most common basic transformations.
> > >
> > > Let me know what you think.
> > >
> > > Best regards,
> > >
> > > Christophe
> > >
> > > ------
> > >
> > > ## Motivation
> > >
> > > Currently, when users want to modify the data in Pulsar, they need to
> > write
> > > a Function.
> > > For a lot of use cases, it would be handy for them to be able to use a
> > > ready-made built-in Function that implements the most common basic
> > > transformations like the ones available in [Kafka Connect’s SMTs](
> > >
> >
> https://docs.confluent.io/platform/current/connect/transforms/overview.html
> > > ).
> > > This removes users the burden of writing the Function themselves,
> having
> > to
> > > understanding the perks of Pulsar Schemas, coding in a language that
> they
> > > may not master (probably Java if they want to do advanced stuff), and
> > they
> > > benefit from battle-tested, maintained, performance-optimised code.
> > >
> > > ## Goal
> > >
> > > This PIP is about providing a `TransformFunction` that executes a
> > sequence
> > > of basic transformations on the data.
> > > The `TransformFunction` shall be easy to configure, launchable as a
> > > built-in NAR.
> > > The `TransformFunction` shall be able to apply a sequence of common
> > > transformations in-memory so we don’t need to execute the
> > > `TransformFunction` multiple times and read/write to a topic each time.
> > >
> > > This PIP is not about appending such a Function to a Source or a Sink.
> > > While this is the ultimate goal, so we can provide an experience
> similar
> > to
> > > Kafka SMTs and avoid a read/write to a topic, this work will be done
> in a
> > > future PIP.
> > > It is expected that the code written for this PIP will be reusable in
> > this
> > > future work.
> > >
> > > ## API Changes
> > >
> > > This PIP will introduce a new `transform` module in `pulsar-function`
> > > multi-module project.  The produced artifact will be a NAR of the
> > > TransformFunction.
> > >
> > > ## Implementation
> > >
> > > When it processes a record, `TransformFunction` will :
> > >
> > > * Create a mutable structure `TransformContext` that contains
> > >
> > > ```java
> > > @Data
> > > public class TransformContext {
> > >     private Context context;
> > >     private Schema<?> keySchema;
> > >     private Object keyObject;
> > >     private boolean keyModified;
> > >     private Schema<?> valueSchema;
> > >     private Object valueObject;
> > >     private boolean valueModified;
> > >     private KeyValueEncodingType keyValueEncodingType;
> > >     private String key;
> > >     private Map<String, String> properties;
> > >     private String outputTopic;
> > > ```
> > >
> > > If the record is a `KeyValue`, the key and value schemas and object are
> > > unpacked. Otherwise the `keySchema` and `keyObject` are null.
> > >
> > > * Call in sequence the process method of a series of `TransformStep` on
> > > this `TransformContext`
> > >
> > > ```java
> > > public interface TransformStep {
> > >     void process(TransformContext transformContext) throws Exception;
> > > }
> > > ```
> > >
> > > Each `TransformStep` can then modify the `TransformContext` as needed.
> > >
> > > * Call the `send()` method of the `TransformContext` which will create
> > the
> > > message to send to the outputTopic, repacking the KeyValue if needed.
> > >
> > > The `TransformFunction` will read its configuration as Json from
> > > `userConfig` in the format:
> > >
> > > ```json
> > > {
> > >   "steps": [
> > >     {
> > >       "type": "drop-fields", "fields": "keyField1,keyField2", "part":
> > "key"
> > >     },
> > >     {
> > >       "type": "merge-key-value"
> > >     },
> > >     {
> > >       "type": "unwrap-key-value"
> > >     },
> > >     {
> > >       "type": "cast", "schema-type": "STRING"
> > >     }
> > >   ]
> > > }
> > > ```
> > >
> > > Each step is defined by its `type` and uses its own arguments.
> > >
> > > This example config applied on a KeyValue<AVRO, AVRO> input record with
> > > value `{key={keyField1: key1, keyField2: key2, keyField3: key3},
> > > value={valueField1: value1, valueField2: value2, valueField3: value3}}`
> > > will give after each step:
> > > ```
> > > {key={keyField1: key1, keyField2: key2, keyField3: key3},
> > > value={valueField1: value1, valueField2: value2, valueField3:
> > > value3}}(KeyValue<AVRO, AVRO>)
> > >            |
> > >            | ”type": "drop-fields", "fields": "keyField1,keyField2”,
> > > "part": "key”
> > >            |
> > > {key={keyField3: key3}, value={valueField1: value1, valueField2:
> value2,
> > > valueField3: value3}} (KeyValue<AVRO, AVRO>)
> > >            |
> > >            | "type": "merge-key-value"
> > >            |
> > > {key={keyField3: key3}, value={keyField3: key3, valueField1: value1,
> > > valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
> > >            |
> > >            | "type": "unwrap-key-value"
> > >            |
> > > {keyField3: key3, valueField1: value1, valueField2: value2,
> valueField3:
> > > value3} (AVRO)
> > >            |
> > >            | "type": "cast", "schema-type": "STRING"
> > >            |
> > > {"keyField3": "key3", "valueField1": "value1", "valueField2": "value2",
> > > "valueField3": "value3"} (STRING)
> > > ```
> > >
> > > `TransformFunction` will be built as a NAR including a `pulsar-io.yaml`
> > > service file so it can be registered as a built-in function with name
> > > `transform`.
> > >
> > > ## Reject Alternatives
> > >
> > > None
> >
>
>
> --
> Best Regards,
> Neng
>

Reply via email to