We also can use BYTES producer, but in BYTES schema, do not use .newMessage(schema0), the message will not carry the schema version. the consumer will not decode correctly.
and BYTES schema can't validate the data schema. if the data is empty bytes array, It does not make sense to send it to the broker. It is irresponsible behavior of the producer to leave everything to the consumer. I think AUTO_PRODUCER simplifies the data validation process for users. I think what we need to do is describe the document clearly and distinguish it from BYTES rather than delete or deprecate it. Thanks, Bo Yunze Xu <y...@streamnative.io.invalid> 于2022年12月14日周三 23:36写道: > > Why not use the following code with a BYTES producer in your case? > > ```java > var schema0 = Schema.AVRO(SchemaDefinition.builder() > .withJsonDef("student with version0 json def").build(); > p.newMessage(schema0).value(schema0.decode(student1)).send(); > ... > ``` > > Thanks, > Yunze > > On Wed, Dec 14, 2022 at 10:37 PM 丛搏 <congbobo...@gmail.com> wrote: > > > > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月14日周三 20:37写道: > > > > > > > how do you can create two Student.class in one java process? and use > > > the same namespace? > > > > > > Could you give an example to show how `AUTO_PRODUCE` schema makes a > > > difference? > > > > // this is Student use version0, may be data from kafka > > byte[] student1 = autoConsumer.receive().getData(); > > // this is Student use version1, may be data from kafka > > byte[] student2 = autoConsumer.receive().getData(); > > // send student with version0 schema date > > p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(SchemaDefinition.builder() > > .withJsonDef("student with version0 json def").build()))) > > .value(student1).send(); > > > > // send student with version1 schema date > > p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(SchemaDefinition.builder() > > .withJsonDef("student with version1 json def").build()))) > > .value(student1).send(); > > > > > > > > But with AUTO_PRODUCE schema, the precondition is that we have a topic > > > that has messages of these two schemas. > > > > > > For example, there is a `bytes-topic` without schema that has two > > > messages: > > > - msg0: Serialized from `new Student("abc")` (schema v0) > > > - msg1: Serialized from `new Student("abc", 1)` (schema v1) > > > > > > Then you can consume these bytes, and send the messages to **a topic > > > that has registered a schema**. > > > - If the schema is v0, it's okay to send msg0 and msg1 to the topic. > > > But the msg1 will lose some bytes because the schema v0 doesn't have > > > the `age` field. > > > - If the schema is v1, msg0 cannot be sent because msg0 doesn't have > > > the `age` field. > > > > > > So which schema did you expect for this topic? > > if you use AUTO_PRODUCE_BYTES, the message will have the correct schema > > version. > > link code: > > https://github.com/apache/pulsar/blob/4129583c418dd68f8303dee601132e2910cdf8e6/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L718-L746 > > > > the msg0 will be sent with schema v0 > > this msg1 will be sent with schema v1 > > > > > > This example also shows AUTO_PRODUCE schema performs validation at > > > producer side. > > > > > > However, if we just send msg0 and msg1 to a topic without schema. Then > > > it will be consumer's responsibility to determine whether the received > > > message is valid. > > > > > > ```java > > > var bytes = consumer.receive(); // bytes > > > var student = Schema.AVRO(Student.class).decode(bytes); > > > ``` > > > > > > - If the `Student` is v0, msg0 and msg1 can be decoded successfully. > > > - If the `Student` is v1, decoding msg0 will throw an exception. > > > > > > Since all messages are stored in the topic, the downstream side > > > (consumer) can catch the exception to discard the bytes without the > > > expected schema. > > > > > > But if the validation fails at the producer side, there is a chance > > > that msg0 is lost. In addition, let's see the producer and consumer > > > code in this case. > > > > > > ``` > > > producer.send(msg0); // validation happens at the producer side > > > ``` > > > > > > ``` > > > var msg = consumer.receive(); > > > var student = msg.getValue(); // validation happens again, though it > > > has already been validated before > > > ``` > > > > > > Thanks, > > > Yunze > > > > > > On Wed, Dec 14, 2022 at 3:11 PM 丛搏 <congbobo...@gmail.com> wrote: > > > > > > > > > > > > > > > the user only creates one producer to send all Kafka topic data, if > > > > > using Pulsar schema, the user needs to create all schema producers in > > > > > a map > > > > > > > > > > It doesn't make sense to me. If the source topic has messages of > > > > > multiple schemas, why did you try to sink them into the same topic > > > > > with a schema? The key point of AUTO_PRODUCE schema is to download the > > > > > schema to validate the source messages. But if the schema of the topic > > > > > evolved, the left messages from the source topic could not be sent to > > > > > the topic. > > > > > > > > > Let me give you an example, AvroSchema will have multi-version, > > > > the version(0) : > > > > Student { > > > > String name; > > > > } > > > > the version(1) : > > > > Student { > > > > String name; > > > > int age; > > > > } > > > > how do you can create two Student.class in one java process? and use > > > > the same namespace? > > > > It's not only the schema type changes it also will have multi-version > > > > schema. > > > > In this case, how do you create two producers with version(0) and > > > > version(1)? > > > > > > > > > The most confusing part is that AUTO_PRODUCE schema will perform > > > > > message format validation before send. It's transparent to users and > > > > > intuitive. IMO, it's better to call validate explicitly like > > > > > > > > > > ```java > > > > > producer.newMessage().value(bytes).validate().sendAsync(); > > > > > ``` > > > > > > > > > > There are two benefits: > > > > > 1. It's clear that the message validation happens before sending. > > > > > 2. If users don't want to validate before sending, they can choose to > > > > > send the bytes directly and validate the message during consumption. > > > > It only uses `schema.validate()` is enough, data validation does not > > > > belong to the pulsar message, and we can add a usage description in > > > > the schema doc. > > > > > > > > > > The performance problem of the AUTO_PRODUCE schema is that the > > > > > validation happens twice and it cannot be controlled. > > > > > > > > Our data verification is the behavior of the client, not the behavior > > > > of the broker. Therefore, we cannot effectively verify that bytes are > > > > generated by a specific schema. I think this is something that users > > > > should consider rather than something that pulsar should guarantee > > > > because you can't control the data sent by users that is generated by > > > > this schema only for client verification. so, we don't need to verify > > > > twice. Unless we verify in the broker, but this is an overhead, we can > > > > add config to control, but is it really necessary? > > > > > > > > Thanks, > > > > Bo > > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月14日周三 12:40写道: > > > > > > > > > > > the user only creates one producer to send all Kafka topic data, if > > > > > using Pulsar schema, the user needs to create all schema producers in > > > > > a map > > > > > > > > > > It doesn't make sense to me. If the source topic has messages of > > > > > multiple schemas, why did you try to sink them into the same topic > > > > > with a schema? The key point of AUTO_PRODUCE schema is to download the > > > > > schema to validate the source messages. But if the schema of the topic > > > > > evolved, the left messages from the source topic could not be sent to > > > > > the topic. > > > > > > > > > > The most confusing part is that AUTO_PRODUCE schema will perform > > > > > message format validation before send. It's transparent to users and > > > > > intuitive. IMO, it's better to call validate explicitly like > > > > > > > > > > ```java > > > > > producer.newMessage().value(bytes).validate().sendAsync(); > > > > > ``` > > > > > > > > > > There are two benefits: > > > > > 1. It's clear that the message validation happens before sending. > > > > > 2. If users don't want to validate before sending, they can choose to > > > > > send the bytes directly and validate the message during consumption. > > > > > > > > > > The performance problem of the AUTO_PRODUCE schema is that the > > > > > validation happens twice and it cannot be controlled. > > > > > > > > > > Thanks, > > > > > Yunze > > > > > > > > > > On Wed, Dec 14, 2022 at 12:01 PM 丛搏 <bog...@apache.org> wrote: > > > > > > > > > > > > Hi, Yunze: > > > > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月14日周三 02:26写道: > > > > > > > > > > > > > First, how do you guarantee the schema can be used to encode the > > > > > > > raw > > > > > > > bytes whose format is unknown? > > > > > > I think this is what the user needs to ensure that the user knows > > > > > > all > > > > > > the schema from the Kafka topic and the date(bytes[]) that the user > > > > > > can send with a pulsar schema > > > > > > > > > > > > > > Second, messages that cannot be encoded by the schema can only be > > > > > > > discarded, i.e. message lost. > > > > > > If the encoding fails, it proves that the user does not know how to > > > > > > convert Kafka date's schema to pulsar schema, which is the user's > > > > > > own > > > > > > problem. > > > > > > > > > > > > > > Third, schema in Pulsar is convenient because it can support > > > > > > > sending > > > > > > > any object of type `T` and the Pulsar client is responsible to > > > > > > > serialize `T` to the bytes. However, when using AUTO_PRODUCE > > > > > > > schema, > > > > > > > the producer still sends raw bytes. > > > > > > the user only creates one producer to send all Kafka topic data, if > > > > > > using Pulsar schema, the user needs to create all schema producers > > > > > > in > > > > > > a map, and get the schema producer to send a message. > > > > > > > > > > > > > > > > > > In my understanding, AUTO_PRODUCE mainly reduces the number of > > > > > > producers created by the client, which will bring convenience to > > > > > > users > > > > > > in migrating data. Instead of dealing with unknown schema data. If > > > > > > you > > > > > > want to use it correctly, you must know the schema of all data, > > > > > > which > > > > > > can be converted into a pulsar schema. Otherwise, it would be best > > > > > > if > > > > > > you handled it yourself using the bytes schema. > > > > > > > > > > > > Thanks, > > > > > > Bo