> Either replicated topic or non-replicated topic, we should provide the > same topic deletion behavior, the topic can be deleted if no active > user's producers/consumers, should not be affected by the Pulsar > internal producers/consumers.
I agree with you that it'd be nice to provide the same deletion behavior. However, because geo-replication's configuration is decentralized, I think namespace or topic deletion is more complicated than unreplicated deletion. Note that users cannot currently delete namespaces that are configured with remote replication clusters. One challenge I see is the configuration in the remote cluster. If cluster A is replicated to cluster B for namespace C, deleting a topic in cluster B does not affect the configuration in cluster A. If we make it easy to delete the topic in cluster B, we need to decide how to handle failed replication in cluster A. In the same way, deleting namespace C in cluster B doesn't delete it from cluster A. For example, when should cluster A stop attempting to create producers to cluster B for the deleted namespace/topic and how do we inform operators that the replication is no longer working as configured. We avoid several problems by failing the initial user request. Note that while a user cannot explicitly delete a replicated topic, they can remove the replication configuration in cluster A and cluster B, and then they are left with unreplicated namespaces and topics, which can be deleted. > But for now, for a replicated topic, users are not able to delete it because > of the internal active producer, adding force delete option can delete > the topic, but it will create again if users enabled topic auto-creation. In this case, the specific problem stems from a remote cluster targeting the local cluster. If the user only deletes the local topic, without removing the replication configuration, I don't think it should be surprising when the topic gets recreated (assuming auto topic creation is enabled). In my view, I think it's okay to make it harder to delete geo-replicated namespaces or topics than local namespaces or topics. Failing requests with a good, informative error message protects users. Additionally, if users are able to delete a namespace or topic by force, that gives them a work around--although we should document the limitations of this workaround. I am not experienced with geo-replication, so please let me know if any of my analysis doesn't align with the actual design. Thanks, Michael On Sun, Feb 27, 2022 at 8:42 PM guo jiwei <techno...@apache.org> wrote: > > I have added the geo-replicator topic case and updated the PIP. > > > Regards > Jiwei Guo (Tboy) > > > On Sun, Feb 27, 2022 at 1:00 PM PengHui Li <peng...@apache.org> wrote: > > > > To me, the main question is whether we create a custom error or expand > > the `CommandCloseProducer` command. I lean towards adding an error > > because it will automatically be backward compatible based on the way > > the client determines which errors are retriable. Although, I don't > > have a strong opinion. > > > > I have the same opinion as you. But it looks like not able to fix the > > geo-replication topic deletion. > > > > Either replicated topic or non-replicated topic, we should provide the > > same topic deletion behavior, the topic can be deleted if no active > > user's producers/consumers, should not be affected by the Pulsar > > internal producers/consumers. > > > > But for now, for a replicated topic, users are not able to delete it > > because > > of the internal active producer, adding force delete option can delete > > the topic, but it will create again if users enabled topic auto-creation. > > > > I think in this case, it seems we don't have a chance to give the producer > > an exception, because the reconnection will connect to a new topic only > > with > > the same topic name as before. > > > > Thanks, > > Penghui > > > > On Sat, Feb 26, 2022 at 2:24 PM Michael Marshall <mmarsh...@apache.org> > > wrote: > > > > > > We need to determine the overall plan, not the implementation > > > > at this moment. > > > > > > Great point, I agree. > > > > > > > Looks like we need an option in the > > > > `CommandCloseProducer` to avoid the replication producer > > > > reconnecting again. > > > > > > This is an important point. One of the primary requirements for > > > geo-replication must also be that a replicated namespace or topic can > > > be deleted and recreated without any problems or leaks in the broker. > > > That requires that the producer is cleaned up properly when the > > > namespace is deleted. I think we could achieve that with a new server > > > error or an expansion of the `CommandCloseProducer` command. > > > > > > To me, the main question is whether we create a custom error or expand > > > the `CommandCloseProducer` command. I lean towards adding an error > > > because it will automatically be backward compatible based on the way > > > the client determines which errors are retriable. Although, I don't > > > have a strong opinion. > > > > > > The key code paths that we need to consider are topic lookup and topic > > > creation. Both the http admin endpoint and the binary protocol > > > endpoint interact with these code paths, and we should fail requests > > > for namespaces which have the "deleted" field set to true. > > > > > > In looking at the code, we could modify the logic in > > > `PulsarWebResource#checkLocalOrGetPeerReplicationCluster`. That method > > > is always called for topic lookups via HTTP or via the Pulsar > > > binary protocol. If a namespace doesn't exist, that method currently > > > returns a NOT_FOUND exception. In the binary protocol case, that > > > exception is converted to a `ServerError.MetadataError`. We could > > > modify the method like I do here [0], or we could add another case and > > > return a custom error message. > > > > > > We'd also need to prevent topic creation via the admin endpoint and > > > the auto topic creation via the binary protocol. I believe this will > > > be just as easy as the lookup case, but I haven't confirmed. > > > > > > Thanks, > > > Michael > > > > > > [0] > > > > > https://github.com/michaeljmarshall/pulsar/commit/93611a741af163587f79b88bc1c9f1acc7953512 > > > > > > > > > > > > > > > On Fri, Feb 25, 2022 at 3:45 AM PengHui Li <peng...@apache.org> wrote: > > > > > > > > The replicated topic deletion also has a similar problem. > > > > If the topic auto-creation is enabled, we are not able to delete > > > > a replicated topic, because the producer used for replication > > > > will try to reconnect. Looks like we need an option in the > > > > `CommandCloseProducer` to avoid the replication producer > > > > reconnecting again. > > > > > > > > Penghui > > > > > > > > On Fri, Feb 25, 2022 at 5:26 PM PengHui Li <peng...@apache.org> wrote: > > > > > > > > > > Penghui, are you suggesting that we implement the namespace/tenant > > > > > terminated logic after completing this PIP? > > > > > > > > > > I'm ok with both being implemented together or separated. > > > > > We need to determine the overall plan, not the implementation > > > > > at this moment. > > > > > > > > > > > For the sake of discussion, if we implement the namespace > > terminated > > > > > logic first, we could fulfill the underlying requirements for this > > PIP > > > > > by returning a new non-retriable error response when a client tries > > to > > > > > connect a producer or a consumer to a topic in a namespace that is > > > > > "terminated". If we do add the "namespace terminated" feature, we'll > > > > > need to add a non-retriable exception for this case, anyway. The main > > > > > advantage here is that we'd only need one expansion of the protobuf > > > > > instead of two. The downside is that the protocol for connected > > > > > clients has a couple more roundtrips. The broker would disconnect > > > > > connected clients and then fail their reconnection attempt with a > > > > > non-retriable error. > > > > > > > > > > Regarding the namespace "terminated" concept, I just noticed that we > > > > > already have a "deleted" field in a namespace's policies [0]. There > > is > > > > > even a comment that says: > > > > > > > > > > Yes, if we can terminate the namespace first, we should return > > > “namespace > > > > > not found” > > > > > if the producers and consumers try to reconnect, the client should > > stop > > > > > reconnecting > > > > > after getting this error. Looks like following this way, we don't > > need > > > to > > > > > introduce any protocol changes anymore. > > > > > > > > > > > As I think about it more, I no longer think "terminated" is the > > right > > > > > term for what I proposed above. Our goal is to briefly prevent any > > > > > topic creation to ensure we can delete all sub resources for a > > > > > namespace. On the other hand, a terminated topic isn't necessarily > > > > > short lived. If we want to apply the "terminated" term unequivocally > > > > > to both topics and namespaces, I think a terminated namespace would > > > > > need to be a namespace where all topics are in terminated state and > > no > > > > > additional topics could be created. That's not the feature we're > > > > > discussing here, though. Deleted seems like the right term to me, > > > > > especially since we're already using it to prevent a race condition > > > > > > > > > > Yes, topic termination has different meanings, deletion should be the > > > right > > > > > term here. > > > > > > > > > > Thanks, > > > > > Penghui > > > > > > > > > > > > > > > On Fri, Feb 25, 2022 at 12:19 PM Michael Marshall < > > > mmarsh...@apache.org> > > > > > wrote: > > > > > > > > > >> Regarding the namespace "terminated" concept, I just noticed that we > > > > >> already have a "deleted" field in a namespace's policies [0]. There > > is > > > > >> even a comment that says: > > > > >> > > > > >> > // set the policies to deleted so that somebody else cannot > > acquire > > > > >> this namespace > > > > >> > > > > >> I am not familiar with this feature, but it seems like this policy > > > > >> field could be checked before creating a topic in a namespace. That > > > > >> would remove certain races described above. > > > > >> > > > > >> As I think about it more, I no longer think "terminated" is the > > right > > > > >> term for what I proposed above. Our goal is to briefly prevent any > > > > >> topic creation to ensure we can delete all sub resources for a > > > > >> namespace. On the other hand, a terminated topic isn't necessarily > > > > >> short lived. If we want to apply the "terminated" term unequivocally > > > > >> to both topics and namespaces, I think a terminated namespace would > > > > >> need to be a namespace where all topics are in terminated state and > > no > > > > >> additional topics could be created. That's not the feature we're > > > > >> discussing here, though. Deleted seems like the right term to me, > > > > >> especially since we're already using it to prevent a race condition > > > > >> [0]. > > > > >> > > > > >> Thanks, > > > > >> Michael > > > > >> > > > > >> [0] > > > > >> > > > > > https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java#L252-L262 > > > > >> > > > > >> On Thu, Feb 24, 2022 at 9:58 PM Michael Marshall < > > > mmarsh...@apache.org> > > > > >> wrote: > > > > >> > > > > > >> > Hi Dave, > > > > >> > > > > > >> > > automatically delete tenants and namespaces for not containing > > > topics > > > > >> > > > > > >> > I don't think that is what we are discussing. I agree that the > > > initial > > > > >> > email says just that, though, which is why I asked above: > > > > >> > > > > > >> > >> When there are no user-created topics under a namespace, > > > > >> > >> Namespace should be deleted. > > > > >> > > This is supposed to mean that the namespace should be able to be > > > > >> > > deleted, correct? > > > > >> > > > > > >> > Perhaps Mattison can clarify, too. > > > > >> > > > > > >> > My current understanding of the context for the PIP is that a user > > > > >> > call to delete a namespace without force can fail when a producer > > > > >> > reconnects to a deleted topic. The goal is to remove the race > > > > >> > condition to ensure namespace deletion can succeed. > > > > >> > > > > > >> > Thanks, > > > > >> > Michael > > > > >> > > > > > >> > On Thu, Feb 24, 2022 at 5:21 PM Dave Fisher <w...@apache.org> > > > wrote: > > > > >> > > > > > > >> > > Hi - > > > > >> > > > > > > >> > > I hope I’m understanding what’s being discussed. > > > > >> > > > > > > >> > > If we are going to automatically delete tenants and namespaces > > for > > > > >> not containing topics then we need to make both of these automatic > > > actions > > > > >> configurable with a default to NOT do so. Otherwise we break > > existing > > > use > > > > >> cases. > > > > >> > > > > > > >> > > Automatic deletion of namespaces should be configurable at both > > > the > > > > >> cluster and tenant level. > > > > >> > > > > > > >> > > Regards, > > > > >> > > Dave > > > > >> > > > > > > >> > > > On Feb 24, 2022, at 2:25 PM, Michael Marshall < > > > mmarsh...@apache.org> > > > > >> wrote: > > > > >> > > > > > > > >> > > >> The old producer/consumer should be closed after applying the > > > > >> changes from > > > > >> > > >> this proposal. > > > > >> > > > > > > > >> > > > Penghui, are you suggesting that we implement the > > > namespace/tenant > > > > >> > > > terminated logic after completing this PIP? > > > > >> > > > > > > > >> > > > For the sake of discussion, if we implement the namespace > > > terminated > > > > >> > > > logic first, we could fulfill the underlying requirements for > > > this > > > > >> PIP > > > > >> > > > by returning a new non-retriable error response when a client > > > tries > > > > >> to > > > > >> > > > connect a producer or a consumer to a topic in a namespace > > that > > > is > > > > >> > > > "terminated". If we do add the "namespace terminated" feature, > > > we'll > > > > >> > > > need to add a non-retriable exception for this case, anyway. > > The > > > > >> main > > > > >> > > > advantage here is that we'd only need one expansion of the > > > protobuf > > > > >> > > > instead of two. The downside is that the protocol for > > connected > > > > >> > > > clients has a couple more roundtrips. The broker would > > > disconnect > > > > >> > > > connected clients and then fail their reconnection attempt > > with > > > a > > > > >> > > > non-retriable error. > > > > >> > > > > > > > >> > > > Thanks, > > > > >> > > > Michael > > > > >> > > > > > > > >> > > > On Thu, Feb 24, 2022 at 7:11 AM PengHui Li < > > peng...@apache.org> > > > > >> wrote: > > > > >> > > >> > > > > >> > > >>> If we want to solve this problem, we need to add some sync > > > > >> resources like > > > > >> > > >> lock/state, I think it’s a harm for us, we don’t need to do > > > that. > > > > >> > > >> > > > > >> > > >> I think we can make the namespace/tenants to the inactive > > state > > > > >> first so > > > > >> > > >> that we can avoid any new > > > > >> > > >> producer/consumer connect to the topic under the > > > namespace/tenant. > > > > >> > > >> > > > > >> > > >> The old producer/consumer should be closed after applying the > > > > >> changes from > > > > >> > > >> this proposal. > > > > >> > > >> > > > > >> > > >> Thanks, > > > > >> > > >> Penghui > > > > >> > > >> > > > > >> > > >> On Tue, Feb 8, 2022 at 5:47 PM mattison chao < > > > > >> mattisonc...@gmail.com> wrote: > > > > >> > > >> > > > > >> > > >>>> This is supposed to mean that the namespace should be able > > > to be > > > > >> > > >>>> deleted, correct? > > > > >> > > >>> > > > > >> > > >>> Yes, the main background is the user doesn’t have an active > > > > >> topic. so, > > > > >> > > >>> they want to delete the namespace. > > > > >> > > >>> > > > > >> > > >>>> However, I think > > > > >> > > >>>> we might still have a race condition that could make tenant > > > or > > > > >> > > >>>> namespace deletion fail. Specifically, if a new producer or > > > > >> consumer > > > > >> > > >>>> creates a topic after the namespace deletion has started > > but > > > > >> > > >>>> before it is complete. Do you agree that the underlying > > race > > > > >> still > > > > >> > > >>> exists? > > > > >> > > >>> > > > > >> > > >>> Yes, this condition exists. I think it’s not a big problem > > > > >> because the > > > > >> > > >>> user doesn’t want to use this namespace anymore. > > > > >> > > >>> If this scenario appears, they will get an error and need to > > > > >> delete it > > > > >> > > >>> again. > > > > >> > > >>> > > > > >> > > >>>> What if we expand our usage of the "terminated" feature to > > > apply > > > > >> to > > > > >> > > >>>> namespaces (and tenants)? Then, a terminated namespace can > > > have > > > > >> > > >>>> bundles and topics can be deleted but not created (just as > > a > > > > >> terminated > > > > >> > > >>>> topic cannot have any new messages published to it). This > > > would > > > > >> take > > > > >> > > >>>> care of all topic creation race conditions. We'd probably > > > need > > > > >> to add > > > > >> > > >>>> new protobuf exceptions for this feature. > > > > >> > > >>> > > > > >> > > >>> > > > > >> > > >>> If we want to solve this problem, we need to add some sync > > > > >> resources like > > > > >> > > >>> lock/state, I think it’s a harm for us, we don’t need to do > > > that. > > > > >> > > >>> > > > > >> > > >>> Thanks for your suggestions, let me know what you think. > > > > >> > > >>> > > > > >> > > >>> Best, > > > > >> > > >>> Mattison > > > > >> > > >>> > > > > >> > > >>>> On Feb 1, 2022, at 2:26 PM, Michael Marshall < > > > > >> mmarsh...@apache.org> > > > > >> > > >>> wrote: > > > > >> > > >>>> > > > > >> > > >>>> This proposal identifies an important issue that we should > > > > >> definitely > > > > >> > > >>>> solve. I have some questions. > > > > >> > > >>>> > > > > >> > > >>>>> When there are no user-created topics under a namespace, > > > > >> > > >>>>> Namespace should be deleted. > > > > >> > > >>>> > > > > >> > > >>>> This is supposed to mean that the namespace should be able > > > to be > > > > >> > > >>>> deleted, correct? > > > > >> > > >>>> > > > > >> > > >>>>> For this reason, we need to close the system topic > > > > >> reader/producer > > > > >> > > >>>>> first, then remove the system topic. finally, remove the > > > > >> namespace. > > > > >> > > >>>> > > > > >> > > >>>> I agree that expanding the protobuf CloseProducer and > > > > >> CloseConsumer > > > > >> > > >>>> commands could be valuable here. The expansion would ensure > > > that > > > > >> > > >>>> producers and consumers don't attempt to reconnect. > > However, > > > I > > > > >> think > > > > >> > > >>>> we might still have a race condition that could make tenant > > > or > > > > >> > > >>>> namespace deletion fail. Specifically, if a new producer or > > > > >> consumer > > > > >> > > >>>> creates a topic after the namespace deletion has started > > but > > > > >> > > >>>> before it is complete. Do you agree that the underlying > > race > > > > >> still > > > > >> > > >>> exists? > > > > >> > > >>>> > > > > >> > > >>>> In my view, the fundamental problem here is that deleting > > > > >> certain Pulsar > > > > >> > > >>>> resources takes time and, in a distributed system, that > > means > > > > >> race > > > > >> > > >>>> conditions. > > > > >> > > >>>> > > > > >> > > >>>> What if we expand our usage of the "terminated" feature to > > > apply > > > > >> to > > > > >> > > >>>> namespaces (and tenants)? Then, a terminated namespace can > > > have > > > > >> > > >>>> bundles and topics can be deleted but not created (just as > > a > > > > >> terminated > > > > >> > > >>>> topic cannot have any new messages published to it). This > > > would > > > > >> take > > > > >> > > >>>> care of all topic creation race conditions. We'd probably > > > need > > > > >> to add > > > > >> > > >>>> new protobuf exceptions for this feature. > > > > >> > > >>>> > > > > >> > > >>>> Thanks, > > > > >> > > >>>> Michael > > > > >> > > >>>> > > > > >> > > >>>> > > > > >> > > >>>> On Sat, Jan 29, 2022 at 7:25 PM Zike Yang > > > > >> > > >>>> <zky...@streamnative.io.invalid> wrote: > > > > >> > > >>>>> > > > > >> > > >>>>> +1 > > > > >> > > >>>>> > > > > >> > > >>>>> > > > > >> > > >>>>> Thanks, > > > > >> > > >>>>> Zike > > > > >> > > >>>>> > > > > >> > > >>>>> On Sat, Jan 29, 2022 at 12:30 PM guo jiwei < > > > > >> techno...@apache.org> > > > > >> > > >>> wrote: > > > > >> > > >>>>>> > > > > >> > > >>>>>> Hi > > > > >> > > >>>>>> The PIP link : > > > https://github.com/apache/pulsar/issues/13989 > > > > >> > > >>>>>> > > > > >> > > >>>>>> Regards > > > > >> > > >>>>>> Jiwei Guo (Tboy) > > > > >> > > >>>>>> > > > > >> > > >>>>>> > > > > >> > > >>>>>> On Sat, Jan 29, 2022 at 11:46 AM mattison chao < > > > > >> mattisonc...@gmail.com > > > > >> > > >>>> > > > > >> > > >>>>>> wrote: > > > > >> > > >>>>>> > > > > >> > > >>>>>>> Hello everyone, > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> I want to start a discussion about PIP-139 : Support > > > Broker > > > > >> send > > > > >> > > >>> command > > > > >> > > >>>>>>> to real close producer/consumer. > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> This is the PIP document > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> https://github.com/apache/pulsar/issues/13989 < > > > > >> > > >>>>>>> https://github.com/apache/pulsar/issues/13979> > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> Please check it out and feel free to share your > > thoughts. > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> Best, > > > > >> > > >>>>>>> Mattison > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> ———————— Pasted below for quoting convenience. > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> Relation pull request: #13337 > > > > >> > > >>>>>>> Authors: @Technoboy- @mattisonchao > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> ## Motivation > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> Before we discuss this pip, I'd like to supplement some > > > > >> context to > > > > >> > > >>> help > > > > >> > > >>>>>>> contributors who don't want to read the original pull > > > request. > > > > >> > > >>>>>>> > > > > >> > > >>>>>>>> When there are no user-created topics under a > > namespace, > > > > >> Namespace > > > > >> > > >>>>>>> should be deleted. But currently, the system topic > > existed > > > > >> and the > > > > >> > > >>>>>>> reader/producer could auto-create the system which may > > > cause > > > > >> the > > > > >> > > >>> namespace > > > > >> > > >>>>>>> deletion to fail. > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> For this reason, we need to close the system topic > > > > >> reader/producer > > > > >> > > >>> first, > > > > >> > > >>>>>>> then remove the system topic. finally, remove the > > > namespace. > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> Following this way, we first want to use ``terminate`` > > to > > > > >> solve this > > > > >> > > >>>>>>> problem. then we found producers can disconnect, but > > > > >> consumers are > > > > >> > > >>> still > > > > >> > > >>>>>>> alive. So, another PR #13960 wants to add consumers' > > > closing > > > > >> logic. > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> After #13960, all things look good, but another problem > > > > >> appears. that > > > > >> > > >>> is > > > > >> > > >>>>>>> we need to wait until consumers completely consume all > > > > >> messages (this > > > > >> > > >>> may > > > > >> > > >>>>>>> make terminate topic so long and the operation > > > timeout)then > > > > >> get > > > > >> > > >>>>>>> ``reachedEndOfTopic``. the relative code here : > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> > > > > >> > > >>> > > > > >> > > > > > https://github.com/apache/pulsar/blob/07ef9231db8b844586b9217ee2d59237eb9c54b7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java#L102-L106 > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> In the #13337 case, we need to force close consumers > > > > >> immediately. So > > > > >> > > >>> we > > > > >> > > >>>>>>> write this PIP to discuss another way to resolve this > > > problem. > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> ## Goal > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> We can add a new field(`allow_reconnect`) in command > > > > >> > > >>>>>>> ``CommandCloseProducer``/ ``CommandCloseConsumer`` to > > > close > > > > >> > > >>>>>>> producer/consumers immediately. > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> ## API Changes > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> - Add ``allow_reconnect`` to ``CommandCloseProducer``; > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> ```java > > > > >> > > >>>>>>> **Before** > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> message CommandCloseProducer { > > > > >> > > >>>>>>> required uint64 producer_id = 1; > > > > >> > > >>>>>>> required uint64 request_id = 2; > > > > >> > > >>>>>>> } > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> **After** > > > > >> > > >>>>>>> message CommandCloseProducer { > > > > >> > > >>>>>>> required uint64 producer_id = 1; > > > > >> > > >>>>>>> required uint64 request_id = 2; > > > > >> > > >>>>>>> optional bool allow_reconnect = 3 [default = true]; > > > > >> > > >>>>>>> } > > > > >> > > >>>>>>> ``` > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> - Add ``allow_reconnect`` to ``CommandCloseConsumer`` > > > > >> > > >>>>>>> ```java > > > > >> > > >>>>>>> **Before** > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> message CommandCloseConsumer { > > > > >> > > >>>>>>> required uint64 consumer_id = 1; > > > > >> > > >>>>>>> required uint64 request_id = 2; > > > > >> > > >>>>>>> } > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> **After** > > > > >> > > >>>>>>> message CommandCloseConsumer { > > > > >> > > >>>>>>> required uint64 consumer_id = 1; > > > > >> > > >>>>>>> required uint64 request_id = 2; > > > > >> > > >>>>>>> optional bool allow_reconnect = 3 [default = true]; > > > > >> > > >>>>>>> } > > > > >> > > >>>>>>> ``` > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> ## Implementation > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> ### ClientCnx - Producer: > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> **Before** > > > > >> > > >>>>>>> ```java > > > > >> > > >>>>>>> @Override > > > > >> > > >>>>>>> protected void > > handleCloseProducer(CommandCloseProducer > > > > >> > > >>> closeProducer) > > > > >> > > >>>>>>> { > > > > >> > > >>>>>>> log.info("[{}] Broker notification of Closed > > > producer: > > > > >> {}", > > > > >> > > >>>>>>> remoteAddress, closeProducer.getProducerId()); > > > > >> > > >>>>>>> final long producerId = > > > closeProducer.getProducerId(); > > > > >> > > >>>>>>> ProducerImpl<?> producer = > > > producers.get(producerId); > > > > >> > > >>>>>>> if (producer != null) { > > > > >> > > >>>>>>> producer.connectionClosed(this); > > > > >> > > >>>>>>> } else { > > > > >> > > >>>>>>> log.warn("Producer with id {} not found while > > > > >> closing > > > > >> > > >>> producer > > > > >> > > >>>>>>> ", producerId); > > > > >> > > >>>>>>> } > > > > >> > > >>>>>>> } > > > > >> > > >>>>>>> ``` > > > > >> > > >>>>>>> After: > > > > >> > > >>>>>>> ```java > > > > >> > > >>>>>>> @Override > > > > >> > > >>>>>>> protected void > > handleCloseProducer(CommandCloseProducer > > > > >> > > >>> closeProducer) > > > > >> > > >>>>>>> { > > > > >> > > >>>>>>> log.info("[{}] Broker notification of Closed > > > producer: > > > > >> {}", > > > > >> > > >>>>>>> remoteAddress, closeProducer.getProducerId()); > > > > >> > > >>>>>>> final long producerId = > > > closeProducer.getProducerId(); > > > > >> > > >>>>>>> ProducerImpl<?> producer = > > > producers.get(producerId); > > > > >> > > >>>>>>> if (producer != null) { > > > > >> > > >>>>>>> if (closeProducer.isAllowReconnect) { > > > > >> > > >>>>>>> producer.connectionClosed(this); > > > > >> > > >>>>>>> } else { > > > > >> > > >>>>>>> producer.closeAsync(); > > > > >> > > >>>>>>> } > > > > >> > > >>>>>>> } else { > > > > >> > > >>>>>>> log.warn("Producer with id {} not found while > > > > >> closing > > > > >> > > >>> producer > > > > >> > > >>>>>>> ", producerId); > > > > >> > > >>>>>>> } > > > > >> > > >>>>>>> } > > > > >> > > >>>>>>> ``` > > > > >> > > >>>>>>> ### ClientCnx - Consumer: > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> **Before** > > > > >> > > >>>>>>> ```java > > > > >> > > >>>>>>> @Override > > > > >> > > >>>>>>> protected void > > handleCloseConsumer(CommandCloseConsumer > > > > >> > > >>> closeConsumer) > > > > >> > > >>>>>>> { > > > > >> > > >>>>>>> log.info("[{}] Broker notification of Closed > > > consumer: > > > > >> {}", > > > > >> > > >>>>>>> remoteAddress, closeConsumer.getConsumerId()); > > > > >> > > >>>>>>> final long consumerId = > > > closeConsumer.getConsumerId(); > > > > >> > > >>>>>>> ConsumerImpl<?> consumer = > > > consumers.get(consumerId); > > > > >> > > >>>>>>> if (consumer != null) { > > > > >> > > >>>>>>> consumer.connectionClosed(this); > > > > >> > > >>>>>>> } else { > > > > >> > > >>>>>>> log.warn("Consumer with id {} not found while > > > > >> closing > > > > >> > > >>> consumer > > > > >> > > >>>>>>> ", consumerId); > > > > >> > > >>>>>>> } > > > > >> > > >>>>>>> } > > > > >> > > >>>>>>> ``` > > > > >> > > >>>>>>> **After** > > > > >> > > >>>>>>> ```java > > > > >> > > >>>>>>> @Override > > > > >> > > >>>>>>> protected void > > handleCloseConsumer(CommandCloseConsumer > > > > >> > > >>> closeConsumer) > > > > >> > > >>>>>>> { > > > > >> > > >>>>>>> log.info("[{}] Broker notification of Closed > > > consumer: > > > > >> {}", > > > > >> > > >>>>>>> remoteAddress, closeConsumer.getConsumerId()); > > > > >> > > >>>>>>> final long consumerId = > > > closeConsumer.getConsumerId(); > > > > >> > > >>>>>>> ConsumerImpl<?> consumer = > > > consumers.get(consumerId); > > > > >> > > >>>>>>> if (consumer != null) { > > > > >> > > >>>>>>> if (closeConsumer.isAllowReconnect) { > > > > >> > > >>>>>>> consumer.connectionClosed(this); > > > > >> > > >>>>>>> } else { > > > > >> > > >>>>>>> consumer.closeAsync(); > > > > >> > > >>>>>>> } > > > > >> > > >>>>>>> } else { > > > > >> > > >>>>>>> log.warn("Consumer with id {} not found while > > > > >> closing > > > > >> > > >>> consumer > > > > >> > > >>>>>>> ", consumerId); > > > > >> > > >>>>>>> } > > > > >> > > >>>>>>> } > > > > >> > > >>>>>>> ``` > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> ## Reject Alternatives > > > > >> > > >>>>>>> > > > > >> > > >>>>>>> none. > > > > >> > > >>>>> > > > > >> > > >>>>> > > > > >> > > >>>>> > > > > >> > > >>>>> -- > > > > >> > > >>>>> Zike Yang > > > > >> > > >>> > > > > >> > > >>> > > > > >> > > > > > > >> > > > > > > > > > >