> It is irresponsible behavior of the producer to leave everything to the > consumer.
I agreed now. > I think what we need to do is describe the document clearly IMO, it's a code problem because there is no exception signature for `TypedMessageBuilder#value` and `Message#getValue`. The application users should catch the exception. It could be better if the exception is thrown during `send` or `receive` and wrapped into `PulsarClientException`. When the input messages are raw bytes, we cannot guarantee the validation always succeeds because the schema might change. The exception is actually thrown in `TypedMessageBuilder#value`. But since these APIs are stable, we could only fix it by adding the documents to describe in which cases could `TypedMessageBuilder#value` and `Message#getValue` throw exceptions. Thanks, Yunze On Thu, Dec 15, 2022 at 12:48 PM 丛搏 <congbobo...@gmail.com> wrote: > > We also can use BYTES producer, but in BYTES schema, do not use > .newMessage(schema0), the message will not carry the schema version. > the consumer will not decode correctly. > > and BYTES schema can't validate the data schema. if the data is empty > bytes array, It does not make sense to send it to the broker. > > It is irresponsible behavior of the producer to leave everything to > the consumer. I think AUTO_PRODUCER simplifies the data validation > process for users. > > I think what we need to do is describe the document clearly and > distinguish it from BYTES rather than delete or deprecate it. > > Thanks, > Bo > > > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月14日周三 23:36写道: > > > > > Why not use the following code with a BYTES producer in your case? > > > > ```java > > var schema0 = Schema.AVRO(SchemaDefinition.builder() > > .withJsonDef("student with version0 json def").build(); > > p.newMessage(schema0).value(schema0.decode(student1)).send(); > > ... > > ``` > > > > Thanks, > > Yunze > > > > On Wed, Dec 14, 2022 at 10:37 PM 丛搏 <congbobo...@gmail.com> wrote: > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月14日周三 20:37写道: > > > > > > > > > how do you can create two Student.class in one java process? and use > > > > the same namespace? > > > > > > > > Could you give an example to show how `AUTO_PRODUCE` schema makes a > > > > difference? > > > > > > // this is Student use version0, may be data from kafka > > > byte[] student1 = autoConsumer.receive().getData(); > > > // this is Student use version1, may be data from kafka > > > byte[] student2 = autoConsumer.receive().getData(); > > > // send student with version0 schema date > > > p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(SchemaDefinition.builder() > > > .withJsonDef("student with version0 json def").build()))) > > > .value(student1).send(); > > > > > > // send student with version1 schema date > > > p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(SchemaDefinition.builder() > > > .withJsonDef("student with version1 json def").build()))) > > > .value(student1).send(); > > > > > > > > > > > But with AUTO_PRODUCE schema, the precondition is that we have a topic > > > > that has messages of these two schemas. > > > > > > > > For example, there is a `bytes-topic` without schema that has two > > > > messages: > > > > - msg0: Serialized from `new Student("abc")` (schema v0) > > > > - msg1: Serialized from `new Student("abc", 1)` (schema v1) > > > > > > > > Then you can consume these bytes, and send the messages to **a topic > > > > that has registered a schema**. > > > > - If the schema is v0, it's okay to send msg0 and msg1 to the topic. > > > > But the msg1 will lose some bytes because the schema v0 doesn't have > > > > the `age` field. > > > > - If the schema is v1, msg0 cannot be sent because msg0 doesn't have > > > > the `age` field. > > > > > > > > So which schema did you expect for this topic? > > > if you use AUTO_PRODUCE_BYTES, the message will have the correct schema > > > version. > > > link code: > > > https://github.com/apache/pulsar/blob/4129583c418dd68f8303dee601132e2910cdf8e6/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L718-L746 > > > > > > the msg0 will be sent with schema v0 > > > this msg1 will be sent with schema v1 > > > > > > > > This example also shows AUTO_PRODUCE schema performs validation at > > > > producer side. > > > > > > > > However, if we just send msg0 and msg1 to a topic without schema. Then > > > > it will be consumer's responsibility to determine whether the received > > > > message is valid. > > > > > > > > ```java > > > > var bytes = consumer.receive(); // bytes > > > > var student = Schema.AVRO(Student.class).decode(bytes); > > > > ``` > > > > > > > > - If the `Student` is v0, msg0 and msg1 can be decoded successfully. > > > > - If the `Student` is v1, decoding msg0 will throw an exception. > > > > > > > > Since all messages are stored in the topic, the downstream side > > > > (consumer) can catch the exception to discard the bytes without the > > > > expected schema. > > > > > > > > But if the validation fails at the producer side, there is a chance > > > > that msg0 is lost. In addition, let's see the producer and consumer > > > > code in this case. > > > > > > > > ``` > > > > producer.send(msg0); // validation happens at the producer side > > > > ``` > > > > > > > > ``` > > > > var msg = consumer.receive(); > > > > var student = msg.getValue(); // validation happens again, though it > > > > has already been validated before > > > > ``` > > > > > > > > Thanks, > > > > Yunze > > > > > > > > On Wed, Dec 14, 2022 at 3:11 PM 丛搏 <congbobo...@gmail.com> wrote: > > > > > > > > > > > > > > > > > > the user only creates one producer to send all Kafka topic data, > > > > > > > if > > > > > > using Pulsar schema, the user needs to create all schema producers > > > > > > in > > > > > > a map > > > > > > > > > > > > It doesn't make sense to me. If the source topic has messages of > > > > > > multiple schemas, why did you try to sink them into the same topic > > > > > > with a schema? The key point of AUTO_PRODUCE schema is to download > > > > > > the > > > > > > schema to validate the source messages. But if the schema of the > > > > > > topic > > > > > > evolved, the left messages from the source topic could not be sent > > > > > > to > > > > > > the topic. > > > > > > > > > > > Let me give you an example, AvroSchema will have multi-version, > > > > > the version(0) : > > > > > Student { > > > > > String name; > > > > > } > > > > > the version(1) : > > > > > Student { > > > > > String name; > > > > > int age; > > > > > } > > > > > how do you can create two Student.class in one java process? and use > > > > > the same namespace? > > > > > It's not only the schema type changes it also will have multi-version > > > > > schema. > > > > > In this case, how do you create two producers with version(0) and > > > > > version(1)? > > > > > > > > > > > The most confusing part is that AUTO_PRODUCE schema will perform > > > > > > message format validation before send. It's transparent to users and > > > > > > intuitive. IMO, it's better to call validate explicitly like > > > > > > > > > > > > ```java > > > > > > producer.newMessage().value(bytes).validate().sendAsync(); > > > > > > ``` > > > > > > > > > > > > There are two benefits: > > > > > > 1. It's clear that the message validation happens before sending. > > > > > > 2. If users don't want to validate before sending, they can choose > > > > > > to > > > > > > send the bytes directly and validate the message during consumption. > > > > > It only uses `schema.validate()` is enough, data validation does not > > > > > belong to the pulsar message, and we can add a usage description in > > > > > the schema doc. > > > > > > > > > > > > The performance problem of the AUTO_PRODUCE schema is that the > > > > > > validation happens twice and it cannot be controlled. > > > > > > > > > > Our data verification is the behavior of the client, not the behavior > > > > > of the broker. Therefore, we cannot effectively verify that bytes are > > > > > generated by a specific schema. I think this is something that users > > > > > should consider rather than something that pulsar should guarantee > > > > > because you can't control the data sent by users that is generated by > > > > > this schema only for client verification. so, we don't need to verify > > > > > twice. Unless we verify in the broker, but this is an overhead, we can > > > > > add config to control, but is it really necessary? > > > > > > > > > > Thanks, > > > > > Bo > > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月14日周三 12:40写道: > > > > > > > > > > > > > the user only creates one producer to send all Kafka topic data, > > > > > > > if > > > > > > using Pulsar schema, the user needs to create all schema producers > > > > > > in > > > > > > a map > > > > > > > > > > > > It doesn't make sense to me. If the source topic has messages of > > > > > > multiple schemas, why did you try to sink them into the same topic > > > > > > with a schema? The key point of AUTO_PRODUCE schema is to download > > > > > > the > > > > > > schema to validate the source messages. But if the schema of the > > > > > > topic > > > > > > evolved, the left messages from the source topic could not be sent > > > > > > to > > > > > > the topic. > > > > > > > > > > > > The most confusing part is that AUTO_PRODUCE schema will perform > > > > > > message format validation before send. It's transparent to users and > > > > > > intuitive. IMO, it's better to call validate explicitly like > > > > > > > > > > > > ```java > > > > > > producer.newMessage().value(bytes).validate().sendAsync(); > > > > > > ``` > > > > > > > > > > > > There are two benefits: > > > > > > 1. It's clear that the message validation happens before sending. > > > > > > 2. If users don't want to validate before sending, they can choose > > > > > > to > > > > > > send the bytes directly and validate the message during consumption. > > > > > > > > > > > > The performance problem of the AUTO_PRODUCE schema is that the > > > > > > validation happens twice and it cannot be controlled. > > > > > > > > > > > > Thanks, > > > > > > Yunze > > > > > > > > > > > > On Wed, Dec 14, 2022 at 12:01 PM 丛搏 <bog...@apache.org> wrote: > > > > > > > > > > > > > > Hi, Yunze: > > > > > > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月14日周三 02:26写道: > > > > > > > > > > > > > > > First, how do you guarantee the schema can be used to encode > > > > > > > > the raw > > > > > > > > bytes whose format is unknown? > > > > > > > I think this is what the user needs to ensure that the user knows > > > > > > > all > > > > > > > the schema from the Kafka topic and the date(bytes[]) that the > > > > > > > user > > > > > > > can send with a pulsar schema > > > > > > > > > > > > > > > > Second, messages that cannot be encoded by the schema can only > > > > > > > > be > > > > > > > > discarded, i.e. message lost. > > > > > > > If the encoding fails, it proves that the user does not know how > > > > > > > to > > > > > > > convert Kafka date's schema to pulsar schema, which is the user's > > > > > > > own > > > > > > > problem. > > > > > > > > > > > > > > > > Third, schema in Pulsar is convenient because it can support > > > > > > > > sending > > > > > > > > any object of type `T` and the Pulsar client is responsible to > > > > > > > > serialize `T` to the bytes. However, when using AUTO_PRODUCE > > > > > > > > schema, > > > > > > > > the producer still sends raw bytes. > > > > > > > the user only creates one producer to send all Kafka topic data, > > > > > > > if > > > > > > > using Pulsar schema, the user needs to create all schema > > > > > > > producers in > > > > > > > a map, and get the schema producer to send a message. > > > > > > > > > > > > > > > > > > > > > In my understanding, AUTO_PRODUCE mainly reduces the number of > > > > > > > producers created by the client, which will bring convenience to > > > > > > > users > > > > > > > in migrating data. Instead of dealing with unknown schema data. > > > > > > > If you > > > > > > > want to use it correctly, you must know the schema of all data, > > > > > > > which > > > > > > > can be converted into a pulsar schema. Otherwise, it would be > > > > > > > best if > > > > > > > you handled it yourself using the bytes schema. > > > > > > > > > > > > > > Thanks, > > > > > > > Bo