It's not related to the schema itself. When an AUTO_CONSUME consumer
subscribes to a topic, the option tells the broker that it's an
AUTO_CONSUME consumer so that the broker should not treat it as an
active consumer when performing schema compatibility check. If there
is a consumer that also wants to ignore the schema compatibility check
in future, this option can be reused.

The other important reason is the breaking change by carrying the
schema info on an AUTO_CONSUMER consumer. (See my explanations in
GitHub and the mail list) If the consumer serves an old version
consumer, the schema could be uploaded into the registry and other
clients would be affected. So we should keep not carrying the schema
info in CommandSubscribe for an AUTO_CONSUMER consumer.

Thanks,
Yunze

On Thu, Jan 5, 2023 at 11:55 AM SiNan Liu <liusinan1...@gmail.com> wrote:
>
> I have modified pip issue and title last night. Yunze. You mean that in
> PulsarApi.proto, take `optional bool is_auto_consume_schema = 6 [default =
> false]; ` in CommandSubscribe instead of Schema? But shouldn't
> schema-related stuff be in Schema?
>
> Thanks,
> Sinan
>
> Yunze Xu <y...@streamnative.io.invalid> 于 2023年1月5日周四 上午12:31写道:
>
> > I found a similar compatibility problem with my closed PR. We should
> > not set the `Schema` field for AUTO_CONSUME schema. More explanations
> > can be found here [1].
> >
> > Instead, we can add an optional field into CommandSubscribe to
> > indicate the schema compatibility check is skipped.
> >
> > ```protobuf
> > optional bool check_schema_compatibility = 20 [default = true]
> > ```
> >
> > Then we can add a relative parameter here:
> >
> > ```java
> > CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData
> > schema, boolean checkSchemaCompatibility);
> > ```
> >
> >
> > [1] https://github.com/apache/pulsar/pull/17449#issuecomment-1371135700
> >
> > Thanks,
> > Yunze
> >
> > On Wed, Jan 4, 2023 at 11:49 PM Yunze Xu <y...@streamnative.io> wrote:
> > >
> > > Could you also update the PIP issue? This solution is totally
> > > different from your original proposal. Since it still introduces
> > > changes to `PulsarApi.proto`, it also requires a PIP (we can reuse
> > > this one).
> > >
> > > ----
> > >
> > > BTW, I tested again about carrying the SchemaInfo in the
> > > CommandSubscribe request. It could break compatibility. Given the
> > > following code run against Pulsar standalone 2.8.4:
> > >
> > > ```java
> > >         PulsarClient client =
> > > PulsarClient.builder().serviceUrl(serviceUrl).build();
> > >         Consumer<GenericRecord> consumer =
> > > client.newConsumer(Schema.AUTO_CONSUME())
> > >                 .topic(topic)
> > >                 .subscriptionName("sub")
> > >                 .subscriptionType(SubscriptionType.Shared)
> > >                 .subscribe();
> > >         Producer<User> producer =
> > client.newProducer(Schema.AVRO(User.class))
> > >                 .topic(topic)
> > >                 .create();
> > > ```
> > >
> > > - If the schema type is 0 in CommandSubscribe, the NONE schema will be
> > > persisted and the producer will fail to create due to the schema
> > > compatibility check.
> > > - If the schema type is -3 (AUTO_CONSUME), it will fail at subscribe()
> > > with the following error:
> > >
> > > ```
> > > 23:49:10.978 [pulsar-io-18-13] WARN
> > > org.apache.pulsar.broker.service.ServerCnx - [/172.23.160.1:5921] Got
> > > exception java.lang.IllegalStateException: Some required fields are
> > > missing
> > >         at
> > org.apache.pulsar.common.api.proto.Schema.checkRequiredFields(Schema.java:337)
> > >         at
> > org.apache.pulsar.common.api.proto.Schema.parseFrom(Schema.java:332)
> > >         at
> > org.apache.pulsar.common.api.proto.CommandSubscribe.parseFrom(CommandSubscribe.java:785)
> > >         at
> > org.apache.pulsar.common.api.proto.BaseCommand.parseFrom(BaseCommand.java:2397)
> > > ```
> > >
> > > Thanks,
> > > Yunze
> > >
> > >
> > > On Wed, Jan 4, 2023 at 10:34 PM SiNan Liu <liusinan1...@gmail.com>
> > wrote:
> > > >
> > > > I just implemented add an optional field in the subscribe request and
> > > > compatibility seems to be fine. You guys can have a look at my PR (
> > > > https://github.com/apache/pulsar/pull/17449).
> > > >
> > > > Yunze Xu <y...@streamnative.io.invalid> 于2023年1月4日周三 21:31写道:
> > > >
> > > > > > Why can't we upload negative schema types?
> > > > >
> > > > > I want to avoid the changes to existing methods like
> > > > > Commands#getSchemaType, which converts all negative schema types to
> > > > > NONE:
> > > > >
> > > > > ```java
> > > > > private static Schema.Type getSchemaType(SchemaType type) {
> > > > >     if (type.getValue() < 0) {
> > > > >         return Schema.Type.None;
> > > > >     } else {
> > > > >         return Schema.Type.valueOf(type.getValue());
> > > > >     }
> > > > > }
> > > > > ```
> > > > >
> > > > > I guess the above code was written because:
> > > > > 1. NONE schema type means it's not uploaded into the registry. (See
> > #3940
> > > > > [1])
> > > > > 2. There is no existing schema that uses NONE as its schema type,
> > i.e.
> > > > > NONE schema is used as something special.
> > > > >
> > > > > > every different language client will code the special logic.
> > > > >
> > > > > If other clients follow the behavior of the Java client, they should
> > > > > also convert negative schemas to NONE currently. Therefore, changes
> > > > > cannot be avoided. No matter if the semantic of `setSchemaType` is
> > > > > changed, they should follow the Java implementation as well.
> > > > >
> > > > > > This will change the meaning of the schema data field
> > > > >
> > > > > The existing definition only defines its meaning to the AVRO and JSON
> > > > > schema. But from a more general view, the schema data should be
> > > > > something associated with the current schema. Giving it more meaning
> > > > > for other schema types is acceptable IMO. For example, the schema
> > data
> > > > > field represents the serialized Protobuf descriptor in Protobuf
> > Native
> > > > > schema, see `ProtobufNativeSchema#of`:
> > > > >
> > > > > ```java
> > > > > .schema(ProtobufNativeSchemaUtils.serialize(descriptor))
> > > > > ```
> > > > >
> > > > > [1] https://github.com/apache/pulsar/pull/3940
> > > > >
> > > > > Thanks,
> > > > > Yunze
> > > > >
> > > > > On Wed, Jan 4, 2023 at 8:27 PM 丛搏 <bog...@apache.org> wrote:
> > > > > >
> > > > > > > It does not affect the public API so it can be cherry-picked
> > into old
> > > > > > > branches. The main difference with this proposal is that my
> > solution
> > > > > > > carries the identity info (i.e. `AUTO_CONSUME`) in the schema
> > data,
> > > > > > > which is a byte array. The negative schema types should not be
> > exposed
> > > > > > > to users. Adding a field to the subscribe request might be okay
> > but it
> > > > > > > could be unnecessary to cover such a corner case.
> > > > > >
> > > > > > This will change the meaning of the schema data field and couple
> > the
> > > > > > schema type and schema data. `schema type = NONE` and `schema data
> > =
> > > > > > "AUTO_CONSUME" ` represent `AUTO_ CONSUME`, I think it's weird. Why
> > > > > > can't we upload negative schema types?
> > > > > >
> > > > > > > It does not affect the public API
> > > > > > upload negative schema types only changes the proto, if using
> > `schema
> > > > > > type = NONE` and `schema data = "AUTO_CONSUME" `, every different
> > > > > > language client will code the special logic. This special logic can
> > > > > > easily be ignored.
> > > > > >
> > > > > > Thanks,
> > > > > > Bo
> > > > > >
> > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2023年1月4日周三 17:02写道:
> > > > > > >
> > > > > > > I opened a PR to fix this issue:
> > > > > https://github.com/apache/pulsar/pull/19128
> > > > > > >
> > > > > > > It does not affect the public API so it can be cherry-picked
> > into old
> > > > > > > branches. The main difference with this proposal is that my
> > solution
> > > > > > > carries the identity info (i.e. `AUTO_CONSUME`) in the schema
> > data,
> > > > > > > which is a byte array. The negative schema types should not be
> > exposed
> > > > > > > to users. Adding a field to the subscribe request might be okay
> > but it
> > > > > > > could be unnecessary to cover such a corner case.
> > > > > > >
> > > > > > > It might be controversial if schema data should be used in such
> > a way,
> > > > > > > because the original purpose is to represent the AVRO or JSON
> > > > > > > definition. However, this semantic is defined just for AVRO or
> > JSON
> > > > > > > schema. IMO, the data field of other schemas is never used well.
> > > > > > >
> > > > > > > Another solution is to make use of the name field of schema,
> > which
> > > > > > > might be more natural. I think we can continue the discussion in
> > my
> > > > > > > PR.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Yunze
> > > > > > >
> > > > > > > On Wed, Jan 4, 2023 at 11:07 AM Yunze Xu <y...@streamnative.io>
> > wrote:
> > > > > > > >
> > > > > > > > Modifying the subscribe request is better than exposing
> > AUTO_CONSUME
> > > > > > > > schema type IMO. The negative value of a schema type, like
> > BYTES,
> > > > > > > > AUTO_PRODUCE, means this schema type should only be used
> > internally.
> > > > > > > > Adding the negative enum value to the Schema definition in
> > > > > > > > PulsarApi.proto looks very weird.
> > > > > > > >
> > > > > > > > But I'm still wondering if we can avoid the API changes. I
> > will look
> > > > > > > > deeper into this issue.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Yunze
> > > > > > > >
> > > > > > > > On Wed, Jan 4, 2023 at 12:12 AM Enrico Olivelli <
> > eolive...@gmail.com>
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > Il Mar 3 Gen 2023, 14:37 Yunze Xu
> > <y...@streamnative.io.invalid>
> > > > > ha scritto:
> > > > > > > > >
> > > > > > > > > > Hi Bo,
> > > > > > > > > >
> > > > > > > > > > I got it now. The PIP title sounds ambiguous. Using the
> > term
> > > > > "Upload
> > > > > > > > > > xxx SchemaType" sounds like uploading the schema into the
> > > > > registry.
> > > > > > > > > > Instead, it should be "carrying schema in the request when
> > > > > subscribing
> > > > > > > > > > with AUTO_CONSUME schema".
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > I agree that we should change the naming and we should
> > probably
> > > > > not use a
> > > > > > > > > new Schema type but add an optional field in the subscribe
> > request
> > > > > (and do
> > > > > > > > > not send it if the broker is an old version)
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Enrico
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Yunze
> > > > > > > > > >
> > > > > > > > > > On Tue, Jan 3, 2023 at 4:56 PM 丛搏 <bog...@apache.org>
> > wrote:
> > > > > > > > > > >
> > > > > > > > > > > Hi, Yunze
> > > > > > > > > > > > What I am concerned about is that if the old clients
> > with
> > > > > other
> > > > > > > > > > > > schemas (i.e. schema is neither null nor AUTO_CONSUME)
> > > > > subscribe to
> > > > > > > > > > > > the topic with AUTO_CONSUME schema, what will happen?
> > > > > > > > > > >
> > > > > > > > > > > AUTO_CONSUME schema will not store in
> > > > > `SchemaRegistryServiceImpl`, it
> > > > > > > > > > > only represents one consumer with AUTO_CONSUME schema to
> > > > > subscribe to
> > > > > > > > > > > a topic. If old clients with other schemas subscribe to
> > this
> > > > > topic,
> > > > > > > > > > > Its behavior will not be changed by this PIP.
> > > > > > > > > > >
> > > > > > > > > > > > What's the schema compatibility check rule on a topic
> > with
> > > > > > > > > > AUTO_CONSUME schema?
> > > > > > > > > > >
> > > > > > > > > > > it's only the consumer schema compatibility check, not on
> > > > > topic. if a
> > > > > > > > > > > consume with AUTO_CONSUME schema will do any
> > compatibility
> > > > > check
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Bo
> > > > > > > > > > >
> > > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2023年1月3日周二
> > 10:16写道:
> > > > > > > > > > > >
> > > > > > > > > > > > What I am concerned about is that if the old clients
> > with
> > > > > other
> > > > > > > > > > > > schemas (i.e. schema is neither null nor AUTO_CONSUME)
> > > > > subscribe to
> > > > > > > > > > > > the topic with AUTO_CONSUME schema, what will happen?
> > What's
> > > > > the
> > > > > > > > > > > > schema compatibility check rule on a topic with
> > AUTO_CONSUME
> > > > > schema?
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Yunze
> > > > > > > > > > > >
> > > > > > > > > > > > On Mon, Jan 2, 2023 at 12:38 AM SiNan Liu <
> > > > > liusinan1...@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > 1.Schema.Type and
> > > > > org.apache.pulsar.common.schema.SchemaType value
> > > > > > > > > > should
> > > > > > > > > > > > > be the same.
> > > > > > > > > > > > > 2.These changes do not affect produce and are only
> > affect
> > > > > consumer
> > > > > > > > > > > > > subscribe behavior.
> > > > > > > > > > > > > 3.backward compatibility:
> > > > > > > > > > > > > (1)In
> > > > > org.apache.pulsar.broker.service.ServerCnx#handleSubscribe.
> > > > > > > > > > > > > if (schema != null && schema.getType() !=
> > > > > SchemaType.AUTO_CONSUME) {
> > > > > > > > > > > > > return topic.addSchemaIfIdleOrCheckCompatible(schema)
> > > > > > > > > > > > > .thenCompose(v -> topic.subscribe(option));
> > > > > > > > > > > > > } else {
> > > > > > > > > > > > > return topic.subscribe(option);
> > > > > > > > > > > > > }
> > > > > > > > > > > > > For the older pulsar client, the schema is null if
> > > > > AUTO_CONSUME
> > > > > > > > > > consumer
> > > > > > > > > > > > > subscribe to the Topic.
> > > > > > > > > > > > > For the new pulsar client, if AUTO_CONSUME consumer
> > > > > subscribe the
> > > > > > > > > > Topic,
> > > > > > > > > > > > > then schema is not null and schema.getType() =
> > > > > > > > > > SchemaType.AUTO_CONSUME.
> > > > > > > > > > > > > Both new and old pulsar clients consume the topic,
> > will
> > > > > return topic.
> > > > > > > > > > > > > subscribe(option).
> > > > > > > > > > > > >
> > > > > > > > > > > > > (2)In
> > > > > org.apache.pulsar.broker.service.persistent.PersistentTopic
> > > > > > > > > > > > > #addSchemaIfIdleOrCheckCompatible.
> > > > > > > > > > > > > @Override
> > > > > > > > > > > > > public CompletableFuture<Void>
> > > > > > > > > > addSchemaIfIdleOrCheckCompatible(SchemaData
> > > > > > > > > > > > > schema) {
> > > > > > > > > > > > > return hasSchema().thenCompose((hasSchema) -> {
> > > > > > > > > > > > > int numActiveConsumersWithoutAutoSchema =
> > > > > > > > > > subscriptions.values().stream()
> > > > > > > > > > > > > .mapToInt(subscription ->
> > > > > subscription.getConsumers().stream()
> > > > > > > > > > > > > .filter(consumer -> consumer.getSchemaType() !=
> > > > > > > > > > SchemaType.AUTO_CONSUME)
> > > > > > > > > > > > > .toList().size())
> > > > > > > > > > > > > .sum();
> > > > > > > > > > > > > if (hasSchema
> > > > > > > > > > > > > || (!producers.isEmpty())
> > > > > > > > > > > > > || (numActiveConsumersWithoutAutoSchema != 0)
> > > > > > > > > > > > > || (ledger.getTotalSize() != 0)) {
> > > > > > > > > > > > > return checkSchemaCompatibleForConsumer(schema);
> > > > > > > > > > > > > } else {
> > > > > > > > > > > > > return addSchema(schema).thenCompose(schemaVersion ->
> > > > > > > > > > > > > CompletableFuture.completedFuture(null));
> > > > > > > > > > > > > }
> > > > > > > > > > > > > });
> > > > > > > > > > > > > }
> > > > > > > > > > > > > Only in one case will there be a bug.
> > > > > > > > > > > > > First, the old pulsar client consume the empty
> > topic, the
> > > > > consumer
> > > > > > > > > > schema
> > > > > > > > > > > > > is AUTO_CONSUME, and then whether the new or old
> > pulsar
> > > > > client
> > > > > > > > > > consume(i.e.
> > > > > > > > > > > > > schema is AVRO) the topic.
> > > > > > > > > > > > > The broker will return the error message as
> > > > > > > > > > IncompatibleSchemaException ("
> > > > > > > > > > > > > Topic does not have a schema to check "). The bug at
> > > > > issue17354 is
> > > > > > > > > > not
> > > > > > > > > > > > > fixed in this case.
> > > > > > > > > > > > > All the other cases will be normal.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid>
> > 于2022年12月31日周六
> > > > > 20:23写道:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Defining `AutoConsume` as -3 is somehow strange.
> > Could
> > > > > you clarify
> > > > > > > > > > if
> > > > > > > > > > > > > > backward compatibility is guaranteed? i.e. if the
> > new
> > > > > Pulsar client
> > > > > > > > > > > > > > uploaded the AUTO_CONSUME schema to the broker,
> > can the
> > > > > old Pulsar
> > > > > > > > > > > > > > clients produce or consume the same topic anymore?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > Yunze
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Fri, Dec 30, 2022 at 11:32 PM 思楠刘 <
> > > > > liusinan1...@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I made a PIP to discuss:
> > > > > > > > > > https://github.com/apache/pulsar/issues/19113.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > Sinan
> > > > > > > > > > > > > >
> > > > > > > > > >
> > > > >
> >

Reply via email to