Is there any problem with using a positive value for it? I think there is no compatibility issue because the enum value is never used on the broker side. Making it positive makes AUTO_CONSUME different with other implicit schema types like BYTES, AUTO and AUTO_PUBLISH.
Thanks, Yunze On Mon, Jan 16, 2023 at 9:36 AM PengHui Li <peng...@apache.org> wrote: > > > This design also has serious compatibility problems between old and new > pulsar clients and new and old brokers. > > Could you please explain more details of the compatibility issue if we > leverage > the protocol version? > > > We should not use a negative enum number in PulsarApi.proto. It's > unnatural. If we decide to carry the AUTO_CONSUME schema in a > CommandSubscribe, it should not be treated as a negative schema type. > > IMO, the protocol is defined as Enum. Users are developing based on the > Enum, not the value of the Enum. We need to make sure the value > of the Enum is immutable. It is not required that he must be a positive > number. > Maybe it looks ugly. > > And the protocol is just the API definition, not about which schema will be > persistent. > As I understand from the protocol definition, the Schema in the subscribe > command is > used to pass the used schema of the consumer. We just make it absent before > for > AUTO_CONSUME schema. We just thought we could make it absent if the consumer > is using AUTO_CONSUME schema. But apparently, this is a problem for now. > > I think the easier-to-understand way is for the client to set the schema > used when > subscribing or creating the producer. Rather than which ones need to be set > and which > ones do not need to be set. > > Thanks, > Penghui > > On Mon, Jan 9, 2023 at 11:32 AM SiNan Liu <liusinan1...@gmail.com> wrote: > > > This design also has serious compatibility problems between old and new > > pulsar clients and new and old brokers. > > > > > > Thanks, > > Sinan > > > > > > PengHui Li <peng...@apache.org> 于 2023年1月9日周一 上午9:51写道: > > > > > Sorry for the late reply. > > > > > > We can leverage the `ProtocolVersion` [1] to handle the compatibility > > > issue. > > > It looks like only if the protocol_version >= 21, subscribe with the > > > auto_consume schema > > > > > > IMO, both the new key-value of the subscribe command, and a specific > > > representative are > > > API changes. It's just that some have modified the definition of the API, > > > and some have modified the behavior of the API > > > > > > I prefer the intuitive way. And from the perspective of API-based > > > developers, we should > > > try to provide a simple and clear API with no hidden rules. The client > > just > > > uploads the schema > > > that it has except the byte[] schema. The broker knows how to handle the > > > different schemas, > > > such as AUTO_PRODUCE, and AUTO_CONSUME. And this should not be the burden > > > of the > > > client developer to learn the details of the schema implementation. They > > > should work according > > > to the API spec. > > > > > > If we can resolve the compatibility issue with uploading the AUTO_CONSUME > > > schema with > > > subscribe command, do you see any apparent cons? > > > > > > Best, > > > Penghui > > > > > > [1] > > > > > > > > https://github.com/apache/pulsar/blob/master/pulsar-common/src/main/proto/PulsarApi.proto#L241-L266 > > > > > > On Fri, Jan 6, 2023 at 10:31 PM SiNan Liu <liusinan1...@gmail.com> > > wrote: > > > > > > > Ok, I will update the PIP issue later. > > > > > > > > About my current design. > > > > When the consumer with AUTO_CONSUME schema subscribed to an "empty" > > > topic, > > > > the schemaInfo will be null. > > > > ``` > > > > public SchemaInfo getSchemaInfo(byte[] schemaVersion) { > > > > SchemaVersion sv = getSchemaVersion(schemaVersion); > > > > if (schemaMap.containsKey(sv)) { > > > > return schemaMap.get(sv).getSchemaInfo(); > > > > } > > > > return null; > > > > > > > > } > > > > > > > > ``` > > > > And checkSchemaCompatibility must be set in > > > > `org.apache.pulsar.common.protocol.Commands#newSubscribe`, > > > > and we need to know that this is an AUTO_CONSUME consumer subscribing. > > So > > > > we should set a "*default*" schemaInfo(schemaType = AUTO_CONSUME) for > > > > AutoConsumeSchema, > > > > this is because schemaInfo is also null when `si.getType` is > > > > SchemaType.BYTES or SchemaType.NONE. > > > > And checkSchemaCompatibility can be set in > > > > `org.apache.pulsar.common.protocol.Commands#newSubscribe`. The most > > > > important thing is clearSchema, which does not carry the wrong schema > > to > > > > the broker. > > > > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2023年1月6日周五 12:57写道: > > > > > > > > > You only need to describe what's added to the PulsarApi.proto, i.e. > > > > > you don't need to paste all definitions of `CommandSubscribe` in the > > > > > proposal. Take PIP-54 [1] for example, it only pastes the new field > > > > > `ack_set` and does not paste the whole `MessageIdData` definition. > > > > > > > > > > The implementations section involves too much code and just looks > > like > > > > > an actual PR. Take PIP-194 [2] for example, you should only talk > > about > > > > > the implementations from a high level. > > > > > > > > > > Let's talk back to your current design, when the schema type is > > > > > AUTO_CONSUME, you clear the schema in CommandSubscribe. It seems that > > > > > adding a SchemaInfo to the AutoConsumeSchema is meaningless. > > > > > > > > > > [1] > > > > > > > > > > > > > > https://github.com/apache/pulsar/wiki/PIP-54:-Support-acknowledgment-at-batch-index-level#changes > > > > > [2] https://github.com/apache/pulsar/issues/16757 > > > > > > > > > > On Fri, Jan 6, 2023 at 12:17 AM SiNan Liu <liusinan1...@gmail.com> > > > > wrote: > > > > > > > > > > > > I just updated the PIP issue and title, you guys can have a look. > > > > > issue19113 > > > > > > <https://github.com/apache/pulsar/issues/19113> > > > > > > I added `check_schema_compatibility` in CommandSubscribe, and I > > also > > > > made > > > > > > many other changes. > > > > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2023年1月5日周四 14:33写道: > > > > > > > > > > > > > It's not related to the schema itself. When an AUTO_CONSUME > > > consumer > > > > > > > subscribes to a topic, the option tells the broker that it's an > > > > > > > AUTO_CONSUME consumer so that the broker should not treat it as > > an > > > > > > > active consumer when performing schema compatibility check. If > > > there > > > > > > > is a consumer that also wants to ignore the schema compatibility > > > > check > > > > > > > in future, this option can be reused. > > > > > > > > > > > > > > The other important reason is the breaking change by carrying the > > > > > > > schema info on an AUTO_CONSUMER consumer. (See my explanations in > > > > > > > GitHub and the mail list) If the consumer serves an old version > > > > > > > consumer, the schema could be uploaded into the registry and > > other > > > > > > > clients would be affected. So we should keep not carrying the > > > schema > > > > > > > info in CommandSubscribe for an AUTO_CONSUMER consumer. > > > > > > > > > > > > > > Thanks, > > > > > > > Yunze > > > > > > > > > > > > > > On Thu, Jan 5, 2023 at 11:55 AM SiNan Liu < > > liusinan1...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > > I have modified pip issue and title last night. Yunze. You mean > > > > that > > > > > in > > > > > > > > PulsarApi.proto, take `optional bool is_auto_consume_schema = 6 > > > > > [default > > > > > > > = > > > > > > > > false]; ` in CommandSubscribe instead of Schema? But shouldn't > > > > > > > > schema-related stuff be in Schema? > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Sinan > > > > > > > > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于 2023年1月5日周四 > > 上午12:31写道: > > > > > > > > > > > > > > > > > I found a similar compatibility problem with my closed PR. We > > > > > should > > > > > > > > > not set the `Schema` field for AUTO_CONSUME schema. More > > > > > explanations > > > > > > > > > can be found here [1]. > > > > > > > > > > > > > > > > > > Instead, we can add an optional field into CommandSubscribe > > to > > > > > > > > > indicate the schema compatibility check is skipped. > > > > > > > > > > > > > > > > > > ```protobuf > > > > > > > > > optional bool check_schema_compatibility = 20 [default = > > true] > > > > > > > > > ``` > > > > > > > > > > > > > > > > > > Then we can add a relative parameter here: > > > > > > > > > > > > > > > > > > ```java > > > > > > > > > CompletableFuture<Void> > > > > addSchemaIfIdleOrCheckCompatible(SchemaData > > > > > > > > > schema, boolean checkSchemaCompatibility); > > > > > > > > > ``` > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > https://github.com/apache/pulsar/pull/17449#issuecomment-1371135700 > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Yunze > > > > > > > > > > > > > > > > > > On Wed, Jan 4, 2023 at 11:49 PM Yunze Xu < > > y...@streamnative.io > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > Could you also update the PIP issue? This solution is > > totally > > > > > > > > > > different from your original proposal. Since it still > > > > introduces > > > > > > > > > > changes to `PulsarApi.proto`, it also requires a PIP (we > > can > > > > > reuse > > > > > > > > > > this one). > > > > > > > > > > > > > > > > > > > > ---- > > > > > > > > > > > > > > > > > > > > BTW, I tested again about carrying the SchemaInfo in the > > > > > > > > > > CommandSubscribe request. It could break compatibility. > > Given > > > > the > > > > > > > > > > following code run against Pulsar standalone 2.8.4: > > > > > > > > > > > > > > > > > > > > ```java > > > > > > > > > > PulsarClient client = > > > > > > > > > > PulsarClient.builder().serviceUrl(serviceUrl).build(); > > > > > > > > > > Consumer<GenericRecord> consumer = > > > > > > > > > > client.newConsumer(Schema.AUTO_CONSUME()) > > > > > > > > > > .topic(topic) > > > > > > > > > > .subscriptionName("sub") > > > > > > > > > > .subscriptionType(SubscriptionType.Shared) > > > > > > > > > > .subscribe(); > > > > > > > > > > Producer<User> producer = > > > > > > > > > client.newProducer(Schema.AVRO(User.class)) > > > > > > > > > > .topic(topic) > > > > > > > > > > .create(); > > > > > > > > > > ``` > > > > > > > > > > > > > > > > > > > > - If the schema type is 0 in CommandSubscribe, the NONE > > > schema > > > > > will > > > > > > > be > > > > > > > > > > persisted and the producer will fail to create due to the > > > > schema > > > > > > > > > > compatibility check. > > > > > > > > > > - If the schema type is -3 (AUTO_CONSUME), it will fail at > > > > > > > subscribe() > > > > > > > > > > with the following error: > > > > > > > > > > > > > > > > > > > > ``` > > > > > > > > > > 23:49:10.978 [pulsar-io-18-13] WARN > > > > > > > > > > org.apache.pulsar.broker.service.ServerCnx - [/ > > > > 172.23.160.1:5921 > > > > > ] > > > > > > > Got > > > > > > > > > > exception java.lang.IllegalStateException: Some required > > > fields > > > > > are > > > > > > > > > > missing > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.pulsar.common.api.proto.Schema.checkRequiredFields(Schema.java:337) > > > > > > > > > > at > > > > > > > > > > > > > > org.apache.pulsar.common.api.proto.Schema.parseFrom(Schema.java:332) > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.pulsar.common.api.proto.CommandSubscribe.parseFrom(CommandSubscribe.java:785) > > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.pulsar.common.api.proto.BaseCommand.parseFrom(BaseCommand.java:2397) > > > > > > > > > > ``` > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > Yunze > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jan 4, 2023 at 10:34 PM SiNan Liu < > > > > > liusinan1...@gmail.com> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > I just implemented add an optional field in the subscribe > > > > > request > > > > > > > and > > > > > > > > > > > compatibility seems to be fine. You guys can have a look > > at > > > > my > > > > > PR ( > > > > > > > > > > > https://github.com/apache/pulsar/pull/17449). > > > > > > > > > > > > > > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2023年1月4日周三 > > > > 21:31写道: > > > > > > > > > > > > > > > > > > > > > > > > Why can't we upload negative schema types? > > > > > > > > > > > > > > > > > > > > > > > > I want to avoid the changes to existing methods like > > > > > > > > > > > > Commands#getSchemaType, which converts all negative > > > schema > > > > > types > > > > > > > to > > > > > > > > > > > > NONE: > > > > > > > > > > > > > > > > > > > > > > > > ```java > > > > > > > > > > > > private static Schema.Type getSchemaType(SchemaType > > > type) { > > > > > > > > > > > > if (type.getValue() < 0) { > > > > > > > > > > > > return Schema.Type.None; > > > > > > > > > > > > } else { > > > > > > > > > > > > return Schema.Type.valueOf(type.getValue()); > > > > > > > > > > > > } > > > > > > > > > > > > } > > > > > > > > > > > > ``` > > > > > > > > > > > > > > > > > > > > > > > > I guess the above code was written because: > > > > > > > > > > > > 1. NONE schema type means it's not uploaded into the > > > > > registry. > > > > > > > (See > > > > > > > > > #3940 > > > > > > > > > > > > [1]) > > > > > > > > > > > > 2. There is no existing schema that uses NONE as its > > > schema > > > > > type, > > > > > > > > > i.e. > > > > > > > > > > > > NONE schema is used as something special. > > > > > > > > > > > > > > > > > > > > > > > > > every different language client will code the special > > > > > logic. > > > > > > > > > > > > > > > > > > > > > > > > If other clients follow the behavior of the Java > > client, > > > > they > > > > > > > should > > > > > > > > > > > > also convert negative schemas to NONE currently. > > > Therefore, > > > > > > > changes > > > > > > > > > > > > cannot be avoided. No matter if the semantic of > > > > > `setSchemaType` > > > > > > > is > > > > > > > > > > > > changed, they should follow the Java implementation as > > > > well. > > > > > > > > > > > > > > > > > > > > > > > > > This will change the meaning of the schema data field > > > > > > > > > > > > > > > > > > > > > > > > The existing definition only defines its meaning to the > > > > AVRO > > > > > and > > > > > > > JSON > > > > > > > > > > > > schema. But from a more general view, the schema data > > > > should > > > > > be > > > > > > > > > > > > something associated with the current schema. Giving it > > > > more > > > > > > > meaning > > > > > > > > > > > > for other schema types is acceptable IMO. For example, > > > the > > > > > schema > > > > > > > > > data > > > > > > > > > > > > field represents the serialized Protobuf descriptor in > > > > > Protobuf > > > > > > > > > Native > > > > > > > > > > > > schema, see `ProtobufNativeSchema#of`: > > > > > > > > > > > > > > > > > > > > > > > > ```java > > > > > > > > > > > > > > .schema(ProtobufNativeSchemaUtils.serialize(descriptor)) > > > > > > > > > > > > ``` > > > > > > > > > > > > > > > > > > > > > > > > [1] https://github.com/apache/pulsar/pull/3940 > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Yunze > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jan 4, 2023 at 8:27 PM 丛搏 <bog...@apache.org> > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > It does not affect the public API so it can be > > > > > cherry-picked > > > > > > > > > into old > > > > > > > > > > > > > > branches. The main difference with this proposal is > > > > that > > > > > my > > > > > > > > > solution > > > > > > > > > > > > > > carries the identity info (i.e. `AUTO_CONSUME`) in > > > the > > > > > schema > > > > > > > > > data, > > > > > > > > > > > > > > which is a byte array. The negative schema types > > > should > > > > > not > > > > > > > be > > > > > > > > > exposed > > > > > > > > > > > > > > to users. Adding a field to the subscribe request > > > might > > > > > be > > > > > > > okay > > > > > > > > > but it > > > > > > > > > > > > > > could be unnecessary to cover such a corner case. > > > > > > > > > > > > > > > > > > > > > > > > > > This will change the meaning of the schema data field > > > and > > > > > > > couple > > > > > > > > > the > > > > > > > > > > > > > schema type and schema data. `schema type = NONE` and > > > > > `schema > > > > > > > data > > > > > > > > > = > > > > > > > > > > > > > "AUTO_CONSUME" ` represent `AUTO_ CONSUME`, I think > > > it's > > > > > > > weird. Why > > > > > > > > > > > > > can't we upload negative schema types? > > > > > > > > > > > > > > > > > > > > > > > > > > > It does not affect the public API > > > > > > > > > > > > > upload negative schema types only changes the proto, > > if > > > > > using > > > > > > > > > `schema > > > > > > > > > > > > > type = NONE` and `schema data = "AUTO_CONSUME" `, > > every > > > > > > > different > > > > > > > > > > > > > language client will code the special logic. This > > > special > > > > > > > logic can > > > > > > > > > > > > > easily be ignored. > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > Bo > > > > > > > > > > > > > > > > > > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2023年1月4日周三 > > > > > 17:02写道: > > > > > > > > > > > > > > > > > > > > > > > > > > > > I opened a PR to fix this issue: > > > > > > > > > > > > https://github.com/apache/pulsar/pull/19128 > > > > > > > > > > > > > > > > > > > > > > > > > > > > It does not affect the public API so it can be > > > > > cherry-picked > > > > > > > > > into old > > > > > > > > > > > > > > branches. The main difference with this proposal is > > > > that > > > > > my > > > > > > > > > solution > > > > > > > > > > > > > > carries the identity info (i.e. `AUTO_CONSUME`) in > > > the > > > > > schema > > > > > > > > > data, > > > > > > > > > > > > > > which is a byte array. The negative schema types > > > should > > > > > not > > > > > > > be > > > > > > > > > exposed > > > > > > > > > > > > > > to users. Adding a field to the subscribe request > > > might > > > > > be > > > > > > > okay > > > > > > > > > but it > > > > > > > > > > > > > > could be unnecessary to cover such a corner case. > > > > > > > > > > > > > > > > > > > > > > > > > > > > It might be controversial if schema data should be > > > used > > > > > in > > > > > > > such > > > > > > > > > a way, > > > > > > > > > > > > > > because the original purpose is to represent the > > AVRO > > > > or > > > > > JSON > > > > > > > > > > > > > > definition. However, this semantic is defined just > > > for > > > > > AVRO > > > > > > > or > > > > > > > > > JSON > > > > > > > > > > > > > > schema. IMO, the data field of other schemas is > > never > > > > > used > > > > > > > well. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Another solution is to make use of the name field > > of > > > > > schema, > > > > > > > > > which > > > > > > > > > > > > > > might be more natural. I think we can continue the > > > > > > > discussion in > > > > > > > > > my > > > > > > > > > > > > > > PR. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Yunze > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jan 4, 2023 at 11:07 AM Yunze Xu < > > > > > > > y...@streamnative.io> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Modifying the subscribe request is better than > > > > exposing > > > > > > > > > AUTO_CONSUME > > > > > > > > > > > > > > > schema type IMO. The negative value of a schema > > > type, > > > > > like > > > > > > > > > BYTES, > > > > > > > > > > > > > > > AUTO_PRODUCE, means this schema type should only > > be > > > > > used > > > > > > > > > internally. > > > > > > > > > > > > > > > Adding the negative enum value to the Schema > > > > > definition in > > > > > > > > > > > > > > > PulsarApi.proto looks very weird. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > But I'm still wondering if we can avoid the API > > > > > changes. I > > > > > > > > > will look > > > > > > > > > > > > > > > deeper into this issue. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > Yunze > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jan 4, 2023 at 12:12 AM Enrico Olivelli < > > > > > > > > > eolive...@gmail.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Il Mar 3 Gen 2023, 14:37 Yunze Xu > > > > > > > > > <y...@streamnative.io.invalid> > > > > > > > > > > > > ha scritto: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Bo, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I got it now. The PIP title sounds ambiguous. > > > > > Using the > > > > > > > > > term > > > > > > > > > > > > "Upload > > > > > > > > > > > > > > > > > xxx SchemaType" sounds like uploading the > > > schema > > > > > into > > > > > > > the > > > > > > > > > > > > registry. > > > > > > > > > > > > > > > > > Instead, it should be "carrying schema in the > > > > > request > > > > > > > when > > > > > > > > > > > > subscribing > > > > > > > > > > > > > > > > > with AUTO_CONSUME schema". > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I agree that we should change the naming and we > > > > > should > > > > > > > > > probably > > > > > > > > > > > > not use a > > > > > > > > > > > > > > > > new Schema type but add an optional field in > > the > > > > > > > subscribe > > > > > > > > > request > > > > > > > > > > > > (and do > > > > > > > > > > > > > > > > not send it if the broker is an old version) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Enrico > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > Yunze > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jan 3, 2023 at 4:56 PM 丛搏 < > > > > > bog...@apache.org> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, Yunze > > > > > > > > > > > > > > > > > > > What I am concerned about is that if the > > > old > > > > > > > clients > > > > > > > > > with > > > > > > > > > > > > other > > > > > > > > > > > > > > > > > > > schemas (i.e. schema is neither null nor > > > > > > > AUTO_CONSUME) > > > > > > > > > > > > subscribe to > > > > > > > > > > > > > > > > > > > the topic with AUTO_CONSUME schema, what > > > will > > > > > > > happen? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > AUTO_CONSUME schema will not store in > > > > > > > > > > > > `SchemaRegistryServiceImpl`, it > > > > > > > > > > > > > > > > > > only represents one consumer with > > > AUTO_CONSUME > > > > > > > schema to > > > > > > > > > > > > subscribe to > > > > > > > > > > > > > > > > > > a topic. If old clients with other schemas > > > > > subscribe > > > > > > > to > > > > > > > > > this > > > > > > > > > > > > topic, > > > > > > > > > > > > > > > > > > Its behavior will not be changed by this > > PIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > What's the schema compatibility check > > rule > > > > on a > > > > > > > topic > > > > > > > > > with > > > > > > > > > > > > > > > > > AUTO_CONSUME schema? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > it's only the consumer schema compatibility > > > > > check, > > > > > > > not on > > > > > > > > > > > > topic. if a > > > > > > > > > > > > > > > > > > consume with AUTO_CONSUME schema will do > > any > > > > > > > > > compatibility > > > > > > > > > > > > check > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > Bo > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid> > > > > > 于2023年1月3日周二 > > > > > > > > > 10:16写道: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > What I am concerned about is that if the > > > old > > > > > > > clients > > > > > > > > > with > > > > > > > > > > > > other > > > > > > > > > > > > > > > > > > > schemas (i.e. schema is neither null nor > > > > > > > AUTO_CONSUME) > > > > > > > > > > > > subscribe to > > > > > > > > > > > > > > > > > > > the topic with AUTO_CONSUME schema, what > > > will > > > > > > > happen? > > > > > > > > > What's > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > schema compatibility check rule on a > > topic > > > > with > > > > > > > > > AUTO_CONSUME > > > > > > > > > > > > schema? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > Yunze > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Jan 2, 2023 at 12:38 AM SiNan > > Liu < > > > > > > > > > > > > liusinan1...@gmail.com> > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1.Schema.Type and > > > > > > > > > > > > org.apache.pulsar.common.schema.SchemaType value > > > > > > > > > > > > > > > > > should > > > > > > > > > > > > > > > > > > > > be the same. > > > > > > > > > > > > > > > > > > > > 2.These changes do not affect produce > > and > > > > are > > > > > > > only > > > > > > > > > affect > > > > > > > > > > > > consumer > > > > > > > > > > > > > > > > > > > > subscribe behavior. > > > > > > > > > > > > > > > > > > > > 3.backward compatibility: > > > > > > > > > > > > > > > > > > > > (1)In > > > > > > > > > > > > > > > org.apache.pulsar.broker.service.ServerCnx#handleSubscribe. > > > > > > > > > > > > > > > > > > > > if (schema != null && schema.getType() > > != > > > > > > > > > > > > SchemaType.AUTO_CONSUME) { > > > > > > > > > > > > > > > > > > > > return > > > > > > > topic.addSchemaIfIdleOrCheckCompatible(schema) > > > > > > > > > > > > > > > > > > > > .thenCompose(v -> > > > topic.subscribe(option)); > > > > > > > > > > > > > > > > > > > > } else { > > > > > > > > > > > > > > > > > > > > return topic.subscribe(option); > > > > > > > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > For the older pulsar client, the schema > > > is > > > > > null > > > > > > > if > > > > > > > > > > > > AUTO_CONSUME > > > > > > > > > > > > > > > > > consumer > > > > > > > > > > > > > > > > > > > > subscribe to the Topic. > > > > > > > > > > > > > > > > > > > > For the new pulsar client, if > > > AUTO_CONSUME > > > > > > > consumer > > > > > > > > > > > > subscribe the > > > > > > > > > > > > > > > > > Topic, > > > > > > > > > > > > > > > > > > > > then schema is not null and > > > > schema.getType() > > > > > = > > > > > > > > > > > > > > > > > SchemaType.AUTO_CONSUME. > > > > > > > > > > > > > > > > > > > > Both new and old pulsar clients consume > > > the > > > > > > > topic, > > > > > > > > > will > > > > > > > > > > > > return topic. > > > > > > > > > > > > > > > > > > > > subscribe(option). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > (2)In > > > > > > > > > > > > > > > org.apache.pulsar.broker.service.persistent.PersistentTopic > > > > > > > > > > > > > > > > > > > > #addSchemaIfIdleOrCheckCompatible. > > > > > > > > > > > > > > > > > > > > @Override > > > > > > > > > > > > > > > > > > > > public CompletableFuture<Void> > > > > > > > > > > > > > > > > > addSchemaIfIdleOrCheckCompatible(SchemaData > > > > > > > > > > > > > > > > > > > > schema) { > > > > > > > > > > > > > > > > > > > > return > > > hasSchema().thenCompose((hasSchema) > > > > > -> { > > > > > > > > > > > > > > > > > > > > int > > numActiveConsumersWithoutAutoSchema = > > > > > > > > > > > > > > > > > subscriptions.values().stream() > > > > > > > > > > > > > > > > > > > > .mapToInt(subscription -> > > > > > > > > > > > > subscription.getConsumers().stream() > > > > > > > > > > > > > > > > > > > > .filter(consumer -> > > > > consumer.getSchemaType() > > > > > != > > > > > > > > > > > > > > > > > SchemaType.AUTO_CONSUME) > > > > > > > > > > > > > > > > > > > > .toList().size()) > > > > > > > > > > > > > > > > > > > > .sum(); > > > > > > > > > > > > > > > > > > > > if (hasSchema > > > > > > > > > > > > > > > > > > > > || (!producers.isEmpty()) > > > > > > > > > > > > > > > > > > > > || (numActiveConsumersWithoutAutoSchema > > > != > > > > 0) > > > > > > > > > > > > > > > > > > > > || (ledger.getTotalSize() != 0)) { > > > > > > > > > > > > > > > > > > > > return > > > > > checkSchemaCompatibleForConsumer(schema); > > > > > > > > > > > > > > > > > > > > } else { > > > > > > > > > > > > > > > > > > > > return > > > > > > > addSchema(schema).thenCompose(schemaVersion -> > > > > > > > > > > > > > > > > > > > > > > CompletableFuture.completedFuture(null)); > > > > > > > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > }); > > > > > > > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > Only in one case will there be a bug. > > > > > > > > > > > > > > > > > > > > First, the old pulsar client consume > > the > > > > > empty > > > > > > > > > topic, the > > > > > > > > > > > > consumer > > > > > > > > > > > > > > > > > schema > > > > > > > > > > > > > > > > > > > > is AUTO_CONSUME, and then whether the > > new > > > > or > > > > > old > > > > > > > > > pulsar > > > > > > > > > > > > client > > > > > > > > > > > > > > > > > consume(i.e. > > > > > > > > > > > > > > > > > > > > schema is AVRO) the topic. > > > > > > > > > > > > > > > > > > > > The broker will return the error > > message > > > as > > > > > > > > > > > > > > > > > IncompatibleSchemaException (" > > > > > > > > > > > > > > > > > > > > Topic does not have a schema to check > > "). > > > > The > > > > > > > bug at > > > > > > > > > > > > issue17354 is > > > > > > > > > > > > > > > > > not > > > > > > > > > > > > > > > > > > > > fixed in this case. > > > > > > > > > > > > > > > > > > > > All the other cases will be normal. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid > > > > > > > > > > > > 于2022年12月31日周六 > > > > > > > > > > > > 20:23写道: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Defining `AutoConsume` as -3 is > > somehow > > > > > > > strange. > > > > > > > > > Could > > > > > > > > > > > > you clarify > > > > > > > > > > > > > > > > > if > > > > > > > > > > > > > > > > > > > > > backward compatibility is guaranteed? > > > > i.e. > > > > > if > > > > > > > the > > > > > > > > > new > > > > > > > > > > > > Pulsar client > > > > > > > > > > > > > > > > > > > > > uploaded the AUTO_CONSUME schema to > > the > > > > > broker, > > > > > > > > > can the > > > > > > > > > > > > old Pulsar > > > > > > > > > > > > > > > > > > > > > clients produce or consume the same > > > topic > > > > > > > anymore? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > Yunze > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Dec 30, 2022 at 11:32 PM 思楠刘 > > < > > > > > > > > > > > > liusinan1...@gmail.com> > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I made a PIP to discuss: > > > > > > > > > > > > > > > > > > > https://github.com/apache/pulsar/issues/19113. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > Sinan > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >