Il giorno ven 19 mar 2021 alle ore 08:20 Jerry Peng
<jerry.boyang.p...@gmail.com> ha scritto:
>
> > But it doesn't work for Sinks
> currently if you declare a Sink<byte[]> you will see as schema Schema.BYTES.
> If we had a Schema object that is preconfigured for decoding the
> payload probably we will be done.
>
> If you declare sink as Sink<byte[]> there should be no schema.  I am not
> sure what getSchema() returns but the original design is that if the
> declared type is byte[] indicates that there is no schema.  This also done
> for backwards compatibility reasons as schema has not always been around.
>
> I still don't quite understand why you can not just implement your own
> de-serialization method in the sink? That deserializes byte[] into whatever
> you want it to be.
>
> public void write(Record<byte[]> record) throws Exception {
>     byte[] val = record.getValue();
>     SomeObject foo = my_decode_method(val);
>     ....
> }

One problem is that I do not know how to code "my_decode_method"
without Schema information.
If we had the Schema that is used for the Record then the approach of
using Sink[] may work. In fact it works perfectly for Source<byte[]>.
This is not the case, because for Sink<byte[]> record.getSchema()
returns byte[] and we are not activating Schema.OBJECT() (AutoConsume)
mechanism that downloads
and applies automatically the schema.

The second problem is about making it easy for users to build Pulsar
IO Sinks that are schema agnostic at compile time.

public class MySink implements Sink<Object> {
    public void write(Record<Object> record) {
          Object value = record.getValue(); --> automatically decoded by Pulsar
          Schema schema = record.getSchema() -> this is the actual
schema for the message (it may vary from message to message)
   }
}

Currently you have to define at compile time the type of Schema you
want to consume, this is fine for custom Sinks or for very simple
Sinks.
But it does not work for general purpose Sinks.

I am putting up a GDoc with the full list of requirements and proposals



>
> > Problem 2: Pulsar IO Record does not handle non-String keys
>
> I do not believe you need to add framework level support for this.  This
> can be done at a higher layer i.e. the compatibility layer of kafka connect
> and Pulsar IO.  The compatibility layer just needs to map a kafka connect
> key of object type into string representation.

We are already supporting non-String keys in Pulsar with KeyValue,
there is partial support for KeyValue on Sources (with KVRecord)
we just have to complete the work and allow Sinks to deal
automatically with KeyValue messages.



