Hi Bo,

I don't think it's a bad implementation. Instead, it's a natural
thought that if a schema info won't be persisted into the schema
registry, there is no need to carry the schema info in
CommandSubscribe.

Actually, I think the bad implementation is the AUTO_XXX schema
itself. From Avro's definition [1] we can see

> This schema describes the fields allowed in the value, along with their data 
> types.

However, in Pulsar, AUTO_XXX schemas are not real "schema"s. They
don't represent any data type. Instead, they represents how to handle
the schemas. The AUTO_XXX schemas themselves are intuitive. I think
they were introduced mainly because of the limitations of Pulsar API.
For example, I don't see any confusing AUTO_XXX schema types in
Confluent Schema [2] as well. They were designed to be negative and
not added into PulsarApi.proto just to avoid confusion to users.

Now you tried to expose these "fake" schemas just to tell brokers to
handle this consumer specially. And you tried to use the schema type
to describe the attribute of a consumer. I don't think it's simpler
and more acceptable.

[1] 
https://docs.oracle.com/cd/E26161_02/html/GettingStartedGuide/avroschemas.html
[2] https://docs.confluent.io/platform/current/schema-registry/index.html

Thanks,
Yunze



On Fri, Jan 6, 2023 at 2:15 PM 丛搏 <bog...@apache.org> wrote:
>
> Hi Yunze,
>
> > It's a good idea to use `ProtocolVersion` to control. But adding a
> > negative schema type still looks weird.
>
> negative schema type is a history problem(include `NONE` schema type).
> I don't think it is a good implementation, It adds too much
> complexity. the broker can control any schema-type behavior. like this
> problem, don't create sub command with `AUTO_CONSUME` and bring this
> problem. We have written a lot of complicated code to solve this
> historical problem, If you add new fields it will be more unacceptable
> than negative schema type. We should choose a simpler and more direct
> way to deal with this matter instead of making it more complicated
>
> Thanks,
> Bo
>
> Yunze Xu <y...@streamnative.io.invalid> 于2023年1月6日周五 13:18写道:
> >
> > Hi Bo,
> >
> > >  the old server compatibility can add `ProtocolVersion` to control.
> >
> > It's a good idea to use `ProtocolVersion` to control. But adding a
> > negative schema type still looks weird. You can find the following
> > description in SchemaType.java, which was added in
> > https://github.com/apache/pulsar/pull/3940:
> >
> > ```java
> > // Schemas that don't have schema info. the value should be negative.
> > ```
> >
> > If you expose the negative schema type in PulsarApi.proto, how could
> > you explain to users that the "new" schema type is a negative integer?
> > And for developers, the negative schema types should not have the
> > schema info, but you create a schema info for it.
> >
> > Thanks,
> > Yunze
> >
> > On Fri, Jan 6, 2023 at 1:07 PM 丛搏 <bog...@apache.org> wrote:
> > >
> > > > Instead, we can add an optional field into CommandSubscribe to
> > > > indicate the schema compatibility check is skipped.
> > > > ```protobuf
> > > > optional bool check_schema_compatibility = 20 [default = true]
> > > > ```
> > > `check_ schema_ Compatibility 'contains too many meanings. I think
> > > this change will make the code more uncontrollable.
> > > I still suggest uploading the `AUTO_CONSUME` type directly. the old
> > > server compatibility can add `ProtocolVersion` to control. Adding any
> > > other fields in proto or uploading directly ` AUTO_ CONSUME ` type
> > > makes no difference. Other modifications may lead to ambiguity.
> > >
> > > Thanks,
> > > Bo
> > >
> > > SiNan Liu <liusinan1...@gmail.com> 于2023年1月6日周五 00:17写道:
> > > >
> > > > I just updated the PIP issue and title, you guys can have a look. 
> > > > issue19113
> > > > <https://github.com/apache/pulsar/issues/19113>
> > > > I added `check_schema_compatibility` in CommandSubscribe, and I also 
> > > > made
> > > > many other changes.
> > > >
> > > > Yunze Xu <y...@streamnative.io.invalid> 于2023年1月5日周四 14:33写道:
> > > >
> > > > > It's not related to the schema itself. When an AUTO_CONSUME consumer
> > > > > subscribes to a topic, the option tells the broker that it's an
> > > > > AUTO_CONSUME consumer so that the broker should not treat it as an
> > > > > active consumer when performing schema compatibility check. If there
> > > > > is a consumer that also wants to ignore the schema compatibility check
> > > > > in future, this option can be reused.
> > > > >
> > > > > The other important reason is the breaking change by carrying the
> > > > > schema info on an AUTO_CONSUMER consumer. (See my explanations in
> > > > > GitHub and the mail list) If the consumer serves an old version
> > > > > consumer, the schema could be uploaded into the registry and other
> > > > > clients would be affected. So we should keep not carrying the schema
> > > > > info in CommandSubscribe for an AUTO_CONSUMER consumer.
> > > > >
> > > > > Thanks,
> > > > > Yunze
> > > > >
> > > > > On Thu, Jan 5, 2023 at 11:55 AM SiNan Liu <liusinan1...@gmail.com> 
> > > > > wrote:
> > > > > >
> > > > > > I have modified pip issue and title last night. Yunze. You mean 
> > > > > > that in
> > > > > > PulsarApi.proto, take `optional bool is_auto_consume_schema = 6 
> > > > > > [default
> > > > > =
> > > > > > false]; ` in CommandSubscribe instead of Schema? But shouldn't
> > > > > > schema-related stuff be in Schema?
> > > > > >
> > > > > > Thanks,
> > > > > > Sinan
> > > > > >
> > > > > > Yunze Xu <y...@streamnative.io.invalid> 于 2023年1月5日周四 上午12:31写道:
> > > > > >
> > > > > > > I found a similar compatibility problem with my closed PR. We 
> > > > > > > should
> > > > > > > not set the `Schema` field for AUTO_CONSUME schema. More 
> > > > > > > explanations
> > > > > > > can be found here [1].
> > > > > > >
> > > > > > > Instead, we can add an optional field into CommandSubscribe to
> > > > > > > indicate the schema compatibility check is skipped.
> > > > > > >
> > > > > > > ```protobuf
> > > > > > > optional bool check_schema_compatibility = 20 [default = true]
> > > > > > > ```
> > > > > > >
> > > > > > > Then we can add a relative parameter here:
> > > > > > >
> > > > > > > ```java
> > > > > > > CompletableFuture<Void> 
> > > > > > > addSchemaIfIdleOrCheckCompatible(SchemaData
> > > > > > > schema, boolean checkSchemaCompatibility);
> > > > > > > ```
> > > > > > >
> > > > > > >
> > > > > > > [1]
> > > > > https://github.com/apache/pulsar/pull/17449#issuecomment-1371135700
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Yunze
> > > > > > >
> > > > > > > On Wed, Jan 4, 2023 at 11:49 PM Yunze Xu <y...@streamnative.io> 
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > Could you also update the PIP issue? This solution is totally
> > > > > > > > different from your original proposal. Since it still introduces
> > > > > > > > changes to `PulsarApi.proto`, it also requires a PIP (we can 
> > > > > > > > reuse
> > > > > > > > this one).
> > > > > > > >
> > > > > > > > ----
> > > > > > > >
> > > > > > > > BTW, I tested again about carrying the SchemaInfo in the
> > > > > > > > CommandSubscribe request. It could break compatibility. Given 
> > > > > > > > the
> > > > > > > > following code run against Pulsar standalone 2.8.4:
> > > > > > > >
> > > > > > > > ```java
> > > > > > > >         PulsarClient client =
> > > > > > > > PulsarClient.builder().serviceUrl(serviceUrl).build();
> > > > > > > >         Consumer<GenericRecord> consumer =
> > > > > > > > client.newConsumer(Schema.AUTO_CONSUME())
> > > > > > > >                 .topic(topic)
> > > > > > > >                 .subscriptionName("sub")
> > > > > > > >                 .subscriptionType(SubscriptionType.Shared)
> > > > > > > >                 .subscribe();
> > > > > > > >         Producer<User> producer =
> > > > > > > client.newProducer(Schema.AVRO(User.class))
> > > > > > > >                 .topic(topic)
> > > > > > > >                 .create();
> > > > > > > > ```
> > > > > > > >
> > > > > > > > - If the schema type is 0 in CommandSubscribe, the NONE schema 
> > > > > > > > will
> > > > > be
> > > > > > > > persisted and the producer will fail to create due to the schema
> > > > > > > > compatibility check.
> > > > > > > > - If the schema type is -3 (AUTO_CONSUME), it will fail at
> > > > > subscribe()
> > > > > > > > with the following error:
> > > > > > > >
> > > > > > > > ```
> > > > > > > > 23:49:10.978 [pulsar-io-18-13] WARN
> > > > > > > > org.apache.pulsar.broker.service.ServerCnx - 
> > > > > > > > [/172.23.160.1:5921]
> > > > > Got
> > > > > > > > exception java.lang.IllegalStateException: Some required fields 
> > > > > > > > are
> > > > > > > > missing
> > > > > > > >         at
> > > > > > >
> > > > > org.apache.pulsar.common.api.proto.Schema.checkRequiredFields(Schema.java:337)
> > > > > > > >         at
> > > > > > > org.apache.pulsar.common.api.proto.Schema.parseFrom(Schema.java:332)
> > > > > > > >         at
> > > > > > >
> > > > > org.apache.pulsar.common.api.proto.CommandSubscribe.parseFrom(CommandSubscribe.java:785)
> > > > > > > >         at
> > > > > > >
> > > > > org.apache.pulsar.common.api.proto.BaseCommand.parseFrom(BaseCommand.java:2397)
> > > > > > > > ```
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Yunze
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Jan 4, 2023 at 10:34 PM SiNan Liu 
> > > > > > > > <liusinan1...@gmail.com>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > I just implemented add an optional field in the subscribe 
> > > > > > > > > request
> > > > > and
> > > > > > > > > compatibility seems to be fine. You guys can have a look at 
> > > > > > > > > my PR (
> > > > > > > > > https://github.com/apache/pulsar/pull/17449).
> > > > > > > > >
> > > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2023年1月4日周三 21:31写道:
> > > > > > > > >
> > > > > > > > > > > Why can't we upload negative schema types?
> > > > > > > > > >
> > > > > > > > > > I want to avoid the changes to existing methods like
> > > > > > > > > > Commands#getSchemaType, which converts all negative schema 
> > > > > > > > > > types
> > > > > to
> > > > > > > > > > NONE:
> > > > > > > > > >
> > > > > > > > > > ```java
> > > > > > > > > > private static Schema.Type getSchemaType(SchemaType type) {
> > > > > > > > > >     if (type.getValue() < 0) {
> > > > > > > > > >         return Schema.Type.None;
> > > > > > > > > >     } else {
> > > > > > > > > >         return Schema.Type.valueOf(type.getValue());
> > > > > > > > > >     }
> > > > > > > > > > }
> > > > > > > > > > ```
> > > > > > > > > >
> > > > > > > > > > I guess the above code was written because:
> > > > > > > > > > 1. NONE schema type means it's not uploaded into the 
> > > > > > > > > > registry.
> > > > > (See
> > > > > > > #3940
> > > > > > > > > > [1])
> > > > > > > > > > 2. There is no existing schema that uses NONE as its schema 
> > > > > > > > > > type,
> > > > > > > i.e.
> > > > > > > > > > NONE schema is used as something special.
> > > > > > > > > >
> > > > > > > > > > > every different language client will code the special 
> > > > > > > > > > > logic.
> > > > > > > > > >
> > > > > > > > > > If other clients follow the behavior of the Java client, 
> > > > > > > > > > they
> > > > > should
> > > > > > > > > > also convert negative schemas to NONE currently. Therefore,
> > > > > changes
> > > > > > > > > > cannot be avoided. No matter if the semantic of 
> > > > > > > > > > `setSchemaType`
> > > > > is
> > > > > > > > > > changed, they should follow the Java implementation as well.
> > > > > > > > > >
> > > > > > > > > > > This will change the meaning of the schema data field
> > > > > > > > > >
> > > > > > > > > > The existing definition only defines its meaning to the 
> > > > > > > > > > AVRO and
> > > > > JSON
> > > > > > > > > > schema. But from a more general view, the schema data 
> > > > > > > > > > should be
> > > > > > > > > > something associated with the current schema. Giving it more
> > > > > meaning
> > > > > > > > > > for other schema types is acceptable IMO. For example, the 
> > > > > > > > > > schema
> > > > > > > data
> > > > > > > > > > field represents the serialized Protobuf descriptor in 
> > > > > > > > > > Protobuf
> > > > > > > Native
> > > > > > > > > > schema, see `ProtobufNativeSchema#of`:
> > > > > > > > > >
> > > > > > > > > > ```java
> > > > > > > > > > .schema(ProtobufNativeSchemaUtils.serialize(descriptor))
> > > > > > > > > > ```
> > > > > > > > > >
> > > > > > > > > > [1] https://github.com/apache/pulsar/pull/3940
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Yunze
> > > > > > > > > >
> > > > > > > > > > On Wed, Jan 4, 2023 at 8:27 PM 丛搏 <bog...@apache.org> wrote:
> > > > > > > > > > >
> > > > > > > > > > > > It does not affect the public API so it can be 
> > > > > > > > > > > > cherry-picked
> > > > > > > into old
> > > > > > > > > > > > branches. The main difference with this proposal is 
> > > > > > > > > > > > that my
> > > > > > > solution
> > > > > > > > > > > > carries the identity info (i.e. `AUTO_CONSUME`) in the 
> > > > > > > > > > > > schema
> > > > > > > data,
> > > > > > > > > > > > which is a byte array. The negative schema types should 
> > > > > > > > > > > > not
> > > > > be
> > > > > > > exposed
> > > > > > > > > > > > to users. Adding a field to the subscribe request might 
> > > > > > > > > > > > be
> > > > > okay
> > > > > > > but it
> > > > > > > > > > > > could be unnecessary to cover such a corner case.
> > > > > > > > > > >
> > > > > > > > > > > This will change the meaning of the schema data field and
> > > > > couple
> > > > > > > the
> > > > > > > > > > > schema type and schema data. `schema type = NONE` and 
> > > > > > > > > > > `schema
> > > > > data
> > > > > > > =
> > > > > > > > > > > "AUTO_CONSUME" ` represent `AUTO_ CONSUME`, I think it's
> > > > > weird. Why
> > > > > > > > > > > can't we upload negative schema types?
> > > > > > > > > > >
> > > > > > > > > > > > It does not affect the public API
> > > > > > > > > > > upload negative schema types only changes the proto, if 
> > > > > > > > > > > using
> > > > > > > `schema
> > > > > > > > > > > type = NONE` and `schema data = "AUTO_CONSUME" `, every
> > > > > different
> > > > > > > > > > > language client will code the special logic. This special
> > > > > logic can
> > > > > > > > > > > easily be ignored.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Bo
> > > > > > > > > > >
> > > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2023年1月4日周三 
> > > > > > > > > > > 17:02写道:
> > > > > > > > > > > >
> > > > > > > > > > > > I opened a PR to fix this issue:
> > > > > > > > > > https://github.com/apache/pulsar/pull/19128
> > > > > > > > > > > >
> > > > > > > > > > > > It does not affect the public API so it can be 
> > > > > > > > > > > > cherry-picked
> > > > > > > into old
> > > > > > > > > > > > branches. The main difference with this proposal is 
> > > > > > > > > > > > that my
> > > > > > > solution
> > > > > > > > > > > > carries the identity info (i.e. `AUTO_CONSUME`) in the 
> > > > > > > > > > > > schema
> > > > > > > data,
> > > > > > > > > > > > which is a byte array. The negative schema types should 
> > > > > > > > > > > > not
> > > > > be
> > > > > > > exposed
> > > > > > > > > > > > to users. Adding a field to the subscribe request might 
> > > > > > > > > > > > be
> > > > > okay
> > > > > > > but it
> > > > > > > > > > > > could be unnecessary to cover such a corner case.
> > > > > > > > > > > >
> > > > > > > > > > > > It might be controversial if schema data should be used 
> > > > > > > > > > > > in
> > > > > such
> > > > > > > a way,
> > > > > > > > > > > > because the original purpose is to represent the AVRO 
> > > > > > > > > > > > or JSON
> > > > > > > > > > > > definition. However, this semantic is defined just for 
> > > > > > > > > > > > AVRO
> > > > > or
> > > > > > > JSON
> > > > > > > > > > > > schema. IMO, the data field of other schemas is never 
> > > > > > > > > > > > used
> > > > > well.
> > > > > > > > > > > >
> > > > > > > > > > > > Another solution is to make use of the name field of 
> > > > > > > > > > > > schema,
> > > > > > > which
> > > > > > > > > > > > might be more natural. I think we can continue the
> > > > > discussion in
> > > > > > > my
> > > > > > > > > > > > PR.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Yunze
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Jan 4, 2023 at 11:07 AM Yunze Xu <
> > > > > y...@streamnative.io>
> > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > Modifying the subscribe request is better than 
> > > > > > > > > > > > > exposing
> > > > > > > AUTO_CONSUME
> > > > > > > > > > > > > schema type IMO. The negative value of a schema type, 
> > > > > > > > > > > > > like
> > > > > > > BYTES,
> > > > > > > > > > > > > AUTO_PRODUCE, means this schema type should only be 
> > > > > > > > > > > > > used
> > > > > > > internally.
> > > > > > > > > > > > > Adding the negative enum value to the Schema 
> > > > > > > > > > > > > definition in
> > > > > > > > > > > > > PulsarApi.proto looks very weird.
> > > > > > > > > > > > >
> > > > > > > > > > > > > But I'm still wondering if we can avoid the API 
> > > > > > > > > > > > > changes. I
> > > > > > > will look
> > > > > > > > > > > > > deeper into this issue.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Yunze
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Wed, Jan 4, 2023 at 12:12 AM Enrico Olivelli <
> > > > > > > eolive...@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Il Mar 3 Gen 2023, 14:37 Yunze Xu
> > > > > > > <y...@streamnative.io.invalid>
> > > > > > > > > > ha scritto:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi Bo,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I got it now. The PIP title sounds ambiguous. 
> > > > > > > > > > > > > > > Using the
> > > > > > > term
> > > > > > > > > > "Upload
> > > > > > > > > > > > > > > xxx SchemaType" sounds like uploading the schema 
> > > > > > > > > > > > > > > into
> > > > > the
> > > > > > > > > > registry.
> > > > > > > > > > > > > > > Instead, it should be "carrying schema in the 
> > > > > > > > > > > > > > > request
> > > > > when
> > > > > > > > > > subscribing
> > > > > > > > > > > > > > > with AUTO_CONSUME schema".
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I agree that we should change the naming and we 
> > > > > > > > > > > > > > should
> > > > > > > probably
> > > > > > > > > > not use a
> > > > > > > > > > > > > > new Schema type but add an optional field in the
> > > > > subscribe
> > > > > > > request
> > > > > > > > > > (and do
> > > > > > > > > > > > > > not send it if the broker is an old version)
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Enrico
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > Yunze
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Tue, Jan 3, 2023 at 4:56 PM 丛搏 
> > > > > > > > > > > > > > > <bog...@apache.org>
> > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi, Yunze
> > > > > > > > > > > > > > > > > What I am concerned about is that if the old
> > > > > clients
> > > > > > > with
> > > > > > > > > > other
> > > > > > > > > > > > > > > > > schemas (i.e. schema is neither null nor
> > > > > AUTO_CONSUME)
> > > > > > > > > > subscribe to
> > > > > > > > > > > > > > > > > the topic with AUTO_CONSUME schema, what will
> > > > > happen?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > AUTO_CONSUME schema will not store in
> > > > > > > > > > `SchemaRegistryServiceImpl`, it
> > > > > > > > > > > > > > > > only represents one consumer with AUTO_CONSUME
> > > > > schema to
> > > > > > > > > > subscribe to
> > > > > > > > > > > > > > > > a topic. If old clients with other schemas 
> > > > > > > > > > > > > > > > subscribe
> > > > > to
> > > > > > > this
> > > > > > > > > > topic,
> > > > > > > > > > > > > > > > Its behavior will not be changed by this PIP.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > What's the schema compatibility check rule on 
> > > > > > > > > > > > > > > > > a
> > > > > topic
> > > > > > > with
> > > > > > > > > > > > > > > AUTO_CONSUME schema?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > it's only the consumer schema compatibility 
> > > > > > > > > > > > > > > > check,
> > > > > not on
> > > > > > > > > > topic. if a
> > > > > > > > > > > > > > > > consume with AUTO_CONSUME schema will do any
> > > > > > > compatibility
> > > > > > > > > > check
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > Bo
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 
> > > > > > > > > > > > > > > > 于2023年1月3日周二
> > > > > > > 10:16写道:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > What I am concerned about is that if the old
> > > > > clients
> > > > > > > with
> > > > > > > > > > other
> > > > > > > > > > > > > > > > > schemas (i.e. schema is neither null nor
> > > > > AUTO_CONSUME)
> > > > > > > > > > subscribe to
> > > > > > > > > > > > > > > > > the topic with AUTO_CONSUME schema, what will
> > > > > happen?
> > > > > > > What's
> > > > > > > > > > the
> > > > > > > > > > > > > > > > > schema compatibility check rule on a topic 
> > > > > > > > > > > > > > > > > with
> > > > > > > AUTO_CONSUME
> > > > > > > > > > schema?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > Yunze
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Mon, Jan 2, 2023 at 12:38 AM SiNan Liu <
> > > > > > > > > > liusinan1...@gmail.com>
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > 1.Schema.Type and
> > > > > > > > > > org.apache.pulsar.common.schema.SchemaType value
> > > > > > > > > > > > > > > should
> > > > > > > > > > > > > > > > > > be the same.
> > > > > > > > > > > > > > > > > > 2.These changes do not affect produce and 
> > > > > > > > > > > > > > > > > > are
> > > > > only
> > > > > > > affect
> > > > > > > > > > consumer
> > > > > > > > > > > > > > > > > > subscribe behavior.
> > > > > > > > > > > > > > > > > > 3.backward compatibility:
> > > > > > > > > > > > > > > > > > (1)In
> > > > > > > > > > org.apache.pulsar.broker.service.ServerCnx#handleSubscribe.
> > > > > > > > > > > > > > > > > > if (schema != null && schema.getType() !=
> > > > > > > > > > SchemaType.AUTO_CONSUME) {
> > > > > > > > > > > > > > > > > > return
> > > > > topic.addSchemaIfIdleOrCheckCompatible(schema)
> > > > > > > > > > > > > > > > > > .thenCompose(v -> topic.subscribe(option));
> > > > > > > > > > > > > > > > > > } else {
> > > > > > > > > > > > > > > > > > return topic.subscribe(option);
> > > > > > > > > > > > > > > > > > }
> > > > > > > > > > > > > > > > > > For the older pulsar client, the schema is 
> > > > > > > > > > > > > > > > > > null
> > > > > if
> > > > > > > > > > AUTO_CONSUME
> > > > > > > > > > > > > > > consumer
> > > > > > > > > > > > > > > > > > subscribe to the Topic.
> > > > > > > > > > > > > > > > > > For the new pulsar client, if AUTO_CONSUME
> > > > > consumer
> > > > > > > > > > subscribe the
> > > > > > > > > > > > > > > Topic,
> > > > > > > > > > > > > > > > > > then schema is not null and 
> > > > > > > > > > > > > > > > > > schema.getType() =
> > > > > > > > > > > > > > > SchemaType.AUTO_CONSUME.
> > > > > > > > > > > > > > > > > > Both new and old pulsar clients consume the
> > > > > topic,
> > > > > > > will
> > > > > > > > > > return topic.
> > > > > > > > > > > > > > > > > > subscribe(option).
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > (2)In
> > > > > > > > > > org.apache.pulsar.broker.service.persistent.PersistentTopic
> > > > > > > > > > > > > > > > > > #addSchemaIfIdleOrCheckCompatible.
> > > > > > > > > > > > > > > > > > @Override
> > > > > > > > > > > > > > > > > > public CompletableFuture<Void>
> > > > > > > > > > > > > > > addSchemaIfIdleOrCheckCompatible(SchemaData
> > > > > > > > > > > > > > > > > > schema) {
> > > > > > > > > > > > > > > > > > return hasSchema().thenCompose((hasSchema) 
> > > > > > > > > > > > > > > > > > -> {
> > > > > > > > > > > > > > > > > > int numActiveConsumersWithoutAutoSchema =
> > > > > > > > > > > > > > > subscriptions.values().stream()
> > > > > > > > > > > > > > > > > > .mapToInt(subscription ->
> > > > > > > > > > subscription.getConsumers().stream()
> > > > > > > > > > > > > > > > > > .filter(consumer -> 
> > > > > > > > > > > > > > > > > > consumer.getSchemaType() !=
> > > > > > > > > > > > > > > SchemaType.AUTO_CONSUME)
> > > > > > > > > > > > > > > > > > .toList().size())
> > > > > > > > > > > > > > > > > > .sum();
> > > > > > > > > > > > > > > > > > if (hasSchema
> > > > > > > > > > > > > > > > > > || (!producers.isEmpty())
> > > > > > > > > > > > > > > > > > || (numActiveConsumersWithoutAutoSchema != 
> > > > > > > > > > > > > > > > > > 0)
> > > > > > > > > > > > > > > > > > || (ledger.getTotalSize() != 0)) {
> > > > > > > > > > > > > > > > > > return 
> > > > > > > > > > > > > > > > > > checkSchemaCompatibleForConsumer(schema);
> > > > > > > > > > > > > > > > > > } else {
> > > > > > > > > > > > > > > > > > return
> > > > > addSchema(schema).thenCompose(schemaVersion ->
> > > > > > > > > > > > > > > > > > CompletableFuture.completedFuture(null));
> > > > > > > > > > > > > > > > > > }
> > > > > > > > > > > > > > > > > > });
> > > > > > > > > > > > > > > > > > }
> > > > > > > > > > > > > > > > > > Only in one case will there be a bug.
> > > > > > > > > > > > > > > > > > First, the old pulsar client consume the 
> > > > > > > > > > > > > > > > > > empty
> > > > > > > topic, the
> > > > > > > > > > consumer
> > > > > > > > > > > > > > > schema
> > > > > > > > > > > > > > > > > > is AUTO_CONSUME, and then whether the new 
> > > > > > > > > > > > > > > > > > or old
> > > > > > > pulsar
> > > > > > > > > > client
> > > > > > > > > > > > > > > consume(i.e.
> > > > > > > > > > > > > > > > > > schema is AVRO) the topic.
> > > > > > > > > > > > > > > > > > The broker will return the error message as
> > > > > > > > > > > > > > > IncompatibleSchemaException ("
> > > > > > > > > > > > > > > > > > Topic does not have a schema to check "). 
> > > > > > > > > > > > > > > > > > The
> > > > > bug at
> > > > > > > > > > issue17354 is
> > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > > > fixed in this case.
> > > > > > > > > > > > > > > > > > All the other cases will be normal.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid>
> > > > > > > 于2022年12月31日周六
> > > > > > > > > > 20:23写道:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Defining `AutoConsume` as -3 is somehow
> > > > > strange.
> > > > > > > Could
> > > > > > > > > > you clarify
> > > > > > > > > > > > > > > if
> > > > > > > > > > > > > > > > > > > backward compatibility is guaranteed? 
> > > > > > > > > > > > > > > > > > > i.e. if
> > > > > the
> > > > > > > new
> > > > > > > > > > Pulsar client
> > > > > > > > > > > > > > > > > > > uploaded the AUTO_CONSUME schema to the 
> > > > > > > > > > > > > > > > > > > broker,
> > > > > > > can the
> > > > > > > > > > old Pulsar
> > > > > > > > > > > > > > > > > > > clients produce or consume the same topic
> > > > > anymore?
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > Yunze
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > On Fri, Dec 30, 2022 at 11:32 PM 思楠刘 <
> > > > > > > > > > liusinan1...@gmail.com>
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > I made a PIP to discuss:
> > > > > > > > > > > > > > > https://github.com/apache/pulsar/issues/19113.
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > Sinan
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > >
> > > > > > >
> > > > >

Reply via email to