Yunze Xu <y...@streamnative.io.invalid> 于2022年12月14日周三 20:37写道: > > > how do you can create two Student.class in one java process? and use > the same namespace? > > Could you give an example to show how `AUTO_PRODUCE` schema makes a > difference?
// this is Student use version0, may be data from kafka byte[] student1 = autoConsumer.receive().getData(); // this is Student use version1, may be data from kafka byte[] student2 = autoConsumer.receive().getData(); // send student with version0 schema date p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(SchemaDefinition.builder() .withJsonDef("student with version0 json def").build()))) .value(student1).send(); // send student with version1 schema date p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(SchemaDefinition.builder() .withJsonDef("student with version1 json def").build()))) .value(student1).send(); > > But with AUTO_PRODUCE schema, the precondition is that we have a topic > that has messages of these two schemas. > > For example, there is a `bytes-topic` without schema that has two messages: > - msg0: Serialized from `new Student("abc")` (schema v0) > - msg1: Serialized from `new Student("abc", 1)` (schema v1) > > Then you can consume these bytes, and send the messages to **a topic > that has registered a schema**. > - If the schema is v0, it's okay to send msg0 and msg1 to the topic. > But the msg1 will lose some bytes because the schema v0 doesn't have > the `age` field. > - If the schema is v1, msg0 cannot be sent because msg0 doesn't have > the `age` field. > > So which schema did you expect for this topic? if you use AUTO_PRODUCE_BYTES, the message will have the correct schema version. link code: https://github.com/apache/pulsar/blob/4129583c418dd68f8303dee601132e2910cdf8e6/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L718-L746 the msg0 will be sent with schema v0 this msg1 will be sent with schema v1 > > This example also shows AUTO_PRODUCE schema performs validation at > producer side. > > However, if we just send msg0 and msg1 to a topic without schema. Then > it will be consumer's responsibility to determine whether the received > message is valid. > > ```java > var bytes = consumer.receive(); // bytes > var student = Schema.AVRO(Student.class).decode(bytes); > ``` > > - If the `Student` is v0, msg0 and msg1 can be decoded successfully. > - If the `Student` is v1, decoding msg0 will throw an exception. > > Since all messages are stored in the topic, the downstream side > (consumer) can catch the exception to discard the bytes without the > expected schema. > > But if the validation fails at the producer side, there is a chance > that msg0 is lost. In addition, let's see the producer and consumer > code in this case. > > ``` > producer.send(msg0); // validation happens at the producer side > ``` > > ``` > var msg = consumer.receive(); > var student = msg.getValue(); // validation happens again, though it > has already been validated before > ``` > > Thanks, > Yunze > > On Wed, Dec 14, 2022 at 3:11 PM 丛搏 <congbobo...@gmail.com> wrote: > > > > > > > > > the user only creates one producer to send all Kafka topic data, if > > > using Pulsar schema, the user needs to create all schema producers in > > > a map > > > > > > It doesn't make sense to me. If the source topic has messages of > > > multiple schemas, why did you try to sink them into the same topic > > > with a schema? The key point of AUTO_PRODUCE schema is to download the > > > schema to validate the source messages. But if the schema of the topic > > > evolved, the left messages from the source topic could not be sent to > > > the topic. > > > > > Let me give you an example, AvroSchema will have multi-version, > > the version(0) : > > Student { > > String name; > > } > > the version(1) : > > Student { > > String name; > > int age; > > } > > how do you can create two Student.class in one java process? and use > > the same namespace? > > It's not only the schema type changes it also will have multi-version > > schema. > > In this case, how do you create two producers with version(0) and > > version(1)? > > > > > The most confusing part is that AUTO_PRODUCE schema will perform > > > message format validation before send. It's transparent to users and > > > intuitive. IMO, it's better to call validate explicitly like > > > > > > ```java > > > producer.newMessage().value(bytes).validate().sendAsync(); > > > ``` > > > > > > There are two benefits: > > > 1. It's clear that the message validation happens before sending. > > > 2. If users don't want to validate before sending, they can choose to > > > send the bytes directly and validate the message during consumption. > > It only uses `schema.validate()` is enough, data validation does not > > belong to the pulsar message, and we can add a usage description in > > the schema doc. > > > > > > The performance problem of the AUTO_PRODUCE schema is that the > > > validation happens twice and it cannot be controlled. > > > > Our data verification is the behavior of the client, not the behavior > > of the broker. Therefore, we cannot effectively verify that bytes are > > generated by a specific schema. I think this is something that users > > should consider rather than something that pulsar should guarantee > > because you can't control the data sent by users that is generated by > > this schema only for client verification. so, we don't need to verify > > twice. Unless we verify in the broker, but this is an overhead, we can > > add config to control, but is it really necessary? > > > > Thanks, > > Bo > > > > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月14日周三 12:40写道: > > > > > > > the user only creates one producer to send all Kafka topic data, if > > > using Pulsar schema, the user needs to create all schema producers in > > > a map > > > > > > It doesn't make sense to me. If the source topic has messages of > > > multiple schemas, why did you try to sink them into the same topic > > > with a schema? The key point of AUTO_PRODUCE schema is to download the > > > schema to validate the source messages. But if the schema of the topic > > > evolved, the left messages from the source topic could not be sent to > > > the topic. > > > > > > The most confusing part is that AUTO_PRODUCE schema will perform > > > message format validation before send. It's transparent to users and > > > intuitive. IMO, it's better to call validate explicitly like > > > > > > ```java > > > producer.newMessage().value(bytes).validate().sendAsync(); > > > ``` > > > > > > There are two benefits: > > > 1. It's clear that the message validation happens before sending. > > > 2. If users don't want to validate before sending, they can choose to > > > send the bytes directly and validate the message during consumption. > > > > > > The performance problem of the AUTO_PRODUCE schema is that the > > > validation happens twice and it cannot be controlled. > > > > > > Thanks, > > > Yunze > > > > > > On Wed, Dec 14, 2022 at 12:01 PM 丛搏 <bog...@apache.org> wrote: > > > > > > > > Hi, Yunze: > > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月14日周三 02:26写道: > > > > > > > > > First, how do you guarantee the schema can be used to encode the raw > > > > > bytes whose format is unknown? > > > > I think this is what the user needs to ensure that the user knows all > > > > the schema from the Kafka topic and the date(bytes[]) that the user > > > > can send with a pulsar schema > > > > > > > > > > Second, messages that cannot be encoded by the schema can only be > > > > > discarded, i.e. message lost. > > > > If the encoding fails, it proves that the user does not know how to > > > > convert Kafka date's schema to pulsar schema, which is the user's own > > > > problem. > > > > > > > > > > Third, schema in Pulsar is convenient because it can support sending > > > > > any object of type `T` and the Pulsar client is responsible to > > > > > serialize `T` to the bytes. However, when using AUTO_PRODUCE schema, > > > > > the producer still sends raw bytes. > > > > the user only creates one producer to send all Kafka topic data, if > > > > using Pulsar schema, the user needs to create all schema producers in > > > > a map, and get the schema producer to send a message. > > > > > > > > > > > > In my understanding, AUTO_PRODUCE mainly reduces the number of > > > > producers created by the client, which will bring convenience to users > > > > in migrating data. Instead of dealing with unknown schema data. If you > > > > want to use it correctly, you must know the schema of all data, which > > > > can be converted into a pulsar schema. Otherwise, it would be best if > > > > you handled it yourself using the bytes schema. > > > > > > > > Thanks, > > > > Bo