The replicated topic deletion also has a similar problem. If the topic auto-creation is enabled, we are not able to delete a replicated topic, because the producer used for replication will try to reconnect. Looks like we need an option in the `CommandCloseProducer` to avoid the replication producer reconnecting again.
Penghui On Fri, Feb 25, 2022 at 5:26 PM PengHui Li <peng...@apache.org> wrote: > > Penghui, are you suggesting that we implement the namespace/tenant > terminated logic after completing this PIP? > > I'm ok with both being implemented together or separated. > We need to determine the overall plan, not the implementation > at this moment. > > > For the sake of discussion, if we implement the namespace terminated > logic first, we could fulfill the underlying requirements for this PIP > by returning a new non-retriable error response when a client tries to > connect a producer or a consumer to a topic in a namespace that is > "terminated". If we do add the "namespace terminated" feature, we'll > need to add a non-retriable exception for this case, anyway. The main > advantage here is that we'd only need one expansion of the protobuf > instead of two. The downside is that the protocol for connected > clients has a couple more roundtrips. The broker would disconnect > connected clients and then fail their reconnection attempt with a > non-retriable error. > > Regarding the namespace "terminated" concept, I just noticed that we > already have a "deleted" field in a namespace's policies [0]. There is > even a comment that says: > > Yes, if we can terminate the namespace first, we should return “namespace > not found” > if the producers and consumers try to reconnect, the client should stop > reconnecting > after getting this error. Looks like following this way, we don't need to > introduce any protocol changes anymore. > > > As I think about it more, I no longer think "terminated" is the right > term for what I proposed above. Our goal is to briefly prevent any > topic creation to ensure we can delete all sub resources for a > namespace. On the other hand, a terminated topic isn't necessarily > short lived. If we want to apply the "terminated" term unequivocally > to both topics and namespaces, I think a terminated namespace would > need to be a namespace where all topics are in terminated state and no > additional topics could be created. That's not the feature we're > discussing here, though. Deleted seems like the right term to me, > especially since we're already using it to prevent a race condition > > Yes, topic termination has different meanings, deletion should be the right > term here. > > Thanks, > Penghui > > > On Fri, Feb 25, 2022 at 12:19 PM Michael Marshall <mmarsh...@apache.org> > wrote: > >> Regarding the namespace "terminated" concept, I just noticed that we >> already have a "deleted" field in a namespace's policies [0]. There is >> even a comment that says: >> >> > // set the policies to deleted so that somebody else cannot acquire >> this namespace >> >> I am not familiar with this feature, but it seems like this policy >> field could be checked before creating a topic in a namespace. That >> would remove certain races described above. >> >> As I think about it more, I no longer think "terminated" is the right >> term for what I proposed above. Our goal is to briefly prevent any >> topic creation to ensure we can delete all sub resources for a >> namespace. On the other hand, a terminated topic isn't necessarily >> short lived. If we want to apply the "terminated" term unequivocally >> to both topics and namespaces, I think a terminated namespace would >> need to be a namespace where all topics are in terminated state and no >> additional topics could be created. That's not the feature we're >> discussing here, though. Deleted seems like the right term to me, >> especially since we're already using it to prevent a race condition >> [0]. >> >> Thanks, >> Michael >> >> [0] >> https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java#L252-L262 >> >> On Thu, Feb 24, 2022 at 9:58 PM Michael Marshall <mmarsh...@apache.org> >> wrote: >> > >> > Hi Dave, >> > >> > > automatically delete tenants and namespaces for not containing topics >> > >> > I don't think that is what we are discussing. I agree that the initial >> > email says just that, though, which is why I asked above: >> > >> > >> When there are no user-created topics under a namespace, >> > >> Namespace should be deleted. >> > > This is supposed to mean that the namespace should be able to be >> > > deleted, correct? >> > >> > Perhaps Mattison can clarify, too. >> > >> > My current understanding of the context for the PIP is that a user >> > call to delete a namespace without force can fail when a producer >> > reconnects to a deleted topic. The goal is to remove the race >> > condition to ensure namespace deletion can succeed. >> > >> > Thanks, >> > Michael >> > >> > On Thu, Feb 24, 2022 at 5:21 PM Dave Fisher <w...@apache.org> wrote: >> > > >> > > Hi - >> > > >> > > I hope I’m understanding what’s being discussed. >> > > >> > > If we are going to automatically delete tenants and namespaces for >> not containing topics then we need to make both of these automatic actions >> configurable with a default to NOT do so. Otherwise we break existing use >> cases. >> > > >> > > Automatic deletion of namespaces should be configurable at both the >> cluster and tenant level. >> > > >> > > Regards, >> > > Dave >> > > >> > > > On Feb 24, 2022, at 2:25 PM, Michael Marshall <mmarsh...@apache.org> >> wrote: >> > > > >> > > >> The old producer/consumer should be closed after applying the >> changes from >> > > >> this proposal. >> > > > >> > > > Penghui, are you suggesting that we implement the namespace/tenant >> > > > terminated logic after completing this PIP? >> > > > >> > > > For the sake of discussion, if we implement the namespace terminated >> > > > logic first, we could fulfill the underlying requirements for this >> PIP >> > > > by returning a new non-retriable error response when a client tries >> to >> > > > connect a producer or a consumer to a topic in a namespace that is >> > > > "terminated". If we do add the "namespace terminated" feature, we'll >> > > > need to add a non-retriable exception for this case, anyway. The >> main >> > > > advantage here is that we'd only need one expansion of the protobuf >> > > > instead of two. The downside is that the protocol for connected >> > > > clients has a couple more roundtrips. The broker would disconnect >> > > > connected clients and then fail their reconnection attempt with a >> > > > non-retriable error. >> > > > >> > > > Thanks, >> > > > Michael >> > > > >> > > > On Thu, Feb 24, 2022 at 7:11 AM PengHui Li <peng...@apache.org> >> wrote: >> > > >> >> > > >>> If we want to solve this problem, we need to add some sync >> resources like >> > > >> lock/state, I think it’s a harm for us, we don’t need to do that. >> > > >> >> > > >> I think we can make the namespace/tenants to the inactive state >> first so >> > > >> that we can avoid any new >> > > >> producer/consumer connect to the topic under the namespace/tenant. >> > > >> >> > > >> The old producer/consumer should be closed after applying the >> changes from >> > > >> this proposal. >> > > >> >> > > >> Thanks, >> > > >> Penghui >> > > >> >> > > >> On Tue, Feb 8, 2022 at 5:47 PM mattison chao < >> mattisonc...@gmail.com> wrote: >> > > >> >> > > >>>> This is supposed to mean that the namespace should be able to be >> > > >>>> deleted, correct? >> > > >>> >> > > >>> Yes, the main background is the user doesn’t have an active >> topic. so, >> > > >>> they want to delete the namespace. >> > > >>> >> > > >>>> However, I think >> > > >>>> we might still have a race condition that could make tenant or >> > > >>>> namespace deletion fail. Specifically, if a new producer or >> consumer >> > > >>>> creates a topic after the namespace deletion has started but >> > > >>>> before it is complete. Do you agree that the underlying race >> still >> > > >>> exists? >> > > >>> >> > > >>> Yes, this condition exists. I think it’s not a big problem >> because the >> > > >>> user doesn’t want to use this namespace anymore. >> > > >>> If this scenario appears, they will get an error and need to >> delete it >> > > >>> again. >> > > >>> >> > > >>>> What if we expand our usage of the "terminated" feature to apply >> to >> > > >>>> namespaces (and tenants)? Then, a terminated namespace can have >> > > >>>> bundles and topics can be deleted but not created (just as a >> terminated >> > > >>>> topic cannot have any new messages published to it). This would >> take >> > > >>>> care of all topic creation race conditions. We'd probably need >> to add >> > > >>>> new protobuf exceptions for this feature. >> > > >>> >> > > >>> >> > > >>> If we want to solve this problem, we need to add some sync >> resources like >> > > >>> lock/state, I think it’s a harm for us, we don’t need to do that. >> > > >>> >> > > >>> Thanks for your suggestions, let me know what you think. >> > > >>> >> > > >>> Best, >> > > >>> Mattison >> > > >>> >> > > >>>> On Feb 1, 2022, at 2:26 PM, Michael Marshall < >> mmarsh...@apache.org> >> > > >>> wrote: >> > > >>>> >> > > >>>> This proposal identifies an important issue that we should >> definitely >> > > >>>> solve. I have some questions. >> > > >>>> >> > > >>>>> When there are no user-created topics under a namespace, >> > > >>>>> Namespace should be deleted. >> > > >>>> >> > > >>>> This is supposed to mean that the namespace should be able to be >> > > >>>> deleted, correct? >> > > >>>> >> > > >>>>> For this reason, we need to close the system topic >> reader/producer >> > > >>>>> first, then remove the system topic. finally, remove the >> namespace. >> > > >>>> >> > > >>>> I agree that expanding the protobuf CloseProducer and >> CloseConsumer >> > > >>>> commands could be valuable here. The expansion would ensure that >> > > >>>> producers and consumers don't attempt to reconnect. However, I >> think >> > > >>>> we might still have a race condition that could make tenant or >> > > >>>> namespace deletion fail. Specifically, if a new producer or >> consumer >> > > >>>> creates a topic after the namespace deletion has started but >> > > >>>> before it is complete. Do you agree that the underlying race >> still >> > > >>> exists? >> > > >>>> >> > > >>>> In my view, the fundamental problem here is that deleting >> certain Pulsar >> > > >>>> resources takes time and, in a distributed system, that means >> race >> > > >>>> conditions. >> > > >>>> >> > > >>>> What if we expand our usage of the "terminated" feature to apply >> to >> > > >>>> namespaces (and tenants)? Then, a terminated namespace can have >> > > >>>> bundles and topics can be deleted but not created (just as a >> terminated >> > > >>>> topic cannot have any new messages published to it). This would >> take >> > > >>>> care of all topic creation race conditions. We'd probably need >> to add >> > > >>>> new protobuf exceptions for this feature. >> > > >>>> >> > > >>>> Thanks, >> > > >>>> Michael >> > > >>>> >> > > >>>> >> > > >>>> On Sat, Jan 29, 2022 at 7:25 PM Zike Yang >> > > >>>> <zky...@streamnative.io.invalid> wrote: >> > > >>>>> >> > > >>>>> +1 >> > > >>>>> >> > > >>>>> >> > > >>>>> Thanks, >> > > >>>>> Zike >> > > >>>>> >> > > >>>>> On Sat, Jan 29, 2022 at 12:30 PM guo jiwei < >> techno...@apache.org> >> > > >>> wrote: >> > > >>>>>> >> > > >>>>>> Hi >> > > >>>>>> The PIP link : https://github.com/apache/pulsar/issues/13989 >> > > >>>>>> >> > > >>>>>> Regards >> > > >>>>>> Jiwei Guo (Tboy) >> > > >>>>>> >> > > >>>>>> >> > > >>>>>> On Sat, Jan 29, 2022 at 11:46 AM mattison chao < >> mattisonc...@gmail.com >> > > >>>> >> > > >>>>>> wrote: >> > > >>>>>> >> > > >>>>>>> Hello everyone, >> > > >>>>>>> >> > > >>>>>>> I want to start a discussion about PIP-139 : Support Broker >> send >> > > >>> command >> > > >>>>>>> to real close producer/consumer. >> > > >>>>>>> >> > > >>>>>>> This is the PIP document >> > > >>>>>>> >> > > >>>>>>> https://github.com/apache/pulsar/issues/13989 < >> > > >>>>>>> https://github.com/apache/pulsar/issues/13979> >> > > >>>>>>> >> > > >>>>>>> Please check it out and feel free to share your thoughts. >> > > >>>>>>> >> > > >>>>>>> Best, >> > > >>>>>>> Mattison >> > > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> ———————— Pasted below for quoting convenience. >> > > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> Relation pull request: #13337 >> > > >>>>>>> Authors: @Technoboy- @mattisonchao >> > > >>>>>>> >> > > >>>>>>> ## Motivation >> > > >>>>>>> >> > > >>>>>>> Before we discuss this pip, I'd like to supplement some >> context to >> > > >>> help >> > > >>>>>>> contributors who don't want to read the original pull request. >> > > >>>>>>> >> > > >>>>>>>> When there are no user-created topics under a namespace, >> Namespace >> > > >>>>>>> should be deleted. But currently, the system topic existed >> and the >> > > >>>>>>> reader/producer could auto-create the system which may cause >> the >> > > >>> namespace >> > > >>>>>>> deletion to fail. >> > > >>>>>>> >> > > >>>>>>> For this reason, we need to close the system topic >> reader/producer >> > > >>> first, >> > > >>>>>>> then remove the system topic. finally, remove the namespace. >> > > >>>>>>> >> > > >>>>>>> Following this way, we first want to use ``terminate`` to >> solve this >> > > >>>>>>> problem. then we found producers can disconnect, but >> consumers are >> > > >>> still >> > > >>>>>>> alive. So, another PR #13960 wants to add consumers' closing >> logic. >> > > >>>>>>> >> > > >>>>>>> After #13960, all things look good, but another problem >> appears. that >> > > >>> is >> > > >>>>>>> we need to wait until consumers completely consume all >> messages (this >> > > >>> may >> > > >>>>>>> make terminate topic so long and the operation timeout)then >> get >> > > >>>>>>> ``reachedEndOfTopic``. the relative code here : >> > > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > > >>> >> https://github.com/apache/pulsar/blob/07ef9231db8b844586b9217ee2d59237eb9c54b7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java#L102-L106 >> > > >>>>>>> >> > > >>>>>>> In the #13337 case, we need to force close consumers >> immediately. So >> > > >>> we >> > > >>>>>>> write this PIP to discuss another way to resolve this problem. >> > > >>>>>>> >> > > >>>>>>> ## Goal >> > > >>>>>>> >> > > >>>>>>> We can add a new field(`allow_reconnect`) in command >> > > >>>>>>> ``CommandCloseProducer``/ ``CommandCloseConsumer`` to close >> > > >>>>>>> producer/consumers immediately. >> > > >>>>>>> >> > > >>>>>>> ## API Changes >> > > >>>>>>> >> > > >>>>>>> - Add ``allow_reconnect`` to ``CommandCloseProducer``; >> > > >>>>>>> >> > > >>>>>>> ```java >> > > >>>>>>> **Before** >> > > >>>>>>> >> > > >>>>>>> message CommandCloseProducer { >> > > >>>>>>> required uint64 producer_id = 1; >> > > >>>>>>> required uint64 request_id = 2; >> > > >>>>>>> } >> > > >>>>>>> >> > > >>>>>>> **After** >> > > >>>>>>> message CommandCloseProducer { >> > > >>>>>>> required uint64 producer_id = 1; >> > > >>>>>>> required uint64 request_id = 2; >> > > >>>>>>> optional bool allow_reconnect = 3 [default = true]; >> > > >>>>>>> } >> > > >>>>>>> ``` >> > > >>>>>>> >> > > >>>>>>> - Add ``allow_reconnect`` to ``CommandCloseConsumer`` >> > > >>>>>>> ```java >> > > >>>>>>> **Before** >> > > >>>>>>> >> > > >>>>>>> message CommandCloseConsumer { >> > > >>>>>>> required uint64 consumer_id = 1; >> > > >>>>>>> required uint64 request_id = 2; >> > > >>>>>>> } >> > > >>>>>>> >> > > >>>>>>> **After** >> > > >>>>>>> message CommandCloseConsumer { >> > > >>>>>>> required uint64 consumer_id = 1; >> > > >>>>>>> required uint64 request_id = 2; >> > > >>>>>>> optional bool allow_reconnect = 3 [default = true]; >> > > >>>>>>> } >> > > >>>>>>> ``` >> > > >>>>>>> >> > > >>>>>>> ## Implementation >> > > >>>>>>> >> > > >>>>>>> ### ClientCnx - Producer: >> > > >>>>>>> >> > > >>>>>>> **Before** >> > > >>>>>>> ```java >> > > >>>>>>> @Override >> > > >>>>>>> protected void handleCloseProducer(CommandCloseProducer >> > > >>> closeProducer) >> > > >>>>>>> { >> > > >>>>>>> log.info("[{}] Broker notification of Closed producer: >> {}", >> > > >>>>>>> remoteAddress, closeProducer.getProducerId()); >> > > >>>>>>> final long producerId = closeProducer.getProducerId(); >> > > >>>>>>> ProducerImpl<?> producer = producers.get(producerId); >> > > >>>>>>> if (producer != null) { >> > > >>>>>>> producer.connectionClosed(this); >> > > >>>>>>> } else { >> > > >>>>>>> log.warn("Producer with id {} not found while >> closing >> > > >>> producer >> > > >>>>>>> ", producerId); >> > > >>>>>>> } >> > > >>>>>>> } >> > > >>>>>>> ``` >> > > >>>>>>> After: >> > > >>>>>>> ```java >> > > >>>>>>> @Override >> > > >>>>>>> protected void handleCloseProducer(CommandCloseProducer >> > > >>> closeProducer) >> > > >>>>>>> { >> > > >>>>>>> log.info("[{}] Broker notification of Closed producer: >> {}", >> > > >>>>>>> remoteAddress, closeProducer.getProducerId()); >> > > >>>>>>> final long producerId = closeProducer.getProducerId(); >> > > >>>>>>> ProducerImpl<?> producer = producers.get(producerId); >> > > >>>>>>> if (producer != null) { >> > > >>>>>>> if (closeProducer.isAllowReconnect) { >> > > >>>>>>> producer.connectionClosed(this); >> > > >>>>>>> } else { >> > > >>>>>>> producer.closeAsync(); >> > > >>>>>>> } >> > > >>>>>>> } else { >> > > >>>>>>> log.warn("Producer with id {} not found while >> closing >> > > >>> producer >> > > >>>>>>> ", producerId); >> > > >>>>>>> } >> > > >>>>>>> } >> > > >>>>>>> ``` >> > > >>>>>>> ### ClientCnx - Consumer: >> > > >>>>>>> >> > > >>>>>>> **Before** >> > > >>>>>>> ```java >> > > >>>>>>> @Override >> > > >>>>>>> protected void handleCloseConsumer(CommandCloseConsumer >> > > >>> closeConsumer) >> > > >>>>>>> { >> > > >>>>>>> log.info("[{}] Broker notification of Closed consumer: >> {}", >> > > >>>>>>> remoteAddress, closeConsumer.getConsumerId()); >> > > >>>>>>> final long consumerId = closeConsumer.getConsumerId(); >> > > >>>>>>> ConsumerImpl<?> consumer = consumers.get(consumerId); >> > > >>>>>>> if (consumer != null) { >> > > >>>>>>> consumer.connectionClosed(this); >> > > >>>>>>> } else { >> > > >>>>>>> log.warn("Consumer with id {} not found while >> closing >> > > >>> consumer >> > > >>>>>>> ", consumerId); >> > > >>>>>>> } >> > > >>>>>>> } >> > > >>>>>>> ``` >> > > >>>>>>> **After** >> > > >>>>>>> ```java >> > > >>>>>>> @Override >> > > >>>>>>> protected void handleCloseConsumer(CommandCloseConsumer >> > > >>> closeConsumer) >> > > >>>>>>> { >> > > >>>>>>> log.info("[{}] Broker notification of Closed consumer: >> {}", >> > > >>>>>>> remoteAddress, closeConsumer.getConsumerId()); >> > > >>>>>>> final long consumerId = closeConsumer.getConsumerId(); >> > > >>>>>>> ConsumerImpl<?> consumer = consumers.get(consumerId); >> > > >>>>>>> if (consumer != null) { >> > > >>>>>>> if (closeConsumer.isAllowReconnect) { >> > > >>>>>>> consumer.connectionClosed(this); >> > > >>>>>>> } else { >> > > >>>>>>> consumer.closeAsync(); >> > > >>>>>>> } >> > > >>>>>>> } else { >> > > >>>>>>> log.warn("Consumer with id {} not found while >> closing >> > > >>> consumer >> > > >>>>>>> ", consumerId); >> > > >>>>>>> } >> > > >>>>>>> } >> > > >>>>>>> ``` >> > > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> ## Reject Alternatives >> > > >>>>>>> >> > > >>>>>>> none. >> > > >>>>> >> > > >>>>> >> > > >>>>> >> > > >>>>> -- >> > > >>>>> Zike Yang >> > > >>> >> > > >>> >> > > >> >