I found a similar compatibility problem with my closed PR. We should not set the `Schema` field for AUTO_CONSUME schema. More explanations can be found here [1].
Instead, we can add an optional field into CommandSubscribe to indicate the schema compatibility check is skipped. ```protobuf optional bool check_schema_compatibility = 20 [default = true] ``` Then we can add a relative parameter here: ```java CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema, boolean checkSchemaCompatibility); ``` [1] https://github.com/apache/pulsar/pull/17449#issuecomment-1371135700 Thanks, Yunze On Wed, Jan 4, 2023 at 11:49 PM Yunze Xu <y...@streamnative.io> wrote: > > Could you also update the PIP issue? This solution is totally > different from your original proposal. Since it still introduces > changes to `PulsarApi.proto`, it also requires a PIP (we can reuse > this one). > > ---- > > BTW, I tested again about carrying the SchemaInfo in the > CommandSubscribe request. It could break compatibility. Given the > following code run against Pulsar standalone 2.8.4: > > ```java > PulsarClient client = > PulsarClient.builder().serviceUrl(serviceUrl).build(); > Consumer<GenericRecord> consumer = > client.newConsumer(Schema.AUTO_CONSUME()) > .topic(topic) > .subscriptionName("sub") > .subscriptionType(SubscriptionType.Shared) > .subscribe(); > Producer<User> producer = client.newProducer(Schema.AVRO(User.class)) > .topic(topic) > .create(); > ``` > > - If the schema type is 0 in CommandSubscribe, the NONE schema will be > persisted and the producer will fail to create due to the schema > compatibility check. > - If the schema type is -3 (AUTO_CONSUME), it will fail at subscribe() > with the following error: > > ``` > 23:49:10.978 [pulsar-io-18-13] WARN > org.apache.pulsar.broker.service.ServerCnx - [/172.23.160.1:5921] Got > exception java.lang.IllegalStateException: Some required fields are > missing > at > org.apache.pulsar.common.api.proto.Schema.checkRequiredFields(Schema.java:337) > at > org.apache.pulsar.common.api.proto.Schema.parseFrom(Schema.java:332) > at > org.apache.pulsar.common.api.proto.CommandSubscribe.parseFrom(CommandSubscribe.java:785) > at > org.apache.pulsar.common.api.proto.BaseCommand.parseFrom(BaseCommand.java:2397) > ``` > > Thanks, > Yunze > > > On Wed, Jan 4, 2023 at 10:34 PM SiNan Liu <liusinan1...@gmail.com> wrote: > > > > I just implemented add an optional field in the subscribe request and > > compatibility seems to be fine. You guys can have a look at my PR ( > > https://github.com/apache/pulsar/pull/17449). > > > > Yunze Xu <y...@streamnative.io.invalid> 于2023年1月4日周三 21:31写道: > > > > > > Why can't we upload negative schema types? > > > > > > I want to avoid the changes to existing methods like > > > Commands#getSchemaType, which converts all negative schema types to > > > NONE: > > > > > > ```java > > > private static Schema.Type getSchemaType(SchemaType type) { > > > if (type.getValue() < 0) { > > > return Schema.Type.None; > > > } else { > > > return Schema.Type.valueOf(type.getValue()); > > > } > > > } > > > ``` > > > > > > I guess the above code was written because: > > > 1. NONE schema type means it's not uploaded into the registry. (See #3940 > > > [1]) > > > 2. There is no existing schema that uses NONE as its schema type, i.e. > > > NONE schema is used as something special. > > > > > > > every different language client will code the special logic. > > > > > > If other clients follow the behavior of the Java client, they should > > > also convert negative schemas to NONE currently. Therefore, changes > > > cannot be avoided. No matter if the semantic of `setSchemaType` is > > > changed, they should follow the Java implementation as well. > > > > > > > This will change the meaning of the schema data field > > > > > > The existing definition only defines its meaning to the AVRO and JSON > > > schema. But from a more general view, the schema data should be > > > something associated with the current schema. Giving it more meaning > > > for other schema types is acceptable IMO. For example, the schema data > > > field represents the serialized Protobuf descriptor in Protobuf Native > > > schema, see `ProtobufNativeSchema#of`: > > > > > > ```java > > > .schema(ProtobufNativeSchemaUtils.serialize(descriptor)) > > > ``` > > > > > > [1] https://github.com/apache/pulsar/pull/3940 > > > > > > Thanks, > > > Yunze > > > > > > On Wed, Jan 4, 2023 at 8:27 PM 丛搏 <bog...@apache.org> wrote: > > > > > > > > > It does not affect the public API so it can be cherry-picked into old > > > > > branches. The main difference with this proposal is that my solution > > > > > carries the identity info (i.e. `AUTO_CONSUME`) in the schema data, > > > > > which is a byte array. The negative schema types should not be exposed > > > > > to users. Adding a field to the subscribe request might be okay but it > > > > > could be unnecessary to cover such a corner case. > > > > > > > > This will change the meaning of the schema data field and couple the > > > > schema type and schema data. `schema type = NONE` and `schema data = > > > > "AUTO_CONSUME" ` represent `AUTO_ CONSUME`, I think it's weird. Why > > > > can't we upload negative schema types? > > > > > > > > > It does not affect the public API > > > > upload negative schema types only changes the proto, if using `schema > > > > type = NONE` and `schema data = "AUTO_CONSUME" `, every different > > > > language client will code the special logic. This special logic can > > > > easily be ignored. > > > > > > > > Thanks, > > > > Bo > > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2023年1月4日周三 17:02写道: > > > > > > > > > > I opened a PR to fix this issue: > > > https://github.com/apache/pulsar/pull/19128 > > > > > > > > > > It does not affect the public API so it can be cherry-picked into old > > > > > branches. The main difference with this proposal is that my solution > > > > > carries the identity info (i.e. `AUTO_CONSUME`) in the schema data, > > > > > which is a byte array. The negative schema types should not be exposed > > > > > to users. Adding a field to the subscribe request might be okay but it > > > > > could be unnecessary to cover such a corner case. > > > > > > > > > > It might be controversial if schema data should be used in such a way, > > > > > because the original purpose is to represent the AVRO or JSON > > > > > definition. However, this semantic is defined just for AVRO or JSON > > > > > schema. IMO, the data field of other schemas is never used well. > > > > > > > > > > Another solution is to make use of the name field of schema, which > > > > > might be more natural. I think we can continue the discussion in my > > > > > PR. > > > > > > > > > > Thanks, > > > > > Yunze > > > > > > > > > > On Wed, Jan 4, 2023 at 11:07 AM Yunze Xu <y...@streamnative.io> wrote: > > > > > > > > > > > > Modifying the subscribe request is better than exposing AUTO_CONSUME > > > > > > schema type IMO. The negative value of a schema type, like BYTES, > > > > > > AUTO_PRODUCE, means this schema type should only be used internally. > > > > > > Adding the negative enum value to the Schema definition in > > > > > > PulsarApi.proto looks very weird. > > > > > > > > > > > > But I'm still wondering if we can avoid the API changes. I will look > > > > > > deeper into this issue. > > > > > > > > > > > > Thanks, > > > > > > Yunze > > > > > > > > > > > > On Wed, Jan 4, 2023 at 12:12 AM Enrico Olivelli > > > > > > <eolive...@gmail.com> > > > wrote: > > > > > > > > > > > > > > Il Mar 3 Gen 2023, 14:37 Yunze Xu <y...@streamnative.io.invalid> > > > ha scritto: > > > > > > > > > > > > > > > Hi Bo, > > > > > > > > > > > > > > > > I got it now. The PIP title sounds ambiguous. Using the term > > > "Upload > > > > > > > > xxx SchemaType" sounds like uploading the schema into the > > > registry. > > > > > > > > Instead, it should be "carrying schema in the request when > > > subscribing > > > > > > > > with AUTO_CONSUME schema". > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I agree that we should change the naming and we should probably > > > not use a > > > > > > > new Schema type but add an optional field in the subscribe request > > > (and do > > > > > > > not send it if the broker is an old version) > > > > > > > > > > > > > > > > > > > > > Enrico > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Yunze > > > > > > > > > > > > > > > > On Tue, Jan 3, 2023 at 4:56 PM 丛搏 <bog...@apache.org> wrote: > > > > > > > > > > > > > > > > > > Hi, Yunze > > > > > > > > > > What I am concerned about is that if the old clients with > > > other > > > > > > > > > > schemas (i.e. schema is neither null nor AUTO_CONSUME) > > > subscribe to > > > > > > > > > > the topic with AUTO_CONSUME schema, what will happen? > > > > > > > > > > > > > > > > > > AUTO_CONSUME schema will not store in > > > `SchemaRegistryServiceImpl`, it > > > > > > > > > only represents one consumer with AUTO_CONSUME schema to > > > subscribe to > > > > > > > > > a topic. If old clients with other schemas subscribe to this > > > topic, > > > > > > > > > Its behavior will not be changed by this PIP. > > > > > > > > > > > > > > > > > > > What's the schema compatibility check rule on a topic with > > > > > > > > AUTO_CONSUME schema? > > > > > > > > > > > > > > > > > > it's only the consumer schema compatibility check, not on > > > topic. if a > > > > > > > > > consume with AUTO_CONSUME schema will do any compatibility > > > check > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Bo > > > > > > > > > > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2023年1月3日周二 10:16写道: > > > > > > > > > > > > > > > > > > > > What I am concerned about is that if the old clients with > > > other > > > > > > > > > > schemas (i.e. schema is neither null nor AUTO_CONSUME) > > > subscribe to > > > > > > > > > > the topic with AUTO_CONSUME schema, what will happen? What's > > > the > > > > > > > > > > schema compatibility check rule on a topic with AUTO_CONSUME > > > schema? > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > Yunze > > > > > > > > > > > > > > > > > > > > On Mon, Jan 2, 2023 at 12:38 AM SiNan Liu < > > > liusinan1...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > 1.Schema.Type and > > > org.apache.pulsar.common.schema.SchemaType value > > > > > > > > should > > > > > > > > > > > be the same. > > > > > > > > > > > 2.These changes do not affect produce and are only affect > > > consumer > > > > > > > > > > > subscribe behavior. > > > > > > > > > > > 3.backward compatibility: > > > > > > > > > > > (1)In > > > org.apache.pulsar.broker.service.ServerCnx#handleSubscribe. > > > > > > > > > > > if (schema != null && schema.getType() != > > > SchemaType.AUTO_CONSUME) { > > > > > > > > > > > return topic.addSchemaIfIdleOrCheckCompatible(schema) > > > > > > > > > > > .thenCompose(v -> topic.subscribe(option)); > > > > > > > > > > > } else { > > > > > > > > > > > return topic.subscribe(option); > > > > > > > > > > > } > > > > > > > > > > > For the older pulsar client, the schema is null if > > > AUTO_CONSUME > > > > > > > > consumer > > > > > > > > > > > subscribe to the Topic. > > > > > > > > > > > For the new pulsar client, if AUTO_CONSUME consumer > > > subscribe the > > > > > > > > Topic, > > > > > > > > > > > then schema is not null and schema.getType() = > > > > > > > > SchemaType.AUTO_CONSUME. > > > > > > > > > > > Both new and old pulsar clients consume the topic, will > > > return topic. > > > > > > > > > > > subscribe(option). > > > > > > > > > > > > > > > > > > > > > > (2)In > > > org.apache.pulsar.broker.service.persistent.PersistentTopic > > > > > > > > > > > #addSchemaIfIdleOrCheckCompatible. > > > > > > > > > > > @Override > > > > > > > > > > > public CompletableFuture<Void> > > > > > > > > addSchemaIfIdleOrCheckCompatible(SchemaData > > > > > > > > > > > schema) { > > > > > > > > > > > return hasSchema().thenCompose((hasSchema) -> { > > > > > > > > > > > int numActiveConsumersWithoutAutoSchema = > > > > > > > > subscriptions.values().stream() > > > > > > > > > > > .mapToInt(subscription -> > > > subscription.getConsumers().stream() > > > > > > > > > > > .filter(consumer -> consumer.getSchemaType() != > > > > > > > > SchemaType.AUTO_CONSUME) > > > > > > > > > > > .toList().size()) > > > > > > > > > > > .sum(); > > > > > > > > > > > if (hasSchema > > > > > > > > > > > || (!producers.isEmpty()) > > > > > > > > > > > || (numActiveConsumersWithoutAutoSchema != 0) > > > > > > > > > > > || (ledger.getTotalSize() != 0)) { > > > > > > > > > > > return checkSchemaCompatibleForConsumer(schema); > > > > > > > > > > > } else { > > > > > > > > > > > return addSchema(schema).thenCompose(schemaVersion -> > > > > > > > > > > > CompletableFuture.completedFuture(null)); > > > > > > > > > > > } > > > > > > > > > > > }); > > > > > > > > > > > } > > > > > > > > > > > Only in one case will there be a bug. > > > > > > > > > > > First, the old pulsar client consume the empty topic, the > > > consumer > > > > > > > > schema > > > > > > > > > > > is AUTO_CONSUME, and then whether the new or old pulsar > > > client > > > > > > > > consume(i.e. > > > > > > > > > > > schema is AVRO) the topic. > > > > > > > > > > > The broker will return the error message as > > > > > > > > IncompatibleSchemaException (" > > > > > > > > > > > Topic does not have a schema to check "). The bug at > > > issue17354 is > > > > > > > > not > > > > > > > > > > > fixed in this case. > > > > > > > > > > > All the other cases will be normal. > > > > > > > > > > > > > > > > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月31日周六 > > > 20:23写道: > > > > > > > > > > > > > > > > > > > > > > > Defining `AutoConsume` as -3 is somehow strange. Could > > > you clarify > > > > > > > > if > > > > > > > > > > > > backward compatibility is guaranteed? i.e. if the new > > > Pulsar client > > > > > > > > > > > > uploaded the AUTO_CONSUME schema to the broker, can the > > > old Pulsar > > > > > > > > > > > > clients produce or consume the same topic anymore? > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Yunze > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Dec 30, 2022 at 11:32 PM 思楠刘 < > > > liusinan1...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > > > > > I made a PIP to discuss: > > > > > > > > https://github.com/apache/pulsar/issues/19113. > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > Sinan > > > > > > > > > > > > > > > > > > > > > > >