Sijie, I have a proposal that fits your points and my needs 1. Introduce a new PulsarObject interface, that is a wrapper for any data type
interface PulsarObject { Schema/SchemaType getSchemaType(); Object getPayload(); } 2. Make GenericRecord extend PulsarObject interface GenericRecord extends PulsarObject { ....... getNativeRecord renamed to getPayload(); ....... implement PrimitiveRecord } 3. Do not introduce ObjectSchema, but only enhance AutoConsumeSchema to work with every Schema type. For non Struct objects we return PrimitiveRecord, as you proposed. 4. Allow users of Pulsar IO to write Sink<PulsarObject> or Sink<Object>, binding it to AutoConsumeSchema new users can use a general purpose API, and they just have to handle PulsarObject This way: - Sink<PulsarObject> will be the "general purpose sink" I want to see - No new ObjectSchema, only AutoConsumeSchema -> no new maintenance costs - No need to deprecate GenericRecord methods, only rename getNativeRecord to getPayload() How does this proposal sound to you ? I believe it answers all of your points and it fixes my problem in a very elegant way. Enrico Il giorno gio 25 mar 2021 alle ore 23:50 Sijie Guo <guosi...@gmail.com> ha scritto: > > Hi Enrico, > > In general, I would suggest separating the general-purpose sink from the > schema object. I know they are related. But it unnecessarily makes the > conversation much complicated. I would actually suggest keeping the > conversation around the generic "auto consumes" schema first. Once that is > figured out, the other conversations will be easier. > > Regarding the generic auto_consuem schema, I don't think approach 2 in the > google doc reflects what we have discussed in the previous community > meeting. You are writing a proposal that is in favor of approach 1 which > dismissed the discussion in the community meeting and misled the community. > > My suggestion was to continue enhancing AUTO_SCHEMA without introducing a > new OBJECT schema to confuse the users and make the maintenance become hard. > > Because we already introduced the "getNativeRecord" method recently to > allow people to access the underlying "object"/"record" that can be > interpreted based on its schema. With that being said, the "GenericRecord" > is becoming a wrapper over the actual object deserialized from a schema. > The changes I suggested in the community were: > > - Deprecated "getFields" in the `GenericRecord` interface as they are > useless after we introduced the `getNativeRecord` method. This change will > make `GenericRecord` become a generic container wrapping the object > serialized from the actual schema. > - Rename `getNativeRecord` to `getNativeObject` to return the underlying > object. `getNativeRecord` was introduced in a recent PR which has NOT been > released yet. > > With these changes, > > 1) It maintains backward compatibility without introducing a new ambiguous > schema that does similar things as AUTO_CONSUME. > 2) It provides consistent behavior across structure schemas and primitive > schemas. Because you always use `getNativeObject` to get the actual object > deserialized from the actual schema implementation. > 3) It also provides extensibility than `Object`. Because it makes > `GenericRecord` a root object for representing objects deserialized from a > log record. > > Thanks, > Sijie > > > > > > On Wed, Mar 24, 2021 at 9:43 AM Enrico Olivelli <eolive...@gmail.com> wrote: > > > Il giorno lun 22 mar 2021 alle ore 11:44 Enrico Olivelli > > <eolive...@gmail.com> ha scritto: > > > > > > Sijjie, Jerry, Matteo > > > I have created this GDoc that explains the problem, and the points we > > > raised in the various PRs and in the Community meeting. > > > > > > I hope that it clarifies my need for Schema.OBJECT(). > > > > > https://docs.google.com/document/d/117gr6hG3N46MlkqENRKRtLejb_e_K0cZagQZv3jbwvQ/edit?usp=sharing > > > > For reference this is the full patch, that allows users to compile and > > run a Sink<Object> > > > > https://github.com/apache/pulsar/pull/10034 > > > > Enrico > > > > > > > > > > The document is open for comments to everyone who has the link > > > > > > This is the patch > > > https://github.com/apache/pulsar/pull/9895 > > > > > > Enrico > > > > > > Il giorno lun 22 mar 2021 alle ore 08:46 Sijie Guo > > > <guosi...@gmail.com> ha scritto: > > > > > > > > > Perhaps Sijie can also chime in on why the behavior is such. > > > > > > > > I will go through the discussion here tomorrow and reply here. > > > > > > > > - Sijie > > > > > > > > On Fri, Mar 19, 2021 at 11:10 AM Jerry Peng < > > jerry.boyang.p...@gmail.com> > > > > wrote: > > > > > > > > > HI Enrico, > > > > > > > > > > > > > > > > > > One problem is that I do not know how to code "my_decode_method" > > > > > > without Schema information. > > > > > > If we had the Schema that is used for the Record then the approach > > of > > > > > > using Sink[] may work. In fact it works perfectly for > > Source<byte[]>. > > > > > > This is not the case, because for Sink<byte[]> record.getSchema() > > > > > > returns byte[] and we are not activating Schema.OBJECT() > > (AutoConsume) > > > > > > mechanism that downloads > > > > > > and applies automatically the schema. > > > > > > The second problem is about making it easy for users to build > > Pulsar > > > > > > IO Sinks that are schema agnostic at compile time. > > > > > > public class MySink implements Sink<Object> { > > > > > > public void write(Record<Object> record) { > > > > > > Object value = record.getValue(); --> automatically > > decoded by > > > > > > Pulsar > > > > > > Schema schema = record.getSchema() -> this is the actual > > > > > > schema for the message (it may vary from message to message) > > > > > > } > > > > > > } > > > > > > Currently you have to define at compile time the type of Schema you > > > > > > want to consume, this is fine for custom Sinks or for very simple > > > > > > Sinks. > > > > > > But it does not work for general purpose Sinks. > > > > > > I am putting up a GDoc with the full list of requirements and > > proposals > > > > > > > > > > > > > > > The problem seems to be with `getSchema()` in Record and the > > subsequent > > > > > `getSchema()` on the actual pulsar message. For consumer declared > > like > > > > > Consumer<byte[]>, getSchema() calls on messages the consumer returns > > will > > > > > always be SCHEMA.BYTES, regardless of what the actual schema the > > message > > > > > was produced with. The schema of message is overridden when byte[] > > is > > > > > specified as the schema for the consumer. This seems like a bug to > > me. > > > > > Perhaps Sijie can also chime in on why the behavior is such. > > > > > > > > > > > > > > > > > > > > > On Fri, Mar 19, 2021 at 12:46 AM Enrico Olivelli < > > eolive...@gmail.com> > > > > > wrote: > > > > > > > > > > > Il giorno ven 19 mar 2021 alle ore 08:20 Jerry Peng > > > > > > <jerry.boyang.p...@gmail.com> ha scritto: > > > > > > > > > > > > > > > But it doesn't work for Sinks > > > > > > > currently if you declare a Sink<byte[]> you will see as schema > > > > > > Schema.BYTES. > > > > > > > If we had a Schema object that is preconfigured for decoding the > > > > > > > payload probably we will be done. > > > > > > > > > > > > > > If you declare sink as Sink<byte[]> there should be no schema. > > I am > > > > > not > > > > > > > sure what getSchema() returns but the original design is that if > > the > > > > > > > declared type is byte[] indicates that there is no schema. This > > also > > > > > > done > > > > > > > for backwards compatibility reasons as schema has not always been > > > > > around. > > > > > > > > > > > > > > I still don't quite understand why you can not just implement > > your own > > > > > > > de-serialization method in the sink? That deserializes byte[] > > into > > > > > > whatever > > > > > > > you want it to be. > > > > > > > > > > > > > > public void write(Record<byte[]> record) throws Exception { > > > > > > > byte[] val = record.getValue(); > > > > > > > SomeObject foo = my_decode_method(val); > > > > > > > .... > > > > > > > } > > > > > > > > > > > > One problem is that I do not know how to code "my_decode_method" > > > > > > without Schema information. > > > > > > If we had the Schema that is used for the Record then the approach > > of > > > > > > using Sink[] may work. In fact it works perfectly for > > Source<byte[]>. > > > > > > This is not the case, because for Sink<byte[]> record.getSchema() > > > > > > returns byte[] and we are not activating Schema.OBJECT() > > (AutoConsume) > > > > > > mechanism that downloads > > > > > > and applies automatically the schema. > > > > > > > > > > > > The second problem is about making it easy for users to build > > Pulsar > > > > > > IO Sinks that are schema agnostic at compile time. > > > > > > > > > > > > public class MySink implements Sink<Object> { > > > > > > public void write(Record<Object> record) { > > > > > > Object value = record.getValue(); --> automatically > > decoded by > > > > > > Pulsar > > > > > > Schema schema = record.getSchema() -> this is the actual > > > > > > schema for the message (it may vary from message to message) > > > > > > } > > > > > > } > > > > > > > > > > > > Currently you have to define at compile time the type of Schema you > > > > > > want to consume, this is fine for custom Sinks or for very simple > > > > > > Sinks. > > > > > > But it does not work for general purpose Sinks. > > > > > > > > > > > > I am putting up a GDoc with the full list of requirements and > > proposals > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Problem 2: Pulsar IO Record does not handle non-String keys > > > > > > > > > > > > > > I do not believe you need to add framework level support for > > this. > > > > > This > > > > > > > can be done at a higher layer i.e. the compatibility layer of > > kafka > > > > > > connect > > > > > > > and Pulsar IO. The compatibility layer just needs to map a kafka > > > > > connect > > > > > > > key of object type into string representation. > > > > > > > > > > > > We are already supporting non-String keys in Pulsar with KeyValue, > > > > > > there is partial support for KeyValue on Sources (with KVRecord) > > > > > > we just have to complete the work and allow Sinks to deal > > > > > > automatically with KeyValue messages. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am glad we are trying to build a compatibility layer so Kafka > > connect > > > > > > > connectors can run on Pulsar IO, but in doing so, let us not just > > > > > change > > > > > > > Pulsar IO to resemble Kafka connect. In general, we should be > > extremely > > > > > > > cautious about introducing new APIs at the framework layer. If > > > > > something > > > > > > > can be implemented at a higher layer then we should implement it > > there > > > > > > and > > > > > > > not try to push it down to the framework layer. > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > Jerry > > > > > > > > > > > > > > On Thu, Mar 18, 2021 at 1:38 AM Enrico Olivelli < > > eolive...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > Jerry > > > > > > > > > > > > > > > > Il giorno gio 18 mar 2021 alle ore 03:07 Jerry Peng > > > > > > > > <jerry.boyang.p...@gmail.com> ha scritto: > > > > > > > > > > > > > > > > > > Hi Enrico, > > > > > > > > > > > > > > > > > > Thanks for taking the initiative for improvements on Pulsar > > IO! > > > > > > > > > > > > > > > > > > I have questions in regards to the following statements > > > > > > > > > > > > > > > > > > > > > > > > > > > *Problem 1: Pulsar Sinks must declare at compile time the > > data type > > > > > > they > > > > > > > > > support* > > > > > > > > > > So I would like to see Sink<Object> and let the > > implementation > > > > > deal > > > > > > > > > with Record<Object> that will give access to the decoded > > payload > > > > > and > > > > > > > > > to the Schema. > > > > > > > > > > > > > > > > > > What is the point of having a schema for a generic POJO > > object? > > > > > The > > > > > > > > point > > > > > > > > > of a schema is to provide boundaries on what the structure > > of the > > > > > > data > > > > > > > > can > > > > > > > > > be. However, having a schema that supports a generic POJO > > that can > > > > > > have > > > > > > > > > any structure seems paradoxical to me. > > > > > > > > > > > > > > > > > > Currently, the input schema and output schema for source, > > sink, and > > > > > > > > > functions can be undefined i.e. there is no schema, which > > means > > > > > that > > > > > > the > > > > > > > > > source/sink/function will either write, read and write, or > > just > > > > > read > > > > > > > > byte[] > > > > > > > > > which is the most generic data representation there can be. > > Why is > > > > > > this > > > > > > > > > not sufficient for your use case? Can the Pulsar IO wrapper > > code > > > > > for > > > > > > > > Kafka > > > > > > > > > connect just read or write byte[] and let the high up > > abstractions > > > > > > > > > determine how to deserialize it? > > > > > > > > > > > > > > > > For Sources you can have a Source<byte[]> and you can write > > records > > > > > > > > with any schema. > > > > > > > > This is good and it works well. > > > > > > > > > > > > > > > > But it doesn't work for Sinks > > > > > > > > currently if you declare a Sink<byte[]> you will see as schema > > > > > > > > Schema.BYTES. > > > > > > > > If we had a Schema object that is preconfigured for decoding > > the > > > > > > > > payload probably we will be done. > > > > > > > > Generic sinks will be Sink<byte[]> and they will decode the > > payload > > > > > > with: > > > > > > > > > > > > > > > > public void write(Record<byte[]> record) throws Exception { > > > > > > > > Schema<?> schema = record.getSchema(); > > > > > > > > SchemaType type = schema.getSchemaInfo().getType(); > > > > > > > > Object decoded = schema.decode(record.getValue()); > > > > > > > > } > > > > > > > > > > > > > > > > but this does not work, because we have a strongly typed > > system and > > > > > > > > you MUST write: > > > > > > > > > > > > > > > > public void write(Record<byte[]> record) throws Exception { > > > > > > > > Schema<byte[]> schema = record.getSchema(); > > > > > > > > SchemaType type = schema.getSchemaInfo().getType(); > > > > > > > > Object decoded = record.getValue(); > > > > > > > > } > > > > > > > > > > > > > > > > if I have Sink<Object> it becomes > > > > > > > > > > > > > > > > public void write(Object record) throws Exception { > > > > > > > > Schema<Object> schema = record.getSchema(); > > > > > > > > SchemaType type = schema.getSchemaInfo().getType(); > > > > > > > > Object decoded = record.getValue(); > > > > > > > > > > > > > > > > // switch on SchemaType .... > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *Problem 2: Pulsar IO Record does not handle non-String keys* > > > > > > > > > > > > > > > > > > The reason why Pulsar IO Record only supports specifying a > > key as a > > > > > > > > string > > > > > > > > > is because Pulsar only supports specifying keys as strings. > > The key > > > > > > > > > specified in Pulsar IO Record is also used as the partition > > key to > > > > > > > > > determine which partition a message will be published into. > > If the > > > > > > key > > > > > > > > > returned from Kafka connect connector also needs to be used > > as the > > > > > > > > > partition key then the method you specified will not work. > > You > > > > > will > > > > > > > > still > > > > > > > > > need to translate the key back into a string or implement > > your own > > > > > > custom > > > > > > > > > message router. > > > > > > > > > > > > > > > > At the low level this is true, but we already have > > KeyValue#SEPARATED > > > > > > > > that brings these features: > > > > > > > > - you have a schema for the Key > > > > > > > > - you have a schema for the Value > > > > > > > > - your Key is stored in the Message key, and you can think it > > as an > > > > > > > > Object (it is encoded to byte[] using the KeySchema and it is > > stored > > > > > > > > in base64 as string into the message key) > > > > > > > > > > > > > > > > It is not super efficient, due to the base64 encoding, but it > > is a > > > > > > > > good tradeoff and it is already supported by Pulsar. > > > > > > > > > > > > > > > > > > > > > > > > > > Can you also explain how the additional methods you are > > suggesting > > > > > > to add > > > > > > > > > will be used? What entities will be implementing methods and > > > > > > setting the > > > > > > > > > value? What entities will use those values? And how those > > values > > > > > > will be > > > > > > > > > used. > > > > > > > > > > > > > > > > If you are using KeyValue (especially in SEPARATED flavour) you > > > > > > > > essentially will see a record as a key value pair of <Object, > > > > > Object>, > > > > > > > > with the key stored in the message key. > > > > > > > > > > > > > > > > Most of these ideas come from porting connectors from Kafka > > Connect > > > > > to > > > > > > > > Pulsar IO. > > > > > > > > I would like to start a separate thread about discussing a > > convention > > > > > > > > for the mapping but the short version is: > > > > > > > > When you come from Kafka you have a KeyValue<Object, Object> > > in > > > > > > > > SEPARATED encoding (this may be simplified in case of String or > > > > > byte[] > > > > > > > > keys, but let's stick to the most complicated case). > > > > > > > > So your producer (probably a KafkaSource or a > > KafkaConnectSource) > > > > > > > > writes to Pulsar a KeyValue and uses KeyValueSchema with > > SEPARATED > > > > > > > > encoding. > > > > > > > > > > > > > > > > On the Sink side you have your connector that (in Kafka) is > > used to > > > > > > > > deal with an Object key and an Object value. > > > > > > > > With those new methods you can use the logical key and the > > logical > > > > > > > > value (and you can use the keySchema and the valueSchema) > > > > > > > > and you have 1-1 mapping with what you did in Kafka. > > > > > > > > > > > > > > > > This is particularly important if you have users that are > > trying to > > > > > > > > migrate from Kafka to Pulsar, and there is no easy and no > > standard > > > > > way > > > > > > > > to do it > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *Problem 3: Pulsar IO Records do not allow NULL values* > > > > > > > > > Don't have a problem with this. The use case you provided > > makes > > > > > > sense. > > > > > > > > > > > > > > > > I will send a patch, thanks > > > > > > > > > > > > > > > > Enrico > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 17, 2021 at 2:04 AM Enrico Olivelli < > > > > > eolive...@gmail.com > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hello everyone, > > > > > > > > > > > > > > > > > > > > I would like to share some proposals about Pulsar IO. > > Based on > > > > > my > > > > > > > > > > experience porting a nontrivial connector for Cassandra > > from > > > > > Kafka > > > > > > > > > > Connect to Pulsar IO, I would like to identify some places > > where > > > > > we > > > > > > > > > > can improve the Pulsar IO API for both native Pulsar > > connectors > > > > > > and to > > > > > > > > > > improve feature parity with Kafka Connect. Ultimately, I > > would > > > > > > like > > > > > > > > > > to provide a Kafka Connect compatibility library so that > > people > > > > > can > > > > > > > > > > use arbitrary Kafka connectors with Pulsar without having > > to port > > > > > > each > > > > > > > > > > one. > > > > > > > > > > > > > > > > > > > > Problem 1: Pulsar Sinks must declare at compile time the > > data > > > > > type > > > > > > they > > > > > > > > > > support > > > > > > > > > > > > > > > > > > > > You can have a Sink<byte[]>, Sink<GenericRecord>, > > Sink<KeyValue>, > > > > > > but > > > > > > > > > > you cannot have only one single Sink implementation that > > deals > > > > > with > > > > > > > > > > every kind of schema. > > > > > > > > > > > > > > > > > > > > In Kafka Connect you are used to dealing with Object at > > compile > > > > > > time > > > > > > > > > > and deal with data types and Schemas in the implementation. > > > > > > > > > > > > > > > > > > > > So I would like to see Sink<Object> and let the > > implementation > > > > > deal > > > > > > > > > > with Record<Object> that will give access to the decoded > > payload > > > > > > and > > > > > > > > > > to the Schema. > > > > > > > > > > > > > > > > > > > > I have started a work to support Schema<Object> here, that > > is the > > > > > > > > > > first step in the direction of supporting Sink<Object> > > > > > > > > > > https://github.com/apache/pulsar/pull/9895 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Problem 2: Pulsar IO Record does not handle non-String keys > > > > > > > > > > > > > > > > > > > > We only have Record#getKey() that returns an > > Optional<String>. So > > > > > > even > > > > > > > > > > if you are using KeyValue, you cannot access the logical > > value of > > > > > > the > > > > > > > > > > key. > > > > > > > > > > > > > > > > > > > > The idea is to use KeyValue as foundation for supporting > > Schema > > > > > in > > > > > > the > > > > > > > > > > Key (this is nothing new, but it is still not much > > supported in > > > > > > Pulsar > > > > > > > > > > IO) > > > > > > > > > > > > > > > > > > > > Kafka connect users deal with Object keys and we should > > provide > > > > > the > > > > > > > > > > same support. > > > > > > > > > > We already have support for Object keys in case of > > KeyValue. > > > > > > > > > > The proposal I am working with my colleagues is to add > > Object > > > > > > > > > > getLogicalKey() to the Record interface. > > > > > > > > > > > > > > > > > > > > interface Record<T> { > > > > > > > > > > > > > > > > > > > > Optional<Object> getLogicalKey(); > > > > > > > > > > Optional<Object> getLogicalValue(); > > > > > > > > > > Schema<T> getKeySchema(); > > > > > > > > > > Schema<T> getValueSchema(); > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Optional<String> getKey(); > > > > > > > > > > Object getValue(); > > > > > > > > > > Schema<T> getSchema(); > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > For messages that are using byte[] key > > > > > > (Message#hasBase64EncodedKey) > > > > > > > > > > the getLogicalKey() returns the same as > > Message#getKeyBytes() and > > > > > > > > > > getKeySchema will return Schema.BYTES. > > > > > > > > > > > > > > > > > > > > For regular messages getKeySchema() returns Schema.STRING. > > > > > > > > > > > > > > > > > > > > For messages that are using the KeyValue schema the new > > methods > > > > > > will > > > > > > > > > > return the corresponding fields and metadata. > > > > > > > > > > > > > > > > > > > > getLogicalKey -> the key of the KeyValue > > > > > > > > > > getLogicalValue -> the value of KeyValue > > > > > > > > > > getKeySchema -> the schema for the logical key > > > > > > > > > > getValueSchema -> the schema for the logical value > > > > > > > > > > getKey -> the key of the message, as usual, for backward > > > > > > compatibility > > > > > > > > > > getSchema -> the KeyValueSchema, as usual, for backward > > > > > > compatibility > > > > > > > > > > getValue -> the KeyValue object, as usual, for backward > > > > > > compatibility > > > > > > > > > > > > > > > > > > > > Please note that we already have a KVRecord interface that > > > > > exposes > > > > > > key > > > > > > > > > > and value schema, but it is not used while pushing data to > > Sinks > > > > > > (it > > > > > > > > > > looks like it is only used by KafkaConnectSource): > > > > > > > > > > > > > > > > > > > > public interface KVRecord<K, V> extends Record { > > > > > > > > > > Schema<K> getKeySchema(); > > > > > > > > > > Schema<V> getValueSchema(); > > > > > > > > > > KeyValueEncodingType getKeyValueEncodingType(); > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > Problem 3: Pulsar IO Records do not allow NULL values > > > > > > > > > > > > > > > > > > > > Currently there is no support for NULL values in Pulsar IO. > > > > > > > > > > This case makes sense if you are storing information in > > the key > > > > > as > > > > > > > > well. > > > > > > > > > > > > > > > > > > > > Example use case: use the topic as a changelog of a > > database, put > > > > > > the > > > > > > > > > > PK into the key and the rest of the record into the value. > > Use > > > > > the > > > > > > > > > > convention that a NULL value means a DELETE operation and > > a non > > > > > > NULL > > > > > > > > > > value is an UPSERT. > > > > > > > > > > > > > > > > > > > > Thoughts ? > > > > > > > > > > > > > > > > > > > > Enrico Olivelli > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >