Hi, Yunze
> What I am concerned about is that if the old clients with other
> schemas (i.e. schema is neither null nor AUTO_CONSUME) subscribe to
> the topic with AUTO_CONSUME schema, what will happen?

AUTO_CONSUME schema will not store in `SchemaRegistryServiceImpl`, it
only represents one consumer with AUTO_CONSUME schema to subscribe to
a topic. If old clients with other schemas subscribe to this topic,
Its behavior will not be changed by this PIP.

> What's the schema compatibility check rule on a topic with AUTO_CONSUME 
> schema?

it's only the consumer schema compatibility check, not on topic. if a
consume with AUTO_CONSUME schema will do any compatibility check

Thanks,
Bo

Yunze Xu <y...@streamnative.io.invalid> 于2023年1月3日周二 10:16写道:
>
> What I am concerned about is that if the old clients with other
> schemas (i.e. schema is neither null nor AUTO_CONSUME) subscribe to
> the topic with AUTO_CONSUME schema, what will happen? What's the
> schema compatibility check rule on a topic with AUTO_CONSUME schema?
>
> Thanks,
> Yunze
>
> On Mon, Jan 2, 2023 at 12:38 AM SiNan Liu <liusinan1...@gmail.com> wrote:
> >
> > 1.Schema.Type and org.apache.pulsar.common.schema.SchemaType value should
> > be the same.
> > 2.These changes do not affect produce and are only affect consumer
> > subscribe behavior.
> > 3.backward compatibility:
> > (1)In org.apache.pulsar.broker.service.ServerCnx#handleSubscribe.
> > if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) {
> > return topic.addSchemaIfIdleOrCheckCompatible(schema)
> > .thenCompose(v -> topic.subscribe(option));
> > } else {
> > return topic.subscribe(option);
> > }
> > For the older pulsar client, the schema is null if AUTO_CONSUME consumer
> > subscribe to the Topic.
> > For the new pulsar client, if AUTO_CONSUME consumer subscribe the Topic,
> > then schema is not null and schema.getType() = SchemaType.AUTO_CONSUME.
> > Both new and old pulsar clients consume the topic, will return topic.
> > subscribe(option).
> >
> > (2)In org.apache.pulsar.broker.service.persistent.PersistentTopic
> > #addSchemaIfIdleOrCheckCompatible.
> > @Override
> > public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData
> > schema) {
> > return hasSchema().thenCompose((hasSchema) -> {
> > int numActiveConsumersWithoutAutoSchema = subscriptions.values().stream()
> > .mapToInt(subscription -> subscription.getConsumers().stream()
> > .filter(consumer -> consumer.getSchemaType() != SchemaType.AUTO_CONSUME)
> > .toList().size())
> > .sum();
> > if (hasSchema
> > || (!producers.isEmpty())
> > || (numActiveConsumersWithoutAutoSchema != 0)
> > || (ledger.getTotalSize() != 0)) {
> > return checkSchemaCompatibleForConsumer(schema);
> > } else {
> > return addSchema(schema).thenCompose(schemaVersion ->
> > CompletableFuture.completedFuture(null));
> > }
> > });
> > }
> > Only in one case will there be a bug.
> > First, the old pulsar client consume the empty topic, the consumer schema
> > is AUTO_CONSUME, and then whether the new or old pulsar client consume(i.e.
> > schema is AVRO) the topic.
> > The broker will return the error message as IncompatibleSchemaException ("
> > Topic does not have a schema to check "). The bug at issue17354 is not
> > fixed in this case.
> > All the other cases will be normal.
> >
> > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月31日周六 20:23写道:
> >
> > > Defining `AutoConsume` as -3 is somehow strange. Could you clarify if
> > > backward compatibility is guaranteed? i.e. if the new Pulsar client
> > > uploaded the AUTO_CONSUME schema to the broker, can the old Pulsar
> > > clients produce or consume the same topic anymore?
> > >
> > > Thanks,
> > > Yunze
> > >
> > > On Fri, Dec 30, 2022 at 11:32 PM 思楠刘 <liusinan1...@gmail.com> wrote:
> > > >
> > > > Hi all,
> > > >
> > > > I made a PIP to discuss: https://github.com/apache/pulsar/issues/19113.
> > > >
> > > > Thanks,
> > > > Sinan
> > >

Reply via email to