This proposal identifies an important issue that we should definitely
solve. I have some questions.

> When there are no user-created topics under a namespace,
> Namespace should be deleted.

This is supposed to mean that the namespace should be able to be
deleted, correct?

> For this reason, we need to close the system topic reader/producer
> first, then remove the system topic. finally, remove the namespace.

I agree that expanding the protobuf CloseProducer and CloseConsumer
commands could be valuable here. The expansion would ensure that
producers and consumers don't attempt to reconnect. However, I think
we might still have a race condition that could make tenant or
namespace deletion fail. Specifically, if a new producer or consumer
creates a topic after the namespace deletion has started but
before it is complete. Do you agree that the underlying race still exists?

In my view, the fundamental problem here is that deleting certain Pulsar
resources takes time and, in a distributed system, that means race
conditions.

What if we expand our usage of the "terminated" feature to apply to
namespaces (and tenants)? Then, a terminated namespace can have
bundles and topics can be deleted but not created (just as a terminated
topic cannot have any new messages published to it). This would take
care of all topic creation race conditions. We'd probably need to add
new protobuf exceptions for this feature.

Thanks,
Michael


On Sat, Jan 29, 2022 at 7:25 PM Zike Yang
<zky...@streamnative.io.invalid> wrote:
>
> +1
>
>
> Thanks,
> Zike
>
> On Sat, Jan 29, 2022 at 12:30 PM guo jiwei <techno...@apache.org> wrote:
> >
> > Hi
> >  The PIP link : https://github.com/apache/pulsar/issues/13989
> >
> > Regards
> > Jiwei Guo (Tboy)
> >
> >
> > On Sat, Jan 29, 2022 at 11:46 AM mattison chao <mattisonc...@gmail.com>
> > wrote:
> >
> > > Hello everyone,
> > >
> > > I want to start a discussion about PIP-139 : Support Broker send command
> > > to real close producer/consumer.
> > >
> > > This is the PIP document
> > >
> > > https://github.com/apache/pulsar/issues/13989 <
> > > https://github.com/apache/pulsar/issues/13979>
> > >
> > > Please check it out and feel free to share your thoughts.
> > >
> > > Best,
> > > Mattison
> > >
> > >
> > > ———————— Pasted below for quoting convenience.
> > >
> > >
> > >
> > > Relation pull request:  #13337
> > > Authors: @Technoboy-  @mattisonchao
> > >
> > > ## Motivation
> > >
> > > Before we discuss this pip, I'd like to supplement some context to help
> > > contributors who don't want to read the original pull request.
> > >
> > > > When there are no user-created topics under a namespace, Namespace
> > > should be deleted. But currently, the system topic existed and the
> > > reader/producer could auto-create the system which may cause the namespace
> > > deletion to fail.
> > >
> > > For this reason, we need to close the system topic reader/producer first,
> > > then remove the system topic. finally, remove the namespace.
> > >
> > > Following this way, we first want to use ``terminate`` to solve this
> > > problem. then we found producers can disconnect, but consumers are still
> > > alive. So, another PR #13960 wants to add consumers' closing logic.
> > >
> > > After #13960, all things look good, but another problem appears. that is
> > > we need to wait until consumers completely consume all messages (this may
> > > make terminate topic so long and the operation timeout)then get
> > > ``reachedEndOfTopic``. the relative code here :
> > >
> > >
> > > https://github.com/apache/pulsar/blob/07ef9231db8b844586b9217ee2d59237eb9c54b7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java#L102-L106
> > >
> > > In the #13337 case, we need to force close consumers immediately. So we
> > > write this PIP to discuss another way to resolve this problem.
> > >
> > > ## Goal
> > >
> > > We can add a new field(`allow_reconnect`) in command
> > > ``CommandCloseProducer``/ ``CommandCloseConsumer`` to close
> > > producer/consumers immediately.
> > >
> > > ## API Changes
> > >
> > > - Add ``allow_reconnect`` to ``CommandCloseProducer``;
> > >
> > > ```java
> > > **Before**
> > >
> > > message CommandCloseProducer {
> > >     required uint64 producer_id = 1;
> > >     required uint64 request_id = 2;
> > > }
> > >
> > > **After**
> > > message CommandCloseProducer {
> > >     required uint64 producer_id = 1;
> > >     required uint64 request_id = 2;
> > >     optional bool allow_reconnect = 3 [default = true];
> > > }
> > > ```
> > >
> > > - Add ``allow_reconnect`` to ``CommandCloseConsumer``
> > > ```java
> > > **Before**
> > >
> > > message CommandCloseConsumer {
> > >     required uint64 consumer_id = 1;
> > >     required uint64 request_id = 2;
> > > }
> > >
> > > **After**
> > > message CommandCloseConsumer {
> > >     required uint64 consumer_id = 1;
> > >     required uint64 request_id = 2;
> > >     optional bool allow_reconnect = 3 [default = true];
> > > }
> > > ```
> > >
> > > ## Implementation
> > >
> > > ### ClientCnx - Producer:
> > >
> > > **Before**
> > > ```java
> > >     @Override
> > >     protected void handleCloseProducer(CommandCloseProducer closeProducer)
> > > {
> > >         log.info("[{}] Broker notification of Closed producer: {}",
> > > remoteAddress, closeProducer.getProducerId());
> > >         final long producerId = closeProducer.getProducerId();
> > >         ProducerImpl<?> producer = producers.get(producerId);
> > >         if (producer != null) {
> > >             producer.connectionClosed(this);
> > >         } else {
> > >             log.warn("Producer with id {} not found while closing producer
> > > ", producerId);
> > >         }
> > >     }
> > > ```
> > > After:
> > > ```java
> > >    @Override
> > >     protected void handleCloseProducer(CommandCloseProducer closeProducer)
> > > {
> > >         log.info("[{}] Broker notification of Closed producer: {}",
> > > remoteAddress, closeProducer.getProducerId());
> > >         final long producerId = closeProducer.getProducerId();
> > >         ProducerImpl<?> producer = producers.get(producerId);
> > >         if (producer != null) {
> > >             if (closeProducer.isAllowReconnect) {
> > >                 producer.connectionClosed(this);
> > >             } else {
> > >                 producer.closeAsync();
> > >             }
> > >         } else {
> > >             log.warn("Producer with id {} not found while closing producer
> > > ", producerId);
> > >         }
> > >     }
> > > ```
> > > ### ClientCnx - Consumer:
> > >
> > > **Before**
> > > ```java
> > >     @Override
> > >     protected void handleCloseConsumer(CommandCloseConsumer closeConsumer)
> > > {
> > >         log.info("[{}] Broker notification of Closed consumer: {}",
> > > remoteAddress, closeConsumer.getConsumerId());
> > >         final long consumerId = closeConsumer.getConsumerId();
> > >         ConsumerImpl<?> consumer = consumers.get(consumerId);
> > >         if (consumer != null) {
> > >             consumer.connectionClosed(this);
> > >         } else {
> > >             log.warn("Consumer with id {} not found while closing consumer
> > > ", consumerId);
> > >         }
> > >     }
> > > ```
> > > **After**
> > > ```java
> > >     @Override
> > >     protected void handleCloseConsumer(CommandCloseConsumer closeConsumer)
> > > {
> > >         log.info("[{}] Broker notification of Closed consumer: {}",
> > > remoteAddress, closeConsumer.getConsumerId());
> > >         final long consumerId = closeConsumer.getConsumerId();
> > >         ConsumerImpl<?> consumer = consumers.get(consumerId);
> > >         if (consumer != null) {
> > >             if (closeConsumer.isAllowReconnect) {
> > >                 consumer.connectionClosed(this);
> > >             } else {
> > >                 consumer.closeAsync();
> > >             }
> > >         } else {
> > >             log.warn("Consumer with id {} not found while closing consumer
> > > ", consumerId);
> > >         }
> > >     }
> > > ```
> > >
> > >
> > > ## Reject Alternatives
> > >
> > > none.
>
>
>
> --
> Zike Yang

Reply via email to