Yunze Xu <y...@streamnative.io.invalid> 于2022年12月14日周三 20:37写道:
>
> > how do you can create two Student.class in one java process? and use
> the same namespace?
>
> Could you give an example to show how `AUTO_PRODUCE` schema makes a 
> difference?

// this is Student use version0, may be data from kafka
byte[] student1 = autoConsumer.receive().getData();
// this is Student use version1, may be data from kafka
byte[] student2 = autoConsumer.receive().getData();
// send student with version0 schema date
p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(SchemaDefinition.builder()
        .withJsonDef("student with version0 json def").build())))
        .value(student1).send();

// send student with version1 schema date
p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(SchemaDefinition.builder()
        .withJsonDef("student with version1 json def").build())))
        .value(student1).send();

>
> But with AUTO_PRODUCE schema, the precondition is that we have a topic
> that has messages of these two schemas.
>
> For example, there is a `bytes-topic` without schema that has two messages:
> - msg0: Serialized from `new Student("abc")` (schema v0)
> - msg1: Serialized from `new Student("abc", 1)` (schema v1)
>
> Then you can consume these bytes, and send the messages to **a topic
> that has registered a schema**.
> - If the schema is v0, it's okay to send msg0 and msg1 to the topic.
> But the msg1 will lose some bytes because the schema v0 doesn't have
> the `age` field.
> - If the schema is v1, msg0 cannot be sent because msg0 doesn't have
> the `age` field.
>
> So which schema did you expect for this topic?
if you use AUTO_PRODUCE_BYTES, the message will have the correct schema version.
link code: 
https://github.com/apache/pulsar/blob/4129583c418dd68f8303dee601132e2910cdf8e6/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L718-L746

