Hi Enrico,

In general, I would suggest separating the general-purpose sink from the
schema object. I know they are related. But it unnecessarily makes the
conversation much complicated. I would actually suggest keeping the
conversation around the generic "auto consumes" schema first. Once that is
figured out, the other conversations will be easier.

Regarding the generic auto_consuem schema, I don't think approach 2 in the
google doc reflects what we have discussed in the previous community
meeting. You are writing a proposal that is in favor of approach 1 which
dismissed the discussion in the community meeting and misled the community.

My suggestion was to continue enhancing AUTO_SCHEMA without introducing a
new OBJECT schema to confuse the users and make the maintenance become hard.

Because we already introduced the "getNativeRecord" method recently to
allow people to access the underlying "object"/"record" that can be
interpreted based on its schema. With that being said, the "GenericRecord"
is becoming a wrapper over the actual object deserialized from a schema.
The changes I suggested in the community were:

- Deprecated "getFields" in the `GenericRecord` interface as they are
useless after we introduced the `getNativeRecord` method. This change will
make `GenericRecord` become a generic container wrapping the object
serialized from the actual schema.
- Rename `getNativeRecord` to `getNativeObject` to return the underlying
object. `getNativeRecord` was introduced in a recent PR which has NOT been
released yet.

With these changes,

1) It maintains backward compatibility without introducing a new ambiguous
schema that does similar things as AUTO_CONSUME.
2) It provides consistent behavior across structure schemas and primitive
schemas. Because you always use `getNativeObject` to get the actual object
deserialized from the actual schema implementation.
3) It also provides extensibility than `Object`. Because it makes
`GenericRecord` a root object for representing objects deserialized from a
log record.

Thanks,
Sijie





On Wed, Mar 24, 2021 at 9:43 AM Enrico Olivelli <eolive...@gmail.com> wrote:

