Dear Pulsar community,

I opened PIP-173 https://github.com/apache/pulsar/issues/15902 to create a
built-in Function implementing the most common basic transformations.

Let me know what you think.

Best regards,

Christophe

------

## Motivation

Currently, when users want to modify the data in Pulsar, they need to write
a Function.
For a lot of use cases, it would be handy for them to be able to use a
ready-made built-in Function that implements the most common basic
transformations like the ones available in [Kafka Connect’s SMTs](
https://docs.confluent.io/platform/current/connect/transforms/overview.html
).
This removes users the burden of writing the Function themselves, having to
understanding the perks of Pulsar Schemas, coding in a language that they
may not master (probably Java if they want to do advanced stuff), and they
benefit from battle-tested, maintained, performance-optimised code.

## Goal

This PIP is about providing a `TransformFunction` that executes a sequence
of basic transformations on the data.
The `TransformFunction` shall be easy to configure, launchable as a
built-in NAR.
The `TransformFunction` shall be able to apply a sequence of common
transformations in-memory so we don’t need to execute the
`TransformFunction` multiple times and read/write to a topic each time.

This PIP is not about appending such a Function to a Source or a Sink.
While this is the ultimate goal, so we can provide an experience similar to
Kafka SMTs and avoid a read/write to a topic, this work will be done in a
future PIP.
It is expected that the code written for this PIP will be reusable in this
future work.

## API Changes

This PIP will introduce a new `transform` module in `pulsar-function`
multi-module project.  The produced artifact will be a NAR of the
TransformFunction.

## Implementation

When it processes a record, `TransformFunction` will :

* Create a mutable structure `TransformContext` that contains

```java
@Data
public class TransformContext {
    private Context context;
    private Schema<?> keySchema;
    private Object keyObject;
    private boolean keyModified;
    private Schema<?> valueSchema;
    private Object valueObject;
    private boolean valueModified;
    private KeyValueEncodingType keyValueEncodingType;
    private String key;
    private Map<String, String> properties;
    private String outputTopic;
```

If the record is a `KeyValue`, the key and value schemas and object are
unpacked. Otherwise the `keySchema` and `keyObject` are null.

* Call in sequence the process method of a series of `TransformStep` on
this `TransformContext`

```java
public interface TransformStep {
    void process(TransformContext transformContext) throws Exception;
}
```

Each `TransformStep` can then modify the `TransformContext` as needed.

* Call the `send()` method of the `TransformContext` which will create the
message to send to the outputTopic, repacking the KeyValue if needed.

The `TransformFunction` will read its configuration as Json from
`userConfig` in the format:

```json
{
  "steps": [
    {
      "type": "drop-fields", "fields": "keyField1,keyField2", "part": "key"
    },
    {
      "type": "merge-key-value"
    },
    {
      "type": "unwrap-key-value"
    },
    {
      "type": "cast", "schema-type": "STRING"
    }
  ]
}
```

Each step is defined by its `type` and uses its own arguments.

This example config applied on a KeyValue<AVRO, AVRO> input record with
value `{key={keyField1: key1, keyField2: key2, keyField3: key3},
value={valueField1: value1, valueField2: value2, valueField3: value3}}`
will give after each step:
```
{key={keyField1: key1, keyField2: key2, keyField3: key3},
value={valueField1: value1, valueField2: value2, valueField3:
value3}}(KeyValue<AVRO, AVRO>)
           |
           | ”type": "drop-fields", "fields": "keyField1,keyField2”,
"part": "key”
           |
{key={keyField3: key3}, value={valueField1: value1, valueField2: value2,
valueField3: value3}} (KeyValue<AVRO, AVRO>)
           |
           | "type": "merge-key-value"
           |
{key={keyField3: key3}, value={keyField3: key3, valueField1: value1,
valueField2: value2, valueField3: value3}} (KeyValue<AVRO, AVRO>)
           |
           | "type": "unwrap-key-value"
           |
{keyField3: key3, valueField1: value1, valueField2: value2, valueField3:
value3} (AVRO)
           |
           | "type": "cast", "schema-type": "STRING"
           |
{"keyField3": "key3", "valueField1": "value1", "valueField2": "value2",
"valueField3": "value3"} (STRING)
```

`TransformFunction` will be built as a NAR including a `pulsar-io.yaml`
service file so it can be registered as a built-in function with name
`transform`.

## Reject Alternatives

None

Reply via email to