Hi Enrico, In general, I would suggest separating the general-purpose sink from the schema object. I know they are related. But it unnecessarily makes the conversation much complicated. I would actually suggest keeping the conversation around the generic "auto consumes" schema first. Once that is figured out, the other conversations will be easier.
Regarding the generic auto_consuem schema, I don't think approach 2 in the google doc reflects what we have discussed in the previous community meeting. You are writing a proposal that is in favor of approach 1 which dismissed the discussion in the community meeting and misled the community. My suggestion was to continue enhancing AUTO_SCHEMA without introducing a new OBJECT schema to confuse the users and make the maintenance become hard. Because we already introduced the "getNativeRecord" method recently to allow people to access the underlying "object"/"record" that can be interpreted based on its schema. With that being said, the "GenericRecord" is becoming a wrapper over the actual object deserialized from a schema. The changes I suggested in the community were: - Deprecated "getFields" in the `GenericRecord` interface as they are useless after we introduced the `getNativeRecord` method. This change will make `GenericRecord` become a generic container wrapping the object serialized from the actual schema. - Rename `getNativeRecord` to `getNativeObject` to return the underlying object. `getNativeRecord` was introduced in a recent PR which has NOT been released yet. With these changes, 1) It maintains backward compatibility without introducing a new ambiguous schema that does similar things as AUTO_CONSUME. 2) It provides consistent behavior across structure schemas and primitive schemas. Because you always use `getNativeObject` to get the actual object deserialized from the actual schema implementation. 3) It also provides extensibility than `Object`. Because it makes `GenericRecord` a root object for representing objects deserialized from a log record. Thanks, Sijie On Wed, Mar 24, 2021 at 9:43 AM Enrico Olivelli <eolive...@gmail.com> wrote: > Il giorno lun 22 mar 2021 alle ore 11:44 Enrico Olivelli > <eolive...@gmail.com> ha scritto: > > > > Sijjie, Jerry, Matteo > > I have created this GDoc that explains the problem, and the points we > > raised in the various PRs and in the Community meeting. > > > > I hope that it clarifies my need for Schema.OBJECT(). > > > https://docs.google.com/document/d/117gr6hG3N46MlkqENRKRtLejb_e_K0cZagQZv3jbwvQ/edit?usp=sharing > > For reference this is the full patch, that allows users to compile and > run a Sink<Object> > > https://github.com/apache/pulsar/pull/10034 > > Enrico > > > > > > The document is open for comments to everyone who has the link > > > > This is the patch > > https://github.com/apache/pulsar/pull/9895 > > > > Enrico > > > > Il giorno lun 22 mar 2021 alle ore 08:46 Sijie Guo > > <guosi...@gmail.com> ha scritto: > > > > > > > Perhaps Sijie can also chime in on why the behavior is such. > > > > > > I will go through the discussion here tomorrow and reply here. > > > > > > - Sijie > > > > > > On Fri, Mar 19, 2021 at 11:10 AM Jerry Peng < > jerry.boyang.p...@gmail.com> > > > wrote: > > > > > > > HI Enrico, > > > > > > > > > > > > > > > One problem is that I do not know how to code "my_decode_method" > > > > > without Schema information. > > > > > If we had the Schema that is used for the Record then the approach > of > > > > > using Sink[] may work. In fact it works perfectly for > Source<byte[]>. > > > > > This is not the case, because for Sink<byte[]> record.getSchema() > > > > > returns byte[] and we are not activating Schema.OBJECT() > (AutoConsume) > > > > > mechanism that downloads > > > > > and applies automatically the schema. > > > > > The second problem is about making it easy for users to build > Pulsar > > > > > IO Sinks that are schema agnostic at compile time. > > > > > public class MySink implements Sink<Object> { > > > > > public void write(Record<Object> record) { > > > > > Object value = record.getValue(); --> automatically > decoded by > > > > > Pulsar > > > > > Schema schema = record.getSchema() -> this is the actual > > > > > schema for the message (it may vary from message to message) > > > > > } > > > > > } > > > > > Currently you have to define at compile time the type of Schema you > > > > > want to consume, this is fine for custom Sinks or for very simple > > > > > Sinks. > > > > > But it does not work for general purpose Sinks. > > > > > I am putting up a GDoc with the full list of requirements and > proposals > > > > > > > > > > > > The problem seems to be with `getSchema()` in Record and the > subsequent > > > > `getSchema()` on the actual pulsar message. For consumer declared > like > > > > Consumer<byte[]>, getSchema() calls on messages the consumer returns > will > > > > always be SCHEMA.BYTES, regardless of what the actual schema the > message > > > > was produced with. The schema of message is overridden when byte[] > is > > > > specified as the schema for the consumer. This seems like a bug to > me. > > > > Perhaps Sijie can also chime in on why the behavior is such. > > > > > > > > > > > > > > > > > On Fri, Mar 19, 2021 at 12:46 AM Enrico Olivelli < > eolive...@gmail.com> > > > > wrote: > > > > > > > > > Il giorno ven 19 mar 2021 alle ore 08:20 Jerry Peng > > > > > <jerry.boyang.p...@gmail.com> ha scritto: > > > > > > > > > > > > > But it doesn't work for Sinks > > > > > > currently if you declare a Sink<byte[]> you will see as schema > > > > > Schema.BYTES. > > > > > > If we had a Schema object that is preconfigured for decoding the > > > > > > payload probably we will be done. > > > > > > > > > > > > If you declare sink as Sink<byte[]> there should be no schema. > I am > > > > not > > > > > > sure what getSchema() returns but the original design is that if > the > > > > > > declared type is byte[] indicates that there is no schema. This > also > > > > > done > > > > > > for backwards compatibility reasons as schema has not always been > > > > around. > > > > > > > > > > > > I still don't quite understand why you can not just implement > your own > > > > > > de-serialization method in the sink? That deserializes byte[] > into > > > > > whatever > > > > > > you want it to be. > > > > > > > > > > > > public void write(Record<byte[]> record) throws Exception { > > > > > > byte[] val = record.getValue(); > > > > > > SomeObject foo = my_decode_method(val); > > > > > > .... > > > > > > } > > > > > > > > > > One problem is that I do not know how to code "my_decode_method" > > > > > without Schema information. > > > > > If we had the Schema that is used for the Record then the approach > of > > > > > using Sink[] may work. In fact it works perfectly for > Source<byte[]>. > > > > > This is not the case, because for Sink<byte[]> record.getSchema() > > > > > returns byte[] and we are not activating Schema.OBJECT() > (AutoConsume) > > > > > mechanism that downloads > > > > > and applies automatically the schema. > > > > > > > > > > The second problem is about making it easy for users to build > Pulsar > > > > > IO Sinks that are schema agnostic at compile time. > > > > > > > > > > public class MySink implements Sink<Object> { > > > > > public void write(Record<Object> record) { > > > > > Object value = record.getValue(); --> automatically > decoded by > > > > > Pulsar > > > > > Schema schema = record.getSchema() -> this is the actual > > > > > schema for the message (it may vary from message to message) > > > > > } > > > > > } > > > > > > > > > > Currently you have to define at compile time the type of Schema you > > > > > want to consume, this is fine for custom Sinks or for very simple > > > > > Sinks. > > > > > But it does not work for general purpose Sinks. > > > > > > > > > > I am putting up a GDoc with the full list of requirements and > proposals > > > > > > > > > > > > > > > > > > > > > > > > > > > > Problem 2: Pulsar IO Record does not handle non-String keys > > > > > > > > > > > > I do not believe you need to add framework level support for > this. > > > > This > > > > > > can be done at a higher layer i.e. the compatibility layer of > kafka > > > > > connect > > > > > > and Pulsar IO. The compatibility layer just needs to map a kafka > > > > connect > > > > > > key of object type into string representation. > > > > > > > > > > We are already supporting non-String keys in Pulsar with KeyValue, > > > > > there is partial support for KeyValue on Sources (with KVRecord) > > > > > we just have to complete the work and allow Sinks to deal > > > > > automatically with KeyValue messages. > > > > > > > > > > > > > > > > > > > > > > > > > > > I am glad we are trying to build a compatibility layer so Kafka > connect > > > > > > connectors can run on Pulsar IO, but in doing so, let us not just > > > > change > > > > > > Pulsar IO to resemble Kafka connect. In general, we should be > extremely > > > > > > cautious about introducing new APIs at the framework layer. If > > > > something > > > > > > can be implemented at a higher layer then we should implement it > there > > > > > and > > > > > > not try to push it down to the framework layer. > > > > > > > > > > > > Best, > > > > > > > > > > > > Jerry > > > > > > > > > > > > On Thu, Mar 18, 2021 at 1:38 AM Enrico Olivelli < > eolive...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > Jerry > > > > > > > > > > > > > > Il giorno gio 18 mar 2021 alle ore 03:07 Jerry Peng > > > > > > > <jerry.boyang.p...@gmail.com> ha scritto: > > > > > > > > > > > > > > > > Hi Enrico, > > > > > > > > > > > > > > > > Thanks for taking the initiative for improvements on Pulsar > IO! > > > > > > > > > > > > > > > > I have questions in regards to the following statements > > > > > > > > > > > > > > > > > > > > > > > > *Problem 1: Pulsar Sinks must declare at compile time the > data type > > > > > they > > > > > > > > support* > > > > > > > > > So I would like to see Sink<Object> and let the > implementation > > > > deal > > > > > > > > with Record<Object> that will give access to the decoded > payload > > > > and > > > > > > > > to the Schema. > > > > > > > > > > > > > > > > What is the point of having a schema for a generic POJO > object? > > > > The > > > > > > > point > > > > > > > > of a schema is to provide boundaries on what the structure > of the > > > > > data > > > > > > > can > > > > > > > > be. However, having a schema that supports a generic POJO > that can > > > > > have > > > > > > > > any structure seems paradoxical to me. > > > > > > > > > > > > > > > > Currently, the input schema and output schema for source, > sink, and > > > > > > > > functions can be undefined i.e. there is no schema, which > means > > > > that > > > > > the > > > > > > > > source/sink/function will either write, read and write, or > just > > > > read > > > > > > > byte[] > > > > > > > > which is the most generic data representation there can be. > Why is > > > > > this > > > > > > > > not sufficient for your use case? Can the Pulsar IO wrapper > code > > > > for > > > > > > > Kafka > > > > > > > > connect just read or write byte[] and let the high up > abstractions > > > > > > > > determine how to deserialize it? > > > > > > > > > > > > > > For Sources you can have a Source<byte[]> and you can write > records > > > > > > > with any schema. > > > > > > > This is good and it works well. > > > > > > > > > > > > > > But it doesn't work for Sinks > > > > > > > currently if you declare a Sink<byte[]> you will see as schema > > > > > > > Schema.BYTES. > > > > > > > If we had a Schema object that is preconfigured for decoding > the > > > > > > > payload probably we will be done. > > > > > > > Generic sinks will be Sink<byte[]> and they will decode the > payload > > > > > with: > > > > > > > > > > > > > > public void write(Record<byte[]> record) throws Exception { > > > > > > > Schema<?> schema = record.getSchema(); > > > > > > > SchemaType type = schema.getSchemaInfo().getType(); > > > > > > > Object decoded = schema.decode(record.getValue()); > > > > > > > } > > > > > > > > > > > > > > but this does not work, because we have a strongly typed > system and > > > > > > > you MUST write: > > > > > > > > > > > > > > public void write(Record<byte[]> record) throws Exception { > > > > > > > Schema<byte[]> schema = record.getSchema(); > > > > > > > SchemaType type = schema.getSchemaInfo().getType(); > > > > > > > Object decoded = record.getValue(); > > > > > > > } > > > > > > > > > > > > > > if I have Sink<Object> it becomes > > > > > > > > > > > > > > public void write(Object record) throws Exception { > > > > > > > Schema<Object> schema = record.getSchema(); > > > > > > > SchemaType type = schema.getSchemaInfo().getType(); > > > > > > > Object decoded = record.getValue(); > > > > > > > > > > > > > > // switch on SchemaType .... > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *Problem 2: Pulsar IO Record does not handle non-String keys* > > > > > > > > > > > > > > > > The reason why Pulsar IO Record only supports specifying a > key as a > > > > > > > string > > > > > > > > is because Pulsar only supports specifying keys as strings. > The key > > > > > > > > specified in Pulsar IO Record is also used as the partition > key to > > > > > > > > determine which partition a message will be published into. > If the > > > > > key > > > > > > > > returned from Kafka connect connector also needs to be used > as the > > > > > > > > partition key then the method you specified will not work. > You > > > > will > > > > > > > still > > > > > > > > need to translate the key back into a string or implement > your own > > > > > custom > > > > > > > > message router. > > > > > > > > > > > > > > At the low level this is true, but we already have > KeyValue#SEPARATED > > > > > > > that brings these features: > > > > > > > - you have a schema for the Key > > > > > > > - you have a schema for the Value > > > > > > > - your Key is stored in the Message key, and you can think it > as an > > > > > > > Object (it is encoded to byte[] using the KeySchema and it is > stored > > > > > > > in base64 as string into the message key) > > > > > > > > > > > > > > It is not super efficient, due to the base64 encoding, but it > is a > > > > > > > good tradeoff and it is already supported by Pulsar. > > > > > > > > > > > > > > > > > > > > > > > Can you also explain how the additional methods you are > suggesting > > > > > to add > > > > > > > > will be used? What entities will be implementing methods and > > > > > setting the > > > > > > > > value? What entities will use those values? And how those > values > > > > > will be > > > > > > > > used. > > > > > > > > > > > > > > If you are using KeyValue (especially in SEPARATED flavour) you > > > > > > > essentially will see a record as a key value pair of <Object, > > > > Object>, > > > > > > > with the key stored in the message key. > > > > > > > > > > > > > > Most of these ideas come from porting connectors from Kafka > Connect > > > > to > > > > > > > Pulsar IO. > > > > > > > I would like to start a separate thread about discussing a > convention > > > > > > > for the mapping but the short version is: > > > > > > > When you come from Kafka you have a KeyValue<Object, Object> > in > > > > > > > SEPARATED encoding (this may be simplified in case of String or > > > > byte[] > > > > > > > keys, but let's stick to the most complicated case). > > > > > > > So your producer (probably a KafkaSource or a > KafkaConnectSource) > > > > > > > writes to Pulsar a KeyValue and uses KeyValueSchema with > SEPARATED > > > > > > > encoding. > > > > > > > > > > > > > > On the Sink side you have your connector that (in Kafka) is > used to > > > > > > > deal with an Object key and an Object value. > > > > > > > With those new methods you can use the logical key and the > logical > > > > > > > value (and you can use the keySchema and the valueSchema) > > > > > > > and you have 1-1 mapping with what you did in Kafka. > > > > > > > > > > > > > > This is particularly important if you have users that are > trying to > > > > > > > migrate from Kafka to Pulsar, and there is no easy and no > standard > > > > way > > > > > > > to do it > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *Problem 3: Pulsar IO Records do not allow NULL values* > > > > > > > > Don't have a problem with this. The use case you provided > makes > > > > > sense. > > > > > > > > > > > > > > I will send a patch, thanks > > > > > > > > > > > > > > Enrico > > > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 17, 2021 at 2:04 AM Enrico Olivelli < > > > > eolive...@gmail.com > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hello everyone, > > > > > > > > > > > > > > > > > > I would like to share some proposals about Pulsar IO. > Based on > > > > my > > > > > > > > > experience porting a nontrivial connector for Cassandra > from > > > > Kafka > > > > > > > > > Connect to Pulsar IO, I would like to identify some places > where > > > > we > > > > > > > > > can improve the Pulsar IO API for both native Pulsar > connectors > > > > > and to > > > > > > > > > improve feature parity with Kafka Connect. Ultimately, I > would > > > > > like > > > > > > > > > to provide a Kafka Connect compatibility library so that > people > > > > can > > > > > > > > > use arbitrary Kafka connectors with Pulsar without having > to port > > > > > each > > > > > > > > > one. > > > > > > > > > > > > > > > > > > Problem 1: Pulsar Sinks must declare at compile time the > data > > > > type > > > > > they > > > > > > > > > support > > > > > > > > > > > > > > > > > > You can have a Sink<byte[]>, Sink<GenericRecord>, > Sink<KeyValue>, > > > > > but > > > > > > > > > you cannot have only one single Sink implementation that > deals > > > > with > > > > > > > > > every kind of schema. > > > > > > > > > > > > > > > > > > In Kafka Connect you are used to dealing with Object at > compile > > > > > time > > > > > > > > > and deal with data types and Schemas in the implementation. > > > > > > > > > > > > > > > > > > So I would like to see Sink<Object> and let the > implementation > > > > deal > > > > > > > > > with Record<Object> that will give access to the decoded > payload > > > > > and > > > > > > > > > to the Schema. > > > > > > > > > > > > > > > > > > I have started a work to support Schema<Object> here, that > is the > > > > > > > > > first step in the direction of supporting Sink<Object> > > > > > > > > > https://github.com/apache/pulsar/pull/9895 > > > > > > > > > > > > > > > > > > > > > > > > > > > Problem 2: Pulsar IO Record does not handle non-String keys > > > > > > > > > > > > > > > > > > We only have Record#getKey() that returns an > Optional<String>. So > > > > > even > > > > > > > > > if you are using KeyValue, you cannot access the logical > value of > > > > > the > > > > > > > > > key. > > > > > > > > > > > > > > > > > > The idea is to use KeyValue as foundation for supporting > Schema > > > > in > > > > > the > > > > > > > > > Key (this is nothing new, but it is still not much > supported in > > > > > Pulsar > > > > > > > > > IO) > > > > > > > > > > > > > > > > > > Kafka connect users deal with Object keys and we should > provide > > > > the > > > > > > > > > same support. > > > > > > > > > We already have support for Object keys in case of > KeyValue. > > > > > > > > > The proposal I am working with my colleagues is to add > Object > > > > > > > > > getLogicalKey() to the Record interface. > > > > > > > > > > > > > > > > > > interface Record<T> { > > > > > > > > > > > > > > > > > > Optional<Object> getLogicalKey(); > > > > > > > > > Optional<Object> getLogicalValue(); > > > > > > > > > Schema<T> getKeySchema(); > > > > > > > > > Schema<T> getValueSchema(); > > > > > > > > > > > > > > > > > > > > > > > > > > > Optional<String> getKey(); > > > > > > > > > Object getValue(); > > > > > > > > > Schema<T> getSchema(); > > > > > > > > > } > > > > > > > > > > > > > > > > > > For messages that are using byte[] key > > > > > (Message#hasBase64EncodedKey) > > > > > > > > > the getLogicalKey() returns the same as > Message#getKeyBytes() and > > > > > > > > > getKeySchema will return Schema.BYTES. > > > > > > > > > > > > > > > > > > For regular messages getKeySchema() returns Schema.STRING. > > > > > > > > > > > > > > > > > > For messages that are using the KeyValue schema the new > methods > > > > > will > > > > > > > > > return the corresponding fields and metadata. > > > > > > > > > > > > > > > > > > getLogicalKey -> the key of the KeyValue > > > > > > > > > getLogicalValue -> the value of KeyValue > > > > > > > > > getKeySchema -> the schema for the logical key > > > > > > > > > getValueSchema -> the schema for the logical value > > > > > > > > > getKey -> the key of the message, as usual, for backward > > > > > compatibility > > > > > > > > > getSchema -> the KeyValueSchema, as usual, for backward > > > > > compatibility > > > > > > > > > getValue -> the KeyValue object, as usual, for backward > > > > > compatibility > > > > > > > > > > > > > > > > > > Please note that we already have a KVRecord interface that > > > > exposes > > > > > key > > > > > > > > > and value schema, but it is not used while pushing data to > Sinks > > > > > (it > > > > > > > > > looks like it is only used by KafkaConnectSource): > > > > > > > > > > > > > > > > > > public interface KVRecord<K, V> extends Record { > > > > > > > > > Schema<K> getKeySchema(); > > > > > > > > > Schema<V> getValueSchema(); > > > > > > > > > KeyValueEncodingType getKeyValueEncodingType(); > > > > > > > > > } > > > > > > > > > > > > > > > > > > Problem 3: Pulsar IO Records do not allow NULL values > > > > > > > > > > > > > > > > > > Currently there is no support for NULL values in Pulsar IO. > > > > > > > > > This case makes sense if you are storing information in > the key > > > > as > > > > > > > well. > > > > > > > > > > > > > > > > > > Example use case: use the topic as a changelog of a > database, put > > > > > the > > > > > > > > > PK into the key and the rest of the record into the value. > Use > > > > the > > > > > > > > > convention that a NULL value means a DELETE operation and > a non > > > > > NULL > > > > > > > > > value is an UPSERT. > > > > > > > > > > > > > > > > > > Thoughts ? > > > > > > > > > > > > > > > > > > Enrico Olivelli > > > > > > > > > > > > > > > > > > > > > > > > > >