Il giorno ven 26 mar 2021 alle ore 07:50 Enrico Olivelli
<eolive...@gmail.com> ha scritto:
>
> Sijie,
> I have a proposal that fits your points and my needs
>
> 1. Introduce a new PulsarObject interface, that is a wrapper for any data type
>
> interface PulsarObject {
>      Schema/SchemaType getSchemaType();
>      Object getPayload();
> }
>
> 2. Make GenericRecord extend PulsarObject
>
> interface GenericRecord extends PulsarObject {
> ....... getNativeRecord renamed to getPayload();
> ....... implement PrimitiveRecord
> }
>
> 3. Do not introduce ObjectSchema, but only enhance AutoConsumeSchema
> to work with every Schema type.
> For non Struct objects we return PrimitiveRecord, as you proposed.
>
> 4. Allow users of Pulsar IO to write Sink<PulsarObject> or
> Sink<Object>, binding it to AutoConsumeSchema
> new users can use a general purpose API, and they just have to handle
> PulsarObject
>
> This way:
> - Sink<PulsarObject> will be the "general purpose sink" I want to see
> - No new ObjectSchema, only AutoConsumeSchema -> no new maintenance costs
> - No need to deprecate GenericRecord methods, only rename
> getNativeRecord to getPayload()
>
> How does this proposal sound to you ?
> I believe it answers all of your points and it fixes my problem in a
> very elegant way.

Probably the better name is GenericObject instead of PulsarObject

I have sent a new PR
https://github.com/apache/pulsar/pull/10057

Please take a look

Enrico