the msg0 will be sent with schema v0
this msg1 will be sent with schema v1
>
> This example also shows AUTO_PRODUCE schema performs validation at
> producer side.
>
> However, if we just send msg0 and msg1 to a topic without schema. Then
> it will be consumer's responsibility to determine whether the received
> message is valid.
>
> ```java
> var bytes = consumer.receive(); // bytes
> var student = Schema.AVRO(Student.class).decode(bytes);
> ```
>
> - If the `Student` is v0, msg0 and msg1 can be decoded successfully.
> - If the `Student` is v1, decoding msg0 will throw an exception.
>
> Since all messages are stored in the topic, the downstream side
> (consumer) can catch the exception to discard the bytes without the
> expected schema.
>
> But if the validation fails at the producer side, there is a chance
> that msg0 is lost. In addition, let's see the producer and consumer
> code in this case.
>
> ```
> producer.send(msg0); // validation happens at the producer side
> ```
>
> ```
> var msg = consumer.receive();
> var student = msg.getValue(); // validation happens again, though it
> has already been validated before
> ```
>
> Thanks,
> Yunze
>
> On Wed, Dec 14, 2022 at 3:11 PM 丛搏 <congbobo...@gmail.com> wrote:
> >
> > >
> > > > the user only creates one producer to send all Kafka topic data, if
> > > using Pulsar schema, the user needs to create all schema producers in
> > > a map
> > >
> > > It doesn't make sense to me. If the source topic has messages of
> > > multiple schemas, why did you try to sink them into the same topic
> > > with a schema? The key point of AUTO_PRODUCE schema is to download the
> > > schema to validate the source messages. But if the schema of the topic
> > > evolved, the left messages from the source topic could not be sent to
> > > the topic.
> > >
> > Let me give you an example, AvroSchema will have multi-version,
> > the version(0) :
> > Student {
> > String name;
> > }
> > the version(1) :
> > Student {
> > String name;
> > int age;
> > }
> > how do you can create two Student.class in one java process? and use
> > the same namespace?
> > It's not only the schema type changes it also will have multi-version 
> > schema.
> > In this case, how do you create two producers with version(0) and 
> > version(1)?
> >
> > > The most confusing part is that AUTO_PRODUCE schema will perform
> > > message format validation before send. It's transparent to users and
> > > intuitive. IMO, it's better to call validate explicitly like
> > >
> > > ```java
> > > producer.newMessage().value(bytes).validate().sendAsync();
> > > ```
> > >
> > > There are two benefits:
> > > 1. It's clear that the message validation happens before sending.
> > > 2. If users don't want to validate before sending, they can choose to
> > > send the bytes directly and validate the message during consumption.
> > It only uses `schema.validate()` is enough, data validation does not
> > belong to the pulsar message, and we can add a usage description in
> > the schema doc.
> > >
> > > The performance problem of the AUTO_PRODUCE schema is that the
> > > validation happens twice and it cannot be controlled.
> >
> > Our data verification is the behavior of the client, not the behavior
> > of the broker. Therefore, we cannot effectively verify that bytes are
> > generated by a specific schema. I think this is something that users
> > should consider rather than something that pulsar should guarantee
> > because you can't control the data sent by users that is generated by
> > this schema only for client verification. so, we don't need to verify
> > twice. Unless we verify in the broker, but this is an overhead, we can
> > add config to control, but is it really necessary?
> >
> > Thanks,
> > Bo
> >
> > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月14日周三 12:40写道:
> > >
> > > > the user only creates one producer to send all Kafka topic data, if
> > > using Pulsar schema, the user needs to create all schema producers in
> > > a map
> > >
> > > It doesn't make sense to me. If the source topic has messages of
> > > multiple schemas, why did you try to sink them into the same topic
> > > with a schema? The key point of AUTO_PRODUCE schema is to download the
> > > schema to validate the source messages. But if the schema of the topic
> > > evolved, the left messages from the source topic could not be sent to
> > > the topic.
> > >
> > > The most confusing part is that AUTO_PRODUCE schema will perform
> > > message format validation before send. It's transparent to users and
> > > intuitive. IMO, it's better to call validate explicitly like
> > >
> > > ```java
> > > producer.newMessage().value(bytes).validate().sendAsync();
> > > ```
> > >
> > > There are two benefits:
> > > 1. It's clear that the message validation happens before sending.
> > > 2. If users don't want to validate before sending, they can choose to
> > > send the bytes directly and validate the message during consumption.
> > >
> > > The performance problem of the AUTO_PRODUCE schema is that the
> > > validation happens twice and it cannot be controlled.
> > >
> > > Thanks,
> > > Yunze
> > >
> > > On Wed, Dec 14, 2022 at 12:01 PM 丛搏 <bog...@apache.org> wrote:
> > > >
> > > > Hi, Yunze:
> > > >
> > > > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月14日周三 02:26写道:
> > > >
> > > > > First, how do you guarantee the schema can be used to encode the raw
> > > > > bytes whose format is unknown?
> > > > I think this is what the user needs to ensure that the user knows all
> > > > the schema from the Kafka topic and the date(bytes[]) that the user
> > > > can send with a pulsar schema
> > > > >
> > > > > Second, messages that cannot be encoded by the schema can only be
> > > > > discarded, i.e. message lost.
> > > > If the encoding fails, it proves that the user does not know how to
> > > > convert Kafka date's schema to pulsar schema, which is the user's own
> > > > problem.
> > > > >
> > > > > Third, schema in Pulsar is convenient because it can support sending
> > > > > any object of type `T` and the Pulsar client is responsible to
> > > > > serialize `T` to the bytes. However, when using AUTO_PRODUCE schema,
> > > > > the producer still sends raw bytes.
> > > > the user only creates one producer to send all Kafka topic data, if
> > > > using Pulsar schema, the user needs to create all schema producers in
> > > > a map, and get the schema producer to send a message.
> > > >
> > > >
> > > > In my understanding, AUTO_PRODUCE mainly reduces the number of
> > > > producers created by the client, which will bring convenience to users
> > > > in migrating data. Instead of dealing with unknown schema data. If you
> > > > want to use it correctly, you must know the schema of all data, which
> > > > can be converted into a pulsar schema. Otherwise, it would be best if
> > > > you handled it yourself using the bytes schema.
> > > >
> > > > Thanks,
> > > > Bo

Reply via email to