HI Enrico,
>
>
> One problem is that I do not know how to code "my_decode_method"
> without Schema information.
> If we had the Schema that is used for the Record then the approach of
> using Sink[] may work. In fact it works perfectly for Source<byte[]>.
> This is not the case, because for Sink<byte[]> record.getSchema()
> returns byte[] and we are not activating Schema.OBJECT() (AutoConsume)
> mechanism that downloads
> and applies automatically the schema.
> The second problem is about making it easy for users to build Pulsar
> IO Sinks that are schema agnostic at compile time.
> public class MySink implements Sink<Object> {
>     public void write(Record<Object> record) {
>           Object value = record.getValue(); --> automatically decoded by
> Pulsar
>           Schema schema = record.getSchema() -> this is the actual
> schema for the message (it may vary from message to message)
>    }
> }
> Currently you have to define at compile time the type of Schema you
> want to consume, this is fine for custom Sinks or for very simple
> Sinks.
> But it does not work for general purpose Sinks.
> I am putting up a GDoc with the full list of requirements and proposals


The problem seems to be with `getSchema()`  in Record and the subsequent
 `getSchema()` on the actual pulsar message.  For consumer declared like
Consumer<byte[]>, getSchema() calls on messages the consumer returns will
always be SCHEMA.BYTES, regardless of what the actual schema the message
was produced with.   The schema of message is overridden when byte[] is
specified as the schema for the consumer. This seems like a bug to me.
Perhaps Sijie can also chime in on why the behavior is such.


>
On Fri, Mar 19, 2021 at 12:46 AM Enrico Olivelli <eolive...@gmail.com>
wrote:

> Il giorno ven 19 mar 2021 alle ore 08:20 Jerry Peng
> <jerry.boyang.p...@gmail.com> ha scritto:
> >
> > > But it doesn't work for Sinks
> > currently if you declare a Sink<byte[]> you will see as schema
> Schema.BYTES.
> > If we had a Schema object that is preconfigured for decoding the
> > payload probably we will be done.
> >
> > If you declare sink as Sink<byte[]> there should be no schema.  I am not
> > sure what getSchema() returns but the original design is that if the
> > declared type is byte[] indicates that there is no schema.  This also
> done
> > for backwards compatibility reasons as schema has not always been around.
> >
> > I still don't quite understand why you can not just implement your own
> > de-serialization method in the sink? That deserializes byte[] into
> whatever
> > you want it to be.
> >
> > public void write(Record<byte[]> record) throws Exception {
> >     byte[] val = record.getValue();
> >     SomeObject foo = my_decode_method(val);
> >     ....
> > }
>
> One problem is that I do not know how to code "my_decode_method"
> without Schema information.
> If we had the Schema that is used for the Record then the approach of
> using Sink[] may work. In fact it works perfectly for Source<byte[]>.
> This is not the case, because for Sink<byte[]> record.getSchema()
> returns byte[] and we are not activating Schema.OBJECT() (AutoConsume)
> mechanism that downloads
> and applies automatically the schema.
>
> The second problem is about making it easy for users to build Pulsar
> IO Sinks that are schema agnostic at compile time.
>
> public class MySink implements Sink<Object> {
>     public void write(Record<Object> record) {
>           Object value = record.getValue(); --> automatically decoded by
> Pulsar
>           Schema schema = record.getSchema() -> this is the actual
> schema for the message (it may vary from message to message)
>    }
> }
>
> Currently you have to define at compile time the type of Schema you
> want to consume, this is fine for custom Sinks or for very simple
> Sinks.
> But it does not work for general purpose Sinks.
>
> I am putting up a GDoc with the full list of requirements and proposals
>
>
>
> >
> > > Problem 2: Pulsar IO Record does not handle non-String keys
> >
> > I do not believe you need to add framework level support for this.  This
> > can be done at a higher layer i.e. the compatibility layer of kafka
> connect
> > and Pulsar IO.  The compatibility layer just needs to map a kafka connect
> > key of object type into string representation.
>
> We are already supporting non-String keys in Pulsar with KeyValue,
> there is partial support for KeyValue on Sources (with KVRecord)
> we just have to complete the work and allow Sinks to deal
> automatically with KeyValue messages.
>
>
>
> >
> > I am glad we are trying to build a compatibility layer so Kafka connect
> > connectors can run on Pulsar IO, but in doing so, let us not just change
> > Pulsar IO to resemble Kafka connect. In general, we should be extremely
> > cautious about introducing new APIs at the framework layer.  If something
> > can be implemented at a higher layer then we should implement it there
> and
> > not try to push it down to the framework layer.
> >
> > Best,
> >
> > Jerry
> >
> > On Thu, Mar 18, 2021 at 1:38 AM Enrico Olivelli <eolive...@gmail.com>
> wrote:
> >
> > > Jerry
> > >
> > > Il giorno gio 18 mar 2021 alle ore 03:07 Jerry Peng
> > > <jerry.boyang.p...@gmail.com> ha scritto:
> > > >
> > > > Hi Enrico,
> > > >
> > > > Thanks for taking the initiative for improvements on Pulsar IO!
> > > >
> > > > I have questions in regards to the following statements
> > > >
> > > >
> > > > *Problem 1: Pulsar Sinks must declare at compile time the data type
> they
> > > > support*
> > > > > So I would like to see Sink<Object> and let the implementation deal
> > > > with Record<Object> that will give access to the decoded payload and
> > > > to the Schema.
> > > >
> > > > What is the point of having a schema for a generic POJO object?  The
> > > point
> > > > of a schema is to provide boundaries on what the structure of the
> data
> > > can
> > > > be.  However, having a schema that supports a generic POJO that can
> have
> > > > any structure seems paradoxical to me.
> > > >
> > > > Currently, the input schema and output schema for source, sink, and
> > > > functions can be undefined i.e. there is no schema, which means that
> the
> > > > source/sink/function will either write, read and write, or just read
> > > byte[]
> > > > which is the most generic data representation there can be.  Why is
> this
> > > > not sufficient for your use case?  Can the Pulsar IO wrapper code for
> > > Kafka
> > > > connect just read or write byte[] and let the high up abstractions
> > > > determine how to deserialize it?
> > >
> > > For Sources you can have a Source<byte[]> and you can write records
> > > with any schema.
> > > This is good and it works well.
> > >
> > > But it doesn't work for Sinks
> > > currently if you declare a Sink<byte[]> you will see as schema
> > > Schema.BYTES.
> > > If we had a Schema object that is preconfigured for decoding the
> > > payload probably we will be done.
> > > Generic sinks will be Sink<byte[]> and they will decode the payload
> with:
> > >
> > > public void write(Record<byte[]> record) throws Exception {
> > >     Schema<?> schema = record.getSchema();
> > >    SchemaType type = schema.getSchemaInfo().getType();
> > >    Object decoded = schema.decode(record.getValue());
> > > }
> > >
> > > but this does not work, because we have a strongly typed system and
> > > you MUST write:
> > >
> > > public void write(Record<byte[]> record) throws Exception {
> > >     Schema<byte[]> schema = record.getSchema();
> > >    SchemaType type = schema.getSchemaInfo().getType();
> > >    Object decoded = record.getValue();
> > > }
> > >
> > > if I have Sink<Object> it becomes
> > >
> > > public void write(Object record) throws Exception {
> > >     Schema<Object> schema = record.getSchema();
> > >    SchemaType type = schema.getSchemaInfo().getType();
> > >    Object decoded = record.getValue();
> > >
> > >    // switch on SchemaType ....
> > > }
> > >
> > >
> > > >
> > > > *Problem 2: Pulsar IO Record does not handle non-String keys*
> > > >
> > > > The reason why Pulsar IO Record only supports specifying a key as a
> > > string
> > > > is because Pulsar only supports specifying keys as strings. The key
> > > > specified in Pulsar IO Record is also used as the partition key to
> > > > determine which partition a message will be published into.  If the
> key
> > > > returned from Kafka connect connector also needs to be used as the
> > > > partition key then the method you specified will not work.  You will
> > > still
> > > > need to translate the key back into a string or implement your own
> custom
> > > > message router.
> > >
> > > At the low level this is true, but we already have KeyValue#SEPARATED
> > > that brings these features:
> > > - you have a schema for the Key
> > > - you have a schema for the Value
> > > - your Key is stored in the Message key, and you can think it as an
> > > Object (it is encoded to byte[] using the KeySchema and it is stored
> > > in base64 as string into the message key)
> > >
> > > It is not super efficient, due to the base64 encoding, but it is a
> > > good tradeoff and it is already supported by Pulsar.
> > >
> > > >
> > > > Can you also explain how the additional methods you are suggesting
> to add
> > > > will be used?  What entities will be implementing methods and
> setting the
> > > > value? What entities will use those values? And how those values
> will be
> > > > used.
> > >
> > > If you are using KeyValue (especially in SEPARATED flavour) you
> > > essentially will see a record as a key value pair of <Object, Object>,
> > > with the key stored in the message key.
> > >
> > > Most of these ideas come from porting connectors from Kafka Connect to
> > > Pulsar IO.
> > > I would like to start a separate thread about discussing a convention
> > > for the mapping but the short version is:
> > > When you come from Kafka you have a KeyValue<Object, Object>  in
> > > SEPARATED encoding (this may be simplified in case of String or byte[]
> > > keys, but let's stick to the most complicated case).
> > > So your producer (probably a KafkaSource or a KafkaConnectSource)
> > > writes to Pulsar a KeyValue and uses KeyValueSchema with SEPARATED
> > > encoding.
> > >
> > > On the Sink side you have your connector that (in Kafka) is used to
> > > deal with an Object key and an Object value.
> > > With those new methods you can use the logical key and the logical
> > > value (and you can use the keySchema and the valueSchema)
> > > and you have 1-1 mapping with what you did in Kafka.
> > >
> > > This is particularly important if you have users that are trying to
> > > migrate from Kafka to Pulsar, and there is no easy and no standard way
> > > to do it
> > >
> > > >
> > > >
> > > > *Problem 3:  Pulsar IO Records do not allow NULL values*
> > > > Don't have a problem with this.  The use case you provided makes
> sense.
> > >
> > > I will send a patch, thanks
> > >
> > > Enrico
> > >
> > > >
> > > > On Wed, Mar 17, 2021 at 2:04 AM Enrico Olivelli <eolive...@gmail.com
> >
> > > wrote:
> > > >
> > > > > Hello everyone,
> > > > >
> > > > > I would like to share some proposals about Pulsar IO.  Based on my
> > > > > experience porting a nontrivial connector for Cassandra from Kafka
> > > > > Connect to Pulsar IO, I would like to identify some places where we
> > > > > can improve the Pulsar IO API for both native Pulsar connectors
> and to
> > > > > improve feature parity with Kafka Connect.  Ultimately, I would
> like
> > > > > to provide a Kafka Connect compatibility library so that people can
> > > > > use arbitrary Kafka connectors with Pulsar without having to port
> each
> > > > > one.
> > > > >
> > > > > Problem 1: Pulsar Sinks must declare at compile time the data type
> they
> > > > > support
> > > > >
> > > > > You can have a Sink<byte[]>, Sink<GenericRecord>, Sink<KeyValue>,
> but
> > > > > you cannot have only one single Sink implementation that deals with
> > > > > every kind of schema.
> > > > >
> > > > > In Kafka Connect you are used to dealing with Object at compile
> time
> > > > > and deal with data types and Schemas in the implementation.
> > > > >
> > > > > So I would like to see Sink<Object> and let the implementation deal
> > > > > with Record<Object> that will give access to the decoded payload
> and
> > > > > to the Schema.
> > > > >
> > > > > I have started a work to support Schema<Object> here, that is the
> > > > > first step in the direction of supporting Sink<Object>
> > > > > https://github.com/apache/pulsar/pull/9895
> > > > >
> > > > >
> > > > > Problem 2: Pulsar IO Record does not handle non-String keys
> > > > >
> > > > > We only have Record#getKey() that returns an Optional<String>. So
> even
> > > > > if you are using KeyValue, you cannot access the logical value of
> the
> > > > > key.
> > > > >
> > > > > The idea is to use KeyValue as foundation for supporting Schema in
> the
> > > > > Key (this is nothing new, but it is still not much supported in
> Pulsar
> > > > > IO)
> > > > >
> > > > > Kafka connect users deal with Object keys and we should provide the
> > > > > same support.
> > > > > We already have support for Object keys in case of KeyValue.
> > > > > The proposal I am working with my colleagues is to add Object
> > > > > getLogicalKey() to the Record interface.
> > > > >
> > > > > interface Record<T> {
> > > > >
> > > > >       Optional<Object> getLogicalKey();
> > > > >       Optional<Object> getLogicalValue();
> > > > >       Schema<T> getKeySchema();
> > > > >       Schema<T> getValueSchema();
> > > > >
> > > > >
> > > > >       Optional<String> getKey();
> > > > >       Object getValue();
> > > > >       Schema<T> getSchema();
> > > > > }
> > > > >
> > > > > For messages that are using byte[] key
> (Message#hasBase64EncodedKey)
> > > > > the getLogicalKey() returns the same as Message#getKeyBytes() and
> > > > > getKeySchema will return Schema.BYTES.
> > > > >
> > > > > For regular messages getKeySchema() returns Schema.STRING.
> > > > >
> > > > > For messages that are using the KeyValue schema the new methods
> will
> > > > > return the corresponding fields and metadata.
> > > > >
> > > > > getLogicalKey -> the key of the KeyValue
> > > > > getLogicalValue -> the value of KeyValue
> > > > > getKeySchema -> the schema for the logical key
> > > > > getValueSchema -> the schema for the logical value
> > > > > getKey -> the key of the message, as usual, for backward
> compatibility
> > > > > getSchema -> the KeyValueSchema, as usual, for backward
> compatibility
> > > > > getValue -> the KeyValue object, as usual, for backward
> compatibility
> > > > >
> > > > > Please note that we already have a KVRecord interface that exposes
> key
> > > > > and value schema, but it is not used while pushing data to Sinks
> (it
> > > > > looks like it is only used by KafkaConnectSource):
> > > > >
> > > > > public interface KVRecord<K, V> extends Record {
> > > > >    Schema<K> getKeySchema();
> > > > >    Schema<V> getValueSchema();
> > > > >    KeyValueEncodingType getKeyValueEncodingType();
> > > > > }
> > > > >
> > > > > Problem 3:  Pulsar IO Records do not allow NULL values
> > > > >
> > > > > Currently there is no support for NULL values in Pulsar IO.
> > > > > This case makes sense if you are storing information in the key as
> > > well.
> > > > >
> > > > > Example use case: use the topic as a changelog of a database, put
> the
> > > > > PK into the key and the rest of the record into the value. Use the
> > > > > convention that a NULL value means a DELETE operation and a non
> NULL
> > > > > value is an UPSERT.
> > > > >
> > > > > Thoughts ?
> > > > >
> > > > > Enrico Olivelli
> > > > >
> > >
>

Reply via email to