>
> I am glad we are trying to build a compatibility layer so Kafka connect
> connectors can run on Pulsar IO, but in doing so, let us not just change
> Pulsar IO to resemble Kafka connect. In general, we should be extremely
> cautious about introducing new APIs at the framework layer.  If something
> can be implemented at a higher layer then we should implement it there and
> not try to push it down to the framework layer.
>
> Best,
>
> Jerry
>
> On Thu, Mar 18, 2021 at 1:38 AM Enrico Olivelli <eolive...@gmail.com> wrote:
>
> > Jerry
> >
> > Il giorno gio 18 mar 2021 alle ore 03:07 Jerry Peng
> > <jerry.boyang.p...@gmail.com> ha scritto:
> > >
> > > Hi Enrico,
> > >
> > > Thanks for taking the initiative for improvements on Pulsar IO!
> > >
> > > I have questions in regards to the following statements
> > >
> > >
> > > *Problem 1: Pulsar Sinks must declare at compile time the data type they
> > > support*
> > > > So I would like to see Sink<Object> and let the implementation deal
> > > with Record<Object> that will give access to the decoded payload and
> > > to the Schema.
> > >
> > > What is the point of having a schema for a generic POJO object?  The
> > point
> > > of a schema is to provide boundaries on what the structure of the data
> > can
> > > be.  However, having a schema that supports a generic POJO that can have
> > > any structure seems paradoxical to me.
> > >
> > > Currently, the input schema and output schema for source, sink, and
> > > functions can be undefined i.e. there is no schema, which means that the
> > > source/sink/function will either write, read and write, or just read
> > byte[]
> > > which is the most generic data representation there can be.  Why is this
> > > not sufficient for your use case?  Can the Pulsar IO wrapper code for
> > Kafka
> > > connect just read or write byte[] and let the high up abstractions
> > > determine how to deserialize it?
> >
> > For Sources you can have a Source<byte[]> and you can write records
> > with any schema.
> > This is good and it works well.
> >
> > But it doesn't work for Sinks
> > currently if you declare a Sink<byte[]> you will see as schema
> > Schema.BYTES.
> > If we had a Schema object that is preconfigured for decoding the
> > payload probably we will be done.
> > Generic sinks will be Sink<byte[]> and they will decode the payload with:
> >
> > public void write(Record<byte[]> record) throws Exception {
> >     Schema<?> schema = record.getSchema();
> >    SchemaType type = schema.getSchemaInfo().getType();
> >    Object decoded = schema.decode(record.getValue());
> > }
> >
> > but this does not work, because we have a strongly typed system and
> > you MUST write:
> >
> > public void write(Record<byte[]> record) throws Exception {
> >     Schema<byte[]> schema = record.getSchema();
> >    SchemaType type = schema.getSchemaInfo().getType();
> >    Object decoded = record.getValue();
> > }
> >
> > if I have Sink<Object> it becomes
> >
> > public void write(Object record) throws Exception {
> >     Schema<Object> schema = record.getSchema();
> >    SchemaType type = schema.getSchemaInfo().getType();
> >    Object decoded = record.getValue();
> >
> >    // switch on SchemaType ....
> > }
> >
> >
> > >
> > > *Problem 2: Pulsar IO Record does not handle non-String keys*
> > >
> > > The reason why Pulsar IO Record only supports specifying a key as a
> > string
> > > is because Pulsar only supports specifying keys as strings. The key
> > > specified in Pulsar IO Record is also used as the partition key to
> > > determine which partition a message will be published into.  If the key
> > > returned from Kafka connect connector also needs to be used as the
> > > partition key then the method you specified will not work.  You will
> > still
> > > need to translate the key back into a string or implement your own custom
> > > message router.
> >
> > At the low level this is true, but we already have KeyValue#SEPARATED
> > that brings these features:
> > - you have a schema for the Key
> > - you have a schema for the Value
> > - your Key is stored in the Message key, and you can think it as an
> > Object (it is encoded to byte[] using the KeySchema and it is stored
> > in base64 as string into the message key)
> >
> > It is not super efficient, due to the base64 encoding, but it is a
> > good tradeoff and it is already supported by Pulsar.
> >
> > >
> > > Can you also explain how the additional methods you are suggesting to add
> > > will be used?  What entities will be implementing methods and setting the
> > > value? What entities will use those values? And how those values will be
> > > used.
> >
> > If you are using KeyValue (especially in SEPARATED flavour) you
> > essentially will see a record as a key value pair of <Object, Object>,
> > with the key stored in the message key.
> >
> > Most of these ideas come from porting connectors from Kafka Connect to
> > Pulsar IO.
> > I would like to start a separate thread about discussing a convention
> > for the mapping but the short version is:
> > When you come from Kafka you have a KeyValue<Object, Object>  in
> > SEPARATED encoding (this may be simplified in case of String or byte[]
> > keys, but let's stick to the most complicated case).
> > So your producer (probably a KafkaSource or a KafkaConnectSource)
> > writes to Pulsar a KeyValue and uses KeyValueSchema with SEPARATED
> > encoding.
> >
> > On the Sink side you have your connector that (in Kafka) is used to
> > deal with an Object key and an Object value.
> > With those new methods you can use the logical key and the logical
> > value (and you can use the keySchema and the valueSchema)
> > and you have 1-1 mapping with what you did in Kafka.
> >
> > This is particularly important if you have users that are trying to
> > migrate from Kafka to Pulsar, and there is no easy and no standard way
> > to do it
> >
> > >
> > >
> > > *Problem 3:  Pulsar IO Records do not allow NULL values*
> > > Don't have a problem with this.  The use case you provided makes sense.
> >
> > I will send a patch, thanks
> >
> > Enrico
> >
> > >
> > > On Wed, Mar 17, 2021 at 2:04 AM Enrico Olivelli <eolive...@gmail.com>
> > wrote:
> > >
> > > > Hello everyone,
> > > >
> > > > I would like to share some proposals about Pulsar IO.  Based on my
> > > > experience porting a nontrivial connector for Cassandra from Kafka
> > > > Connect to Pulsar IO, I would like to identify some places where we
> > > > can improve the Pulsar IO API for both native Pulsar connectors and to
> > > > improve feature parity with Kafka Connect.  Ultimately, I would like
> > > > to provide a Kafka Connect compatibility library so that people can
> > > > use arbitrary Kafka connectors with Pulsar without having to port each
> > > > one.
> > > >
> > > > Problem 1: Pulsar Sinks must declare at compile time the data type they
> > > > support
> > > >
> > > > You can have a Sink<byte[]>, Sink<GenericRecord>, Sink<KeyValue>, but
> > > > you cannot have only one single Sink implementation that deals with
> > > > every kind of schema.
> > > >
> > > > In Kafka Connect you are used to dealing with Object at compile time
> > > > and deal with data types and Schemas in the implementation.
> > > >
> > > > So I would like to see Sink<Object> and let the implementation deal
> > > > with Record<Object> that will give access to the decoded payload and
> > > > to the Schema.
> > > >
> > > > I have started a work to support Schema<Object> here, that is the
> > > > first step in the direction of supporting Sink<Object>
> > > > https://github.com/apache/pulsar/pull/9895
> > > >
> > > >
> > > > Problem 2: Pulsar IO Record does not handle non-String keys
> > > >
> > > > We only have Record#getKey() that returns an Optional<String>. So even
> > > > if you are using KeyValue, you cannot access the logical value of the
> > > > key.
> > > >
> > > > The idea is to use KeyValue as foundation for supporting Schema in the
> > > > Key (this is nothing new, but it is still not much supported in Pulsar
> > > > IO)
> > > >
> > > > Kafka connect users deal with Object keys and we should provide the
> > > > same support.
> > > > We already have support for Object keys in case of KeyValue.
> > > > The proposal I am working with my colleagues is to add Object
> > > > getLogicalKey() to the Record interface.
> > > >
> > > > interface Record<T> {
> > > >
> > > >       Optional<Object> getLogicalKey();
> > > >       Optional<Object> getLogicalValue();
> > > >       Schema<T> getKeySchema();
> > > >       Schema<T> getValueSchema();
> > > >
> > > >
> > > >       Optional<String> getKey();
> > > >       Object getValue();
> > > >       Schema<T> getSchema();
> > > > }
> > > >
> > > > For messages that are using byte[] key (Message#hasBase64EncodedKey)
> > > > the getLogicalKey() returns the same as Message#getKeyBytes() and
> > > > getKeySchema will return Schema.BYTES.
> > > >
> > > > For regular messages getKeySchema() returns Schema.STRING.
> > > >
> > > > For messages that are using the KeyValue schema the new methods will
> > > > return the corresponding fields and metadata.
> > > >
> > > > getLogicalKey -> the key of the KeyValue
> > > > getLogicalValue -> the value of KeyValue
> > > > getKeySchema -> the schema for the logical key
> > > > getValueSchema -> the schema for the logical value
> > > > getKey -> the key of the message, as usual, for backward compatibility
> > > > getSchema -> the KeyValueSchema, as usual, for backward compatibility
> > > > getValue -> the KeyValue object, as usual, for backward compatibility
> > > >
> > > > Please note that we already have a KVRecord interface that exposes key
> > > > and value schema, but it is not used while pushing data to Sinks (it
> > > > looks like it is only used by KafkaConnectSource):
> > > >
> > > > public interface KVRecord<K, V> extends Record {
> > > >    Schema<K> getKeySchema();
> > > >    Schema<V> getValueSchema();
> > > >    KeyValueEncodingType getKeyValueEncodingType();
> > > > }
> > > >
> > > > Problem 3:  Pulsar IO Records do not allow NULL values
> > > >
> > > > Currently there is no support for NULL values in Pulsar IO.
> > > > This case makes sense if you are storing information in the key as
> > well.
> > > >
> > > > Example use case: use the topic as a changelog of a database, put the
> > > > PK into the key and the rest of the record into the value. Use the
> > > > convention that a NULL value means a DELETE operation and a non NULL
> > > > value is an UPSERT.
> > > >
> > > > Thoughts ?
> > > >
> > > > Enrico Olivelli
> > > >
> >

Reply via email to