Why not use the following code with a BYTES producer in your case?

```java
var schema0 = Schema.AVRO(SchemaDefinition.builder()
    .withJsonDef("student with version0 json def").build();
p.newMessage(schema0).value(schema0.decode(student1)).send();
...
```

Thanks,
Yunze

On Wed, Dec 14, 2022 at 10:37 PM 丛搏 <congbobo...@gmail.com> wrote:
>
> Yunze Xu <y...@streamnative.io.invalid> 于2022年12月14日周三 20:37写道:
> >
> > > how do you can create two Student.class in one java process? and use
> > the same namespace?
> >
> > Could you give an example to show how `AUTO_PRODUCE` schema makes a 
> > difference?
>
> // this is Student use version0, may be data from kafka
> byte[] student1 = autoConsumer.receive().getData();
> // this is Student use version1, may be data from kafka
> byte[] student2 = autoConsumer.receive().getData();
> // send student with version0 schema date
> p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(SchemaDefinition.builder()
>         .withJsonDef("student with version0 json def").build())))
>         .value(student1).send();
>
> // send student with version1 schema date
> p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(SchemaDefinition.builder()
>         .withJsonDef("student with version1 json def").build())))
>         .value(student1).send();
>
> >
> > But with AUTO_PRODUCE schema, the precondition is that we have a topic
> > that has messages of these two schemas.
> >
> > For example, there is a `bytes-topic` without schema that has two messages:
> > - msg0: Serialized from `new Student("abc")` (schema v0)
> > - msg1: Serialized from `new Student("abc", 1)` (schema v1)
> >
> > Then you can consume these bytes, and send the messages to **a topic
> > that has registered a schema**.
> > - If the schema is v0, it's okay to send msg0 and msg1 to the topic.
> > But the msg1 will lose some bytes because the schema v0 doesn't have
> > the `age` field.
> > - If the schema is v1, msg0 cannot be sent because msg0 doesn't have
> > the `age` field.
> >
> > So which schema did you expect for this topic?
> if you use AUTO_PRODUCE_BYTES, the message will have the correct schema 
> version.
> link code: 
> https://github.com/apache/pulsar/blob/4129583c418dd68f8303dee601132e2910cdf8e6/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L718-L746
>
> the msg0 will be sent with schema v0
> this msg1 will be sent with schema v1
> >
> > This example also shows AUTO_PRODUCE schema performs validation at
> > producer side.
> >
> > However, if we just send msg0 and msg1 to a topic without schema. Then
> > it will be consumer's responsibility to determine whether the received
> > message is valid.
> >
> > ```java
> > var bytes = consumer.receive(); // bytes
> > var student = Schema.AVRO(Student.class).decode(bytes);
> > ```
> >
> > - If the `Student` is v0, msg0 and msg1 can be decoded successfully.
> > - If the `Student` is v1, decoding msg0 will throw an exception.
> >
> > Since all messages are stored in the topic, the downstream side
> > (consumer) can catch the exception to discard the bytes without the
> > expected schema.
> >
> > But if the validation fails at the producer side, there is a chance
> > that msg0 is lost. In addition, let's see the producer and consumer
> > code in this case.
> >
> > ```
> > producer.send(msg0); // validation happens at the producer side
> > ```
> >
> > ```
> > var msg = consumer.receive();
> > var student = msg.getValue(); // validation happens again, though it
> > has already been validated before
> > ```
> >
> > Thanks,
> > Yunze
> >
> > On Wed, Dec 14, 2022 at 3:11 PM 丛搏 <congbobo...@gmail.com> wrote:
> > >
> > > >
> > > > > the user only creates one producer to send all Kafka topic data, if
> > > > using Pulsar schema, the user needs to create all schema producers in
> > > > a map
> > > >
> > > > It doesn't make sense to me. If the source topic has messages of
> > > > multiple schemas, why did you try to sink them into the same topic
> > > > with a schema? The key point of AUTO_PRODUCE schema is to download the
> > > > schema to validate the source messages. But if the schema of the topic
> > > > evolved, the left messages from the source topic could not be sent to
> > > > the topic.
> > > >
> > > Let me give you an example, AvroSchema will have multi-version,
> > > the version(0) :
> > > Student {
> > > String name;
> > > }
> > > the version(1) :
> > > Student {
> > > String name;
> > > int age;
> > > }
> > > how do you can create two Student.class in one java process? and use
> > > the same namespace?
> > > It's not only the schema type changes it also will have multi-version 
> > > schema.
> > > In this case, how do you create two producers with version(0) and 
> > > version(1)?
> > >
> > > > The most confusing part is that AUTO_PRODUCE schema will perform
> > > > message format validation before send. It's transparent to users and
> > > > intuitive. IMO, it's better to call validate explicitly like
> > > >
> > > > ```java
> > > > producer.newMessage().value(bytes).validate().sendAsync();
> > > > ```
> > > >
> > > > There are two benefits:
> > > > 1. It's clear that the message validation happens before sending.
> > > > 2. If users don't want to validate before sending, they can choose to
> > > > send the bytes directly and validate the message during consumption.
> > > It only uses `schema.validate()` is enough, data validation does not
> > > belong to the pulsar message, and we can add a usage description in
> > > the schema doc.
> > > >
> > > > The performance problem of the AUTO_PRODUCE schema is that the
> > > > validation happens twice and it cannot be controlled.
> > >
> > > Our data verification is the behavior of the client, not the behavior
> > > of the broker. Therefore, we cannot effectively verify that bytes are
> > > generated by a specific schema. I think this is something that users
> > > should consider rather than something that pulsar should guarantee
> > > because you can't control the data sent by users that is generated by
> > > this schema only for client verification. so, we don't need to verify
> > > twice. Unless we verify in the broker, but this is an overhead, we can
> > > add config to control, but is it really necessary?
> > >
> > > Thanks,
> > > Bo
> > >
> > > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月14日周三 12:40写道:
> > > >
> > > > > the user only creates one producer to send all Kafka topic data, if
> > > > using Pulsar schema, the user needs to create all schema producers in
> > > > a map
> > > >
> > > > It doesn't make sense to me. If the source topic has messages of
> > > > multiple schemas, why did you try to sink them into the same topic
> > > > with a schema? The key point of AUTO_PRODUCE schema is to download the
> > > > schema to validate the source messages. But if the schema of the topic
> > > > evolved, the left messages from the source topic could not be sent to
> > > > the topic.
> > > >
> > > > The most confusing part is that AUTO_PRODUCE schema will perform
> > > > message format validation before send. It's transparent to users and
> > > > intuitive. IMO, it's better to call validate explicitly like
> > > >
> > > > ```java
> > > > producer.newMessage().value(bytes).validate().sendAsync();
> > > > ```
> > > >
> > > > There are two benefits:
> > > > 1. It's clear that the message validation happens before sending.
> > > > 2. If users don't want to validate before sending, they can choose to
> > > > send the bytes directly and validate the message during consumption.
> > > >
> > > > The performance problem of the AUTO_PRODUCE schema is that the
> > > > validation happens twice and it cannot be controlled.
> > > >
> > > > Thanks,
> > > > Yunze
> > > >
> > > > On Wed, Dec 14, 2022 at 12:01 PM 丛搏 <bog...@apache.org> wrote:
> > > > >
> > > > > Hi, Yunze:
> > > > >
> > > > > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月14日周三 02:26写道:
> > > > >
> > > > > > First, how do you guarantee the schema can be used to encode the raw
> > > > > > bytes whose format is unknown?
> > > > > I think this is what the user needs to ensure that the user knows all
> > > > > the schema from the Kafka topic and the date(bytes[]) that the user
> > > > > can send with a pulsar schema
> > > > > >
> > > > > > Second, messages that cannot be encoded by the schema can only be
> > > > > > discarded, i.e. message lost.
> > > > > If the encoding fails, it proves that the user does not know how to
> > > > > convert Kafka date's schema to pulsar schema, which is the user's own
> > > > > problem.
> > > > > >
> > > > > > Third, schema in Pulsar is convenient because it can support sending
> > > > > > any object of type `T` and the Pulsar client is responsible to
> > > > > > serialize `T` to the bytes. However, when using AUTO_PRODUCE schema,
> > > > > > the producer still sends raw bytes.
> > > > > the user only creates one producer to send all Kafka topic data, if
> > > > > using Pulsar schema, the user needs to create all schema producers in
> > > > > a map, and get the schema producer to send a message.
> > > > >
> > > > >
> > > > > In my understanding, AUTO_PRODUCE mainly reduces the number of
> > > > > producers created by the client, which will bring convenience to users
> > > > > in migrating data. Instead of dealing with unknown schema data. If you
> > > > > want to use it correctly, you must know the schema of all data, which
> > > > > can be converted into a pulsar schema. Otherwise, it would be best if
> > > > > you handled it yourself using the bytes schema.
> > > > >
> > > > > Thanks,
> > > > > Bo

Reply via email to