>
> Enrico
>
>
> Il giorno gio 25 mar 2021 alle ore 23:50 Sijie Guo
> <guosi...@gmail.com> ha scritto:
> >
> > Hi Enrico,
> >
> > In general, I would suggest separating the general-purpose sink from the
> > schema object. I know they are related. But it unnecessarily makes the
> > conversation much complicated. I would actually suggest keeping the
> > conversation around the generic "auto consumes" schema first. Once that is
> > figured out, the other conversations will be easier.
> >
> > Regarding the generic auto_consuem schema, I don't think approach 2 in the
> > google doc reflects what we have discussed in the previous community
> > meeting. You are writing a proposal that is in favor of approach 1 which
> > dismissed the discussion in the community meeting and misled the community.
> >
> > My suggestion was to continue enhancing AUTO_SCHEMA without introducing a
> > new OBJECT schema to confuse the users and make the maintenance become hard.
> >
> > Because we already introduced the "getNativeRecord" method recently to
> > allow people to access the underlying "object"/"record" that can be
> > interpreted based on its schema. With that being said, the "GenericRecord"
> > is becoming a wrapper over the actual object deserialized from a schema.
> > The changes I suggested in the community were:
> >
> > - Deprecated "getFields" in the `GenericRecord` interface as they are
> > useless after we introduced the `getNativeRecord` method. This change will
> > make `GenericRecord` become a generic container wrapping the object
> > serialized from the actual schema.
> > - Rename `getNativeRecord` to `getNativeObject` to return the underlying
> > object. `getNativeRecord` was introduced in a recent PR which has NOT been
> > released yet.
> >
> > With these changes,
> >
> > 1) It maintains backward compatibility without introducing a new ambiguous
> > schema that does similar things as AUTO_CONSUME.
> > 2) It provides consistent behavior across structure schemas and primitive
> > schemas. Because you always use `getNativeObject` to get the actual object
> > deserialized from the actual schema implementation.
> > 3) It also provides extensibility than `Object`. Because it makes
> > `GenericRecord` a root object for representing objects deserialized from a
> > log record.
> >
> > Thanks,
> > Sijie
> >
> >
> >
> >
> >
> > On Wed, Mar 24, 2021 at 9:43 AM Enrico Olivelli <eolive...@gmail.com> wrote:
> >
> > > Il giorno lun 22 mar 2021 alle ore 11:44 Enrico Olivelli
> > > <eolive...@gmail.com> ha scritto:
> > > >
> > > > Sijjie, Jerry, Matteo
> > > > I have created this GDoc that explains the problem, and the points we
> > > > raised in the various PRs and in the Community meeting.
> > > >
> > > > I hope that it clarifies my need for Schema.OBJECT().
> > > >
> > > https://docs.google.com/document/d/117gr6hG3N46MlkqENRKRtLejb_e_K0cZagQZv3jbwvQ/edit?usp=sharing
> > >
> > > For reference this is the full patch, that allows users to compile and
> > > run a Sink<Object>
> > >
> > > https://github.com/apache/pulsar/pull/10034
> > >
> > > Enrico
> > >
> > >
> > > >
> > > > The document is open for comments to everyone who has the link
> > > >
> > > > This is the patch
> > > > https://github.com/apache/pulsar/pull/9895
> > > >
> > > > Enrico
> > > >
> > > > Il giorno lun 22 mar 2021 alle ore 08:46 Sijie Guo
> > > > <guosi...@gmail.com> ha scritto:
> > > > >
> > > > > > Perhaps Sijie can also chime in on why the behavior is such.
> > > > >
> > > > > I will go through the discussion here tomorrow and reply here.
> > > > >
> > > > > - Sijie
> > > > >
> > > > > On Fri, Mar 19, 2021 at 11:10 AM Jerry Peng <
> > > jerry.boyang.p...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > HI Enrico,
> > > > > > >
> > > > > > >
> > > > > > > One problem is that I do not know how to code "my_decode_method"
> > > > > > > without Schema information.
> > > > > > > If we had the Schema that is used for the Record then the approach
> > > of
> > > > > > > using Sink[] may work. In fact it works perfectly for
> > > Source<byte[]>.
> > > > > > > This is not the case, because for Sink<byte[]> record.getSchema()
> > > > > > > returns byte[] and we are not activating Schema.OBJECT()
> > > (AutoConsume)
> > > > > > > mechanism that downloads
> > > > > > > and applies automatically the schema.
> > > > > > > The second problem is about making it easy for users to build
> > > Pulsar
> > > > > > > IO Sinks that are schema agnostic at compile time.
> > > > > > > public class MySink implements Sink<Object> {
> > > > > > >     public void write(Record<Object> record) {
> > > > > > >           Object value = record.getValue(); --> automatically
> > > decoded by
> > > > > > > Pulsar
> > > > > > >           Schema schema = record.getSchema() -> this is the actual
> > > > > > > schema for the message (it may vary from message to message)
> > > > > > >    }
> > > > > > > }
> > > > > > > Currently you have to define at compile time the type of Schema 
> > > > > > > you
> > > > > > > want to consume, this is fine for custom Sinks or for very simple
> > > > > > > Sinks.
> > > > > > > But it does not work for general purpose Sinks.
> > > > > > > I am putting up a GDoc with the full list of requirements and
> > > proposals
> > > > > >
> > > > > >
> > > > > > The problem seems to be with `getSchema()`  in Record and the
> > > subsequent
> > > > > >  `getSchema()` on the actual pulsar message.  For consumer declared
> > > like
> > > > > > Consumer<byte[]>, getSchema() calls on messages the consumer returns
> > > will
> > > > > > always be SCHEMA.BYTES, regardless of what the actual schema the
> > > message
> > > > > > was produced with.   The schema of message is overridden when byte[]
> > > is
> > > > > > specified as the schema for the consumer. This seems like a bug to
> > > me.
> > > > > > Perhaps Sijie can also chime in on why the behavior is such.
> > > > > >
> > > > > >
> > > > > > >
> > > > > > On Fri, Mar 19, 2021 at 12:46 AM Enrico Olivelli <
> > > eolive...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Il giorno ven 19 mar 2021 alle ore 08:20 Jerry Peng
> > > > > > > <jerry.boyang.p...@gmail.com> ha scritto:
> > > > > > > >
> > > > > > > > > But it doesn't work for Sinks
> > > > > > > > currently if you declare a Sink<byte[]> you will see as schema
> > > > > > > Schema.BYTES.
> > > > > > > > If we had a Schema object that is preconfigured for decoding the
> > > > > > > > payload probably we will be done.
> > > > > > > >
> > > > > > > > If you declare sink as Sink<byte[]> there should be no schema.
> > > I am
> > > > > > not
> > > > > > > > sure what getSchema() returns but the original design is that if
> > > the
> > > > > > > > declared type is byte[] indicates that there is no schema.  This
> > > also
> > > > > > > done
> > > > > > > > for backwards compatibility reasons as schema has not always 
> > > > > > > > been
> > > > > > around.
> > > > > > > >
> > > > > > > > I still don't quite understand why you can not just implement
> > > your own
> > > > > > > > de-serialization method in the sink? That deserializes byte[]
> > > into
> > > > > > > whatever
> > > > > > > > you want it to be.
> > > > > > > >
> > > > > > > > public void write(Record<byte[]> record) throws Exception {
> > > > > > > >     byte[] val = record.getValue();
> > > > > > > >     SomeObject foo = my_decode_method(val);
> > > > > > > >     ....
> > > > > > > > }
> > > > > > >
> > > > > > > One problem is that I do not know how to code "my_decode_method"
> > > > > > > without Schema information.
> > > > > > > If we had the Schema that is used for the Record then the approach
> > > of
> > > > > > > using Sink[] may work. In fact it works perfectly for
> > > Source<byte[]>.
> > > > > > > This is not the case, because for Sink<byte[]> record.getSchema()
> > > > > > > returns byte[] and we are not activating Schema.OBJECT()
> > > (AutoConsume)
> > > > > > > mechanism that downloads
> > > > > > > and applies automatically the schema.
> > > > > > >
> > > > > > > The second problem is about making it easy for users to build
> > > Pulsar
> > > > > > > IO Sinks that are schema agnostic at compile time.
> > > > > > >
> > > > > > > public class MySink implements Sink<Object> {
> > > > > > >     public void write(Record<Object> record) {
> > > > > > >           Object value = record.getValue(); --> automatically
> > > decoded by
> > > > > > > Pulsar
> > > > > > >           Schema schema = record.getSchema() -> this is the actual
> > > > > > > schema for the message (it may vary from message to message)
> > > > > > >    }
> > > > > > > }
> > > > > > >
> > > > > > > Currently you have to define at compile time the type of Schema 
> > > > > > > you
> > > > > > > want to consume, this is fine for custom Sinks or for very simple
> > > > > > > Sinks.
> > > > > > > But it does not work for general purpose Sinks.
> > > > > > >
> > > > > > > I am putting up a GDoc with the full list of requirements and
> > > proposals
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > >
> > > > > > > > > Problem 2: Pulsar IO Record does not handle non-String keys
> > > > > > > >
> > > > > > > > I do not believe you need to add framework level support for
> > > this.
> > > > > > This
> > > > > > > > can be done at a higher layer i.e. the compatibility layer of
> > > kafka
> > > > > > > connect
> > > > > > > > and Pulsar IO.  The compatibility layer just needs to map a 
> > > > > > > > kafka
> > > > > > connect
> > > > > > > > key of object type into string representation.
> > > > > > >
> > > > > > > We are already supporting non-String keys in Pulsar with KeyValue,
> > > > > > > there is partial support for KeyValue on Sources (with KVRecord)
> > > > > > > we just have to complete the work and allow Sinks to deal
> > > > > > > automatically with KeyValue messages.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > >
> > > > > > > > I am glad we are trying to build a compatibility layer so Kafka
> > > connect
> > > > > > > > connectors can run on Pulsar IO, but in doing so, let us not 
> > > > > > > > just
> > > > > > change
> > > > > > > > Pulsar IO to resemble Kafka connect. In general, we should be
> > > extremely
> > > > > > > > cautious about introducing new APIs at the framework layer.  If
> > > > > > something
> > > > > > > > can be implemented at a higher layer then we should implement it
> > > there
> > > > > > > and
> > > > > > > > not try to push it down to the framework layer.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > >
> > > > > > > > Jerry
> > > > > > > >
> > > > > > > > On Thu, Mar 18, 2021 at 1:38 AM Enrico Olivelli <
> > > eolive...@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Jerry
> > > > > > > > >
> > > > > > > > > Il giorno gio 18 mar 2021 alle ore 03:07 Jerry Peng
> > > > > > > > > <jerry.boyang.p...@gmail.com> ha scritto:
> > > > > > > > > >
> > > > > > > > > > Hi Enrico,
> > > > > > > > > >
> > > > > > > > > > Thanks for taking the initiative for improvements on Pulsar
> > > IO!
> > > > > > > > > >
> > > > > > > > > > I have questions in regards to the following statements
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > *Problem 1: Pulsar Sinks must declare at compile time the
> > > data type
> > > > > > > they
> > > > > > > > > > support*
> > > > > > > > > > > So I would like to see Sink<Object> and let the
> > > implementation
> > > > > > deal
> > > > > > > > > > with Record<Object> that will give access to the decoded
> > > payload
> > > > > > and
> > > > > > > > > > to the Schema.
> > > > > > > > > >
> > > > > > > > > > What is the point of having a schema for a generic POJO
> > > object?
> > > > > > The
> > > > > > > > > point
> > > > > > > > > > of a schema is to provide boundaries on what the structure
> > > of the
> > > > > > > data
> > > > > > > > > can
> > > > > > > > > > be.  However, having a schema that supports a generic POJO
> > > that can
> > > > > > > have
> > > > > > > > > > any structure seems paradoxical to me.
> > > > > > > > > >
> > > > > > > > > > Currently, the input schema and output schema for source,
> > > sink, and
> > > > > > > > > > functions can be undefined i.e. there is no schema, which
> > > means
> > > > > > that
> > > > > > > the
> > > > > > > > > > source/sink/function will either write, read and write, or
> > > just
> > > > > > read
> > > > > > > > > byte[]
> > > > > > > > > > which is the most generic data representation there can be.
> > > Why is
> > > > > > > this
> > > > > > > > > > not sufficient for your use case?  Can the Pulsar IO wrapper
> > > code
> > > > > > for
> > > > > > > > > Kafka
> > > > > > > > > > connect just read or write byte[] and let the high up
> > > abstractions
> > > > > > > > > > determine how to deserialize it?
> > > > > > > > >
> > > > > > > > > For Sources you can have a Source<byte[]> and you can write
> > > records
> > > > > > > > > with any schema.
> > > > > > > > > This is good and it works well.
> > > > > > > > >
> > > > > > > > > But it doesn't work for Sinks
> > > > > > > > > currently if you declare a Sink<byte[]> you will see as schema
> > > > > > > > > Schema.BYTES.
> > > > > > > > > If we had a Schema object that is preconfigured for decoding
> > > the
> > > > > > > > > payload probably we will be done.
> > > > > > > > > Generic sinks will be Sink<byte[]> and they will decode the
> > > payload
> > > > > > > with:
> > > > > > > > >
> > > > > > > > > public void write(Record<byte[]> record) throws Exception {
> > > > > > > > >     Schema<?> schema = record.getSchema();
> > > > > > > > >    SchemaType type = schema.getSchemaInfo().getType();
> > > > > > > > >    Object decoded = schema.decode(record.getValue());
> > > > > > > > > }
> > > > > > > > >
> > > > > > > > > but this does not work, because we have a strongly typed
> > > system and
> > > > > > > > > you MUST write:
> > > > > > > > >
> > > > > > > > > public void write(Record<byte[]> record) throws Exception {
> > > > > > > > >     Schema<byte[]> schema = record.getSchema();
> > > > > > > > >    SchemaType type = schema.getSchemaInfo().getType();
> > > > > > > > >    Object decoded = record.getValue();
> > > > > > > > > }
> > > > > > > > >
> > > > > > > > > if I have Sink<Object> it becomes
> > > > > > > > >
> > > > > > > > > public void write(Object record) throws Exception {
> > > > > > > > >     Schema<Object> schema = record.getSchema();
> > > > > > > > >    SchemaType type = schema.getSchemaInfo().getType();
> > > > > > > > >    Object decoded = record.getValue();
> > > > > > > > >
> > > > > > > > >    // switch on SchemaType ....
> > > > > > > > > }
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > *Problem 2: Pulsar IO Record does not handle non-String 
> > > > > > > > > > keys*
> > > > > > > > > >
> > > > > > > > > > The reason why Pulsar IO Record only supports specifying a
> > > key as a
> > > > > > > > > string
> > > > > > > > > > is because Pulsar only supports specifying keys as strings.
> > > The key
> > > > > > > > > > specified in Pulsar IO Record is also used as the partition
> > > key to
> > > > > > > > > > determine which partition a message will be published into.
> > > If the
> > > > > > > key
> > > > > > > > > > returned from Kafka connect connector also needs to be used
> > > as the
> > > > > > > > > > partition key then the method you specified will not work.
> > > You
> > > > > > will
> > > > > > > > > still
> > > > > > > > > > need to translate the key back into a string or implement
> > > your own
> > > > > > > custom
> > > > > > > > > > message router.
> > > > > > > > >
> > > > > > > > > At the low level this is true, but we already have
> > > KeyValue#SEPARATED
> > > > > > > > > that brings these features:
> > > > > > > > > - you have a schema for the Key
> > > > > > > > > - you have a schema for the Value
> > > > > > > > > - your Key is stored in the Message key, and you can think it
> > > as an
> > > > > > > > > Object (it is encoded to byte[] using the KeySchema and it is
> > > stored
> > > > > > > > > in base64 as string into the message key)
> > > > > > > > >
> > > > > > > > > It is not super efficient, due to the base64 encoding, but it
> > > is a
> > > > > > > > > good tradeoff and it is already supported by Pulsar.
> > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Can you also explain how the additional methods you are
> > > suggesting
> > > > > > > to add
> > > > > > > > > > will be used?  What entities will be implementing methods 
> > > > > > > > > > and
> > > > > > > setting the
> > > > > > > > > > value? What entities will use those values? And how those
> > > values
> > > > > > > will be
> > > > > > > > > > used.
> > > > > > > > >
> > > > > > > > > If you are using KeyValue (especially in SEPARATED flavour) 
> > > > > > > > > you
> > > > > > > > > essentially will see a record as a key value pair of <Object,
> > > > > > Object>,
> > > > > > > > > with the key stored in the message key.
> > > > > > > > >
> > > > > > > > > Most of these ideas come from porting connectors from Kafka
> > > Connect
> > > > > > to
> > > > > > > > > Pulsar IO.
> > > > > > > > > I would like to start a separate thread about discussing a
> > > convention
> > > > > > > > > for the mapping but the short version is:
> > > > > > > > > When you come from Kafka you have a KeyValue<Object, Object>
> > > in
> > > > > > > > > SEPARATED encoding (this may be simplified in case of String 
> > > > > > > > > or
> > > > > > byte[]
> > > > > > > > > keys, but let's stick to the most complicated case).
> > > > > > > > > So your producer (probably a KafkaSource or a
> > > KafkaConnectSource)
> > > > > > > > > writes to Pulsar a KeyValue and uses KeyValueSchema with
> > > SEPARATED
> > > > > > > > > encoding.
> > > > > > > > >
> > > > > > > > > On the Sink side you have your connector that (in Kafka) is
> > > used to
> > > > > > > > > deal with an Object key and an Object value.
> > > > > > > > > With those new methods you can use the logical key and the
> > > logical
> > > > > > > > > value (and you can use the keySchema and the valueSchema)
> > > > > > > > > and you have 1-1 mapping with what you did in Kafka.
> > > > > > > > >
> > > > > > > > > This is particularly important if you have users that are
> > > trying to
> > > > > > > > > migrate from Kafka to Pulsar, and there is no easy and no
> > > standard
> > > > > > way
> > > > > > > > > to do it
> > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > *Problem 3:  Pulsar IO Records do not allow NULL values*
> > > > > > > > > > Don't have a problem with this.  The use case you provided
> > > makes
> > > > > > > sense.
> > > > > > > > >
> > > > > > > > > I will send a patch, thanks
> > > > > > > > >
> > > > > > > > > Enrico
> > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Wed, Mar 17, 2021 at 2:04 AM Enrico Olivelli <
> > > > > > eolive...@gmail.com
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hello everyone,
> > > > > > > > > > >
> > > > > > > > > > > I would like to share some proposals about Pulsar IO.
> > > Based on
> > > > > > my
> > > > > > > > > > > experience porting a nontrivial connector for Cassandra
> > > from
> > > > > > Kafka
> > > > > > > > > > > Connect to Pulsar IO, I would like to identify some places
> > > where
> > > > > > we
> > > > > > > > > > > can improve the Pulsar IO API for both native Pulsar
> > > connectors
> > > > > > > and to
> > > > > > > > > > > improve feature parity with Kafka Connect.  Ultimately, I
> > > would
> > > > > > > like
> > > > > > > > > > > to provide a Kafka Connect compatibility library so that
> > > people
> > > > > > can
> > > > > > > > > > > use arbitrary Kafka connectors with Pulsar without having
> > > to port
> > > > > > > each
> > > > > > > > > > > one.
> > > > > > > > > > >
> > > > > > > > > > > Problem 1: Pulsar Sinks must declare at compile time the
> > > data
> > > > > > type
> > > > > > > they
> > > > > > > > > > > support
> > > > > > > > > > >
> > > > > > > > > > > You can have a Sink<byte[]>, Sink<GenericRecord>,
> > > Sink<KeyValue>,
> > > > > > > but
> > > > > > > > > > > you cannot have only one single Sink implementation that
> > > deals
> > > > > > with
> > > > > > > > > > > every kind of schema.
> > > > > > > > > > >
> > > > > > > > > > > In Kafka Connect you are used to dealing with Object at
> > > compile
> > > > > > > time
> > > > > > > > > > > and deal with data types and Schemas in the 
> > > > > > > > > > > implementation.
> > > > > > > > > > >
> > > > > > > > > > > So I would like to see Sink<Object> and let the
> > > implementation
> > > > > > deal
> > > > > > > > > > > with Record<Object> that will give access to the decoded
> > > payload
> > > > > > > and
> > > > > > > > > > > to the Schema.
> > > > > > > > > > >
> > > > > > > > > > > I have started a work to support Schema<Object> here, that
> > > is the
> > > > > > > > > > > first step in the direction of supporting Sink<Object>
> > > > > > > > > > > https://github.com/apache/pulsar/pull/9895
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Problem 2: Pulsar IO Record does not handle non-String 
> > > > > > > > > > > keys
> > > > > > > > > > >
> > > > > > > > > > > We only have Record#getKey() that returns an
> > > Optional<String>. So
> > > > > > > even
> > > > > > > > > > > if you are using KeyValue, you cannot access the logical
> > > value of
> > > > > > > the
> > > > > > > > > > > key.
> > > > > > > > > > >
> > > > > > > > > > > The idea is to use KeyValue as foundation for supporting
> > > Schema
> > > > > > in
> > > > > > > the
> > > > > > > > > > > Key (this is nothing new, but it is still not much
> > > supported in
> > > > > > > Pulsar
> > > > > > > > > > > IO)
> > > > > > > > > > >
> > > > > > > > > > > Kafka connect users deal with Object keys and we should
> > > provide
> > > > > > the
> > > > > > > > > > > same support.
> > > > > > > > > > > We already have support for Object keys in case of
> > > KeyValue.
> > > > > > > > > > > The proposal I am working with my colleagues is to add
> > > Object
> > > > > > > > > > > getLogicalKey() to the Record interface.
> > > > > > > > > > >
> > > > > > > > > > > interface Record<T> {
> > > > > > > > > > >
> > > > > > > > > > >       Optional<Object> getLogicalKey();
> > > > > > > > > > >       Optional<Object> getLogicalValue();
> > > > > > > > > > >       Schema<T> getKeySchema();
> > > > > > > > > > >       Schema<T> getValueSchema();
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >       Optional<String> getKey();
> > > > > > > > > > >       Object getValue();
> > > > > > > > > > >       Schema<T> getSchema();
> > > > > > > > > > > }
> > > > > > > > > > >
> > > > > > > > > > > For messages that are using byte[] key
> > > > > > > (Message#hasBase64EncodedKey)
> > > > > > > > > > > the getLogicalKey() returns the same as
> > > Message#getKeyBytes() and
> > > > > > > > > > > getKeySchema will return Schema.BYTES.
> > > > > > > > > > >
> > > > > > > > > > > For regular messages getKeySchema() returns Schema.STRING.
> > > > > > > > > > >
> > > > > > > > > > > For messages that are using the KeyValue schema the new
> > > methods
> > > > > > > will
> > > > > > > > > > > return the corresponding fields and metadata.
> > > > > > > > > > >
> > > > > > > > > > > getLogicalKey -> the key of the KeyValue
> > > > > > > > > > > getLogicalValue -> the value of KeyValue
> > > > > > > > > > > getKeySchema -> the schema for the logical key
> > > > > > > > > > > getValueSchema -> the schema for the logical value
> > > > > > > > > > > getKey -> the key of the message, as usual, for backward
> > > > > > > compatibility
> > > > > > > > > > > getSchema -> the KeyValueSchema, as usual, for backward
> > > > > > > compatibility
> > > > > > > > > > > getValue -> the KeyValue object, as usual, for backward
> > > > > > > compatibility
> > > > > > > > > > >
> > > > > > > > > > > Please note that we already have a KVRecord interface that
> > > > > > exposes
> > > > > > > key
> > > > > > > > > > > and value schema, but it is not used while pushing data to
> > > Sinks
> > > > > > > (it
> > > > > > > > > > > looks like it is only used by KafkaConnectSource):
> > > > > > > > > > >
> > > > > > > > > > > public interface KVRecord<K, V> extends Record {
> > > > > > > > > > >    Schema<K> getKeySchema();
> > > > > > > > > > >    Schema<V> getValueSchema();
> > > > > > > > > > >    KeyValueEncodingType getKeyValueEncodingType();
> > > > > > > > > > > }
> > > > > > > > > > >
> > > > > > > > > > > Problem 3:  Pulsar IO Records do not allow NULL values
> > > > > > > > > > >
> > > > > > > > > > > Currently there is no support for NULL values in Pulsar 
> > > > > > > > > > > IO.
> > > > > > > > > > > This case makes sense if you are storing information in
> > > the key
> > > > > > as
> > > > > > > > > well.
> > > > > > > > > > >
> > > > > > > > > > > Example use case: use the topic as a changelog of a
> > > database, put
> > > > > > > the
> > > > > > > > > > > PK into the key and the rest of the record into the value.
> > > Use
> > > > > > the
> > > > > > > > > > > convention that a NULL value means a DELETE operation and
> > > a non
> > > > > > > NULL
> > > > > > > > > > > value is an UPSERT.
> > > > > > > > > > >
> > > > > > > > > > > Thoughts ?
> > > > > > > > > > >
> > > > > > > > > > > Enrico Olivelli
> > > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > >

Reply via email to