You mean add an optional field in the subscribe request, like (optional
bool is_autoconsume =6 [default = false];)?

Enrico Olivelli <eolive...@gmail.com> 于 2023年1月4日周三 上午12:12写道:

> Il Mar 3 Gen 2023, 14:37 Yunze Xu <y...@streamnative.io.invalid> ha
> scritto:
>
> > Hi Bo,
> >
> > I got it now. The PIP title sounds ambiguous. Using the term "Upload
> > xxx SchemaType" sounds like uploading the schema into the registry.
> > Instead, it should be "carrying schema in the request when subscribing
> > with AUTO_CONSUME schema".
> >
>
>
> I agree that we should change the naming and we should probably not use a
> new Schema type but add an optional field in the subscribe request (and do
> not send it if the broker is an old version)
>
>
> Enrico
>
>
>
> > Thanks,
> > Yunze
> >
> > On Tue, Jan 3, 2023 at 4:56 PM 丛搏 <bog...@apache.org> wrote:
> > >
> > > Hi, Yunze
> > > > What I am concerned about is that if the old clients with other
> > > > schemas (i.e. schema is neither null nor AUTO_CONSUME) subscribe to
> > > > the topic with AUTO_CONSUME schema, what will happen?
> > >
> > > AUTO_CONSUME schema will not store in `SchemaRegistryServiceImpl`, it
> > > only represents one consumer with AUTO_CONSUME schema to subscribe to
> > > a topic. If old clients with other schemas subscribe to this topic,
> > > Its behavior will not be changed by this PIP.
> > >
> > > > What's the schema compatibility check rule on a topic with
> > AUTO_CONSUME schema?
> > >
> > > it's only the consumer schema compatibility check, not on topic. if a
> > > consume with AUTO_CONSUME schema will do any compatibility check
> > >
> > > Thanks,
> > > Bo
> > >
> > > Yunze Xu <y...@streamnative.io.invalid> 于2023年1月3日周二 10:16写道:
> > > >
> > > > What I am concerned about is that if the old clients with other
> > > > schemas (i.e. schema is neither null nor AUTO_CONSUME) subscribe to
> > > > the topic with AUTO_CONSUME schema, what will happen? What's the
> > > > schema compatibility check rule on a topic with AUTO_CONSUME schema?
> > > >
> > > > Thanks,
> > > > Yunze
> > > >
> > > > On Mon, Jan 2, 2023 at 12:38 AM SiNan Liu <liusinan1...@gmail.com>
> > wrote:
> > > > >
> > > > > 1.Schema.Type and org.apache.pulsar.common.schema.SchemaType value
> > should
> > > > > be the same.
> > > > > 2.These changes do not affect produce and are only affect consumer
> > > > > subscribe behavior.
> > > > > 3.backward compatibility:
> > > > > (1)In org.apache.pulsar.broker.service.ServerCnx#handleSubscribe.
> > > > > if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME)
> {
> > > > > return topic.addSchemaIfIdleOrCheckCompatible(schema)
> > > > > .thenCompose(v -> topic.subscribe(option));
> > > > > } else {
> > > > > return topic.subscribe(option);
> > > > > }
> > > > > For the older pulsar client, the schema is null if AUTO_CONSUME
> > consumer
> > > > > subscribe to the Topic.
> > > > > For the new pulsar client, if AUTO_CONSUME consumer subscribe the
> > Topic,
> > > > > then schema is not null and schema.getType() =
> > SchemaType.AUTO_CONSUME.
> > > > > Both new and old pulsar clients consume the topic, will return
> topic.
> > > > > subscribe(option).
> > > > >
> > > > > (2)In org.apache.pulsar.broker.service.persistent.PersistentTopic
> > > > > #addSchemaIfIdleOrCheckCompatible.
> > > > > @Override
> > > > > public CompletableFuture<Void>
> > addSchemaIfIdleOrCheckCompatible(SchemaData
> > > > > schema) {
> > > > > return hasSchema().thenCompose((hasSchema) -> {
> > > > > int numActiveConsumersWithoutAutoSchema =
> > subscriptions.values().stream()
> > > > > .mapToInt(subscription -> subscription.getConsumers().stream()
> > > > > .filter(consumer -> consumer.getSchemaType() !=
> > SchemaType.AUTO_CONSUME)
> > > > > .toList().size())
> > > > > .sum();
> > > > > if (hasSchema
> > > > > || (!producers.isEmpty())
> > > > > || (numActiveConsumersWithoutAutoSchema != 0)
> > > > > || (ledger.getTotalSize() != 0)) {
> > > > > return checkSchemaCompatibleForConsumer(schema);
> > > > > } else {
> > > > > return addSchema(schema).thenCompose(schemaVersion ->
> > > > > CompletableFuture.completedFuture(null));
> > > > > }
> > > > > });
> > > > > }
> > > > > Only in one case will there be a bug.
> > > > > First, the old pulsar client consume the empty topic, the consumer
> > schema
> > > > > is AUTO_CONSUME, and then whether the new or old pulsar client
> > consume(i.e.
> > > > > schema is AVRO) the topic.
> > > > > The broker will return the error message as
> > IncompatibleSchemaException ("
> > > > > Topic does not have a schema to check "). The bug at issue17354 is
> > not
> > > > > fixed in this case.
> > > > > All the other cases will be normal.
> > > > >
> > > > > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月31日周六 20:23写道:
> > > > >
> > > > > > Defining `AutoConsume` as -3 is somehow strange. Could you
> clarify
> > if
> > > > > > backward compatibility is guaranteed? i.e. if the new Pulsar
> client
> > > > > > uploaded the AUTO_CONSUME schema to the broker, can the old
> Pulsar
> > > > > > clients produce or consume the same topic anymore?
> > > > > >
> > > > > > Thanks,
> > > > > > Yunze
> > > > > >
> > > > > > On Fri, Dec 30, 2022 at 11:32 PM 思楠刘 <liusinan1...@gmail.com>
> > wrote:
> > > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I made a PIP to discuss:
> > https://github.com/apache/pulsar/issues/19113.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Sinan
> > > > > >
> >
>

Reply via email to