> If we want to solve this problem, we need to add some sync resources like
lock/state, I think it’s a harm for us, we don’t need to do that.

I think we can make the namespace/tenants to the inactive state first so
that we can avoid any new
producer/consumer connect to the topic under the namespace/tenant.

The old producer/consumer should be closed after applying the changes from
this proposal.

Thanks,
Penghui

On Tue, Feb 8, 2022 at 5:47 PM mattison chao <mattisonc...@gmail.com> wrote:

> > This is supposed to mean that the namespace should be able to be
> > deleted, correct?
>
> Yes, the main background is the user doesn’t have an active topic. so,
> they want to delete the namespace.
>
> > However, I think
> > we might still have a race condition that could make tenant or
> > namespace deletion fail. Specifically, if a new producer or consumer
> > creates a topic after the namespace deletion has started but
> > before it is complete. Do you agree that the underlying race still
> exists?
>
> Yes, this condition exists. I think it’s not a big problem because the
> user doesn’t want to use this namespace anymore.
> If this scenario appears, they will get an error and need to delete it
> again.
>
> > What if we expand our usage of the "terminated" feature to apply to
> > namespaces (and tenants)? Then, a terminated namespace can have
> > bundles and topics can be deleted but not created (just as a terminated
> > topic cannot have any new messages published to it). This would take
> > care of all topic creation race conditions. We'd probably need to add
> > new protobuf exceptions for this feature.
>
>
> If we want to solve this problem, we need to add some sync resources like
> lock/state, I think it’s a harm for us, we don’t need to do that.
>
> Thanks for your suggestions, let me know what you think.
>
> Best,
> Mattison
>
> > On Feb 1, 2022, at 2:26 PM, Michael Marshall <mmarsh...@apache.org>
> wrote:
> >
> > This proposal identifies an important issue that we should definitely
> > solve. I have some questions.
> >
> >> When there are no user-created topics under a namespace,
> >> Namespace should be deleted.
> >
> > This is supposed to mean that the namespace should be able to be
> > deleted, correct?
> >
> >> For this reason, we need to close the system topic reader/producer
> >> first, then remove the system topic. finally, remove the namespace.
> >
> > I agree that expanding the protobuf CloseProducer and CloseConsumer
> > commands could be valuable here. The expansion would ensure that
> > producers and consumers don't attempt to reconnect. However, I think
> > we might still have a race condition that could make tenant or
> > namespace deletion fail. Specifically, if a new producer or consumer
> > creates a topic after the namespace deletion has started but
> > before it is complete. Do you agree that the underlying race still
> exists?
> >
> > In my view, the fundamental problem here is that deleting certain Pulsar
> > resources takes time and, in a distributed system, that means race
> > conditions.
> >
> > What if we expand our usage of the "terminated" feature to apply to
> > namespaces (and tenants)? Then, a terminated namespace can have
> > bundles and topics can be deleted but not created (just as a terminated
> > topic cannot have any new messages published to it). This would take
> > care of all topic creation race conditions. We'd probably need to add
> > new protobuf exceptions for this feature.
> >
> > Thanks,
> > Michael
> >
> >
> > On Sat, Jan 29, 2022 at 7:25 PM Zike Yang
> > <zky...@streamnative.io.invalid> wrote:
> >>
> >> +1
> >>
> >>
> >> Thanks,
> >> Zike
> >>
> >> On Sat, Jan 29, 2022 at 12:30 PM guo jiwei <techno...@apache.org>
> wrote:
> >>>
> >>> Hi
> >>> The PIP link : https://github.com/apache/pulsar/issues/13989
> >>>
> >>> Regards
> >>> Jiwei Guo (Tboy)
> >>>
> >>>
> >>> On Sat, Jan 29, 2022 at 11:46 AM mattison chao <mattisonc...@gmail.com
> >
> >>> wrote:
> >>>
> >>>> Hello everyone,
> >>>>
> >>>> I want to start a discussion about PIP-139 : Support Broker send
> command
> >>>> to real close producer/consumer.
> >>>>
> >>>> This is the PIP document
> >>>>
> >>>> https://github.com/apache/pulsar/issues/13989 <
> >>>> https://github.com/apache/pulsar/issues/13979>
> >>>>
> >>>> Please check it out and feel free to share your thoughts.
> >>>>
> >>>> Best,
> >>>> Mattison
> >>>>
> >>>>
> >>>> ———————— Pasted below for quoting convenience.
> >>>>
> >>>>
> >>>>
> >>>> Relation pull request:  #13337
> >>>> Authors: @Technoboy-  @mattisonchao
> >>>>
> >>>> ## Motivation
> >>>>
> >>>> Before we discuss this pip, I'd like to supplement some context to
> help
> >>>> contributors who don't want to read the original pull request.
> >>>>
> >>>>> When there are no user-created topics under a namespace, Namespace
> >>>> should be deleted. But currently, the system topic existed and the
> >>>> reader/producer could auto-create the system which may cause the
> namespace
> >>>> deletion to fail.
> >>>>
> >>>> For this reason, we need to close the system topic reader/producer
> first,
> >>>> then remove the system topic. finally, remove the namespace.
> >>>>
> >>>> Following this way, we first want to use ``terminate`` to solve this
> >>>> problem. then we found producers can disconnect, but consumers are
> still
> >>>> alive. So, another PR #13960 wants to add consumers' closing logic.
> >>>>
> >>>> After #13960, all things look good, but another problem appears. that
> is
> >>>> we need to wait until consumers completely consume all messages (this
> may
> >>>> make terminate topic so long and the operation timeout)then get
> >>>> ``reachedEndOfTopic``. the relative code here :
> >>>>
> >>>>
> >>>>
> https://github.com/apache/pulsar/blob/07ef9231db8b844586b9217ee2d59237eb9c54b7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java#L102-L106
> >>>>
> >>>> In the #13337 case, we need to force close consumers immediately. So
> we
> >>>> write this PIP to discuss another way to resolve this problem.
> >>>>
> >>>> ## Goal
> >>>>
> >>>> We can add a new field(`allow_reconnect`) in command
> >>>> ``CommandCloseProducer``/ ``CommandCloseConsumer`` to close
> >>>> producer/consumers immediately.
> >>>>
> >>>> ## API Changes
> >>>>
> >>>> - Add ``allow_reconnect`` to ``CommandCloseProducer``;
> >>>>
> >>>> ```java
> >>>> **Before**
> >>>>
> >>>> message CommandCloseProducer {
> >>>>    required uint64 producer_id = 1;
> >>>>    required uint64 request_id = 2;
> >>>> }
> >>>>
> >>>> **After**
> >>>> message CommandCloseProducer {
> >>>>    required uint64 producer_id = 1;
> >>>>    required uint64 request_id = 2;
> >>>>    optional bool allow_reconnect = 3 [default = true];
> >>>> }
> >>>> ```
> >>>>
> >>>> - Add ``allow_reconnect`` to ``CommandCloseConsumer``
> >>>> ```java
> >>>> **Before**
> >>>>
> >>>> message CommandCloseConsumer {
> >>>>    required uint64 consumer_id = 1;
> >>>>    required uint64 request_id = 2;
> >>>> }
> >>>>
> >>>> **After**
> >>>> message CommandCloseConsumer {
> >>>>    required uint64 consumer_id = 1;
> >>>>    required uint64 request_id = 2;
> >>>>    optional bool allow_reconnect = 3 [default = true];
> >>>> }
> >>>> ```
> >>>>
> >>>> ## Implementation
> >>>>
> >>>> ### ClientCnx - Producer:
> >>>>
> >>>> **Before**
> >>>> ```java
> >>>>    @Override
> >>>>    protected void handleCloseProducer(CommandCloseProducer
> closeProducer)
> >>>> {
> >>>>        log.info("[{}] Broker notification of Closed producer: {}",
> >>>> remoteAddress, closeProducer.getProducerId());
> >>>>        final long producerId = closeProducer.getProducerId();
> >>>>        ProducerImpl<?> producer = producers.get(producerId);
> >>>>        if (producer != null) {
> >>>>            producer.connectionClosed(this);
> >>>>        } else {
> >>>>            log.warn("Producer with id {} not found while closing
> producer
> >>>> ", producerId);
> >>>>        }
> >>>>    }
> >>>> ```
> >>>> After:
> >>>> ```java
> >>>>   @Override
> >>>>    protected void handleCloseProducer(CommandCloseProducer
> closeProducer)
> >>>> {
> >>>>        log.info("[{}] Broker notification of Closed producer: {}",
> >>>> remoteAddress, closeProducer.getProducerId());
> >>>>        final long producerId = closeProducer.getProducerId();
> >>>>        ProducerImpl<?> producer = producers.get(producerId);
> >>>>        if (producer != null) {
> >>>>            if (closeProducer.isAllowReconnect) {
> >>>>                producer.connectionClosed(this);
> >>>>            } else {
> >>>>                producer.closeAsync();
> >>>>            }
> >>>>        } else {
> >>>>            log.warn("Producer with id {} not found while closing
> producer
> >>>> ", producerId);
> >>>>        }
> >>>>    }
> >>>> ```
> >>>> ### ClientCnx - Consumer:
> >>>>
> >>>> **Before**
> >>>> ```java
> >>>>    @Override
> >>>>    protected void handleCloseConsumer(CommandCloseConsumer
> closeConsumer)
> >>>> {
> >>>>        log.info("[{}] Broker notification of Closed consumer: {}",
> >>>> remoteAddress, closeConsumer.getConsumerId());
> >>>>        final long consumerId = closeConsumer.getConsumerId();
> >>>>        ConsumerImpl<?> consumer = consumers.get(consumerId);
> >>>>        if (consumer != null) {
> >>>>            consumer.connectionClosed(this);
> >>>>        } else {
> >>>>            log.warn("Consumer with id {} not found while closing
> consumer
> >>>> ", consumerId);
> >>>>        }
> >>>>    }
> >>>> ```
> >>>> **After**
> >>>> ```java
> >>>>    @Override
> >>>>    protected void handleCloseConsumer(CommandCloseConsumer
> closeConsumer)
> >>>> {
> >>>>        log.info("[{}] Broker notification of Closed consumer: {}",
> >>>> remoteAddress, closeConsumer.getConsumerId());
> >>>>        final long consumerId = closeConsumer.getConsumerId();
> >>>>        ConsumerImpl<?> consumer = consumers.get(consumerId);
> >>>>        if (consumer != null) {
> >>>>            if (closeConsumer.isAllowReconnect) {
> >>>>                consumer.connectionClosed(this);
> >>>>            } else {
> >>>>                consumer.closeAsync();
> >>>>            }
> >>>>        } else {
> >>>>            log.warn("Consumer with id {} not found while closing
> consumer
> >>>> ", consumerId);
> >>>>        }
> >>>>    }
> >>>> ```
> >>>>
> >>>>
> >>>> ## Reject Alternatives
> >>>>
> >>>> none.
> >>
> >>
> >>
> >> --
> >> Zike Yang
>
>

Reply via email to