Jerry

Il giorno gio 18 mar 2021 alle ore 03:07 Jerry Peng
<jerry.boyang.p...@gmail.com> ha scritto:
>
> Hi Enrico,
>
> Thanks for taking the initiative for improvements on Pulsar IO!
>
> I have questions in regards to the following statements
>
>
> *Problem 1: Pulsar Sinks must declare at compile time the data type they
> support*
> > So I would like to see Sink<Object> and let the implementation deal
> with Record<Object> that will give access to the decoded payload and
> to the Schema.
>
> What is the point of having a schema for a generic POJO object?  The point
> of a schema is to provide boundaries on what the structure of the data can
> be.  However, having a schema that supports a generic POJO that can have
> any structure seems paradoxical to me.
>
> Currently, the input schema and output schema for source, sink, and
> functions can be undefined i.e. there is no schema, which means that the
> source/sink/function will either write, read and write, or just read byte[]
> which is the most generic data representation there can be.  Why is this
> not sufficient for your use case?  Can the Pulsar IO wrapper code for Kafka
> connect just read or write byte[] and let the high up abstractions
> determine how to deserialize it?

For Sources you can have a Source<byte[]> and you can write records
with any schema.
This is good and it works well.

But it doesn't work for Sinks
currently if you declare a Sink<byte[]> you will see as schema Schema.BYTES.
If we had a Schema object that is preconfigured for decoding the
payload probably we will be done.
Generic sinks will be Sink<byte[]> and they will decode the payload with:

public void write(Record<byte[]> record) throws Exception {
    Schema<?> schema = record.getSchema();
   SchemaType type = schema.getSchemaInfo().getType();
   Object decoded = schema.decode(record.getValue());
}

but this does not work, because we have a strongly typed system and
you MUST write:

public void write(Record<byte[]> record) throws Exception {
    Schema<byte[]> schema = record.getSchema();
   SchemaType type = schema.getSchemaInfo().getType();
   Object decoded = record.getValue();
}

if I have Sink<Object> it becomes

public void write(Object record) throws Exception {
    Schema<Object> schema = record.getSchema();
   SchemaType type = schema.getSchemaInfo().getType();
   Object decoded = record.getValue();

   // switch on SchemaType ....
}


>
> *Problem 2: Pulsar IO Record does not handle non-String keys*
>
> The reason why Pulsar IO Record only supports specifying a key as a string
> is because Pulsar only supports specifying keys as strings. The key
> specified in Pulsar IO Record is also used as the partition key to
> determine which partition a message will be published into.  If the key
> returned from Kafka connect connector also needs to be used as the
> partition key then the method you specified will not work.  You will still
> need to translate the key back into a string or implement your own custom
> message router.

At the low level this is true, but we already have KeyValue#SEPARATED
that brings these features:
- you have a schema for the Key
- you have a schema for the Value
- your Key is stored in the Message key, and you can think it as an
Object (it is encoded to byte[] using the KeySchema and it is stored
in base64 as string into the message key)

It is not super efficient, due to the base64 encoding, but it is a
good tradeoff and it is already supported by Pulsar.

>
> Can you also explain how the additional methods you are suggesting to add
> will be used?  What entities will be implementing methods and setting the
> value? What entities will use those values? And how those values will be
> used.

If you are using KeyValue (especially in SEPARATED flavour) you
essentially will see a record as a key value pair of <Object, Object>,
with the key stored in the message key.

