GitHub user pointearth created a discussion: Pulsar function Should support 
Generic output Message

**Is your feature request related to a problem? Please describe.**
In the pulsar function, I don't want to define a particular class as O in
`public interface Function<I, O> {
    O process(I var1, Context var2) throws Exception;
}`

because I have a POJO class, but I always want to push a wrap class to wrap the 
POJO class as output. because the only different thing between this wrap class 
and the POJO class is wrap class always adds a JSON field to any of POJO class.

We can prodcue a Generic message with pulsar.client.api:
`RecordSchemaBuilder recordSchemaBuilder = 
SchemaBuilder.record(USER_CONST.SCHEMA_NAME);
        recordSchemaBuilder.field(USER_CONST.AGE).type(SchemaType.INT32);
        recordSchemaBuilder.field(USER_CONST.NAME).type(SchemaType.STRING);
        
recordSchemaBuilder.field(USER_CONST.JSON_FOR_EVENT).type(SchemaType.STRING);
        SchemaInfo schemaInfo = recordSchemaBuilder.build(SchemaType.AVRO);
        GenericSchema userSchema = Schema.generic(schemaInfo);
        Producer<GenericRecord> producer = pulsarClient.newProducer( userSchema)
                .topic(TOPIC_CONST.TOPIC_FUN_GENERIC)
                .create();
        User user = new User();
        user.age = 27;
        user.name = "Simon Generic";
        Gson gson = new Gson();
        String jsonForEvent = gson.toJson(user);
        GenericRecordBuilder genericRecordBuilder = 
userSchema.newRecordBuilder();
        GenericRecord genericRecord = genericRecordBuilder
                .set(USER_CONST.AGE, user.age)
                .set(USER_CONST.NAME,user.name)
                .set(USER_CONST.JSON_FOR_EVENT, jsonForEvent)
                .build();`

but, I am trying many ways to produce dynamic object into a pulsar, just like:
`public class FunWrapGeneric implements Function<String, GenericRecord> {
    @Override
    public GenericRecord process(String input, Context context) throws 
Exception {`

but I always got failed. Can we support this? 
 
**Describe the solution you'd like**
1. Add GenericRecord support for pulsar function
`public class FunWrapGeneric implements Function<String, GenericRecord>`
2. or Add a generic Class 
`public class FunWrapGeneric implements Function<String, RecordWrap<T>> {
    @Override
    public RecordWrap<T> process(String input, Context context) throws 
Exception {`
and interface RecordWrap
support add or reduce fields from T.

3. or Add an override of pulsar function want a Generic class as a parameter of 
pulsar function
 `FunWrapGeneric implements Function<String,Schema>` 

cc @sijie @codelipenghui 


GitHub link: https://github.com/apache/pulsar/discussions/18875

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscr...@pulsar.apache.org

Reply via email to