> Il giorno lun 22 mar 2021 alle ore 11:44 Enrico Olivelli
> <eolive...@gmail.com> ha scritto:
> >
> > Sijjie, Jerry, Matteo
> > I have created this GDoc that explains the problem, and the points we
> > raised in the various PRs and in the Community meeting.
> >
> > I hope that it clarifies my need for Schema.OBJECT().
> >
> https://docs.google.com/document/d/117gr6hG3N46MlkqENRKRtLejb_e_K0cZagQZv3jbwvQ/edit?usp=sharing
>
> For reference this is the full patch, that allows users to compile and
> run a Sink<Object>
>
> https://github.com/apache/pulsar/pull/10034
>
> Enrico
>
>
> >
> > The document is open for comments to everyone who has the link
> >
> > This is the patch
> > https://github.com/apache/pulsar/pull/9895
> >
> > Enrico
> >
> > Il giorno lun 22 mar 2021 alle ore 08:46 Sijie Guo
> > <guosi...@gmail.com> ha scritto:
> > >
> > > > Perhaps Sijie can also chime in on why the behavior is such.
> > >
> > > I will go through the discussion here tomorrow and reply here.
> > >
> > > - Sijie
> > >
> > > On Fri, Mar 19, 2021 at 11:10 AM Jerry Peng <
> jerry.boyang.p...@gmail.com>
> > > wrote:
> > >
> > > > HI Enrico,
> > > > >
> > > > >
> > > > > One problem is that I do not know how to code "my_decode_method"
> > > > > without Schema information.
> > > > > If we had the Schema that is used for the Record then the approach
> of
> > > > > using Sink[] may work. In fact it works perfectly for
> Source<byte[]>.
> > > > > This is not the case, because for Sink<byte[]> record.getSchema()
> > > > > returns byte[] and we are not activating Schema.OBJECT()
> (AutoConsume)
> > > > > mechanism that downloads
> > > > > and applies automatically the schema.
> > > > > The second problem is about making it easy for users to build
> Pulsar
> > > > > IO Sinks that are schema agnostic at compile time.
> > > > > public class MySink implements Sink<Object> {
> > > > >     public void write(Record<Object> record) {
> > > > >           Object value = record.getValue(); --> automatically
> decoded by
> > > > > Pulsar
> > > > >           Schema schema = record.getSchema() -> this is the actual
> > > > > schema for the message (it may vary from message to message)
> > > > >    }
> > > > > }
> > > > > Currently you have to define at compile time the type of Schema you
> > > > > want to consume, this is fine for custom Sinks or for very simple
> > > > > Sinks.
> > > > > But it does not work for general purpose Sinks.
> > > > > I am putting up a GDoc with the full list of requirements and
> proposals
> > > >
> > > >
> > > > The problem seems to be with `getSchema()`  in Record and the
> subsequent
> > > >  `getSchema()` on the actual pulsar message.  For consumer declared
> like
> > > > Consumer<byte[]>, getSchema() calls on messages the consumer returns
> will
> > > > always be SCHEMA.BYTES, regardless of what the actual schema the
> message
> > > > was produced with.   The schema of message is overridden when byte[]
> is
> > > > specified as the schema for the consumer. This seems like a bug to
> me.
> > > > Perhaps Sijie can also chime in on why the behavior is such.
> > > >
> > > >
> > > > >
> > > > On Fri, Mar 19, 2021 at 12:46 AM Enrico Olivelli <
> eolive...@gmail.com>
> > > > wrote:
> > > >
> > > > > Il giorno ven 19 mar 2021 alle ore 08:20 Jerry Peng
> > > > > <jerry.boyang.p...@gmail.com> ha scritto:
> > > > > >
> > > > > > > But it doesn't work for Sinks
> > > > > > currently if you declare a Sink<byte[]> you will see as schema
> > > > > Schema.BYTES.
> > > > > > If we had a Schema object that is preconfigured for decoding the
> > > > > > payload probably we will be done.
> > > > > >
> > > > > > If you declare sink as Sink<byte[]> there should be no schema.
> I am
> > > > not
> > > > > > sure what getSchema() returns but the original design is that if
> the
> > > > > > declared type is byte[] indicates that there is no schema.  This
> also
> > > > > done
> > > > > > for backwards compatibility reasons as schema has not always been
> > > > around.
> > > > > >
> > > > > > I still don't quite understand why you can not just implement
> your own
> > > > > > de-serialization method in the sink? That deserializes byte[]
> into
> > > > > whatever
> > > > > > you want it to be.
> > > > > >
> > > > > > public void write(Record<byte[]> record) throws Exception {
> > > > > >     byte[] val = record.getValue();
> > > > > >     SomeObject foo = my_decode_method(val);
> > > > > >     ....
> > > > > > }
> > > > >
> > > > > One problem is that I do not know how to code "my_decode_method"
> > > > > without Schema information.
> > > > > If we had the Schema that is used for the Record then the approach
> of
> > > > > using Sink[] may work. In fact it works perfectly for
> Source<byte[]>.
> > > > > This is not the case, because for Sink<byte[]> record.getSchema()
> > > > > returns byte[] and we are not activating Schema.OBJECT()
> (AutoConsume)
> > > > > mechanism that downloads
> > > > > and applies automatically the schema.
> > > > >
> > > > > The second problem is about making it easy for users to build
> Pulsar
> > > > > IO Sinks that are schema agnostic at compile time.
> > > > >
> > > > > public class MySink implements Sink<Object> {
> > > > >     public void write(Record<Object> record) {
> > > > >           Object value = record.getValue(); --> automatically
> decoded by
> > > > > Pulsar
> > > > >           Schema schema = record.getSchema() -> this is the actual
> > > > > schema for the message (it may vary from message to message)
> > > > >    }
> > > > > }
> > > > >
> > > > > Currently you have to define at compile time the type of Schema you
> > > > > want to consume, this is fine for custom Sinks or for very simple
> > > > > Sinks.
> > > > > But it does not work for general purpose Sinks.
> > > > >
> > > > > I am putting up a GDoc with the full list of requirements and
> proposals
> > > > >
> > > > >
> > > > >
> > > > > >
> > > > > > > Problem 2: Pulsar IO Record does not handle non-String keys
> > > > > >
> > > > > > I do not believe you need to add framework level support for
> this.
> > > > This
> > > > > > can be done at a higher layer i.e. the compatibility layer of
> kafka
> > > > > connect
> > > > > > and Pulsar IO.  The compatibility layer just needs to map a kafka
> > > > connect
> > > > > > key of object type into string representation.
> > > > >
> > > > > We are already supporting non-String keys in Pulsar with KeyValue,
> > > > > there is partial support for KeyValue on Sources (with KVRecord)
> > > > > we just have to complete the work and allow Sinks to deal
> > > > > automatically with KeyValue messages.
> > > > >
> > > > >
> > > > >
> > > > > >
> > > > > > I am glad we are trying to build a compatibility layer so Kafka
> connect
> > > > > > connectors can run on Pulsar IO, but in doing so, let us not just
> > > > change
> > > > > > Pulsar IO to resemble Kafka connect. In general, we should be
> extremely
> > > > > > cautious about introducing new APIs at the framework layer.  If
> > > > something
> > > > > > can be implemented at a higher layer then we should implement it
> there
> > > > > and
> > > > > > not try to push it down to the framework layer.
> > > > > >
> > > > > > Best,
> > > > > >
> > > > > > Jerry
> > > > > >
> > > > > > On Thu, Mar 18, 2021 at 1:38 AM Enrico Olivelli <
> eolive...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Jerry
> > > > > > >
> > > > > > > Il giorno gio 18 mar 2021 alle ore 03:07 Jerry Peng
> > > > > > > <jerry.boyang.p...@gmail.com> ha scritto:
> > > > > > > >
> > > > > > > > Hi Enrico,
> > > > > > > >
> > > > > > > > Thanks for taking the initiative for improvements on Pulsar
> IO!
> > > > > > > >
> > > > > > > > I have questions in regards to the following statements
> > > > > > > >
> > > > > > > >
> > > > > > > > *Problem 1: Pulsar Sinks must declare at compile time the
> data type
> > > > > they
> > > > > > > > support*
> > > > > > > > > So I would like to see Sink<Object> and let the
> implementation
> > > > deal
> > > > > > > > with Record<Object> that will give access to the decoded
> payload
> > > > and
> > > > > > > > to the Schema.
> > > > > > > >
> > > > > > > > What is the point of having a schema for a generic POJO
> object?
> > > > The
> > > > > > > point
> > > > > > > > of a schema is to provide boundaries on what the structure
> of the
> > > > > data
> > > > > > > can
> > > > > > > > be.  However, having a schema that supports a generic POJO
> that can
> > > > > have
> > > > > > > > any structure seems paradoxical to me.
> > > > > > > >
> > > > > > > > Currently, the input schema and output schema for source,
> sink, and
> > > > > > > > functions can be undefined i.e. there is no schema, which
> means
> > > > that
> > > > > the
> > > > > > > > source/sink/function will either write, read and write, or
> just
> > > > read
> > > > > > > byte[]
> > > > > > > > which is the most generic data representation there can be.
> Why is
> > > > > this
> > > > > > > > not sufficient for your use case?  Can the Pulsar IO wrapper
> code
> > > > for
> > > > > > > Kafka
> > > > > > > > connect just read or write byte[] and let the high up
> abstractions
> > > > > > > > determine how to deserialize it?
> > > > > > >
> > > > > > > For Sources you can have a Source<byte[]> and you can write
> records
> > > > > > > with any schema.
> > > > > > > This is good and it works well.
> > > > > > >
> > > > > > > But it doesn't work for Sinks
> > > > > > > currently if you declare a Sink<byte[]> you will see as schema
> > > > > > > Schema.BYTES.
> > > > > > > If we had a Schema object that is preconfigured for decoding
> the
> > > > > > > payload probably we will be done.
> > > > > > > Generic sinks will be Sink<byte[]> and they will decode the
> payload
> > > > > with:
> > > > > > >
> > > > > > > public void write(Record<byte[]> record) throws Exception {
> > > > > > >     Schema<?> schema = record.getSchema();
> > > > > > >    SchemaType type = schema.getSchemaInfo().getType();
> > > > > > >    Object decoded = schema.decode(record.getValue());
> > > > > > > }
> > > > > > >
> > > > > > > but this does not work, because we have a strongly typed
> system and
> > > > > > > you MUST write:
> > > > > > >
> > > > > > > public void write(Record<byte[]> record) throws Exception {
> > > > > > >     Schema<byte[]> schema = record.getSchema();
> > > > > > >    SchemaType type = schema.getSchemaInfo().getType();
> > > > > > >    Object decoded = record.getValue();
> > > > > > > }
> > > > > > >
> > > > > > > if I have Sink<Object> it becomes
> > > > > > >
> > > > > > > public void write(Object record) throws Exception {
> > > > > > >     Schema<Object> schema = record.getSchema();
> > > > > > >    SchemaType type = schema.getSchemaInfo().getType();
> > > > > > >    Object decoded = record.getValue();
> > > > > > >
> > > > > > >    // switch on SchemaType ....
> > > > > > > }
> > > > > > >
> > > > > > >
> > > > > > > >
> > > > > > > > *Problem 2: Pulsar IO Record does not handle non-String keys*
> > > > > > > >
> > > > > > > > The reason why Pulsar IO Record only supports specifying a
> key as a
> > > > > > > string
> > > > > > > > is because Pulsar only supports specifying keys as strings.
> The key
> > > > > > > > specified in Pulsar IO Record is also used as the partition
> key to
> > > > > > > > determine which partition a message will be published into.
> If the
> > > > > key
> > > > > > > > returned from Kafka connect connector also needs to be used
> as the
> > > > > > > > partition key then the method you specified will not work.
> You
> > > > will
> > > > > > > still
> > > > > > > > need to translate the key back into a string or implement
> your own
> > > > > custom
> > > > > > > > message router.
> > > > > > >
> > > > > > > At the low level this is true, but we already have
> KeyValue#SEPARATED
> > > > > > > that brings these features:
> > > > > > > - you have a schema for the Key
> > > > > > > - you have a schema for the Value
> > > > > > > - your Key is stored in the Message key, and you can think it
> as an
> > > > > > > Object (it is encoded to byte[] using the KeySchema and it is
> stored
> > > > > > > in base64 as string into the message key)
> > > > > > >
> > > > > > > It is not super efficient, due to the base64 encoding, but it
> is a
> > > > > > > good tradeoff and it is already supported by Pulsar.
> > > > > > >
> > > > > > > >
> > > > > > > > Can you also explain how the additional methods you are
> suggesting
> > > > > to add
> > > > > > > > will be used?  What entities will be implementing methods and
> > > > > setting the
> > > > > > > > value? What entities will use those values? And how those
> values
> > > > > will be
> > > > > > > > used.
> > > > > > >
> > > > > > > If you are using KeyValue (especially in SEPARATED flavour) you
> > > > > > > essentially will see a record as a key value pair of <Object,
> > > > Object>,
> > > > > > > with the key stored in the message key.
> > > > > > >
> > > > > > > Most of these ideas come from porting connectors from Kafka
> Connect
> > > > to
> > > > > > > Pulsar IO.
> > > > > > > I would like to start a separate thread about discussing a
> convention
> > > > > > > for the mapping but the short version is:
> > > > > > > When you come from Kafka you have a KeyValue<Object, Object>
> in
> > > > > > > SEPARATED encoding (this may be simplified in case of String or
> > > > byte[]
> > > > > > > keys, but let's stick to the most complicated case).
> > > > > > > So your producer (probably a KafkaSource or a
> KafkaConnectSource)
> > > > > > > writes to Pulsar a KeyValue and uses KeyValueSchema with
> SEPARATED
> > > > > > > encoding.
> > > > > > >
> > > > > > > On the Sink side you have your connector that (in Kafka) is
> used to
> > > > > > > deal with an Object key and an Object value.
> > > > > > > With those new methods you can use the logical key and the
> logical
> > > > > > > value (and you can use the keySchema and the valueSchema)
> > > > > > > and you have 1-1 mapping with what you did in Kafka.
> > > > > > >
> > > > > > > This is particularly important if you have users that are
> trying to
> > > > > > > migrate from Kafka to Pulsar, and there is no easy and no
> standard
> > > > way
> > > > > > > to do it
> > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > *Problem 3:  Pulsar IO Records do not allow NULL values*
> > > > > > > > Don't have a problem with this.  The use case you provided
> makes
> > > > > sense.
> > > > > > >
> > > > > > > I will send a patch, thanks
> > > > > > >
> > > > > > > Enrico
> > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Mar 17, 2021 at 2:04 AM Enrico Olivelli <
> > > > eolive...@gmail.com
> > > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hello everyone,
> > > > > > > > >
> > > > > > > > > I would like to share some proposals about Pulsar IO.
> Based on
> > > > my
> > > > > > > > > experience porting a nontrivial connector for Cassandra
> from
> > > > Kafka
> > > > > > > > > Connect to Pulsar IO, I would like to identify some places
> where
> > > > we
> > > > > > > > > can improve the Pulsar IO API for both native Pulsar
> connectors
> > > > > and to
> > > > > > > > > improve feature parity with Kafka Connect.  Ultimately, I
> would
> > > > > like
> > > > > > > > > to provide a Kafka Connect compatibility library so that
> people
> > > > can
> > > > > > > > > use arbitrary Kafka connectors with Pulsar without having
> to port
> > > > > each
> > > > > > > > > one.
> > > > > > > > >
> > > > > > > > > Problem 1: Pulsar Sinks must declare at compile time the
> data
> > > > type
> > > > > they
> > > > > > > > > support
> > > > > > > > >
> > > > > > > > > You can have a Sink<byte[]>, Sink<GenericRecord>,
> Sink<KeyValue>,
> > > > > but
> > > > > > > > > you cannot have only one single Sink implementation that
> deals
> > > > with
> > > > > > > > > every kind of schema.
> > > > > > > > >
> > > > > > > > > In Kafka Connect you are used to dealing with Object at
> compile
> > > > > time
> > > > > > > > > and deal with data types and Schemas in the implementation.
> > > > > > > > >
> > > > > > > > > So I would like to see Sink<Object> and let the
> implementation
> > > > deal
> > > > > > > > > with Record<Object> that will give access to the decoded
> payload
> > > > > and
> > > > > > > > > to the Schema.
> > > > > > > > >
> > > > > > > > > I have started a work to support Schema<Object> here, that
> is the
> > > > > > > > > first step in the direction of supporting Sink<Object>
> > > > > > > > > https://github.com/apache/pulsar/pull/9895
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Problem 2: Pulsar IO Record does not handle non-String keys
> > > > > > > > >
> > > > > > > > > We only have Record#getKey() that returns an
> Optional<String>. So
> > > > > even
> > > > > > > > > if you are using KeyValue, you cannot access the logical
> value of
> > > > > the
> > > > > > > > > key.
> > > > > > > > >
> > > > > > > > > The idea is to use KeyValue as foundation for supporting
> Schema
> > > > in
> > > > > the
> > > > > > > > > Key (this is nothing new, but it is still not much
> supported in
> > > > > Pulsar
> > > > > > > > > IO)
> > > > > > > > >
> > > > > > > > > Kafka connect users deal with Object keys and we should
> provide
> > > > the
> > > > > > > > > same support.
> > > > > > > > > We already have support for Object keys in case of
> KeyValue.
> > > > > > > > > The proposal I am working with my colleagues is to add
> Object
> > > > > > > > > getLogicalKey() to the Record interface.
> > > > > > > > >
> > > > > > > > > interface Record<T> {
> > > > > > > > >
> > > > > > > > >       Optional<Object> getLogicalKey();
> > > > > > > > >       Optional<Object> getLogicalValue();
> > > > > > > > >       Schema<T> getKeySchema();
> > > > > > > > >       Schema<T> getValueSchema();
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >       Optional<String> getKey();
> > > > > > > > >       Object getValue();
> > > > > > > > >       Schema<T> getSchema();
> > > > > > > > > }
> > > > > > > > >
> > > > > > > > > For messages that are using byte[] key
> > > > > (Message#hasBase64EncodedKey)
> > > > > > > > > the getLogicalKey() returns the same as
> Message#getKeyBytes() and
> > > > > > > > > getKeySchema will return Schema.BYTES.
> > > > > > > > >
> > > > > > > > > For regular messages getKeySchema() returns Schema.STRING.
> > > > > > > > >
> > > > > > > > > For messages that are using the KeyValue schema the new
> methods
> > > > > will
> > > > > > > > > return the corresponding fields and metadata.
> > > > > > > > >
> > > > > > > > > getLogicalKey -> the key of the KeyValue
> > > > > > > > > getLogicalValue -> the value of KeyValue
> > > > > > > > > getKeySchema -> the schema for the logical key
> > > > > > > > > getValueSchema -> the schema for the logical value
> > > > > > > > > getKey -> the key of the message, as usual, for backward
> > > > > compatibility
> > > > > > > > > getSchema -> the KeyValueSchema, as usual, for backward
> > > > > compatibility
> > > > > > > > > getValue -> the KeyValue object, as usual, for backward
> > > > > compatibility
> > > > > > > > >
> > > > > > > > > Please note that we already have a KVRecord interface that
> > > > exposes
> > > > > key
> > > > > > > > > and value schema, but it is not used while pushing data to
> Sinks
> > > > > (it
> > > > > > > > > looks like it is only used by KafkaConnectSource):
> > > > > > > > >
> > > > > > > > > public interface KVRecord<K, V> extends Record {
> > > > > > > > >    Schema<K> getKeySchema();
> > > > > > > > >    Schema<V> getValueSchema();
> > > > > > > > >    KeyValueEncodingType getKeyValueEncodingType();
> > > > > > > > > }
> > > > > > > > >
> > > > > > > > > Problem 3:  Pulsar IO Records do not allow NULL values
> > > > > > > > >
> > > > > > > > > Currently there is no support for NULL values in Pulsar IO.
> > > > > > > > > This case makes sense if you are storing information in
> the key
> > > > as
> > > > > > > well.
> > > > > > > > >
> > > > > > > > > Example use case: use the topic as a changelog of a
> database, put
> > > > > the
> > > > > > > > > PK into the key and the rest of the record into the value.
> Use
> > > > the
> > > > > > > > > convention that a NULL value means a DELETE operation and
> a non
> > > > > NULL
> > > > > > > > > value is an UPSERT.
> > > > > > > > >
> > > > > > > > > Thoughts ?
> > > > > > > > >
> > > > > > > > > Enrico Olivelli
> > > > > > > > >
> > > > > > >
> > > > >
> > > >
>

Reply via email to