Hi Enrico,

Thanks for taking the initiative for improvements on Pulsar IO!

I have questions in regards to the following statements

*Problem 1: Pulsar Sinks must declare at compile time the data type they
> So I would like to see Sink<Object> and let the implementation deal
with Record<Object> that will give access to the decoded payload and
to the Schema.

What is the point of having a schema for a generic POJO object?  The point
of a schema is to provide boundaries on what the structure of the data can
be.  However, having a schema that supports a generic POJO that can have
any structure seems paradoxical to me.

Currently, the input schema and output schema for source, sink, and
functions can be undefined i.e. there is no schema, which means that the
source/sink/function will either write, read and write, or just read byte[]
which is the most generic data representation there can be.  Why is this
not sufficient for your use case?  Can the Pulsar IO wrapper code for Kafka
connect just read or write byte[] and let the high up abstractions
determine how to deserialize it?

*Problem 2: Pulsar IO Record does not handle non-String keys*

The reason why Pulsar IO Record only supports specifying a key as a string
is because Pulsar only supports specifying keys as strings. The key
specified in Pulsar IO Record is also used as the partition key to
determine which partition a message will be published into.  If the key
returned from Kafka connect connector also needs to be used as the
partition key then the method you specified will not work.  You will still
need to translate the key back into a string or implement your own custom
message router.

Can you also explain how the additional methods you are suggesting to add
will be used?  What entities will be implementing methods and setting the
value? What entities will use those values? And how those values will be

*Problem 3:  Pulsar IO Records do not allow NULL values*
Don't have a problem with this.  The use case you provided makes sense.

On Wed, Mar 17, 2021 at 2:04 AM Enrico Olivelli <eolive...@gmail.com> wrote:

> Hello everyone,
> I would like to share some proposals about Pulsar IO.  Based on my
> experience porting a nontrivial connector for Cassandra from Kafka
> Connect to Pulsar IO, I would like to identify some places where we
> can improve the Pulsar IO API for both native Pulsar connectors and to
> improve feature parity with Kafka Connect.  Ultimately, I would like
> to provide a Kafka Connect compatibility library so that people can
> use arbitrary Kafka connectors with Pulsar without having to port each
> one.
> Problem 1: Pulsar Sinks must declare at compile time the data type they
> support
> You can have a Sink<byte[]>, Sink<GenericRecord>, Sink<KeyValue>, but
> you cannot have only one single Sink implementation that deals with
> every kind of schema.
> In Kafka Connect you are used to dealing with Object at compile time
> and deal with data types and Schemas in the implementation.
> So I would like to see Sink<Object> and let the implementation deal
> with Record<Object> that will give access to the decoded payload and
> to the Schema.
> I have started a work to support Schema<Object> here, that is the
> first step in the direction of supporting Sink<Object>
> https://github.com/apache/pulsar/pull/9895
> Problem 2: Pulsar IO Record does not handle non-String keys
> We only have Record#getKey() that returns an Optional<String>. So even
> if you are using KeyValue, you cannot access the logical value of the
> key.
> The idea is to use KeyValue as foundation for supporting Schema in the
> Key (this is nothing new, but it is still not much supported in Pulsar
> IO)
> Kafka connect users deal with Object keys and we should provide the
> same support.
> We already have support for Object keys in case of KeyValue.
> The proposal I am working with my colleagues is to add Object
> getLogicalKey() to the Record interface.
> interface Record<T> {
>       Optional<Object> getLogicalKey();
>       Optional<Object> getLogicalValue();
>       Schema<T> getKeySchema();
>       Schema<T> getValueSchema();
>       Optional<String> getKey();
>       Object getValue();
>       Schema<T> getSchema();
> }
> For messages that are using byte[] key (Message#hasBase64EncodedKey)
> the getLogicalKey() returns the same as Message#getKeyBytes() and
> getKeySchema will return Schema.BYTES.
> For regular messages getKeySchema() returns Schema.STRING.
> For messages that are using the KeyValue schema the new methods will
> return the corresponding fields and metadata.
> getLogicalKey -> the key of the KeyValue
> getLogicalValue -> the value of KeyValue
> getKeySchema -> the schema for the logical key
> getValueSchema -> the schema for the logical value
> getKey -> the key of the message, as usual, for backward compatibility
> getSchema -> the KeyValueSchema, as usual, for backward compatibility
> getValue -> the KeyValue object, as usual, for backward compatibility
> Please note that we already have a KVRecord interface that exposes key
> and value schema, but it is not used while pushing data to Sinks (it
> looks like it is only used by KafkaConnectSource):
> public interface KVRecord<K, V> extends Record {
>    Schema<K> getKeySchema();
>    Schema<V> getValueSchema();
>    KeyValueEncodingType getKeyValueEncodingType();
> }
> Problem 3:  Pulsar IO Records do not allow NULL values
> Currently there is no support for NULL values in Pulsar IO.
> This case makes sense if you are storing information in the key as well.
> Example use case: use the topic as a changelog of a database, put the
> PK into the key and the rest of the record into the value. Use the
> convention that a NULL value means a DELETE operation and a non NULL
> value is an UPSERT.
> Thoughts ?
> Enrico Olivelli

Reply via email to