Il giorno ven 26 mar 2021 alle ore 07:50 Enrico Olivelli <eolive...@gmail.com> ha scritto: > > Sijie, > I have a proposal that fits your points and my needs > > 1. Introduce a new PulsarObject interface, that is a wrapper for any data type > > interface PulsarObject { > Schema/SchemaType getSchemaType(); > Object getPayload(); > } > > 2. Make GenericRecord extend PulsarObject > > interface GenericRecord extends PulsarObject { > ....... getNativeRecord renamed to getPayload(); > ....... implement PrimitiveRecord > } > > 3. Do not introduce ObjectSchema, but only enhance AutoConsumeSchema > to work with every Schema type. > For non Struct objects we return PrimitiveRecord, as you proposed. > > 4. Allow users of Pulsar IO to write Sink<PulsarObject> or > Sink<Object>, binding it to AutoConsumeSchema > new users can use a general purpose API, and they just have to handle > PulsarObject > > This way: > - Sink<PulsarObject> will be the "general purpose sink" I want to see > - No new ObjectSchema, only AutoConsumeSchema -> no new maintenance costs > - No need to deprecate GenericRecord methods, only rename > getNativeRecord to getPayload() > > How does this proposal sound to you ? > I believe it answers all of your points and it fixes my problem in a > very elegant way.
Probably the better name is GenericObject instead of PulsarObject I have sent a new PR https://github.com/apache/pulsar/pull/10057 Please take a look Enrico > > Enrico > > > Il giorno gio 25 mar 2021 alle ore 23:50 Sijie Guo > <guosi...@gmail.com> ha scritto: > > > > Hi Enrico, > > > > In general, I would suggest separating the general-purpose sink from the > > schema object. I know they are related. But it unnecessarily makes the > > conversation much complicated. I would actually suggest keeping the > > conversation around the generic "auto consumes" schema first. Once that is > > figured out, the other conversations will be easier. > > > > Regarding the generic auto_consuem schema, I don't think approach 2 in the > > google doc reflects what we have discussed in the previous community > > meeting. You are writing a proposal that is in favor of approach 1 which > > dismissed the discussion in the community meeting and misled the community. > > > > My suggestion was to continue enhancing AUTO_SCHEMA without introducing a > > new OBJECT schema to confuse the users and make the maintenance become hard. > > > > Because we already introduced the "getNativeRecord" method recently to > > allow people to access the underlying "object"/"record" that can be > > interpreted based on its schema. With that being said, the "GenericRecord" > > is becoming a wrapper over the actual object deserialized from a schema. > > The changes I suggested in the community were: > > > > - Deprecated "getFields" in the `GenericRecord` interface as they are > > useless after we introduced the `getNativeRecord` method. This change will > > make `GenericRecord` become a generic container wrapping the object > > serialized from the actual schema. > > - Rename `getNativeRecord` to `getNativeObject` to return the underlying > > object. `getNativeRecord` was introduced in a recent PR which has NOT been > > released yet. > > > > With these changes, > > > > 1) It maintains backward compatibility without introducing a new ambiguous > > schema that does similar things as AUTO_CONSUME. > > 2) It provides consistent behavior across structure schemas and primitive > > schemas. Because you always use `getNativeObject` to get the actual object > > deserialized from the actual schema implementation. > > 3) It also provides extensibility than `Object`. Because it makes > > `GenericRecord` a root object for representing objects deserialized from a > > log record. > > > > Thanks, > > Sijie > > > > > > > > > > > > On Wed, Mar 24, 2021 at 9:43 AM Enrico Olivelli <eolive...@gmail.com> wrote: > > > > > Il giorno lun 22 mar 2021 alle ore 11:44 Enrico Olivelli > > > <eolive...@gmail.com> ha scritto: > > > > > > > > Sijjie, Jerry, Matteo > > > > I have created this GDoc that explains the problem, and the points we > > > > raised in the various PRs and in the Community meeting. > > > > > > > > I hope that it clarifies my need for Schema.OBJECT(). > > > > > > > https://docs.google.com/document/d/117gr6hG3N46MlkqENRKRtLejb_e_K0cZagQZv3jbwvQ/edit?usp=sharing > > > > > > For reference this is the full patch, that allows users to compile and > > > run a Sink<Object> > > > > > > https://github.com/apache/pulsar/pull/10034 > > > > > > Enrico > > > > > > > > > > > > > > The document is open for comments to everyone who has the link > > > > > > > > This is the patch > > > > https://github.com/apache/pulsar/pull/9895 > > > > > > > > Enrico > > > > > > > > Il giorno lun 22 mar 2021 alle ore 08:46 Sijie Guo > > > > <guosi...@gmail.com> ha scritto: > > > > > > > > > > > Perhaps Sijie can also chime in on why the behavior is such. > > > > > > > > > > I will go through the discussion here tomorrow and reply here. > > > > > > > > > > - Sijie > > > > > > > > > > On Fri, Mar 19, 2021 at 11:10 AM Jerry Peng < > > > jerry.boyang.p...@gmail.com> > > > > > wrote: > > > > > > > > > > > HI Enrico, > > > > > > > > > > > > > > > > > > > > > One problem is that I do not know how to code "my_decode_method" > > > > > > > without Schema information. > > > > > > > If we had the Schema that is used for the Record then the approach > > > of > > > > > > > using Sink[] may work. In fact it works perfectly for > > > Source<byte[]>. > > > > > > > This is not the case, because for Sink<byte[]> record.getSchema() > > > > > > > returns byte[] and we are not activating Schema.OBJECT() > > > (AutoConsume) > > > > > > > mechanism that downloads > > > > > > > and applies automatically the schema. > > > > > > > The second problem is about making it easy for users to build > > > Pulsar > > > > > > > IO Sinks that are schema agnostic at compile time. > > > > > > > public class MySink implements Sink<Object> { > > > > > > > public void write(Record<Object> record) { > > > > > > > Object value = record.getValue(); --> automatically > > > decoded by > > > > > > > Pulsar > > > > > > > Schema schema = record.getSchema() -> this is the actual > > > > > > > schema for the message (it may vary from message to message) > > > > > > > } > > > > > > > } > > > > > > > Currently you have to define at compile time the type of Schema > > > > > > > you > > > > > > > want to consume, this is fine for custom Sinks or for very simple > > > > > > > Sinks. > > > > > > > But it does not work for general purpose Sinks. > > > > > > > I am putting up a GDoc with the full list of requirements and > > > proposals > > > > > > > > > > > > > > > > > > The problem seems to be with `getSchema()` in Record and the > > > subsequent > > > > > > `getSchema()` on the actual pulsar message. For consumer declared > > > like > > > > > > Consumer<byte[]>, getSchema() calls on messages the consumer returns > > > will > > > > > > always be SCHEMA.BYTES, regardless of what the actual schema the > > > message > > > > > > was produced with. The schema of message is overridden when byte[] > > > is > > > > > > specified as the schema for the consumer. This seems like a bug to > > > me. > > > > > > Perhaps Sijie can also chime in on why the behavior is such. > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Mar 19, 2021 at 12:46 AM Enrico Olivelli < > > > eolive...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Il giorno ven 19 mar 2021 alle ore 08:20 Jerry Peng > > > > > > > <jerry.boyang.p...@gmail.com> ha scritto: > > > > > > > > > > > > > > > > > But it doesn't work for Sinks > > > > > > > > currently if you declare a Sink<byte[]> you will see as schema > > > > > > > Schema.BYTES. > > > > > > > > If we had a Schema object that is preconfigured for decoding the > > > > > > > > payload probably we will be done. > > > > > > > > > > > > > > > > If you declare sink as Sink<byte[]> there should be no schema. > > > I am > > > > > > not > > > > > > > > sure what getSchema() returns but the original design is that if > > > the > > > > > > > > declared type is byte[] indicates that there is no schema. This > > > also > > > > > > > done > > > > > > > > for backwards compatibility reasons as schema has not always > > > > > > > > been > > > > > > around. > > > > > > > > > > > > > > > > I still don't quite understand why you can not just implement > > > your own > > > > > > > > de-serialization method in the sink? That deserializes byte[] > > > into > > > > > > > whatever > > > > > > > > you want it to be. > > > > > > > > > > > > > > > > public void write(Record<byte[]> record) throws Exception { > > > > > > > > byte[] val = record.getValue(); > > > > > > > > SomeObject foo = my_decode_method(val); > > > > > > > > .... > > > > > > > > } > > > > > > > > > > > > > > One problem is that I do not know how to code "my_decode_method" > > > > > > > without Schema information. > > > > > > > If we had the Schema that is used for the Record then the approach > > > of > > > > > > > using Sink[] may work. In fact it works perfectly for > > > Source<byte[]>. > > > > > > > This is not the case, because for Sink<byte[]> record.getSchema() > > > > > > > returns byte[] and we are not activating Schema.OBJECT() > > > (AutoConsume) > > > > > > > mechanism that downloads > > > > > > > and applies automatically the schema. > > > > > > > > > > > > > > The second problem is about making it easy for users to build > > > Pulsar > > > > > > > IO Sinks that are schema agnostic at compile time. > > > > > > > > > > > > > > public class MySink implements Sink<Object> { > > > > > > > public void write(Record<Object> record) { > > > > > > > Object value = record.getValue(); --> automatically > > > decoded by > > > > > > > Pulsar > > > > > > > Schema schema = record.getSchema() -> this is the actual > > > > > > > schema for the message (it may vary from message to message) > > > > > > > } > > > > > > > } > > > > > > > > > > > > > > Currently you have to define at compile time the type of Schema > > > > > > > you > > > > > > > want to consume, this is fine for custom Sinks or for very simple > > > > > > > Sinks. > > > > > > > But it does not work for general purpose Sinks. > > > > > > > > > > > > > > I am putting up a GDoc with the full list of requirements and > > > proposals > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Problem 2: Pulsar IO Record does not handle non-String keys > > > > > > > > > > > > > > > > I do not believe you need to add framework level support for > > > this. > > > > > > This > > > > > > > > can be done at a higher layer i.e. the compatibility layer of > > > kafka > > > > > > > connect > > > > > > > > and Pulsar IO. The compatibility layer just needs to map a > > > > > > > > kafka > > > > > > connect > > > > > > > > key of object type into string representation. > > > > > > > > > > > > > > We are already supporting non-String keys in Pulsar with KeyValue, > > > > > > > there is partial support for KeyValue on Sources (with KVRecord) > > > > > > > we just have to complete the work and allow Sinks to deal > > > > > > > automatically with KeyValue messages. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am glad we are trying to build a compatibility layer so Kafka > > > connect > > > > > > > > connectors can run on Pulsar IO, but in doing so, let us not > > > > > > > > just > > > > > > change > > > > > > > > Pulsar IO to resemble Kafka connect. In general, we should be > > > extremely > > > > > > > > cautious about introducing new APIs at the framework layer. If > > > > > > something > > > > > > > > can be implemented at a higher layer then we should implement it > > > there > > > > > > > and > > > > > > > > not try to push it down to the framework layer. > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > > > > > Jerry > > > > > > > > > > > > > > > > On Thu, Mar 18, 2021 at 1:38 AM Enrico Olivelli < > > > eolive...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > Jerry > > > > > > > > > > > > > > > > > > Il giorno gio 18 mar 2021 alle ore 03:07 Jerry Peng > > > > > > > > > <jerry.boyang.p...@gmail.com> ha scritto: > > > > > > > > > > > > > > > > > > > > Hi Enrico, > > > > > > > > > > > > > > > > > > > > Thanks for taking the initiative for improvements on Pulsar > > > IO! > > > > > > > > > > > > > > > > > > > > I have questions in regards to the following statements > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *Problem 1: Pulsar Sinks must declare at compile time the > > > data type > > > > > > > they > > > > > > > > > > support* > > > > > > > > > > > So I would like to see Sink<Object> and let the > > > implementation > > > > > > deal > > > > > > > > > > with Record<Object> that will give access to the decoded > > > payload > > > > > > and > > > > > > > > > > to the Schema. > > > > > > > > > > > > > > > > > > > > What is the point of having a schema for a generic POJO > > > object? > > > > > > The > > > > > > > > > point > > > > > > > > > > of a schema is to provide boundaries on what the structure > > > of the > > > > > > > data > > > > > > > > > can > > > > > > > > > > be. However, having a schema that supports a generic POJO > > > that can > > > > > > > have > > > > > > > > > > any structure seems paradoxical to me. > > > > > > > > > > > > > > > > > > > > Currently, the input schema and output schema for source, > > > sink, and > > > > > > > > > > functions can be undefined i.e. there is no schema, which > > > means > > > > > > that > > > > > > > the > > > > > > > > > > source/sink/function will either write, read and write, or > > > just > > > > > > read > > > > > > > > > byte[] > > > > > > > > > > which is the most generic data representation there can be. > > > Why is > > > > > > > this > > > > > > > > > > not sufficient for your use case? Can the Pulsar IO wrapper > > > code > > > > > > for > > > > > > > > > Kafka > > > > > > > > > > connect just read or write byte[] and let the high up > > > abstractions > > > > > > > > > > determine how to deserialize it? > > > > > > > > > > > > > > > > > > For Sources you can have a Source<byte[]> and you can write > > > records > > > > > > > > > with any schema. > > > > > > > > > This is good and it works well. > > > > > > > > > > > > > > > > > > But it doesn't work for Sinks > > > > > > > > > currently if you declare a Sink<byte[]> you will see as schema > > > > > > > > > Schema.BYTES. > > > > > > > > > If we had a Schema object that is preconfigured for decoding > > > the > > > > > > > > > payload probably we will be done. > > > > > > > > > Generic sinks will be Sink<byte[]> and they will decode the > > > payload > > > > > > > with: > > > > > > > > > > > > > > > > > > public void write(Record<byte[]> record) throws Exception { > > > > > > > > > Schema<?> schema = record.getSchema(); > > > > > > > > > SchemaType type = schema.getSchemaInfo().getType(); > > > > > > > > > Object decoded = schema.decode(record.getValue()); > > > > > > > > > } > > > > > > > > > > > > > > > > > > but this does not work, because we have a strongly typed > > > system and > > > > > > > > > you MUST write: > > > > > > > > > > > > > > > > > > public void write(Record<byte[]> record) throws Exception { > > > > > > > > > Schema<byte[]> schema = record.getSchema(); > > > > > > > > > SchemaType type = schema.getSchemaInfo().getType(); > > > > > > > > > Object decoded = record.getValue(); > > > > > > > > > } > > > > > > > > > > > > > > > > > > if I have Sink<Object> it becomes > > > > > > > > > > > > > > > > > > public void write(Object record) throws Exception { > > > > > > > > > Schema<Object> schema = record.getSchema(); > > > > > > > > > SchemaType type = schema.getSchemaInfo().getType(); > > > > > > > > > Object decoded = record.getValue(); > > > > > > > > > > > > > > > > > > // switch on SchemaType .... > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *Problem 2: Pulsar IO Record does not handle non-String > > > > > > > > > > keys* > > > > > > > > > > > > > > > > > > > > The reason why Pulsar IO Record only supports specifying a > > > key as a > > > > > > > > > string > > > > > > > > > > is because Pulsar only supports specifying keys as strings. > > > The key > > > > > > > > > > specified in Pulsar IO Record is also used as the partition > > > key to > > > > > > > > > > determine which partition a message will be published into. > > > If the > > > > > > > key > > > > > > > > > > returned from Kafka connect connector also needs to be used > > > as the > > > > > > > > > > partition key then the method you specified will not work. > > > You > > > > > > will > > > > > > > > > still > > > > > > > > > > need to translate the key back into a string or implement > > > your own > > > > > > > custom > > > > > > > > > > message router. > > > > > > > > > > > > > > > > > > At the low level this is true, but we already have > > > KeyValue#SEPARATED > > > > > > > > > that brings these features: > > > > > > > > > - you have a schema for the Key > > > > > > > > > - you have a schema for the Value > > > > > > > > > - your Key is stored in the Message key, and you can think it > > > as an > > > > > > > > > Object (it is encoded to byte[] using the KeySchema and it is > > > stored > > > > > > > > > in base64 as string into the message key) > > > > > > > > > > > > > > > > > > It is not super efficient, due to the base64 encoding, but it > > > is a > > > > > > > > > good tradeoff and it is already supported by Pulsar. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Can you also explain how the additional methods you are > > > suggesting > > > > > > > to add > > > > > > > > > > will be used? What entities will be implementing methods > > > > > > > > > > and > > > > > > > setting the > > > > > > > > > > value? What entities will use those values? And how those > > > values > > > > > > > will be > > > > > > > > > > used. > > > > > > > > > > > > > > > > > > If you are using KeyValue (especially in SEPARATED flavour) > > > > > > > > > you > > > > > > > > > essentially will see a record as a key value pair of <Object, > > > > > > Object>, > > > > > > > > > with the key stored in the message key. > > > > > > > > > > > > > > > > > > Most of these ideas come from porting connectors from Kafka > > > Connect > > > > > > to > > > > > > > > > Pulsar IO. > > > > > > > > > I would like to start a separate thread about discussing a > > > convention > > > > > > > > > for the mapping but the short version is: > > > > > > > > > When you come from Kafka you have a KeyValue<Object, Object> > > > in > > > > > > > > > SEPARATED encoding (this may be simplified in case of String > > > > > > > > > or > > > > > > byte[] > > > > > > > > > keys, but let's stick to the most complicated case). > > > > > > > > > So your producer (probably a KafkaSource or a > > > KafkaConnectSource) > > > > > > > > > writes to Pulsar a KeyValue and uses KeyValueSchema with > > > SEPARATED > > > > > > > > > encoding. > > > > > > > > > > > > > > > > > > On the Sink side you have your connector that (in Kafka) is > > > used to > > > > > > > > > deal with an Object key and an Object value. > > > > > > > > > With those new methods you can use the logical key and the > > > logical > > > > > > > > > value (and you can use the keySchema and the valueSchema) > > > > > > > > > and you have 1-1 mapping with what you did in Kafka. > > > > > > > > > > > > > > > > > > This is particularly important if you have users that are > > > trying to > > > > > > > > > migrate from Kafka to Pulsar, and there is no easy and no > > > standard > > > > > > way > > > > > > > > > to do it > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *Problem 3: Pulsar IO Records do not allow NULL values* > > > > > > > > > > Don't have a problem with this. The use case you provided > > > makes > > > > > > > sense. > > > > > > > > > > > > > > > > > > I will send a patch, thanks > > > > > > > > > > > > > > > > > > Enrico > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 17, 2021 at 2:04 AM Enrico Olivelli < > > > > > > eolive...@gmail.com > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hello everyone, > > > > > > > > > > > > > > > > > > > > > > I would like to share some proposals about Pulsar IO. > > > Based on > > > > > > my > > > > > > > > > > > experience porting a nontrivial connector for Cassandra > > > from > > > > > > Kafka > > > > > > > > > > > Connect to Pulsar IO, I would like to identify some places > > > where > > > > > > we > > > > > > > > > > > can improve the Pulsar IO API for both native Pulsar > > > connectors > > > > > > > and to > > > > > > > > > > > improve feature parity with Kafka Connect. Ultimately, I > > > would > > > > > > > like > > > > > > > > > > > to provide a Kafka Connect compatibility library so that > > > people > > > > > > can > > > > > > > > > > > use arbitrary Kafka connectors with Pulsar without having > > > to port > > > > > > > each > > > > > > > > > > > one. > > > > > > > > > > > > > > > > > > > > > > Problem 1: Pulsar Sinks must declare at compile time the > > > data > > > > > > type > > > > > > > they > > > > > > > > > > > support > > > > > > > > > > > > > > > > > > > > > > You can have a Sink<byte[]>, Sink<GenericRecord>, > > > Sink<KeyValue>, > > > > > > > but > > > > > > > > > > > you cannot have only one single Sink implementation that > > > deals > > > > > > with > > > > > > > > > > > every kind of schema. > > > > > > > > > > > > > > > > > > > > > > In Kafka Connect you are used to dealing with Object at > > > compile > > > > > > > time > > > > > > > > > > > and deal with data types and Schemas in the > > > > > > > > > > > implementation. > > > > > > > > > > > > > > > > > > > > > > So I would like to see Sink<Object> and let the > > > implementation > > > > > > deal > > > > > > > > > > > with Record<Object> that will give access to the decoded > > > payload > > > > > > > and > > > > > > > > > > > to the Schema. > > > > > > > > > > > > > > > > > > > > > > I have started a work to support Schema<Object> here, that > > > is the > > > > > > > > > > > first step in the direction of supporting Sink<Object> > > > > > > > > > > > https://github.com/apache/pulsar/pull/9895 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Problem 2: Pulsar IO Record does not handle non-String > > > > > > > > > > > keys > > > > > > > > > > > > > > > > > > > > > > We only have Record#getKey() that returns an > > > Optional<String>. So > > > > > > > even > > > > > > > > > > > if you are using KeyValue, you cannot access the logical > > > value of > > > > > > > the > > > > > > > > > > > key. > > > > > > > > > > > > > > > > > > > > > > The idea is to use KeyValue as foundation for supporting > > > Schema > > > > > > in > > > > > > > the > > > > > > > > > > > Key (this is nothing new, but it is still not much > > > supported in > > > > > > > Pulsar > > > > > > > > > > > IO) > > > > > > > > > > > > > > > > > > > > > > Kafka connect users deal with Object keys and we should > > > provide > > > > > > the > > > > > > > > > > > same support. > > > > > > > > > > > We already have support for Object keys in case of > > > KeyValue. > > > > > > > > > > > The proposal I am working with my colleagues is to add > > > Object > > > > > > > > > > > getLogicalKey() to the Record interface. > > > > > > > > > > > > > > > > > > > > > > interface Record<T> { > > > > > > > > > > > > > > > > > > > > > > Optional<Object> getLogicalKey(); > > > > > > > > > > > Optional<Object> getLogicalValue(); > > > > > > > > > > > Schema<T> getKeySchema(); > > > > > > > > > > > Schema<T> getValueSchema(); > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Optional<String> getKey(); > > > > > > > > > > > Object getValue(); > > > > > > > > > > > Schema<T> getSchema(); > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > For messages that are using byte[] key > > > > > > > (Message#hasBase64EncodedKey) > > > > > > > > > > > the getLogicalKey() returns the same as > > > Message#getKeyBytes() and > > > > > > > > > > > getKeySchema will return Schema.BYTES. > > > > > > > > > > > > > > > > > > > > > > For regular messages getKeySchema() returns Schema.STRING. > > > > > > > > > > > > > > > > > > > > > > For messages that are using the KeyValue schema the new > > > methods > > > > > > > will > > > > > > > > > > > return the corresponding fields and metadata. > > > > > > > > > > > > > > > > > > > > > > getLogicalKey -> the key of the KeyValue > > > > > > > > > > > getLogicalValue -> the value of KeyValue > > > > > > > > > > > getKeySchema -> the schema for the logical key > > > > > > > > > > > getValueSchema -> the schema for the logical value > > > > > > > > > > > getKey -> the key of the message, as usual, for backward > > > > > > > compatibility > > > > > > > > > > > getSchema -> the KeyValueSchema, as usual, for backward > > > > > > > compatibility > > > > > > > > > > > getValue -> the KeyValue object, as usual, for backward > > > > > > > compatibility > > > > > > > > > > > > > > > > > > > > > > Please note that we already have a KVRecord interface that > > > > > > exposes > > > > > > > key > > > > > > > > > > > and value schema, but it is not used while pushing data to > > > Sinks > > > > > > > (it > > > > > > > > > > > looks like it is only used by KafkaConnectSource): > > > > > > > > > > > > > > > > > > > > > > public interface KVRecord<K, V> extends Record { > > > > > > > > > > > Schema<K> getKeySchema(); > > > > > > > > > > > Schema<V> getValueSchema(); > > > > > > > > > > > KeyValueEncodingType getKeyValueEncodingType(); > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > Problem 3: Pulsar IO Records do not allow NULL values > > > > > > > > > > > > > > > > > > > > > > Currently there is no support for NULL values in Pulsar > > > > > > > > > > > IO. > > > > > > > > > > > This case makes sense if you are storing information in > > > the key > > > > > > as > > > > > > > > > well. > > > > > > > > > > > > > > > > > > > > > > Example use case: use the topic as a changelog of a > > > database, put > > > > > > > the > > > > > > > > > > > PK into the key and the rest of the record into the value. > > > Use > > > > > > the > > > > > > > > > > > convention that a NULL value means a DELETE operation and > > > a non > > > > > > > NULL > > > > > > > > > > > value is an UPSERT. > > > > > > > > > > > > > > > > > > > > > > Thoughts ? > > > > > > > > > > > > > > > > > > > > > > Enrico Olivelli > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >