> Perhaps Sijie can also chime in on why the behavior is such.

I will go through the discussion here tomorrow and reply here.

- Sijie

On Fri, Mar 19, 2021 at 11:10 AM Jerry Peng <jerry.boyang.p...@gmail.com>
wrote:

> HI Enrico,
> >
> >
> > One problem is that I do not know how to code "my_decode_method"
> > without Schema information.
> > If we had the Schema that is used for the Record then the approach of
> > using Sink[] may work. In fact it works perfectly for Source<byte[]>.
> > This is not the case, because for Sink<byte[]> record.getSchema()
> > returns byte[] and we are not activating Schema.OBJECT() (AutoConsume)
> > mechanism that downloads
> > and applies automatically the schema.
> > The second problem is about making it easy for users to build Pulsar
> > IO Sinks that are schema agnostic at compile time.
> > public class MySink implements Sink<Object> {
> >     public void write(Record<Object> record) {
> >           Object value = record.getValue(); --> automatically decoded by
> > Pulsar
> >           Schema schema = record.getSchema() -> this is the actual
> > schema for the message (it may vary from message to message)
> >    }
> > }
> > Currently you have to define at compile time the type of Schema you
> > want to consume, this is fine for custom Sinks or for very simple
> > Sinks.
> > But it does not work for general purpose Sinks.
> > I am putting up a GDoc with the full list of requirements and proposals
>
>
> The problem seems to be with `getSchema()`  in Record and the subsequent
>  `getSchema()` on the actual pulsar message.  For consumer declared like
> Consumer<byte[]>, getSchema() calls on messages the consumer returns will
> always be SCHEMA.BYTES, regardless of what the actual schema the message
> was produced with.   The schema of message is overridden when byte[] is
> specified as the schema for the consumer. This seems like a bug to me.
> Perhaps Sijie can also chime in on why the behavior is such.
>
>
> >
> On Fri, Mar 19, 2021 at 12:46 AM Enrico Olivelli <eolive...@gmail.com>
> wrote:
>
> > Il giorno ven 19 mar 2021 alle ore 08:20 Jerry Peng
> > <jerry.boyang.p...@gmail.com> ha scritto:
> > >
> > > > But it doesn't work for Sinks
> > > currently if you declare a Sink<byte[]> you will see as schema
> > Schema.BYTES.
> > > If we had a Schema object that is preconfigured for decoding the
> > > payload probably we will be done.
> > >
> > > If you declare sink as Sink<byte[]> there should be no schema.  I am
> not
> > > sure what getSchema() returns but the original design is that if the
> > > declared type is byte[] indicates that there is no schema.  This also
> > done
> > > for backwards compatibility reasons as schema has not always been
> around.
> > >
> > > I still don't quite understand why you can not just implement your own
> > > de-serialization method in the sink? That deserializes byte[] into
> > whatever
> > > you want it to be.
> > >
> > > public void write(Record<byte[]> record) throws Exception {
> > >     byte[] val = record.getValue();
> > >     SomeObject foo = my_decode_method(val);
> > >     ....
> > > }
> >
> > One problem is that I do not know how to code "my_decode_method"
> > without Schema information.
> > If we had the Schema that is used for the Record then the approach of
> > using Sink[] may work. In fact it works perfectly for Source<byte[]>.
> > This is not the case, because for Sink<byte[]> record.getSchema()
> > returns byte[] and we are not activating Schema.OBJECT() (AutoConsume)
> > mechanism that downloads
> > and applies automatically the schema.
> >
> > The second problem is about making it easy for users to build Pulsar
> > IO Sinks that are schema agnostic at compile time.
> >
> > public class MySink implements Sink<Object> {
> >     public void write(Record<Object> record) {
> >           Object value = record.getValue(); --> automatically decoded by
> > Pulsar
> >           Schema schema = record.getSchema() -> this is the actual
> > schema for the message (it may vary from message to message)
> >    }
> > }
> >
> > Currently you have to define at compile time the type of Schema you
> > want to consume, this is fine for custom Sinks or for very simple
> > Sinks.
> > But it does not work for general purpose Sinks.
> >
> > I am putting up a GDoc with the full list of requirements and proposals
> >
> >
> >
> > >
> > > > Problem 2: Pulsar IO Record does not handle non-String keys
> > >
> > > I do not believe you need to add framework level support for this.
> This
> > > can be done at a higher layer i.e. the compatibility layer of kafka
> > connect
> > > and Pulsar IO.  The compatibility layer just needs to map a kafka
> connect
> > > key of object type into string representation.
> >
> > We are already supporting non-String keys in Pulsar with KeyValue,
> > there is partial support for KeyValue on Sources (with KVRecord)
> > we just have to complete the work and allow Sinks to deal
> > automatically with KeyValue messages.
> >
> >
> >
> > >
> > > I am glad we are trying to build a compatibility layer so Kafka connect
> > > connectors can run on Pulsar IO, but in doing so, let us not just
> change
> > > Pulsar IO to resemble Kafka connect. In general, we should be extremely
> > > cautious about introducing new APIs at the framework layer.  If
> something
> > > can be implemented at a higher layer then we should implement it there
> > and
> > > not try to push it down to the framework layer.
> > >
> > > Best,
> > >
> > > Jerry
> > >
> > > On Thu, Mar 18, 2021 at 1:38 AM Enrico Olivelli <eolive...@gmail.com>
> > wrote:
> > >
> > > > Jerry
> > > >
> > > > Il giorno gio 18 mar 2021 alle ore 03:07 Jerry Peng
> > > > <jerry.boyang.p...@gmail.com> ha scritto:
> > > > >
> > > > > Hi Enrico,
> > > > >
> > > > > Thanks for taking the initiative for improvements on Pulsar IO!
> > > > >
> > > > > I have questions in regards to the following statements
> > > > >
> > > > >
> > > > > *Problem 1: Pulsar Sinks must declare at compile time the data type
> > they
> > > > > support*
> > > > > > So I would like to see Sink<Object> and let the implementation
> deal
> > > > > with Record<Object> that will give access to the decoded payload
> and
> > > > > to the Schema.
> > > > >
> > > > > What is the point of having a schema for a generic POJO object?
> The
> > > > point
> > > > > of a schema is to provide boundaries on what the structure of the
> > data
> > > > can
> > > > > be.  However, having a schema that supports a generic POJO that can
> > have
> > > > > any structure seems paradoxical to me.
> > > > >
> > > > > Currently, the input schema and output schema for source, sink, and
> > > > > functions can be undefined i.e. there is no schema, which means
> that
> > the
> > > > > source/sink/function will either write, read and write, or just
> read
> > > > byte[]
> > > > > which is the most generic data representation there can be.  Why is
> > this
> > > > > not sufficient for your use case?  Can the Pulsar IO wrapper code
> for
> > > > Kafka
> > > > > connect just read or write byte[] and let the high up abstractions
> > > > > determine how to deserialize it?
> > > >
> > > > For Sources you can have a Source<byte[]> and you can write records
> > > > with any schema.
> > > > This is good and it works well.
> > > >
> > > > But it doesn't work for Sinks
> > > > currently if you declare a Sink<byte[]> you will see as schema
> > > > Schema.BYTES.
> > > > If we had a Schema object that is preconfigured for decoding the
> > > > payload probably we will be done.
> > > > Generic sinks will be Sink<byte[]> and they will decode the payload
> > with:
> > > >
> > > > public void write(Record<byte[]> record) throws Exception {
> > > >     Schema<?> schema = record.getSchema();
> > > >    SchemaType type = schema.getSchemaInfo().getType();
> > > >    Object decoded = schema.decode(record.getValue());
> > > > }
> > > >
> > > > but this does not work, because we have a strongly typed system and
> > > > you MUST write:
> > > >
> > > > public void write(Record<byte[]> record) throws Exception {
> > > >     Schema<byte[]> schema = record.getSchema();
> > > >    SchemaType type = schema.getSchemaInfo().getType();
> > > >    Object decoded = record.getValue();
> > > > }
> > > >
> > > > if I have Sink<Object> it becomes
> > > >
> > > > public void write(Object record) throws Exception {
> > > >     Schema<Object> schema = record.getSchema();
> > > >    SchemaType type = schema.getSchemaInfo().getType();
> > > >    Object decoded = record.getValue();
> > > >
> > > >    // switch on SchemaType ....
> > > > }
> > > >
> > > >
> > > > >
> > > > > *Problem 2: Pulsar IO Record does not handle non-String keys*
> > > > >
> > > > > The reason why Pulsar IO Record only supports specifying a key as a
> > > > string
> > > > > is because Pulsar only supports specifying keys as strings. The key
> > > > > specified in Pulsar IO Record is also used as the partition key to
> > > > > determine which partition a message will be published into.  If the
> > key
> > > > > returned from Kafka connect connector also needs to be used as the
> > > > > partition key then the method you specified will not work.  You
> will
> > > > still
> > > > > need to translate the key back into a string or implement your own
> > custom
> > > > > message router.
> > > >
> > > > At the low level this is true, but we already have KeyValue#SEPARATED
> > > > that brings these features:
> > > > - you have a schema for the Key
> > > > - you have a schema for the Value
> > > > - your Key is stored in the Message key, and you can think it as an
> > > > Object (it is encoded to byte[] using the KeySchema and it is stored
> > > > in base64 as string into the message key)
> > > >
> > > > It is not super efficient, due to the base64 encoding, but it is a
> > > > good tradeoff and it is already supported by Pulsar.
> > > >
> > > > >
> > > > > Can you also explain how the additional methods you are suggesting
> > to add
> > > > > will be used?  What entities will be implementing methods and
> > setting the
> > > > > value? What entities will use those values? And how those values
> > will be
> > > > > used.
> > > >
> > > > If you are using KeyValue (especially in SEPARATED flavour) you
> > > > essentially will see a record as a key value pair of <Object,
> Object>,
> > > > with the key stored in the message key.
> > > >
> > > > Most of these ideas come from porting connectors from Kafka Connect
> to
> > > > Pulsar IO.
> > > > I would like to start a separate thread about discussing a convention
> > > > for the mapping but the short version is:
> > > > When you come from Kafka you have a KeyValue<Object, Object>  in
> > > > SEPARATED encoding (this may be simplified in case of String or
> byte[]
> > > > keys, but let's stick to the most complicated case).
> > > > So your producer (probably a KafkaSource or a KafkaConnectSource)
> > > > writes to Pulsar a KeyValue and uses KeyValueSchema with SEPARATED
> > > > encoding.
> > > >
> > > > On the Sink side you have your connector that (in Kafka) is used to
> > > > deal with an Object key and an Object value.
> > > > With those new methods you can use the logical key and the logical
> > > > value (and you can use the keySchema and the valueSchema)
> > > > and you have 1-1 mapping with what you did in Kafka.
> > > >
> > > > This is particularly important if you have users that are trying to
> > > > migrate from Kafka to Pulsar, and there is no easy and no standard
> way
> > > > to do it
> > > >
> > > > >
> > > > >
> > > > > *Problem 3:  Pulsar IO Records do not allow NULL values*
> > > > > Don't have a problem with this.  The use case you provided makes
> > sense.
> > > >
> > > > I will send a patch, thanks
> > > >
> > > > Enrico
> > > >
> > > > >
> > > > > On Wed, Mar 17, 2021 at 2:04 AM Enrico Olivelli <
> eolive...@gmail.com
> > >
> > > > wrote:
> > > > >
> > > > > > Hello everyone,
> > > > > >
> > > > > > I would like to share some proposals about Pulsar IO.  Based on
> my
> > > > > > experience porting a nontrivial connector for Cassandra from
> Kafka
> > > > > > Connect to Pulsar IO, I would like to identify some places where
> we
> > > > > > can improve the Pulsar IO API for both native Pulsar connectors
> > and to
> > > > > > improve feature parity with Kafka Connect.  Ultimately, I would
> > like
> > > > > > to provide a Kafka Connect compatibility library so that people
> can
> > > > > > use arbitrary Kafka connectors with Pulsar without having to port
> > each
> > > > > > one.
> > > > > >
> > > > > > Problem 1: Pulsar Sinks must declare at compile time the data
> type
> > they
> > > > > > support
> > > > > >
> > > > > > You can have a Sink<byte[]>, Sink<GenericRecord>, Sink<KeyValue>,
> > but
> > > > > > you cannot have only one single Sink implementation that deals
> with
> > > > > > every kind of schema.
> > > > > >
> > > > > > In Kafka Connect you are used to dealing with Object at compile
> > time
> > > > > > and deal with data types and Schemas in the implementation.
> > > > > >
> > > > > > So I would like to see Sink<Object> and let the implementation
> deal
> > > > > > with Record<Object> that will give access to the decoded payload
> > and
> > > > > > to the Schema.
> > > > > >
> > > > > > I have started a work to support Schema<Object> here, that is the
> > > > > > first step in the direction of supporting Sink<Object>
> > > > > > https://github.com/apache/pulsar/pull/9895
> > > > > >
> > > > > >
> > > > > > Problem 2: Pulsar IO Record does not handle non-String keys
> > > > > >
> > > > > > We only have Record#getKey() that returns an Optional<String>. So
> > even
> > > > > > if you are using KeyValue, you cannot access the logical value of
> > the
> > > > > > key.
> > > > > >
> > > > > > The idea is to use KeyValue as foundation for supporting Schema
> in
> > the
> > > > > > Key (this is nothing new, but it is still not much supported in
> > Pulsar
> > > > > > IO)
> > > > > >
> > > > > > Kafka connect users deal with Object keys and we should provide
> the
> > > > > > same support.
> > > > > > We already have support for Object keys in case of KeyValue.
> > > > > > The proposal I am working with my colleagues is to add Object
> > > > > > getLogicalKey() to the Record interface.
> > > > > >
> > > > > > interface Record<T> {
> > > > > >
> > > > > >       Optional<Object> getLogicalKey();
> > > > > >       Optional<Object> getLogicalValue();
> > > > > >       Schema<T> getKeySchema();
> > > > > >       Schema<T> getValueSchema();
> > > > > >
> > > > > >
> > > > > >       Optional<String> getKey();
> > > > > >       Object getValue();
> > > > > >       Schema<T> getSchema();
> > > > > > }
> > > > > >
> > > > > > For messages that are using byte[] key
> > (Message#hasBase64EncodedKey)
> > > > > > the getLogicalKey() returns the same as Message#getKeyBytes() and
> > > > > > getKeySchema will return Schema.BYTES.
> > > > > >
> > > > > > For regular messages getKeySchema() returns Schema.STRING.
> > > > > >
> > > > > > For messages that are using the KeyValue schema the new methods
> > will
> > > > > > return the corresponding fields and metadata.
> > > > > >
> > > > > > getLogicalKey -> the key of the KeyValue
> > > > > > getLogicalValue -> the value of KeyValue
> > > > > > getKeySchema -> the schema for the logical key
> > > > > > getValueSchema -> the schema for the logical value
> > > > > > getKey -> the key of the message, as usual, for backward
> > compatibility
> > > > > > getSchema -> the KeyValueSchema, as usual, for backward
> > compatibility
> > > > > > getValue -> the KeyValue object, as usual, for backward
> > compatibility
> > > > > >
> > > > > > Please note that we already have a KVRecord interface that
> exposes
> > key
> > > > > > and value schema, but it is not used while pushing data to Sinks
> > (it
> > > > > > looks like it is only used by KafkaConnectSource):
> > > > > >
> > > > > > public interface KVRecord<K, V> extends Record {
> > > > > >    Schema<K> getKeySchema();
> > > > > >    Schema<V> getValueSchema();
> > > > > >    KeyValueEncodingType getKeyValueEncodingType();
> > > > > > }
> > > > > >
> > > > > > Problem 3:  Pulsar IO Records do not allow NULL values
> > > > > >
> > > > > > Currently there is no support for NULL values in Pulsar IO.
> > > > > > This case makes sense if you are storing information in the key
> as
> > > > well.
> > > > > >
> > > > > > Example use case: use the topic as a changelog of a database, put
> > the
> > > > > > PK into the key and the rest of the record into the value. Use
> the
> > > > > > convention that a NULL value means a DELETE operation and a non
> > NULL
> > > > > > value is an UPSERT.
> > > > > >
> > > > > > Thoughts ?
> > > > > >
> > > > > > Enrico Olivelli
> > > > > >
> > > >
> >
>

Reply via email to