Hello everyone,

I would like to share some proposals about Pulsar IO.  Based on my
experience porting a nontrivial connector for Cassandra from Kafka
Connect to Pulsar IO, I would like to identify some places where we
can improve the Pulsar IO API for both native Pulsar connectors and to
improve feature parity with Kafka Connect.  Ultimately, I would like
to provide a Kafka Connect compatibility library so that people can
use arbitrary Kafka connectors with Pulsar without having to port each
one.

Problem 1: Pulsar Sinks must declare at compile time the data type they support

You can have a Sink<byte[]>, Sink<GenericRecord>, Sink<KeyValue>, but
you cannot have only one single Sink implementation that deals with
every kind of schema.

In Kafka Connect you are used to dealing with Object at compile time
and deal with data types and Schemas in the implementation.

So I would like to see Sink<Object> and let the implementation deal
with Record<Object> that will give access to the decoded payload and
to the Schema.

I have started a work to support Schema<Object> here, that is the
first step in the direction of supporting Sink<Object>
https://github.com/apache/pulsar/pull/9895


Problem 2: Pulsar IO Record does not handle non-String keys

We only have Record#getKey() that returns an Optional<String>. So even
if you are using KeyValue, you cannot access the logical value of the
key.

The idea is to use KeyValue as foundation for supporting Schema in the
Key (this is nothing new, but it is still not much supported in Pulsar
IO)

Kafka connect users deal with Object keys and we should provide the
same support.
We already have support for Object keys in case of KeyValue.
The proposal I am working with my colleagues is to add Object
getLogicalKey() to the Record interface.

interface Record<T> {

      Optional<Object> getLogicalKey();
      Optional<Object> getLogicalValue();
      Schema<T> getKeySchema();
      Schema<T> getValueSchema();


      Optional<String> getKey();
      Object getValue();
      Schema<T> getSchema();
}

For messages that are using byte[] key (Message#hasBase64EncodedKey)
the getLogicalKey() returns the same as Message#getKeyBytes() and
getKeySchema will return Schema.BYTES.

For regular messages getKeySchema() returns Schema.STRING.

For messages that are using the KeyValue schema the new methods will
return the corresponding fields and metadata.

getLogicalKey -> the key of the KeyValue
getLogicalValue -> the value of KeyValue
getKeySchema -> the schema for the logical key
getValueSchema -> the schema for the logical value
getKey -> the key of the message, as usual, for backward compatibility
getSchema -> the KeyValueSchema, as usual, for backward compatibility
getValue -> the KeyValue object, as usual, for backward compatibility

Please note that we already have a KVRecord interface that exposes key
and value schema, but it is not used while pushing data to Sinks (it
looks like it is only used by KafkaConnectSource):

public interface KVRecord<K, V> extends Record {
   Schema<K> getKeySchema();
   Schema<V> getValueSchema();
   KeyValueEncodingType getKeyValueEncodingType();
}

Problem 3:  Pulsar IO Records do not allow NULL values

Currently there is no support for NULL values in Pulsar IO.
This case makes sense if you are storing information in the key as well.

Example use case: use the topic as a changelog of a database, put the
PK into the key and the rest of the record into the value. Use the
convention that a NULL value means a DELETE operation and a non NULL
value is an UPSERT.

Thoughts ?

Enrico Olivelli

Reply via email to