Sijjie, Jerry, Matteo I have created this GDoc that explains the problem, and the points we raised in the various PRs and in the Community meeting.
I hope that it clarifies my need for Schema.OBJECT(). https://docs.google.com/document/d/117gr6hG3N46MlkqENRKRtLejb_e_K0cZagQZv3jbwvQ/edit?usp=sharing The document is open for comments to everyone who has the link This is the patch https://github.com/apache/pulsar/pull/9895 Enrico Il giorno lun 22 mar 2021 alle ore 08:46 Sijie Guo <guosi...@gmail.com> ha scritto: > > > Perhaps Sijie can also chime in on why the behavior is such. > > I will go through the discussion here tomorrow and reply here. > > - Sijie > > On Fri, Mar 19, 2021 at 11:10 AM Jerry Peng <jerry.boyang.p...@gmail.com> > wrote: > > > HI Enrico, > > > > > > > > > One problem is that I do not know how to code "my_decode_method" > > > without Schema information. > > > If we had the Schema that is used for the Record then the approach of > > > using Sink[] may work. In fact it works perfectly for Source<byte[]>. > > > This is not the case, because for Sink<byte[]> record.getSchema() > > > returns byte[] and we are not activating Schema.OBJECT() (AutoConsume) > > > mechanism that downloads > > > and applies automatically the schema. > > > The second problem is about making it easy for users to build Pulsar > > > IO Sinks that are schema agnostic at compile time. > > > public class MySink implements Sink<Object> { > > > public void write(Record<Object> record) { > > > Object value = record.getValue(); --> automatically decoded by > > > Pulsar > > > Schema schema = record.getSchema() -> this is the actual > > > schema for the message (it may vary from message to message) > > > } > > > } > > > Currently you have to define at compile time the type of Schema you > > > want to consume, this is fine for custom Sinks or for very simple > > > Sinks. > > > But it does not work for general purpose Sinks. > > > I am putting up a GDoc with the full list of requirements and proposals > > > > > > The problem seems to be with `getSchema()` in Record and the subsequent > > `getSchema()` on the actual pulsar message. For consumer declared like > > Consumer<byte[]>, getSchema() calls on messages the consumer returns will > > always be SCHEMA.BYTES, regardless of what the actual schema the message > > was produced with. The schema of message is overridden when byte[] is > > specified as the schema for the consumer. This seems like a bug to me. > > Perhaps Sijie can also chime in on why the behavior is such. > > > > > > > > > On Fri, Mar 19, 2021 at 12:46 AM Enrico Olivelli <eolive...@gmail.com> > > wrote: > > > > > Il giorno ven 19 mar 2021 alle ore 08:20 Jerry Peng > > > <jerry.boyang.p...@gmail.com> ha scritto: > > > > > > > > > But it doesn't work for Sinks > > > > currently if you declare a Sink<byte[]> you will see as schema > > > Schema.BYTES. > > > > If we had a Schema object that is preconfigured for decoding the > > > > payload probably we will be done. > > > > > > > > If you declare sink as Sink<byte[]> there should be no schema. I am > > not > > > > sure what getSchema() returns but the original design is that if the > > > > declared type is byte[] indicates that there is no schema. This also > > > done > > > > for backwards compatibility reasons as schema has not always been > > around. > > > > > > > > I still don't quite understand why you can not just implement your own > > > > de-serialization method in the sink? That deserializes byte[] into > > > whatever > > > > you want it to be. > > > > > > > > public void write(Record<byte[]> record) throws Exception { > > > > byte[] val = record.getValue(); > > > > SomeObject foo = my_decode_method(val); > > > > .... > > > > } > > > > > > One problem is that I do not know how to code "my_decode_method" > > > without Schema information. > > > If we had the Schema that is used for the Record then the approach of > > > using Sink[] may work. In fact it works perfectly for Source<byte[]>. > > > This is not the case, because for Sink<byte[]> record.getSchema() > > > returns byte[] and we are not activating Schema.OBJECT() (AutoConsume) > > > mechanism that downloads > > > and applies automatically the schema. > > > > > > The second problem is about making it easy for users to build Pulsar > > > IO Sinks that are schema agnostic at compile time. > > > > > > public class MySink implements Sink<Object> { > > > public void write(Record<Object> record) { > > > Object value = record.getValue(); --> automatically decoded by > > > Pulsar > > > Schema schema = record.getSchema() -> this is the actual > > > schema for the message (it may vary from message to message) > > > } > > > } > > > > > > Currently you have to define at compile time the type of Schema you > > > want to consume, this is fine for custom Sinks or for very simple > > > Sinks. > > > But it does not work for general purpose Sinks. > > > > > > I am putting up a GDoc with the full list of requirements and proposals > > > > > > > > > > > > > > > > > > Problem 2: Pulsar IO Record does not handle non-String keys > > > > > > > > I do not believe you need to add framework level support for this. > > This > > > > can be done at a higher layer i.e. the compatibility layer of kafka > > > connect > > > > and Pulsar IO. The compatibility layer just needs to map a kafka > > connect > > > > key of object type into string representation. > > > > > > We are already supporting non-String keys in Pulsar with KeyValue, > > > there is partial support for KeyValue on Sources (with KVRecord) > > > we just have to complete the work and allow Sinks to deal > > > automatically with KeyValue messages. > > > > > > > > > > > > > > > > > I am glad we are trying to build a compatibility layer so Kafka connect > > > > connectors can run on Pulsar IO, but in doing so, let us not just > > change > > > > Pulsar IO to resemble Kafka connect. In general, we should be extremely > > > > cautious about introducing new APIs at the framework layer. If > > something > > > > can be implemented at a higher layer then we should implement it there > > > and > > > > not try to push it down to the framework layer. > > > > > > > > Best, > > > > > > > > Jerry > > > > > > > > On Thu, Mar 18, 2021 at 1:38 AM Enrico Olivelli <eolive...@gmail.com> > > > wrote: > > > > > > > > > Jerry > > > > > > > > > > Il giorno gio 18 mar 2021 alle ore 03:07 Jerry Peng > > > > > <jerry.boyang.p...@gmail.com> ha scritto: > > > > > > > > > > > > Hi Enrico, > > > > > > > > > > > > Thanks for taking the initiative for improvements on Pulsar IO! > > > > > > > > > > > > I have questions in regards to the following statements > > > > > > > > > > > > > > > > > > *Problem 1: Pulsar Sinks must declare at compile time the data type > > > they > > > > > > support* > > > > > > > So I would like to see Sink<Object> and let the implementation > > deal > > > > > > with Record<Object> that will give access to the decoded payload > > and > > > > > > to the Schema. > > > > > > > > > > > > What is the point of having a schema for a generic POJO object? > > The > > > > > point > > > > > > of a schema is to provide boundaries on what the structure of the > > > data > > > > > can > > > > > > be. However, having a schema that supports a generic POJO that can > > > have > > > > > > any structure seems paradoxical to me. > > > > > > > > > > > > Currently, the input schema and output schema for source, sink, and > > > > > > functions can be undefined i.e. there is no schema, which means > > that > > > the > > > > > > source/sink/function will either write, read and write, or just > > read > > > > > byte[] > > > > > > which is the most generic data representation there can be. Why is > > > this > > > > > > not sufficient for your use case? Can the Pulsar IO wrapper code > > for > > > > > Kafka > > > > > > connect just read or write byte[] and let the high up abstractions > > > > > > determine how to deserialize it? > > > > > > > > > > For Sources you can have a Source<byte[]> and you can write records > > > > > with any schema. > > > > > This is good and it works well. > > > > > > > > > > But it doesn't work for Sinks > > > > > currently if you declare a Sink<byte[]> you will see as schema > > > > > Schema.BYTES. > > > > > If we had a Schema object that is preconfigured for decoding the > > > > > payload probably we will be done. > > > > > Generic sinks will be Sink<byte[]> and they will decode the payload > > > with: > > > > > > > > > > public void write(Record<byte[]> record) throws Exception { > > > > > Schema<?> schema = record.getSchema(); > > > > > SchemaType type = schema.getSchemaInfo().getType(); > > > > > Object decoded = schema.decode(record.getValue()); > > > > > } > > > > > > > > > > but this does not work, because we have a strongly typed system and > > > > > you MUST write: > > > > > > > > > > public void write(Record<byte[]> record) throws Exception { > > > > > Schema<byte[]> schema = record.getSchema(); > > > > > SchemaType type = schema.getSchemaInfo().getType(); > > > > > Object decoded = record.getValue(); > > > > > } > > > > > > > > > > if I have Sink<Object> it becomes > > > > > > > > > > public void write(Object record) throws Exception { > > > > > Schema<Object> schema = record.getSchema(); > > > > > SchemaType type = schema.getSchemaInfo().getType(); > > > > > Object decoded = record.getValue(); > > > > > > > > > > // switch on SchemaType .... > > > > > } > > > > > > > > > > > > > > > > > > > > > > *Problem 2: Pulsar IO Record does not handle non-String keys* > > > > > > > > > > > > The reason why Pulsar IO Record only supports specifying a key as a > > > > > string > > > > > > is because Pulsar only supports specifying keys as strings. The key > > > > > > specified in Pulsar IO Record is also used as the partition key to > > > > > > determine which partition a message will be published into. If the > > > key > > > > > > returned from Kafka connect connector also needs to be used as the > > > > > > partition key then the method you specified will not work. You > > will > > > > > still > > > > > > need to translate the key back into a string or implement your own > > > custom > > > > > > message router. > > > > > > > > > > At the low level this is true, but we already have KeyValue#SEPARATED > > > > > that brings these features: > > > > > - you have a schema for the Key > > > > > - you have a schema for the Value > > > > > - your Key is stored in the Message key, and you can think it as an > > > > > Object (it is encoded to byte[] using the KeySchema and it is stored > > > > > in base64 as string into the message key) > > > > > > > > > > It is not super efficient, due to the base64 encoding, but it is a > > > > > good tradeoff and it is already supported by Pulsar. > > > > > > > > > > > > > > > > > Can you also explain how the additional methods you are suggesting > > > to add > > > > > > will be used? What entities will be implementing methods and > > > setting the > > > > > > value? What entities will use those values? And how those values > > > will be > > > > > > used. > > > > > > > > > > If you are using KeyValue (especially in SEPARATED flavour) you > > > > > essentially will see a record as a key value pair of <Object, > > Object>, > > > > > with the key stored in the message key. > > > > > > > > > > Most of these ideas come from porting connectors from Kafka Connect > > to > > > > > Pulsar IO. > > > > > I would like to start a separate thread about discussing a convention > > > > > for the mapping but the short version is: > > > > > When you come from Kafka you have a KeyValue<Object, Object> in > > > > > SEPARATED encoding (this may be simplified in case of String or > > byte[] > > > > > keys, but let's stick to the most complicated case). > > > > > So your producer (probably a KafkaSource or a KafkaConnectSource) > > > > > writes to Pulsar a KeyValue and uses KeyValueSchema with SEPARATED > > > > > encoding. > > > > > > > > > > On the Sink side you have your connector that (in Kafka) is used to > > > > > deal with an Object key and an Object value. > > > > > With those new methods you can use the logical key and the logical > > > > > value (and you can use the keySchema and the valueSchema) > > > > > and you have 1-1 mapping with what you did in Kafka. > > > > > > > > > > This is particularly important if you have users that are trying to > > > > > migrate from Kafka to Pulsar, and there is no easy and no standard > > way > > > > > to do it > > > > > > > > > > > > > > > > > > > > > > > *Problem 3: Pulsar IO Records do not allow NULL values* > > > > > > Don't have a problem with this. The use case you provided makes > > > sense. > > > > > > > > > > I will send a patch, thanks > > > > > > > > > > Enrico > > > > > > > > > > > > > > > > > On Wed, Mar 17, 2021 at 2:04 AM Enrico Olivelli < > > eolive...@gmail.com > > > > > > > > > wrote: > > > > > > > > > > > > > Hello everyone, > > > > > > > > > > > > > > I would like to share some proposals about Pulsar IO. Based on > > my > > > > > > > experience porting a nontrivial connector for Cassandra from > > Kafka > > > > > > > Connect to Pulsar IO, I would like to identify some places where > > we > > > > > > > can improve the Pulsar IO API for both native Pulsar connectors > > > and to > > > > > > > improve feature parity with Kafka Connect. Ultimately, I would > > > like > > > > > > > to provide a Kafka Connect compatibility library so that people > > can > > > > > > > use arbitrary Kafka connectors with Pulsar without having to port > > > each > > > > > > > one. > > > > > > > > > > > > > > Problem 1: Pulsar Sinks must declare at compile time the data > > type > > > they > > > > > > > support > > > > > > > > > > > > > > You can have a Sink<byte[]>, Sink<GenericRecord>, Sink<KeyValue>, > > > but > > > > > > > you cannot have only one single Sink implementation that deals > > with > > > > > > > every kind of schema. > > > > > > > > > > > > > > In Kafka Connect you are used to dealing with Object at compile > > > time > > > > > > > and deal with data types and Schemas in the implementation. > > > > > > > > > > > > > > So I would like to see Sink<Object> and let the implementation > > deal > > > > > > > with Record<Object> that will give access to the decoded payload > > > and > > > > > > > to the Schema. > > > > > > > > > > > > > > I have started a work to support Schema<Object> here, that is the > > > > > > > first step in the direction of supporting Sink<Object> > > > > > > > https://github.com/apache/pulsar/pull/9895 > > > > > > > > > > > > > > > > > > > > > Problem 2: Pulsar IO Record does not handle non-String keys > > > > > > > > > > > > > > We only have Record#getKey() that returns an Optional<String>. So > > > even > > > > > > > if you are using KeyValue, you cannot access the logical value of > > > the > > > > > > > key. > > > > > > > > > > > > > > The idea is to use KeyValue as foundation for supporting Schema > > in > > > the > > > > > > > Key (this is nothing new, but it is still not much supported in > > > Pulsar > > > > > > > IO) > > > > > > > > > > > > > > Kafka connect users deal with Object keys and we should provide > > the > > > > > > > same support. > > > > > > > We already have support for Object keys in case of KeyValue. > > > > > > > The proposal I am working with my colleagues is to add Object > > > > > > > getLogicalKey() to the Record interface. > > > > > > > > > > > > > > interface Record<T> { > > > > > > > > > > > > > > Optional<Object> getLogicalKey(); > > > > > > > Optional<Object> getLogicalValue(); > > > > > > > Schema<T> getKeySchema(); > > > > > > > Schema<T> getValueSchema(); > > > > > > > > > > > > > > > > > > > > > Optional<String> getKey(); > > > > > > > Object getValue(); > > > > > > > Schema<T> getSchema(); > > > > > > > } > > > > > > > > > > > > > > For messages that are using byte[] key > > > (Message#hasBase64EncodedKey) > > > > > > > the getLogicalKey() returns the same as Message#getKeyBytes() and > > > > > > > getKeySchema will return Schema.BYTES. > > > > > > > > > > > > > > For regular messages getKeySchema() returns Schema.STRING. > > > > > > > > > > > > > > For messages that are using the KeyValue schema the new methods > > > will > > > > > > > return the corresponding fields and metadata. > > > > > > > > > > > > > > getLogicalKey -> the key of the KeyValue > > > > > > > getLogicalValue -> the value of KeyValue > > > > > > > getKeySchema -> the schema for the logical key > > > > > > > getValueSchema -> the schema for the logical value > > > > > > > getKey -> the key of the message, as usual, for backward > > > compatibility > > > > > > > getSchema -> the KeyValueSchema, as usual, for backward > > > compatibility > > > > > > > getValue -> the KeyValue object, as usual, for backward > > > compatibility > > > > > > > > > > > > > > Please note that we already have a KVRecord interface that > > exposes > > > key > > > > > > > and value schema, but it is not used while pushing data to Sinks > > > (it > > > > > > > looks like it is only used by KafkaConnectSource): > > > > > > > > > > > > > > public interface KVRecord<K, V> extends Record { > > > > > > > Schema<K> getKeySchema(); > > > > > > > Schema<V> getValueSchema(); > > > > > > > KeyValueEncodingType getKeyValueEncodingType(); > > > > > > > } > > > > > > > > > > > > > > Problem 3: Pulsar IO Records do not allow NULL values > > > > > > > > > > > > > > Currently there is no support for NULL values in Pulsar IO. > > > > > > > This case makes sense if you are storing information in the key > > as > > > > > well. > > > > > > > > > > > > > > Example use case: use the topic as a changelog of a database, put > > > the > > > > > > > PK into the key and the rest of the record into the value. Use > > the > > > > > > > convention that a NULL value means a DELETE operation and a non > > > NULL > > > > > > > value is an UPSERT. > > > > > > > > > > > > > > Thoughts ? > > > > > > > > > > > > > > Enrico Olivelli > > > > > > > > > > > > > > > > >