Most of these ideas come from porting connectors from Kafka Connect to
Pulsar IO.
I would like to start a separate thread about discussing a convention
for the mapping but the short version is:
When you come from Kafka you have a KeyValue<Object, Object>  in
SEPARATED encoding (this may be simplified in case of String or byte[]
keys, but let's stick to the most complicated case).
So your producer (probably a KafkaSource or a KafkaConnectSource)
writes to Pulsar a KeyValue and uses KeyValueSchema with SEPARATED
encoding.

On the Sink side you have your connector that (in Kafka) is used to
deal with an Object key and an Object value.
With those new methods you can use the logical key and the logical
value (and you can use the keySchema and the valueSchema)
and you have 1-1 mapping with what you did in Kafka.

This is particularly important if you have users that are trying to
migrate from Kafka to Pulsar, and there is no easy and no standard way
to do it

>
>
> *Problem 3:  Pulsar IO Records do not allow NULL values*
> Don't have a problem with this.  The use case you provided makes sense.

I will send a patch, thanks

Enrico

>
> On Wed, Mar 17, 2021 at 2:04 AM Enrico Olivelli <eolive...@gmail.com> wrote:
>
> > Hello everyone,
> >
> > I would like to share some proposals about Pulsar IO.  Based on my
> > experience porting a nontrivial connector for Cassandra from Kafka
> > Connect to Pulsar IO, I would like to identify some places where we
> > can improve the Pulsar IO API for both native Pulsar connectors and to
> > improve feature parity with Kafka Connect.  Ultimately, I would like
> > to provide a Kafka Connect compatibility library so that people can
> > use arbitrary Kafka connectors with Pulsar without having to port each
> > one.
> >
> > Problem 1: Pulsar Sinks must declare at compile time the data type they
> > support
> >
> > You can have a Sink<byte[]>, Sink<GenericRecord>, Sink<KeyValue>, but
> > you cannot have only one single Sink implementation that deals with
> > every kind of schema.
> >
> > In Kafka Connect you are used to dealing with Object at compile time
> > and deal with data types and Schemas in the implementation.
> >
> > So I would like to see Sink<Object> and let the implementation deal
> > with Record<Object> that will give access to the decoded payload and
> > to the Schema.
> >
> > I have started a work to support Schema<Object> here, that is the
> > first step in the direction of supporting Sink<Object>
> > https://github.com/apache/pulsar/pull/9895
> >
> >
> > Problem 2: Pulsar IO Record does not handle non-String keys
> >
> > We only have Record#getKey() that returns an Optional<String>. So even
> > if you are using KeyValue, you cannot access the logical value of the
> > key.
> >
> > The idea is to use KeyValue as foundation for supporting Schema in the
> > Key (this is nothing new, but it is still not much supported in Pulsar
> > IO)
> >
> > Kafka connect users deal with Object keys and we should provide the
> > same support.
> > We already have support for Object keys in case of KeyValue.
> > The proposal I am working with my colleagues is to add Object
> > getLogicalKey() to the Record interface.
> >
> > interface Record<T> {
> >
> >       Optional<Object> getLogicalKey();
> >       Optional<Object> getLogicalValue();
> >       Schema<T> getKeySchema();
> >       Schema<T> getValueSchema();
> >
> >
> >       Optional<String> getKey();
> >       Object getValue();
> >       Schema<T> getSchema();
> > }
> >
> > For messages that are using byte[] key (Message#hasBase64EncodedKey)
> > the getLogicalKey() returns the same as Message#getKeyBytes() and
> > getKeySchema will return Schema.BYTES.
> >
> > For regular messages getKeySchema() returns Schema.STRING.
> >
> > For messages that are using the KeyValue schema the new methods will
> > return the corresponding fields and metadata.
> >
> > getLogicalKey -> the key of the KeyValue
> > getLogicalValue -> the value of KeyValue
> > getKeySchema -> the schema for the logical key
> > getValueSchema -> the schema for the logical value
> > getKey -> the key of the message, as usual, for backward compatibility
> > getSchema -> the KeyValueSchema, as usual, for backward compatibility
> > getValue -> the KeyValue object, as usual, for backward compatibility
> >
> > Please note that we already have a KVRecord interface that exposes key
> > and value schema, but it is not used while pushing data to Sinks (it
> > looks like it is only used by KafkaConnectSource):
> >
> > public interface KVRecord<K, V> extends Record {
> >    Schema<K> getKeySchema();
> >    Schema<V> getValueSchema();
> >    KeyValueEncodingType getKeyValueEncodingType();
> > }
> >
> > Problem 3:  Pulsar IO Records do not allow NULL values
> >
> > Currently there is no support for NULL values in Pulsar IO.
> > This case makes sense if you are storing information in the key as well.
> >
> > Example use case: use the topic as a changelog of a database, put the
> > PK into the key and the rest of the record into the value. Use the
> > convention that a NULL value means a DELETE operation and a non NULL
> > value is an UPSERT.
> >
> > Thoughts ?
> >
> > Enrico Olivelli
> >

Reply via email to