Hello everyone, I would like to share some proposals about Pulsar IO. Based on my experience porting a nontrivial connector for Cassandra from Kafka Connect to Pulsar IO, I would like to identify some places where we can improve the Pulsar IO API for both native Pulsar connectors and to improve feature parity with Kafka Connect. Ultimately, I would like to provide a Kafka Connect compatibility library so that people can use arbitrary Kafka connectors with Pulsar without having to port each one.
Problem 1: Pulsar Sinks must declare at compile time the data type they support You can have a Sink<byte[]>, Sink<GenericRecord>, Sink<KeyValue>, but you cannot have only one single Sink implementation that deals with every kind of schema. In Kafka Connect you are used to dealing with Object at compile time and deal with data types and Schemas in the implementation. So I would like to see Sink<Object> and let the implementation deal with Record<Object> that will give access to the decoded payload and to the Schema. I have started a work to support Schema<Object> here, that is the first step in the direction of supporting Sink<Object> https://github.com/apache/pulsar/pull/9895 Problem 2: Pulsar IO Record does not handle non-String keys We only have Record#getKey() that returns an Optional<String>. So even if you are using KeyValue, you cannot access the logical value of the key. The idea is to use KeyValue as foundation for supporting Schema in the Key (this is nothing new, but it is still not much supported in Pulsar IO) Kafka connect users deal with Object keys and we should provide the same support. We already have support for Object keys in case of KeyValue. The proposal I am working with my colleagues is to add Object getLogicalKey() to the Record interface. interface Record<T> { Optional<Object> getLogicalKey(); Optional<Object> getLogicalValue(); Schema<T> getKeySchema(); Schema<T> getValueSchema(); Optional<String> getKey(); Object getValue(); Schema<T> getSchema(); } For messages that are using byte[] key (Message#hasBase64EncodedKey) the getLogicalKey() returns the same as Message#getKeyBytes() and getKeySchema will return Schema.BYTES. For regular messages getKeySchema() returns Schema.STRING. For messages that are using the KeyValue schema the new methods will return the corresponding fields and metadata. getLogicalKey -> the key of the KeyValue getLogicalValue -> the value of KeyValue getKeySchema -> the schema for the logical key getValueSchema -> the schema for the logical value getKey -> the key of the message, as usual, for backward compatibility getSchema -> the KeyValueSchema, as usual, for backward compatibility getValue -> the KeyValue object, as usual, for backward compatibility Please note that we already have a KVRecord interface that exposes key and value schema, but it is not used while pushing data to Sinks (it looks like it is only used by KafkaConnectSource): public interface KVRecord<K, V> extends Record { Schema<K> getKeySchema(); Schema<V> getValueSchema(); KeyValueEncodingType getKeyValueEncodingType(); } Problem 3: Pulsar IO Records do not allow NULL values Currently there is no support for NULL values in Pulsar IO. This case makes sense if you are storing information in the key as well. Example use case: use the topic as a changelog of a database, put the PK into the key and the rest of the record into the value. Use the convention that a NULL value means a DELETE operation and a non NULL value is an UPSERT. Thoughts ? Enrico Olivelli