I have added the geo-replicator topic case and updated the PIP.
Regards Jiwei Guo (Tboy) On Sun, Feb 27, 2022 at 1:00 PM PengHui Li <peng...@apache.org> wrote: > > To me, the main question is whether we create a custom error or expand > the `CommandCloseProducer` command. I lean towards adding an error > because it will automatically be backward compatible based on the way > the client determines which errors are retriable. Although, I don't > have a strong opinion. > > I have the same opinion as you. But it looks like not able to fix the > geo-replication topic deletion. > > Either replicated topic or non-replicated topic, we should provide the > same topic deletion behavior, the topic can be deleted if no active > user's producers/consumers, should not be affected by the Pulsar > internal producers/consumers. > > But for now, for a replicated topic, users are not able to delete it > because > of the internal active producer, adding force delete option can delete > the topic, but it will create again if users enabled topic auto-creation. > > I think in this case, it seems we don't have a chance to give the producer > an exception, because the reconnection will connect to a new topic only > with > the same topic name as before. > > Thanks, > Penghui > > On Sat, Feb 26, 2022 at 2:24 PM Michael Marshall <mmarsh...@apache.org> > wrote: > > > > We need to determine the overall plan, not the implementation > > > at this moment. > > > > Great point, I agree. > > > > > Looks like we need an option in the > > > `CommandCloseProducer` to avoid the replication producer > > > reconnecting again. > > > > This is an important point. One of the primary requirements for > > geo-replication must also be that a replicated namespace or topic can > > be deleted and recreated without any problems or leaks in the broker. > > That requires that the producer is cleaned up properly when the > > namespace is deleted. I think we could achieve that with a new server > > error or an expansion of the `CommandCloseProducer` command. > > > > To me, the main question is whether we create a custom error or expand > > the `CommandCloseProducer` command. I lean towards adding an error > > because it will automatically be backward compatible based on the way > > the client determines which errors are retriable. Although, I don't > > have a strong opinion. > > > > The key code paths that we need to consider are topic lookup and topic > > creation. Both the http admin endpoint and the binary protocol > > endpoint interact with these code paths, and we should fail requests > > for namespaces which have the "deleted" field set to true. > > > > In looking at the code, we could modify the logic in > > `PulsarWebResource#checkLocalOrGetPeerReplicationCluster`. That method > > is always called for topic lookups via HTTP or via the Pulsar > > binary protocol. If a namespace doesn't exist, that method currently > > returns a NOT_FOUND exception. In the binary protocol case, that > > exception is converted to a `ServerError.MetadataError`. We could > > modify the method like I do here [0], or we could add another case and > > return a custom error message. > > > > We'd also need to prevent topic creation via the admin endpoint and > > the auto topic creation via the binary protocol. I believe this will > > be just as easy as the lookup case, but I haven't confirmed. > > > > Thanks, > > Michael > > > > [0] > > > https://github.com/michaeljmarshall/pulsar/commit/93611a741af163587f79b88bc1c9f1acc7953512 > > > > > > > > > > On Fri, Feb 25, 2022 at 3:45 AM PengHui Li <peng...@apache.org> wrote: > > > > > > The replicated topic deletion also has a similar problem. > > > If the topic auto-creation is enabled, we are not able to delete > > > a replicated topic, because the producer used for replication > > > will try to reconnect. Looks like we need an option in the > > > `CommandCloseProducer` to avoid the replication producer > > > reconnecting again. > > > > > > Penghui > > > > > > On Fri, Feb 25, 2022 at 5:26 PM PengHui Li <peng...@apache.org> wrote: > > > > > > > > Penghui, are you suggesting that we implement the namespace/tenant > > > > terminated logic after completing this PIP? > > > > > > > > I'm ok with both being implemented together or separated. > > > > We need to determine the overall plan, not the implementation > > > > at this moment. > > > > > > > > > For the sake of discussion, if we implement the namespace > terminated > > > > logic first, we could fulfill the underlying requirements for this > PIP > > > > by returning a new non-retriable error response when a client tries > to > > > > connect a producer or a consumer to a topic in a namespace that is > > > > "terminated". If we do add the "namespace terminated" feature, we'll > > > > need to add a non-retriable exception for this case, anyway. The main > > > > advantage here is that we'd only need one expansion of the protobuf > > > > instead of two. The downside is that the protocol for connected > > > > clients has a couple more roundtrips. The broker would disconnect > > > > connected clients and then fail their reconnection attempt with a > > > > non-retriable error. > > > > > > > > Regarding the namespace "terminated" concept, I just noticed that we > > > > already have a "deleted" field in a namespace's policies [0]. There > is > > > > even a comment that says: > > > > > > > > Yes, if we can terminate the namespace first, we should return > > “namespace > > > > not found” > > > > if the producers and consumers try to reconnect, the client should > stop > > > > reconnecting > > > > after getting this error. Looks like following this way, we don't > need > > to > > > > introduce any protocol changes anymore. > > > > > > > > > As I think about it more, I no longer think "terminated" is the > right > > > > term for what I proposed above. Our goal is to briefly prevent any > > > > topic creation to ensure we can delete all sub resources for a > > > > namespace. On the other hand, a terminated topic isn't necessarily > > > > short lived. If we want to apply the "terminated" term unequivocally > > > > to both topics and namespaces, I think a terminated namespace would > > > > need to be a namespace where all topics are in terminated state and > no > > > > additional topics could be created. That's not the feature we're > > > > discussing here, though. Deleted seems like the right term to me, > > > > especially since we're already using it to prevent a race condition > > > > > > > > Yes, topic termination has different meanings, deletion should be the > > right > > > > term here. > > > > > > > > Thanks, > > > > Penghui > > > > > > > > > > > > On Fri, Feb 25, 2022 at 12:19 PM Michael Marshall < > > mmarsh...@apache.org> > > > > wrote: > > > > > > > >> Regarding the namespace "terminated" concept, I just noticed that we > > > >> already have a "deleted" field in a namespace's policies [0]. There > is > > > >> even a comment that says: > > > >> > > > >> > // set the policies to deleted so that somebody else cannot > acquire > > > >> this namespace > > > >> > > > >> I am not familiar with this feature, but it seems like this policy > > > >> field could be checked before creating a topic in a namespace. That > > > >> would remove certain races described above. > > > >> > > > >> As I think about it more, I no longer think "terminated" is the > right > > > >> term for what I proposed above. Our goal is to briefly prevent any > > > >> topic creation to ensure we can delete all sub resources for a > > > >> namespace. On the other hand, a terminated topic isn't necessarily > > > >> short lived. If we want to apply the "terminated" term unequivocally > > > >> to both topics and namespaces, I think a terminated namespace would > > > >> need to be a namespace where all topics are in terminated state and > no > > > >> additional topics could be created. That's not the feature we're > > > >> discussing here, though. Deleted seems like the right term to me, > > > >> especially since we're already using it to prevent a race condition > > > >> [0]. > > > >> > > > >> Thanks, > > > >> Michael > > > >> > > > >> [0] > > > >> > > > https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java#L252-L262 > > > >> > > > >> On Thu, Feb 24, 2022 at 9:58 PM Michael Marshall < > > mmarsh...@apache.org> > > > >> wrote: > > > >> > > > > >> > Hi Dave, > > > >> > > > > >> > > automatically delete tenants and namespaces for not containing > > topics > > > >> > > > > >> > I don't think that is what we are discussing. I agree that the > > initial > > > >> > email says just that, though, which is why I asked above: > > > >> > > > > >> > >> When there are no user-created topics under a namespace, > > > >> > >> Namespace should be deleted. > > > >> > > This is supposed to mean that the namespace should be able to be > > > >> > > deleted, correct? > > > >> > > > > >> > Perhaps Mattison can clarify, too. > > > >> > > > > >> > My current understanding of the context for the PIP is that a user > > > >> > call to delete a namespace without force can fail when a producer > > > >> > reconnects to a deleted topic. The goal is to remove the race > > > >> > condition to ensure namespace deletion can succeed. > > > >> > > > > >> > Thanks, > > > >> > Michael > > > >> > > > > >> > On Thu, Feb 24, 2022 at 5:21 PM Dave Fisher <w...@apache.org> > > wrote: > > > >> > > > > > >> > > Hi - > > > >> > > > > > >> > > I hope I’m understanding what’s being discussed. > > > >> > > > > > >> > > If we are going to automatically delete tenants and namespaces > for > > > >> not containing topics then we need to make both of these automatic > > actions > > > >> configurable with a default to NOT do so. Otherwise we break > existing > > use > > > >> cases. > > > >> > > > > > >> > > Automatic deletion of namespaces should be configurable at both > > the > > > >> cluster and tenant level. > > > >> > > > > > >> > > Regards, > > > >> > > Dave > > > >> > > > > > >> > > > On Feb 24, 2022, at 2:25 PM, Michael Marshall < > > mmarsh...@apache.org> > > > >> wrote: > > > >> > > > > > > >> > > >> The old producer/consumer should be closed after applying the > > > >> changes from > > > >> > > >> this proposal. > > > >> > > > > > > >> > > > Penghui, are you suggesting that we implement the > > namespace/tenant > > > >> > > > terminated logic after completing this PIP? > > > >> > > > > > > >> > > > For the sake of discussion, if we implement the namespace > > terminated > > > >> > > > logic first, we could fulfill the underlying requirements for > > this > > > >> PIP > > > >> > > > by returning a new non-retriable error response when a client > > tries > > > >> to > > > >> > > > connect a producer or a consumer to a topic in a namespace > that > > is > > > >> > > > "terminated". If we do add the "namespace terminated" feature, > > we'll > > > >> > > > need to add a non-retriable exception for this case, anyway. > The > > > >> main > > > >> > > > advantage here is that we'd only need one expansion of the > > protobuf > > > >> > > > instead of two. The downside is that the protocol for > connected > > > >> > > > clients has a couple more roundtrips. The broker would > > disconnect > > > >> > > > connected clients and then fail their reconnection attempt > with > > a > > > >> > > > non-retriable error. > > > >> > > > > > > >> > > > Thanks, > > > >> > > > Michael > > > >> > > > > > > >> > > > On Thu, Feb 24, 2022 at 7:11 AM PengHui Li < > peng...@apache.org> > > > >> wrote: > > > >> > > >> > > > >> > > >>> If we want to solve this problem, we need to add some sync > > > >> resources like > > > >> > > >> lock/state, I think it’s a harm for us, we don’t need to do > > that. > > > >> > > >> > > > >> > > >> I think we can make the namespace/tenants to the inactive > state > > > >> first so > > > >> > > >> that we can avoid any new > > > >> > > >> producer/consumer connect to the topic under the > > namespace/tenant. > > > >> > > >> > > > >> > > >> The old producer/consumer should be closed after applying the > > > >> changes from > > > >> > > >> this proposal. > > > >> > > >> > > > >> > > >> Thanks, > > > >> > > >> Penghui > > > >> > > >> > > > >> > > >> On Tue, Feb 8, 2022 at 5:47 PM mattison chao < > > > >> mattisonc...@gmail.com> wrote: > > > >> > > >> > > > >> > > >>>> This is supposed to mean that the namespace should be able > > to be > > > >> > > >>>> deleted, correct? > > > >> > > >>> > > > >> > > >>> Yes, the main background is the user doesn’t have an active > > > >> topic. so, > > > >> > > >>> they want to delete the namespace. > > > >> > > >>> > > > >> > > >>>> However, I think > > > >> > > >>>> we might still have a race condition that could make tenant > > or > > > >> > > >>>> namespace deletion fail. Specifically, if a new producer or > > > >> consumer > > > >> > > >>>> creates a topic after the namespace deletion has started > but > > > >> > > >>>> before it is complete. Do you agree that the underlying > race > > > >> still > > > >> > > >>> exists? > > > >> > > >>> > > > >> > > >>> Yes, this condition exists. I think it’s not a big problem > > > >> because the > > > >> > > >>> user doesn’t want to use this namespace anymore. > > > >> > > >>> If this scenario appears, they will get an error and need to > > > >> delete it > > > >> > > >>> again. > > > >> > > >>> > > > >> > > >>>> What if we expand our usage of the "terminated" feature to > > apply > > > >> to > > > >> > > >>>> namespaces (and tenants)? Then, a terminated namespace can > > have > > > >> > > >>>> bundles and topics can be deleted but not created (just as > a > > > >> terminated > > > >> > > >>>> topic cannot have any new messages published to it). This > > would > > > >> take > > > >> > > >>>> care of all topic creation race conditions. We'd probably > > need > > > >> to add > > > >> > > >>>> new protobuf exceptions for this feature. > > > >> > > >>> > > > >> > > >>> > > > >> > > >>> If we want to solve this problem, we need to add some sync > > > >> resources like > > > >> > > >>> lock/state, I think it’s a harm for us, we don’t need to do > > that. > > > >> > > >>> > > > >> > > >>> Thanks for your suggestions, let me know what you think. > > > >> > > >>> > > > >> > > >>> Best, > > > >> > > >>> Mattison > > > >> > > >>> > > > >> > > >>>> On Feb 1, 2022, at 2:26 PM, Michael Marshall < > > > >> mmarsh...@apache.org> > > > >> > > >>> wrote: > > > >> > > >>>> > > > >> > > >>>> This proposal identifies an important issue that we should > > > >> definitely > > > >> > > >>>> solve. I have some questions. > > > >> > > >>>> > > > >> > > >>>>> When there are no user-created topics under a namespace, > > > >> > > >>>>> Namespace should be deleted. > > > >> > > >>>> > > > >> > > >>>> This is supposed to mean that the namespace should be able > > to be > > > >> > > >>>> deleted, correct? > > > >> > > >>>> > > > >> > > >>>>> For this reason, we need to close the system topic > > > >> reader/producer > > > >> > > >>>>> first, then remove the system topic. finally, remove the > > > >> namespace. > > > >> > > >>>> > > > >> > > >>>> I agree that expanding the protobuf CloseProducer and > > > >> CloseConsumer > > > >> > > >>>> commands could be valuable here. The expansion would ensure > > that > > > >> > > >>>> producers and consumers don't attempt to reconnect. > However, > > I > > > >> think > > > >> > > >>>> we might still have a race condition that could make tenant > > or > > > >> > > >>>> namespace deletion fail. Specifically, if a new producer or > > > >> consumer > > > >> > > >>>> creates a topic after the namespace deletion has started > but > > > >> > > >>>> before it is complete. Do you agree that the underlying > race > > > >> still > > > >> > > >>> exists? > > > >> > > >>>> > > > >> > > >>>> In my view, the fundamental problem here is that deleting > > > >> certain Pulsar > > > >> > > >>>> resources takes time and, in a distributed system, that > means > > > >> race > > > >> > > >>>> conditions. > > > >> > > >>>> > > > >> > > >>>> What if we expand our usage of the "terminated" feature to > > apply > > > >> to > > > >> > > >>>> namespaces (and tenants)? Then, a terminated namespace can > > have > > > >> > > >>>> bundles and topics can be deleted but not created (just as > a > > > >> terminated > > > >> > > >>>> topic cannot have any new messages published to it). This > > would > > > >> take > > > >> > > >>>> care of all topic creation race conditions. We'd probably > > need > > > >> to add > > > >> > > >>>> new protobuf exceptions for this feature. > > > >> > > >>>> > > > >> > > >>>> Thanks, > > > >> > > >>>> Michael > > > >> > > >>>> > > > >> > > >>>> > > > >> > > >>>> On Sat, Jan 29, 2022 at 7:25 PM Zike Yang > > > >> > > >>>> <zky...@streamnative.io.invalid> wrote: > > > >> > > >>>>> > > > >> > > >>>>> +1 > > > >> > > >>>>> > > > >> > > >>>>> > > > >> > > >>>>> Thanks, > > > >> > > >>>>> Zike > > > >> > > >>>>> > > > >> > > >>>>> On Sat, Jan 29, 2022 at 12:30 PM guo jiwei < > > > >> techno...@apache.org> > > > >> > > >>> wrote: > > > >> > > >>>>>> > > > >> > > >>>>>> Hi > > > >> > > >>>>>> The PIP link : > > https://github.com/apache/pulsar/issues/13989 > > > >> > > >>>>>> > > > >> > > >>>>>> Regards > > > >> > > >>>>>> Jiwei Guo (Tboy) > > > >> > > >>>>>> > > > >> > > >>>>>> > > > >> > > >>>>>> On Sat, Jan 29, 2022 at 11:46 AM mattison chao < > > > >> mattisonc...@gmail.com > > > >> > > >>>> > > > >> > > >>>>>> wrote: > > > >> > > >>>>>> > > > >> > > >>>>>>> Hello everyone, > > > >> > > >>>>>>> > > > >> > > >>>>>>> I want to start a discussion about PIP-139 : Support > > Broker > > > >> send > > > >> > > >>> command > > > >> > > >>>>>>> to real close producer/consumer. > > > >> > > >>>>>>> > > > >> > > >>>>>>> This is the PIP document > > > >> > > >>>>>>> > > > >> > > >>>>>>> https://github.com/apache/pulsar/issues/13989 < > > > >> > > >>>>>>> https://github.com/apache/pulsar/issues/13979> > > > >> > > >>>>>>> > > > >> > > >>>>>>> Please check it out and feel free to share your > thoughts. > > > >> > > >>>>>>> > > > >> > > >>>>>>> Best, > > > >> > > >>>>>>> Mattison > > > >> > > >>>>>>> > > > >> > > >>>>>>> > > > >> > > >>>>>>> ———————— Pasted below for quoting convenience. > > > >> > > >>>>>>> > > > >> > > >>>>>>> > > > >> > > >>>>>>> > > > >> > > >>>>>>> Relation pull request: #13337 > > > >> > > >>>>>>> Authors: @Technoboy- @mattisonchao > > > >> > > >>>>>>> > > > >> > > >>>>>>> ## Motivation > > > >> > > >>>>>>> > > > >> > > >>>>>>> Before we discuss this pip, I'd like to supplement some > > > >> context to > > > >> > > >>> help > > > >> > > >>>>>>> contributors who don't want to read the original pull > > request. > > > >> > > >>>>>>> > > > >> > > >>>>>>>> When there are no user-created topics under a > namespace, > > > >> Namespace > > > >> > > >>>>>>> should be deleted. But currently, the system topic > existed > > > >> and the > > > >> > > >>>>>>> reader/producer could auto-create the system which may > > cause > > > >> the > > > >> > > >>> namespace > > > >> > > >>>>>>> deletion to fail. > > > >> > > >>>>>>> > > > >> > > >>>>>>> For this reason, we need to close the system topic > > > >> reader/producer > > > >> > > >>> first, > > > >> > > >>>>>>> then remove the system topic. finally, remove the > > namespace. > > > >> > > >>>>>>> > > > >> > > >>>>>>> Following this way, we first want to use ``terminate`` > to > > > >> solve this > > > >> > > >>>>>>> problem. then we found producers can disconnect, but > > > >> consumers are > > > >> > > >>> still > > > >> > > >>>>>>> alive. So, another PR #13960 wants to add consumers' > > closing > > > >> logic. > > > >> > > >>>>>>> > > > >> > > >>>>>>> After #13960, all things look good, but another problem > > > >> appears. that > > > >> > > >>> is > > > >> > > >>>>>>> we need to wait until consumers completely consume all > > > >> messages (this > > > >> > > >>> may > > > >> > > >>>>>>> make terminate topic so long and the operation > > timeout)then > > > >> get > > > >> > > >>>>>>> ``reachedEndOfTopic``. the relative code here : > > > >> > > >>>>>>> > > > >> > > >>>>>>> > > > >> > > >>>>>>> > > > >> > > >>> > > > >> > > > https://github.com/apache/pulsar/blob/07ef9231db8b844586b9217ee2d59237eb9c54b7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java#L102-L106 > > > >> > > >>>>>>> > > > >> > > >>>>>>> In the #13337 case, we need to force close consumers > > > >> immediately. So > > > >> > > >>> we > > > >> > > >>>>>>> write this PIP to discuss another way to resolve this > > problem. > > > >> > > >>>>>>> > > > >> > > >>>>>>> ## Goal > > > >> > > >>>>>>> > > > >> > > >>>>>>> We can add a new field(`allow_reconnect`) in command > > > >> > > >>>>>>> ``CommandCloseProducer``/ ``CommandCloseConsumer`` to > > close > > > >> > > >>>>>>> producer/consumers immediately. > > > >> > > >>>>>>> > > > >> > > >>>>>>> ## API Changes > > > >> > > >>>>>>> > > > >> > > >>>>>>> - Add ``allow_reconnect`` to ``CommandCloseProducer``; > > > >> > > >>>>>>> > > > >> > > >>>>>>> ```java > > > >> > > >>>>>>> **Before** > > > >> > > >>>>>>> > > > >> > > >>>>>>> message CommandCloseProducer { > > > >> > > >>>>>>> required uint64 producer_id = 1; > > > >> > > >>>>>>> required uint64 request_id = 2; > > > >> > > >>>>>>> } > > > >> > > >>>>>>> > > > >> > > >>>>>>> **After** > > > >> > > >>>>>>> message CommandCloseProducer { > > > >> > > >>>>>>> required uint64 producer_id = 1; > > > >> > > >>>>>>> required uint64 request_id = 2; > > > >> > > >>>>>>> optional bool allow_reconnect = 3 [default = true]; > > > >> > > >>>>>>> } > > > >> > > >>>>>>> ``` > > > >> > > >>>>>>> > > > >> > > >>>>>>> - Add ``allow_reconnect`` to ``CommandCloseConsumer`` > > > >> > > >>>>>>> ```java > > > >> > > >>>>>>> **Before** > > > >> > > >>>>>>> > > > >> > > >>>>>>> message CommandCloseConsumer { > > > >> > > >>>>>>> required uint64 consumer_id = 1; > > > >> > > >>>>>>> required uint64 request_id = 2; > > > >> > > >>>>>>> } > > > >> > > >>>>>>> > > > >> > > >>>>>>> **After** > > > >> > > >>>>>>> message CommandCloseConsumer { > > > >> > > >>>>>>> required uint64 consumer_id = 1; > > > >> > > >>>>>>> required uint64 request_id = 2; > > > >> > > >>>>>>> optional bool allow_reconnect = 3 [default = true]; > > > >> > > >>>>>>> } > > > >> > > >>>>>>> ``` > > > >> > > >>>>>>> > > > >> > > >>>>>>> ## Implementation > > > >> > > >>>>>>> > > > >> > > >>>>>>> ### ClientCnx - Producer: > > > >> > > >>>>>>> > > > >> > > >>>>>>> **Before** > > > >> > > >>>>>>> ```java > > > >> > > >>>>>>> @Override > > > >> > > >>>>>>> protected void > handleCloseProducer(CommandCloseProducer > > > >> > > >>> closeProducer) > > > >> > > >>>>>>> { > > > >> > > >>>>>>> log.info("[{}] Broker notification of Closed > > producer: > > > >> {}", > > > >> > > >>>>>>> remoteAddress, closeProducer.getProducerId()); > > > >> > > >>>>>>> final long producerId = > > closeProducer.getProducerId(); > > > >> > > >>>>>>> ProducerImpl<?> producer = > > producers.get(producerId); > > > >> > > >>>>>>> if (producer != null) { > > > >> > > >>>>>>> producer.connectionClosed(this); > > > >> > > >>>>>>> } else { > > > >> > > >>>>>>> log.warn("Producer with id {} not found while > > > >> closing > > > >> > > >>> producer > > > >> > > >>>>>>> ", producerId); > > > >> > > >>>>>>> } > > > >> > > >>>>>>> } > > > >> > > >>>>>>> ``` > > > >> > > >>>>>>> After: > > > >> > > >>>>>>> ```java > > > >> > > >>>>>>> @Override > > > >> > > >>>>>>> protected void > handleCloseProducer(CommandCloseProducer > > > >> > > >>> closeProducer) > > > >> > > >>>>>>> { > > > >> > > >>>>>>> log.info("[{}] Broker notification of Closed > > producer: > > > >> {}", > > > >> > > >>>>>>> remoteAddress, closeProducer.getProducerId()); > > > >> > > >>>>>>> final long producerId = > > closeProducer.getProducerId(); > > > >> > > >>>>>>> ProducerImpl<?> producer = > > producers.get(producerId); > > > >> > > >>>>>>> if (producer != null) { > > > >> > > >>>>>>> if (closeProducer.isAllowReconnect) { > > > >> > > >>>>>>> producer.connectionClosed(this); > > > >> > > >>>>>>> } else { > > > >> > > >>>>>>> producer.closeAsync(); > > > >> > > >>>>>>> } > > > >> > > >>>>>>> } else { > > > >> > > >>>>>>> log.warn("Producer with id {} not found while > > > >> closing > > > >> > > >>> producer > > > >> > > >>>>>>> ", producerId); > > > >> > > >>>>>>> } > > > >> > > >>>>>>> } > > > >> > > >>>>>>> ``` > > > >> > > >>>>>>> ### ClientCnx - Consumer: > > > >> > > >>>>>>> > > > >> > > >>>>>>> **Before** > > > >> > > >>>>>>> ```java > > > >> > > >>>>>>> @Override > > > >> > > >>>>>>> protected void > handleCloseConsumer(CommandCloseConsumer > > > >> > > >>> closeConsumer) > > > >> > > >>>>>>> { > > > >> > > >>>>>>> log.info("[{}] Broker notification of Closed > > consumer: > > > >> {}", > > > >> > > >>>>>>> remoteAddress, closeConsumer.getConsumerId()); > > > >> > > >>>>>>> final long consumerId = > > closeConsumer.getConsumerId(); > > > >> > > >>>>>>> ConsumerImpl<?> consumer = > > consumers.get(consumerId); > > > >> > > >>>>>>> if (consumer != null) { > > > >> > > >>>>>>> consumer.connectionClosed(this); > > > >> > > >>>>>>> } else { > > > >> > > >>>>>>> log.warn("Consumer with id {} not found while > > > >> closing > > > >> > > >>> consumer > > > >> > > >>>>>>> ", consumerId); > > > >> > > >>>>>>> } > > > >> > > >>>>>>> } > > > >> > > >>>>>>> ``` > > > >> > > >>>>>>> **After** > > > >> > > >>>>>>> ```java > > > >> > > >>>>>>> @Override > > > >> > > >>>>>>> protected void > handleCloseConsumer(CommandCloseConsumer > > > >> > > >>> closeConsumer) > > > >> > > >>>>>>> { > > > >> > > >>>>>>> log.info("[{}] Broker notification of Closed > > consumer: > > > >> {}", > > > >> > > >>>>>>> remoteAddress, closeConsumer.getConsumerId()); > > > >> > > >>>>>>> final long consumerId = > > closeConsumer.getConsumerId(); > > > >> > > >>>>>>> ConsumerImpl<?> consumer = > > consumers.get(consumerId); > > > >> > > >>>>>>> if (consumer != null) { > > > >> > > >>>>>>> if (closeConsumer.isAllowReconnect) { > > > >> > > >>>>>>> consumer.connectionClosed(this); > > > >> > > >>>>>>> } else { > > > >> > > >>>>>>> consumer.closeAsync(); > > > >> > > >>>>>>> } > > > >> > > >>>>>>> } else { > > > >> > > >>>>>>> log.warn("Consumer with id {} not found while > > > >> closing > > > >> > > >>> consumer > > > >> > > >>>>>>> ", consumerId); > > > >> > > >>>>>>> } > > > >> > > >>>>>>> } > > > >> > > >>>>>>> ``` > > > >> > > >>>>>>> > > > >> > > >>>>>>> > > > >> > > >>>>>>> ## Reject Alternatives > > > >> > > >>>>>>> > > > >> > > >>>>>>> none. > > > >> > > >>>>> > > > >> > > >>>>> > > > >> > > >>>>> > > > >> > > >>>>> -- > > > >> > > >>>>> Zike Yang > > > >> > > >>> > > > >> > > >>> > > > >> > > > > > >> > > > > > > >