The replicated topic deletion also has a similar problem.
If the topic auto-creation is enabled, we are not able to delete
a replicated topic, because the producer used for replication
will try to reconnect. Looks like we need an option in the
`CommandCloseProducer` to avoid the replication producer
reconnecting again.

Penghui

On Fri, Feb 25, 2022 at 5:26 PM PengHui Li <peng...@apache.org> wrote:

> > Penghui, are you suggesting that we implement the namespace/tenant
> terminated logic after completing this PIP?
>
> I'm ok with both being implemented together or separated.
> We need to determine the overall plan, not the implementation
> at this moment.
>
> > For the sake of discussion, if we implement the namespace terminated
> logic first, we could fulfill the underlying requirements for this PIP
> by returning a new non-retriable error response when a client tries to
> connect a producer or a consumer to a topic in a namespace that is
> "terminated". If we do add the "namespace terminated" feature, we'll
> need to add a non-retriable exception for this case, anyway. The main
> advantage here is that we'd only need one expansion of the protobuf
> instead of two. The downside is that the protocol for connected
> clients has a couple more roundtrips. The broker would disconnect
> connected clients and then fail their reconnection attempt with a
> non-retriable error.
>
> Regarding the namespace "terminated" concept, I just noticed that we
> already have a "deleted" field in a namespace's policies [0]. There is
> even a comment that says:
>
> Yes, if we can terminate the namespace first, we should return “namespace
> not found”
> if the producers and consumers try to reconnect, the client should stop
> reconnecting
> after getting this error. Looks like following this way, we don't need to
> introduce any protocol changes anymore.
>
> > As I think about it more, I no longer think "terminated" is the right
> term for what I proposed above. Our goal is to briefly prevent any
> topic creation to ensure we can delete all sub resources for a
> namespace. On the other hand, a terminated topic isn't necessarily
> short lived. If we want to apply the "terminated" term unequivocally
> to both topics and namespaces, I think a terminated namespace would
> need to be a namespace where all topics are in terminated state and no
> additional topics could be created. That's not the feature we're
> discussing here, though. Deleted seems like the right term to me,
> especially since we're already using it to prevent a race condition
>
> Yes, topic termination has different meanings, deletion should be the right
> term here.
>
> Thanks,
> Penghui
>
>
> On Fri, Feb 25, 2022 at 12:19 PM Michael Marshall <mmarsh...@apache.org>
> wrote:
>
>> Regarding the namespace "terminated" concept, I just noticed that we
>> already have a "deleted" field in a namespace's policies [0]. There is
>> even a comment that says:
>>
>> > // set the policies to deleted so that somebody else cannot acquire
>> this namespace
>>
>> I am not familiar with this feature, but it seems like this policy
>> field could be checked before creating a topic in a namespace. That
>> would remove certain races described above.
>>
>> As I think about it more, I no longer think "terminated" is the right
>> term for what I proposed above. Our goal is to briefly prevent any
>> topic creation to ensure we can delete all sub resources for a
>> namespace. On the other hand, a terminated topic isn't necessarily
>> short lived. If we want to apply the "terminated" term unequivocally
>> to both topics and namespaces, I think a terminated namespace would
>> need to be a namespace where all topics are in terminated state and no
>> additional topics could be created. That's not the feature we're
>> discussing here, though. Deleted seems like the right term to me,
>> especially since we're already using it to prevent a race condition
>> [0].
>>
>> Thanks,
>> Michael
>>
>> [0]
>> https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java#L252-L262
>>
>> On Thu, Feb 24, 2022 at 9:58 PM Michael Marshall <mmarsh...@apache.org>
>> wrote:
>> >
>> > Hi Dave,
>> >
>> > > automatically delete tenants and namespaces for not containing topics
>> >
>> > I don't think that is what we are discussing. I agree that the initial
>> > email says just that, though, which is why I asked above:
>> >
>> > >> When there are no user-created topics under a namespace,
>> > >> Namespace should be deleted.
>> > > This is supposed to mean that the namespace should be able to be
>> > > deleted, correct?
>> >
>> > Perhaps Mattison can clarify, too.
>> >
>> > My current understanding of the context for the PIP is that a user
>> > call to delete a namespace without force can fail when a producer
>> > reconnects to a deleted topic. The goal is to remove the race
>> > condition to ensure namespace deletion can succeed.
>> >
>> > Thanks,
>> > Michael
>> >
>> > On Thu, Feb 24, 2022 at 5:21 PM Dave Fisher <w...@apache.org> wrote:
>> > >
>> > > Hi -
>> > >
>> > > I hope I’m understanding what’s being discussed.
>> > >
>> > > If we are going to automatically delete tenants and namespaces for
>> not containing topics then we need to make both of these automatic actions
>> configurable with a default to NOT do so. Otherwise we break existing use
>> cases.
>> > >
>> > > Automatic deletion of namespaces should be configurable at both the
>> cluster and tenant level.
>> > >
>> > > Regards,
>> > > Dave
>> > >
>> > > > On Feb 24, 2022, at 2:25 PM, Michael Marshall <mmarsh...@apache.org>
>> wrote:
>> > > >
>> > > >> The old producer/consumer should be closed after applying the
>> changes from
>> > > >> this proposal.
>> > > >
>> > > > Penghui, are you suggesting that we implement the namespace/tenant
>> > > > terminated logic after completing this PIP?
>> > > >
>> > > > For the sake of discussion, if we implement the namespace terminated
>> > > > logic first, we could fulfill the underlying requirements for this
>> PIP
>> > > > by returning a new non-retriable error response when a client tries
>> to
>> > > > connect a producer or a consumer to a topic in a namespace that is
>> > > > "terminated". If we do add the "namespace terminated" feature, we'll
>> > > > need to add a non-retriable exception for this case, anyway. The
>> main
>> > > > advantage here is that we'd only need one expansion of the protobuf
>> > > > instead of two. The downside is that the protocol for connected
>> > > > clients has a couple more roundtrips. The broker would disconnect
>> > > > connected clients and then fail their reconnection attempt with a
>> > > > non-retriable error.
>> > > >
>> > > > Thanks,
>> > > > Michael
>> > > >
>> > > > On Thu, Feb 24, 2022 at 7:11 AM PengHui Li <peng...@apache.org>
>> wrote:
>> > > >>
>> > > >>> If we want to solve this problem, we need to add some sync
>> resources like
>> > > >> lock/state, I think it’s a harm for us, we don’t need to do that.
>> > > >>
>> > > >> I think we can make the namespace/tenants to the inactive state
>> first so
>> > > >> that we can avoid any new
>> > > >> producer/consumer connect to the topic under the namespace/tenant.
>> > > >>
>> > > >> The old producer/consumer should be closed after applying the
>> changes from
>> > > >> this proposal.
>> > > >>
>> > > >> Thanks,
>> > > >> Penghui
>> > > >>
>> > > >> On Tue, Feb 8, 2022 at 5:47 PM mattison chao <
>> mattisonc...@gmail.com> wrote:
>> > > >>
>> > > >>>> This is supposed to mean that the namespace should be able to be
>> > > >>>> deleted, correct?
>> > > >>>
>> > > >>> Yes, the main background is the user doesn’t have an active
>> topic. so,
>> > > >>> they want to delete the namespace.
>> > > >>>
>> > > >>>> However, I think
>> > > >>>> we might still have a race condition that could make tenant or
>> > > >>>> namespace deletion fail. Specifically, if a new producer or
>> consumer
>> > > >>>> creates a topic after the namespace deletion has started but
>> > > >>>> before it is complete. Do you agree that the underlying race
>> still
>> > > >>> exists?
>> > > >>>
>> > > >>> Yes, this condition exists. I think it’s not a big problem
>> because the
>> > > >>> user doesn’t want to use this namespace anymore.
>> > > >>> If this scenario appears, they will get an error and need to
>> delete it
>> > > >>> again.
>> > > >>>
>> > > >>>> What if we expand our usage of the "terminated" feature to apply
>> to
>> > > >>>> namespaces (and tenants)? Then, a terminated namespace can have
>> > > >>>> bundles and topics can be deleted but not created (just as a
>> terminated
>> > > >>>> topic cannot have any new messages published to it). This would
>> take
>> > > >>>> care of all topic creation race conditions. We'd probably need
>> to add
>> > > >>>> new protobuf exceptions for this feature.
>> > > >>>
>> > > >>>
>> > > >>> If we want to solve this problem, we need to add some sync
>> resources like
>> > > >>> lock/state, I think it’s a harm for us, we don’t need to do that.
>> > > >>>
>> > > >>> Thanks for your suggestions, let me know what you think.
>> > > >>>
>> > > >>> Best,
>> > > >>> Mattison
>> > > >>>
>> > > >>>> On Feb 1, 2022, at 2:26 PM, Michael Marshall <
>> mmarsh...@apache.org>
>> > > >>> wrote:
>> > > >>>>
>> > > >>>> This proposal identifies an important issue that we should
>> definitely
>> > > >>>> solve. I have some questions.
>> > > >>>>
>> > > >>>>> When there are no user-created topics under a namespace,
>> > > >>>>> Namespace should be deleted.
>> > > >>>>
>> > > >>>> This is supposed to mean that the namespace should be able to be
>> > > >>>> deleted, correct?
>> > > >>>>
>> > > >>>>> For this reason, we need to close the system topic
>> reader/producer
>> > > >>>>> first, then remove the system topic. finally, remove the
>> namespace.
>> > > >>>>
>> > > >>>> I agree that expanding the protobuf CloseProducer and
>> CloseConsumer
>> > > >>>> commands could be valuable here. The expansion would ensure that
>> > > >>>> producers and consumers don't attempt to reconnect. However, I
>> think
>> > > >>>> we might still have a race condition that could make tenant or
>> > > >>>> namespace deletion fail. Specifically, if a new producer or
>> consumer
>> > > >>>> creates a topic after the namespace deletion has started but
>> > > >>>> before it is complete. Do you agree that the underlying race
>> still
>> > > >>> exists?
>> > > >>>>
>> > > >>>> In my view, the fundamental problem here is that deleting
>> certain Pulsar
>> > > >>>> resources takes time and, in a distributed system, that means
>> race
>> > > >>>> conditions.
>> > > >>>>
>> > > >>>> What if we expand our usage of the "terminated" feature to apply
>> to
>> > > >>>> namespaces (and tenants)? Then, a terminated namespace can have
>> > > >>>> bundles and topics can be deleted but not created (just as a
>> terminated
>> > > >>>> topic cannot have any new messages published to it). This would
>> take
>> > > >>>> care of all topic creation race conditions. We'd probably need
>> to add
>> > > >>>> new protobuf exceptions for this feature.
>> > > >>>>
>> > > >>>> Thanks,
>> > > >>>> Michael
>> > > >>>>
>> > > >>>>
>> > > >>>> On Sat, Jan 29, 2022 at 7:25 PM Zike Yang
>> > > >>>> <zky...@streamnative.io.invalid> wrote:
>> > > >>>>>
>> > > >>>>> +1
>> > > >>>>>
>> > > >>>>>
>> > > >>>>> Thanks,
>> > > >>>>> Zike
>> > > >>>>>
>> > > >>>>> On Sat, Jan 29, 2022 at 12:30 PM guo jiwei <
>> techno...@apache.org>
>> > > >>> wrote:
>> > > >>>>>>
>> > > >>>>>> Hi
>> > > >>>>>> The PIP link : https://github.com/apache/pulsar/issues/13989
>> > > >>>>>>
>> > > >>>>>> Regards
>> > > >>>>>> Jiwei Guo (Tboy)
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>> On Sat, Jan 29, 2022 at 11:46 AM mattison chao <
>> mattisonc...@gmail.com
>> > > >>>>
>> > > >>>>>> wrote:
>> > > >>>>>>
>> > > >>>>>>> Hello everyone,
>> > > >>>>>>>
>> > > >>>>>>> I want to start a discussion about PIP-139 : Support Broker
>> send
>> > > >>> command
>> > > >>>>>>> to real close producer/consumer.
>> > > >>>>>>>
>> > > >>>>>>> This is the PIP document
>> > > >>>>>>>
>> > > >>>>>>> https://github.com/apache/pulsar/issues/13989 <
>> > > >>>>>>> https://github.com/apache/pulsar/issues/13979>
>> > > >>>>>>>
>> > > >>>>>>> Please check it out and feel free to share your thoughts.
>> > > >>>>>>>
>> > > >>>>>>> Best,
>> > > >>>>>>> Mattison
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>>> ———————— Pasted below for quoting convenience.
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>>> Relation pull request:  #13337
>> > > >>>>>>> Authors: @Technoboy-  @mattisonchao
>> > > >>>>>>>
>> > > >>>>>>> ## Motivation
>> > > >>>>>>>
>> > > >>>>>>> Before we discuss this pip, I'd like to supplement some
>> context to
>> > > >>> help
>> > > >>>>>>> contributors who don't want to read the original pull request.
>> > > >>>>>>>
>> > > >>>>>>>> When there are no user-created topics under a namespace,
>> Namespace
>> > > >>>>>>> should be deleted. But currently, the system topic existed
>> and the
>> > > >>>>>>> reader/producer could auto-create the system which may cause
>> the
>> > > >>> namespace
>> > > >>>>>>> deletion to fail.
>> > > >>>>>>>
>> > > >>>>>>> For this reason, we need to close the system topic
>> reader/producer
>> > > >>> first,
>> > > >>>>>>> then remove the system topic. finally, remove the namespace.
>> > > >>>>>>>
>> > > >>>>>>> Following this way, we first want to use ``terminate`` to
>> solve this
>> > > >>>>>>> problem. then we found producers can disconnect, but
>> consumers are
>> > > >>> still
>> > > >>>>>>> alive. So, another PR #13960 wants to add consumers' closing
>> logic.
>> > > >>>>>>>
>> > > >>>>>>> After #13960, all things look good, but another problem
>> appears. that
>> > > >>> is
>> > > >>>>>>> we need to wait until consumers completely consume all
>> messages (this
>> > > >>> may
>> > > >>>>>>> make terminate topic so long and the operation timeout)then
>> get
>> > > >>>>>>> ``reachedEndOfTopic``. the relative code here :
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>
>> https://github.com/apache/pulsar/blob/07ef9231db8b844586b9217ee2d59237eb9c54b7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java#L102-L106
>> > > >>>>>>>
>> > > >>>>>>> In the #13337 case, we need to force close consumers
>> immediately. So
>> > > >>> we
>> > > >>>>>>> write this PIP to discuss another way to resolve this problem.
>> > > >>>>>>>
>> > > >>>>>>> ## Goal
>> > > >>>>>>>
>> > > >>>>>>> We can add a new field(`allow_reconnect`) in command
>> > > >>>>>>> ``CommandCloseProducer``/ ``CommandCloseConsumer`` to close
>> > > >>>>>>> producer/consumers immediately.
>> > > >>>>>>>
>> > > >>>>>>> ## API Changes
>> > > >>>>>>>
>> > > >>>>>>> - Add ``allow_reconnect`` to ``CommandCloseProducer``;
>> > > >>>>>>>
>> > > >>>>>>> ```java
>> > > >>>>>>> **Before**
>> > > >>>>>>>
>> > > >>>>>>> message CommandCloseProducer {
>> > > >>>>>>>   required uint64 producer_id = 1;
>> > > >>>>>>>   required uint64 request_id = 2;
>> > > >>>>>>> }
>> > > >>>>>>>
>> > > >>>>>>> **After**
>> > > >>>>>>> message CommandCloseProducer {
>> > > >>>>>>>   required uint64 producer_id = 1;
>> > > >>>>>>>   required uint64 request_id = 2;
>> > > >>>>>>>   optional bool allow_reconnect = 3 [default = true];
>> > > >>>>>>> }
>> > > >>>>>>> ```
>> > > >>>>>>>
>> > > >>>>>>> - Add ``allow_reconnect`` to ``CommandCloseConsumer``
>> > > >>>>>>> ```java
>> > > >>>>>>> **Before**
>> > > >>>>>>>
>> > > >>>>>>> message CommandCloseConsumer {
>> > > >>>>>>>   required uint64 consumer_id = 1;
>> > > >>>>>>>   required uint64 request_id = 2;
>> > > >>>>>>> }
>> > > >>>>>>>
>> > > >>>>>>> **After**
>> > > >>>>>>> message CommandCloseConsumer {
>> > > >>>>>>>   required uint64 consumer_id = 1;
>> > > >>>>>>>   required uint64 request_id = 2;
>> > > >>>>>>>   optional bool allow_reconnect = 3 [default = true];
>> > > >>>>>>> }
>> > > >>>>>>> ```
>> > > >>>>>>>
>> > > >>>>>>> ## Implementation
>> > > >>>>>>>
>> > > >>>>>>> ### ClientCnx - Producer:
>> > > >>>>>>>
>> > > >>>>>>> **Before**
>> > > >>>>>>> ```java
>> > > >>>>>>>   @Override
>> > > >>>>>>>   protected void handleCloseProducer(CommandCloseProducer
>> > > >>> closeProducer)
>> > > >>>>>>> {
>> > > >>>>>>>       log.info("[{}] Broker notification of Closed producer:
>> {}",
>> > > >>>>>>> remoteAddress, closeProducer.getProducerId());
>> > > >>>>>>>       final long producerId = closeProducer.getProducerId();
>> > > >>>>>>>       ProducerImpl<?> producer = producers.get(producerId);
>> > > >>>>>>>       if (producer != null) {
>> > > >>>>>>>           producer.connectionClosed(this);
>> > > >>>>>>>       } else {
>> > > >>>>>>>           log.warn("Producer with id {} not found while
>> closing
>> > > >>> producer
>> > > >>>>>>> ", producerId);
>> > > >>>>>>>       }
>> > > >>>>>>>   }
>> > > >>>>>>> ```
>> > > >>>>>>> After:
>> > > >>>>>>> ```java
>> > > >>>>>>>  @Override
>> > > >>>>>>>   protected void handleCloseProducer(CommandCloseProducer
>> > > >>> closeProducer)
>> > > >>>>>>> {
>> > > >>>>>>>       log.info("[{}] Broker notification of Closed producer:
>> {}",
>> > > >>>>>>> remoteAddress, closeProducer.getProducerId());
>> > > >>>>>>>       final long producerId = closeProducer.getProducerId();
>> > > >>>>>>>       ProducerImpl<?> producer = producers.get(producerId);
>> > > >>>>>>>       if (producer != null) {
>> > > >>>>>>>           if (closeProducer.isAllowReconnect) {
>> > > >>>>>>>               producer.connectionClosed(this);
>> > > >>>>>>>           } else {
>> > > >>>>>>>               producer.closeAsync();
>> > > >>>>>>>           }
>> > > >>>>>>>       } else {
>> > > >>>>>>>           log.warn("Producer with id {} not found while
>> closing
>> > > >>> producer
>> > > >>>>>>> ", producerId);
>> > > >>>>>>>       }
>> > > >>>>>>>   }
>> > > >>>>>>> ```
>> > > >>>>>>> ### ClientCnx - Consumer:
>> > > >>>>>>>
>> > > >>>>>>> **Before**
>> > > >>>>>>> ```java
>> > > >>>>>>>   @Override
>> > > >>>>>>>   protected void handleCloseConsumer(CommandCloseConsumer
>> > > >>> closeConsumer)
>> > > >>>>>>> {
>> > > >>>>>>>       log.info("[{}] Broker notification of Closed consumer:
>> {}",
>> > > >>>>>>> remoteAddress, closeConsumer.getConsumerId());
>> > > >>>>>>>       final long consumerId = closeConsumer.getConsumerId();
>> > > >>>>>>>       ConsumerImpl<?> consumer = consumers.get(consumerId);
>> > > >>>>>>>       if (consumer != null) {
>> > > >>>>>>>           consumer.connectionClosed(this);
>> > > >>>>>>>       } else {
>> > > >>>>>>>           log.warn("Consumer with id {} not found while
>> closing
>> > > >>> consumer
>> > > >>>>>>> ", consumerId);
>> > > >>>>>>>       }
>> > > >>>>>>>   }
>> > > >>>>>>> ```
>> > > >>>>>>> **After**
>> > > >>>>>>> ```java
>> > > >>>>>>>   @Override
>> > > >>>>>>>   protected void handleCloseConsumer(CommandCloseConsumer
>> > > >>> closeConsumer)
>> > > >>>>>>> {
>> > > >>>>>>>       log.info("[{}] Broker notification of Closed consumer:
>> {}",
>> > > >>>>>>> remoteAddress, closeConsumer.getConsumerId());
>> > > >>>>>>>       final long consumerId = closeConsumer.getConsumerId();
>> > > >>>>>>>       ConsumerImpl<?> consumer = consumers.get(consumerId);
>> > > >>>>>>>       if (consumer != null) {
>> > > >>>>>>>           if (closeConsumer.isAllowReconnect) {
>> > > >>>>>>>               consumer.connectionClosed(this);
>> > > >>>>>>>           } else {
>> > > >>>>>>>               consumer.closeAsync();
>> > > >>>>>>>           }
>> > > >>>>>>>       } else {
>> > > >>>>>>>           log.warn("Consumer with id {} not found while
>> closing
>> > > >>> consumer
>> > > >>>>>>> ", consumerId);
>> > > >>>>>>>       }
>> > > >>>>>>>   }
>> > > >>>>>>> ```
>> > > >>>>>>>
>> > > >>>>>>>
>> > > >>>>>>> ## Reject Alternatives
>> > > >>>>>>>
>> > > >>>>>>> none.
>> > > >>>>>
>> > > >>>>>
>> > > >>>>>
>> > > >>>>> --
>> > > >>>>> Zike Yang
>> > > >>>
>> > > >>>
>> > >
>>
>

Reply via email to