I have added the geo-replicator topic case and updated the PIP.

Regards
Jiwei Guo (Tboy)


On Sun, Feb 27, 2022 at 1:00 PM PengHui Li <peng...@apache.org> wrote:

> > To me, the main question is whether we create a custom error or expand
> the `CommandCloseProducer` command. I lean towards adding an error
> because it will automatically be backward compatible based on the way
> the client determines which errors are retriable. Although, I don't
> have a strong opinion.
>
> I have the same opinion as you. But it looks like not able to fix the
> geo-replication topic deletion.
>
> Either replicated topic or non-replicated topic, we should provide the
> same topic deletion behavior, the topic can be deleted if no active
> user's producers/consumers, should not be affected by the Pulsar
> internal producers/consumers.
>
> But for now, for a replicated topic, users are not able to delete it
> because
> of the internal active producer, adding force delete option can delete
> the topic, but it will create again if users enabled topic auto-creation.
>
> I think in this case, it seems we don't have a chance to give the producer
> an exception, because the reconnection will connect to a new topic only
> with
> the same topic name as before.
>
> Thanks,
> Penghui
>
> On Sat, Feb 26, 2022 at 2:24 PM Michael Marshall <mmarsh...@apache.org>
> wrote:
>
> > > We need to determine the overall plan, not the implementation
> > > at this moment.
> >
> > Great point, I agree.
> >
> > > Looks like we need an option in the
> > > `CommandCloseProducer` to avoid the replication producer
> > > reconnecting again.
> >
> > This is an important point. One of the primary requirements for
> > geo-replication must also be that a replicated namespace or topic can
> > be deleted and recreated without any problems or leaks in the broker.
> > That requires that the producer is cleaned up properly when the
> > namespace is deleted. I think we could achieve that with a new server
> > error or an expansion of the `CommandCloseProducer` command.
> >
> > To me, the main question is whether we create a custom error or expand
> > the `CommandCloseProducer` command. I lean towards adding an error
> > because it will automatically be backward compatible based on the way
> > the client determines which errors are retriable. Although, I don't
> > have a strong opinion.
> >
> > The key code paths that we need to consider are topic lookup and topic
> > creation. Both the http admin endpoint and the binary protocol
> > endpoint interact with these code paths, and we should fail requests
> > for namespaces which have the "deleted" field set to true.
> >
> > In looking at the code, we could modify the logic in
> > `PulsarWebResource#checkLocalOrGetPeerReplicationCluster`. That method
> > is always called for topic lookups via HTTP or via the Pulsar
> > binary protocol. If a namespace doesn't exist, that method currently
> > returns a NOT_FOUND exception. In the binary protocol case, that
> > exception is converted to a `ServerError.MetadataError`. We could
> > modify the method like I do here [0], or we could add another case and
> > return a custom error message.
> >
> > We'd also need to prevent topic creation via the admin endpoint and
> > the auto topic creation via the binary protocol. I believe this will
> > be just as easy as the lookup case, but I haven't confirmed.
> >
> > Thanks,
> > Michael
> >
> > [0]
> >
> https://github.com/michaeljmarshall/pulsar/commit/93611a741af163587f79b88bc1c9f1acc7953512
> >
> >
> >
> >
> > On Fri, Feb 25, 2022 at 3:45 AM PengHui Li <peng...@apache.org> wrote:
> > >
> > > The replicated topic deletion also has a similar problem.
> > > If the topic auto-creation is enabled, we are not able to delete
> > > a replicated topic, because the producer used for replication
> > > will try to reconnect. Looks like we need an option in the
> > > `CommandCloseProducer` to avoid the replication producer
> > > reconnecting again.
> > >
> > > Penghui
> > >
> > > On Fri, Feb 25, 2022 at 5:26 PM PengHui Li <peng...@apache.org> wrote:
> > >
> > > > > Penghui, are you suggesting that we implement the namespace/tenant
> > > > terminated logic after completing this PIP?
> > > >
> > > > I'm ok with both being implemented together or separated.
> > > > We need to determine the overall plan, not the implementation
> > > > at this moment.
> > > >
> > > > > For the sake of discussion, if we implement the namespace
> terminated
> > > > logic first, we could fulfill the underlying requirements for this
> PIP
> > > > by returning a new non-retriable error response when a client tries
> to
> > > > connect a producer or a consumer to a topic in a namespace that is
> > > > "terminated". If we do add the "namespace terminated" feature, we'll
> > > > need to add a non-retriable exception for this case, anyway. The main
> > > > advantage here is that we'd only need one expansion of the protobuf
> > > > instead of two. The downside is that the protocol for connected
> > > > clients has a couple more roundtrips. The broker would disconnect
> > > > connected clients and then fail their reconnection attempt with a
> > > > non-retriable error.
> > > >
> > > > Regarding the namespace "terminated" concept, I just noticed that we
> > > > already have a "deleted" field in a namespace's policies [0]. There
> is
> > > > even a comment that says:
> > > >
> > > > Yes, if we can terminate the namespace first, we should return
> > “namespace
> > > > not found”
> > > > if the producers and consumers try to reconnect, the client should
> stop
> > > > reconnecting
> > > > after getting this error. Looks like following this way, we don't
> need
> > to
> > > > introduce any protocol changes anymore.
> > > >
> > > > > As I think about it more, I no longer think "terminated" is the
> right
> > > > term for what I proposed above. Our goal is to briefly prevent any
> > > > topic creation to ensure we can delete all sub resources for a
> > > > namespace. On the other hand, a terminated topic isn't necessarily
> > > > short lived. If we want to apply the "terminated" term unequivocally
> > > > to both topics and namespaces, I think a terminated namespace would
> > > > need to be a namespace where all topics are in terminated state and
> no
> > > > additional topics could be created. That's not the feature we're
> > > > discussing here, though. Deleted seems like the right term to me,
> > > > especially since we're already using it to prevent a race condition
> > > >
> > > > Yes, topic termination has different meanings, deletion should be the
> > right
> > > > term here.
> > > >
> > > > Thanks,
> > > > Penghui
> > > >
> > > >
> > > > On Fri, Feb 25, 2022 at 12:19 PM Michael Marshall <
> > mmarsh...@apache.org>
> > > > wrote:
> > > >
> > > >> Regarding the namespace "terminated" concept, I just noticed that we
> > > >> already have a "deleted" field in a namespace's policies [0]. There
> is
> > > >> even a comment that says:
> > > >>
> > > >> > // set the policies to deleted so that somebody else cannot
> acquire
> > > >> this namespace
> > > >>
> > > >> I am not familiar with this feature, but it seems like this policy
> > > >> field could be checked before creating a topic in a namespace. That
> > > >> would remove certain races described above.
> > > >>
> > > >> As I think about it more, I no longer think "terminated" is the
> right
> > > >> term for what I proposed above. Our goal is to briefly prevent any
> > > >> topic creation to ensure we can delete all sub resources for a
> > > >> namespace. On the other hand, a terminated topic isn't necessarily
> > > >> short lived. If we want to apply the "terminated" term unequivocally
> > > >> to both topics and namespaces, I think a terminated namespace would
> > > >> need to be a namespace where all topics are in terminated state and
> no
> > > >> additional topics could be created. That's not the feature we're
> > > >> discussing here, though. Deleted seems like the right term to me,
> > > >> especially since we're already using it to prevent a race condition
> > > >> [0].
> > > >>
> > > >> Thanks,
> > > >> Michael
> > > >>
> > > >> [0]
> > > >>
> >
> https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java#L252-L262
> > > >>
> > > >> On Thu, Feb 24, 2022 at 9:58 PM Michael Marshall <
> > mmarsh...@apache.org>
> > > >> wrote:
> > > >> >
> > > >> > Hi Dave,
> > > >> >
> > > >> > > automatically delete tenants and namespaces for not containing
> > topics
> > > >> >
> > > >> > I don't think that is what we are discussing. I agree that the
> > initial
> > > >> > email says just that, though, which is why I asked above:
> > > >> >
> > > >> > >> When there are no user-created topics under a namespace,
> > > >> > >> Namespace should be deleted.
> > > >> > > This is supposed to mean that the namespace should be able to be
> > > >> > > deleted, correct?
> > > >> >
> > > >> > Perhaps Mattison can clarify, too.
> > > >> >
> > > >> > My current understanding of the context for the PIP is that a user
> > > >> > call to delete a namespace without force can fail when a producer
> > > >> > reconnects to a deleted topic. The goal is to remove the race
> > > >> > condition to ensure namespace deletion can succeed.
> > > >> >
> > > >> > Thanks,
> > > >> > Michael
> > > >> >
> > > >> > On Thu, Feb 24, 2022 at 5:21 PM Dave Fisher <w...@apache.org>
> > wrote:
> > > >> > >
> > > >> > > Hi -
> > > >> > >
> > > >> > > I hope I’m understanding what’s being discussed.
> > > >> > >
> > > >> > > If we are going to automatically delete tenants and namespaces
> for
> > > >> not containing topics then we need to make both of these automatic
> > actions
> > > >> configurable with a default to NOT do so. Otherwise we break
> existing
> > use
> > > >> cases.
> > > >> > >
> > > >> > > Automatic deletion of namespaces should be configurable at both
> > the
> > > >> cluster and tenant level.
> > > >> > >
> > > >> > > Regards,
> > > >> > > Dave
> > > >> > >
> > > >> > > > On Feb 24, 2022, at 2:25 PM, Michael Marshall <
> > mmarsh...@apache.org>
> > > >> wrote:
> > > >> > > >
> > > >> > > >> The old producer/consumer should be closed after applying the
> > > >> changes from
> > > >> > > >> this proposal.
> > > >> > > >
> > > >> > > > Penghui, are you suggesting that we implement the
> > namespace/tenant
> > > >> > > > terminated logic after completing this PIP?
> > > >> > > >
> > > >> > > > For the sake of discussion, if we implement the namespace
> > terminated
> > > >> > > > logic first, we could fulfill the underlying requirements for
> > this
> > > >> PIP
> > > >> > > > by returning a new non-retriable error response when a client
> > tries
> > > >> to
> > > >> > > > connect a producer or a consumer to a topic in a namespace
> that
> > is
> > > >> > > > "terminated". If we do add the "namespace terminated" feature,
> > we'll
> > > >> > > > need to add a non-retriable exception for this case, anyway.
> The
> > > >> main
> > > >> > > > advantage here is that we'd only need one expansion of the
> > protobuf
> > > >> > > > instead of two. The downside is that the protocol for
> connected
> > > >> > > > clients has a couple more roundtrips. The broker would
> > disconnect
> > > >> > > > connected clients and then fail their reconnection attempt
> with
> > a
> > > >> > > > non-retriable error.
> > > >> > > >
> > > >> > > > Thanks,
> > > >> > > > Michael
> > > >> > > >
> > > >> > > > On Thu, Feb 24, 2022 at 7:11 AM PengHui Li <
> peng...@apache.org>
> > > >> wrote:
> > > >> > > >>
> > > >> > > >>> If we want to solve this problem, we need to add some sync
> > > >> resources like
> > > >> > > >> lock/state, I think it’s a harm for us, we don’t need to do
> > that.
> > > >> > > >>
> > > >> > > >> I think we can make the namespace/tenants to the inactive
> state
> > > >> first so
> > > >> > > >> that we can avoid any new
> > > >> > > >> producer/consumer connect to the topic under the
> > namespace/tenant.
> > > >> > > >>
> > > >> > > >> The old producer/consumer should be closed after applying the
> > > >> changes from
> > > >> > > >> this proposal.
> > > >> > > >>
> > > >> > > >> Thanks,
> > > >> > > >> Penghui
> > > >> > > >>
> > > >> > > >> On Tue, Feb 8, 2022 at 5:47 PM mattison chao <
> > > >> mattisonc...@gmail.com> wrote:
> > > >> > > >>
> > > >> > > >>>> This is supposed to mean that the namespace should be able
> > to be
> > > >> > > >>>> deleted, correct?
> > > >> > > >>>
> > > >> > > >>> Yes, the main background is the user doesn’t have an active
> > > >> topic. so,
> > > >> > > >>> they want to delete the namespace.
> > > >> > > >>>
> > > >> > > >>>> However, I think
> > > >> > > >>>> we might still have a race condition that could make tenant
> > or
> > > >> > > >>>> namespace deletion fail. Specifically, if a new producer or
> > > >> consumer
> > > >> > > >>>> creates a topic after the namespace deletion has started
> but
> > > >> > > >>>> before it is complete. Do you agree that the underlying
> race
> > > >> still
> > > >> > > >>> exists?
> > > >> > > >>>
> > > >> > > >>> Yes, this condition exists. I think it’s not a big problem
> > > >> because the
> > > >> > > >>> user doesn’t want to use this namespace anymore.
> > > >> > > >>> If this scenario appears, they will get an error and need to
> > > >> delete it
> > > >> > > >>> again.
> > > >> > > >>>
> > > >> > > >>>> What if we expand our usage of the "terminated" feature to
> > apply
> > > >> to
> > > >> > > >>>> namespaces (and tenants)? Then, a terminated namespace can
> > have
> > > >> > > >>>> bundles and topics can be deleted but not created (just as
> a
> > > >> terminated
> > > >> > > >>>> topic cannot have any new messages published to it). This
> > would
> > > >> take
> > > >> > > >>>> care of all topic creation race conditions. We'd probably
> > need
> > > >> to add
> > > >> > > >>>> new protobuf exceptions for this feature.
> > > >> > > >>>
> > > >> > > >>>
> > > >> > > >>> If we want to solve this problem, we need to add some sync
> > > >> resources like
> > > >> > > >>> lock/state, I think it’s a harm for us, we don’t need to do
> > that.
> > > >> > > >>>
> > > >> > > >>> Thanks for your suggestions, let me know what you think.
> > > >> > > >>>
> > > >> > > >>> Best,
> > > >> > > >>> Mattison
> > > >> > > >>>
> > > >> > > >>>> On Feb 1, 2022, at 2:26 PM, Michael Marshall <
> > > >> mmarsh...@apache.org>
> > > >> > > >>> wrote:
> > > >> > > >>>>
> > > >> > > >>>> This proposal identifies an important issue that we should
> > > >> definitely
> > > >> > > >>>> solve. I have some questions.
> > > >> > > >>>>
> > > >> > > >>>>> When there are no user-created topics under a namespace,
> > > >> > > >>>>> Namespace should be deleted.
> > > >> > > >>>>
> > > >> > > >>>> This is supposed to mean that the namespace should be able
> > to be
> > > >> > > >>>> deleted, correct?
> > > >> > > >>>>
> > > >> > > >>>>> For this reason, we need to close the system topic
> > > >> reader/producer
> > > >> > > >>>>> first, then remove the system topic. finally, remove the
> > > >> namespace.
> > > >> > > >>>>
> > > >> > > >>>> I agree that expanding the protobuf CloseProducer and
> > > >> CloseConsumer
> > > >> > > >>>> commands could be valuable here. The expansion would ensure
> > that
> > > >> > > >>>> producers and consumers don't attempt to reconnect.
> However,
> > I
> > > >> think
> > > >> > > >>>> we might still have a race condition that could make tenant
> > or
> > > >> > > >>>> namespace deletion fail. Specifically, if a new producer or
> > > >> consumer
> > > >> > > >>>> creates a topic after the namespace deletion has started
> but
> > > >> > > >>>> before it is complete. Do you agree that the underlying
> race
> > > >> still
> > > >> > > >>> exists?
> > > >> > > >>>>
> > > >> > > >>>> In my view, the fundamental problem here is that deleting
> > > >> certain Pulsar
> > > >> > > >>>> resources takes time and, in a distributed system, that
> means
> > > >> race
> > > >> > > >>>> conditions.
> > > >> > > >>>>
> > > >> > > >>>> What if we expand our usage of the "terminated" feature to
> > apply
> > > >> to
> > > >> > > >>>> namespaces (and tenants)? Then, a terminated namespace can
> > have
> > > >> > > >>>> bundles and topics can be deleted but not created (just as
> a
> > > >> terminated
> > > >> > > >>>> topic cannot have any new messages published to it). This
> > would
> > > >> take
> > > >> > > >>>> care of all topic creation race conditions. We'd probably
> > need
> > > >> to add
> > > >> > > >>>> new protobuf exceptions for this feature.
> > > >> > > >>>>
> > > >> > > >>>> Thanks,
> > > >> > > >>>> Michael
> > > >> > > >>>>
> > > >> > > >>>>
> > > >> > > >>>> On Sat, Jan 29, 2022 at 7:25 PM Zike Yang
> > > >> > > >>>> <zky...@streamnative.io.invalid> wrote:
> > > >> > > >>>>>
> > > >> > > >>>>> +1
> > > >> > > >>>>>
> > > >> > > >>>>>
> > > >> > > >>>>> Thanks,
> > > >> > > >>>>> Zike
> > > >> > > >>>>>
> > > >> > > >>>>> On Sat, Jan 29, 2022 at 12:30 PM guo jiwei <
> > > >> techno...@apache.org>
> > > >> > > >>> wrote:
> > > >> > > >>>>>>
> > > >> > > >>>>>> Hi
> > > >> > > >>>>>> The PIP link :
> > https://github.com/apache/pulsar/issues/13989
> > > >> > > >>>>>>
> > > >> > > >>>>>> Regards
> > > >> > > >>>>>> Jiwei Guo (Tboy)
> > > >> > > >>>>>>
> > > >> > > >>>>>>
> > > >> > > >>>>>> On Sat, Jan 29, 2022 at 11:46 AM mattison chao <
> > > >> mattisonc...@gmail.com
> > > >> > > >>>>
> > > >> > > >>>>>> wrote:
> > > >> > > >>>>>>
> > > >> > > >>>>>>> Hello everyone,
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> I want to start a discussion about PIP-139 : Support
> > Broker
> > > >> send
> > > >> > > >>> command
> > > >> > > >>>>>>> to real close producer/consumer.
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> This is the PIP document
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> https://github.com/apache/pulsar/issues/13989 <
> > > >> > > >>>>>>> https://github.com/apache/pulsar/issues/13979>
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> Please check it out and feel free to share your
> thoughts.
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> Best,
> > > >> > > >>>>>>> Mattison
> > > >> > > >>>>>>>
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> ———————— Pasted below for quoting convenience.
> > > >> > > >>>>>>>
> > > >> > > >>>>>>>
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> Relation pull request:  #13337
> > > >> > > >>>>>>> Authors: @Technoboy-  @mattisonchao
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> ## Motivation
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> Before we discuss this pip, I'd like to supplement some
> > > >> context to
> > > >> > > >>> help
> > > >> > > >>>>>>> contributors who don't want to read the original pull
> > request.
> > > >> > > >>>>>>>
> > > >> > > >>>>>>>> When there are no user-created topics under a
> namespace,
> > > >> Namespace
> > > >> > > >>>>>>> should be deleted. But currently, the system topic
> existed
> > > >> and the
> > > >> > > >>>>>>> reader/producer could auto-create the system which may
> > cause
> > > >> the
> > > >> > > >>> namespace
> > > >> > > >>>>>>> deletion to fail.
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> For this reason, we need to close the system topic
> > > >> reader/producer
> > > >> > > >>> first,
> > > >> > > >>>>>>> then remove the system topic. finally, remove the
> > namespace.
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> Following this way, we first want to use ``terminate``
> to
> > > >> solve this
> > > >> > > >>>>>>> problem. then we found producers can disconnect, but
> > > >> consumers are
> > > >> > > >>> still
> > > >> > > >>>>>>> alive. So, another PR #13960 wants to add consumers'
> > closing
> > > >> logic.
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> After #13960, all things look good, but another problem
> > > >> appears. that
> > > >> > > >>> is
> > > >> > > >>>>>>> we need to wait until consumers completely consume all
> > > >> messages (this
> > > >> > > >>> may
> > > >> > > >>>>>>> make terminate topic so long and the operation
> > timeout)then
> > > >> get
> > > >> > > >>>>>>> ``reachedEndOfTopic``. the relative code here :
> > > >> > > >>>>>>>
> > > >> > > >>>>>>>
> > > >> > > >>>>>>>
> > > >> > > >>>
> > > >>
> >
> https://github.com/apache/pulsar/blob/07ef9231db8b844586b9217ee2d59237eb9c54b7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java#L102-L106
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> In the #13337 case, we need to force close consumers
> > > >> immediately. So
> > > >> > > >>> we
> > > >> > > >>>>>>> write this PIP to discuss another way to resolve this
> > problem.
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> ## Goal
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> We can add a new field(`allow_reconnect`) in command
> > > >> > > >>>>>>> ``CommandCloseProducer``/ ``CommandCloseConsumer`` to
> > close
> > > >> > > >>>>>>> producer/consumers immediately.
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> ## API Changes
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> - Add ``allow_reconnect`` to ``CommandCloseProducer``;
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> ```java
> > > >> > > >>>>>>> **Before**
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> message CommandCloseProducer {
> > > >> > > >>>>>>>   required uint64 producer_id = 1;
> > > >> > > >>>>>>>   required uint64 request_id = 2;
> > > >> > > >>>>>>> }
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> **After**
> > > >> > > >>>>>>> message CommandCloseProducer {
> > > >> > > >>>>>>>   required uint64 producer_id = 1;
> > > >> > > >>>>>>>   required uint64 request_id = 2;
> > > >> > > >>>>>>>   optional bool allow_reconnect = 3 [default = true];
> > > >> > > >>>>>>> }
> > > >> > > >>>>>>> ```
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> - Add ``allow_reconnect`` to ``CommandCloseConsumer``
> > > >> > > >>>>>>> ```java
> > > >> > > >>>>>>> **Before**
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> message CommandCloseConsumer {
> > > >> > > >>>>>>>   required uint64 consumer_id = 1;
> > > >> > > >>>>>>>   required uint64 request_id = 2;
> > > >> > > >>>>>>> }
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> **After**
> > > >> > > >>>>>>> message CommandCloseConsumer {
> > > >> > > >>>>>>>   required uint64 consumer_id = 1;
> > > >> > > >>>>>>>   required uint64 request_id = 2;
> > > >> > > >>>>>>>   optional bool allow_reconnect = 3 [default = true];
> > > >> > > >>>>>>> }
> > > >> > > >>>>>>> ```
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> ## Implementation
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> ### ClientCnx - Producer:
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> **Before**
> > > >> > > >>>>>>> ```java
> > > >> > > >>>>>>>   @Override
> > > >> > > >>>>>>>   protected void
> handleCloseProducer(CommandCloseProducer
> > > >> > > >>> closeProducer)
> > > >> > > >>>>>>> {
> > > >> > > >>>>>>>       log.info("[{}] Broker notification of Closed
> > producer:
> > > >> {}",
> > > >> > > >>>>>>> remoteAddress, closeProducer.getProducerId());
> > > >> > > >>>>>>>       final long producerId =
> > closeProducer.getProducerId();
> > > >> > > >>>>>>>       ProducerImpl<?> producer =
> > producers.get(producerId);
> > > >> > > >>>>>>>       if (producer != null) {
> > > >> > > >>>>>>>           producer.connectionClosed(this);
> > > >> > > >>>>>>>       } else {
> > > >> > > >>>>>>>           log.warn("Producer with id {} not found while
> > > >> closing
> > > >> > > >>> producer
> > > >> > > >>>>>>> ", producerId);
> > > >> > > >>>>>>>       }
> > > >> > > >>>>>>>   }
> > > >> > > >>>>>>> ```
> > > >> > > >>>>>>> After:
> > > >> > > >>>>>>> ```java
> > > >> > > >>>>>>>  @Override
> > > >> > > >>>>>>>   protected void
> handleCloseProducer(CommandCloseProducer
> > > >> > > >>> closeProducer)
> > > >> > > >>>>>>> {
> > > >> > > >>>>>>>       log.info("[{}] Broker notification of Closed
> > producer:
> > > >> {}",
> > > >> > > >>>>>>> remoteAddress, closeProducer.getProducerId());
> > > >> > > >>>>>>>       final long producerId =
> > closeProducer.getProducerId();
> > > >> > > >>>>>>>       ProducerImpl<?> producer =
> > producers.get(producerId);
> > > >> > > >>>>>>>       if (producer != null) {
> > > >> > > >>>>>>>           if (closeProducer.isAllowReconnect) {
> > > >> > > >>>>>>>               producer.connectionClosed(this);
> > > >> > > >>>>>>>           } else {
> > > >> > > >>>>>>>               producer.closeAsync();
> > > >> > > >>>>>>>           }
> > > >> > > >>>>>>>       } else {
> > > >> > > >>>>>>>           log.warn("Producer with id {} not found while
> > > >> closing
> > > >> > > >>> producer
> > > >> > > >>>>>>> ", producerId);
> > > >> > > >>>>>>>       }
> > > >> > > >>>>>>>   }
> > > >> > > >>>>>>> ```
> > > >> > > >>>>>>> ### ClientCnx - Consumer:
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> **Before**
> > > >> > > >>>>>>> ```java
> > > >> > > >>>>>>>   @Override
> > > >> > > >>>>>>>   protected void
> handleCloseConsumer(CommandCloseConsumer
> > > >> > > >>> closeConsumer)
> > > >> > > >>>>>>> {
> > > >> > > >>>>>>>       log.info("[{}] Broker notification of Closed
> > consumer:
> > > >> {}",
> > > >> > > >>>>>>> remoteAddress, closeConsumer.getConsumerId());
> > > >> > > >>>>>>>       final long consumerId =
> > closeConsumer.getConsumerId();
> > > >> > > >>>>>>>       ConsumerImpl<?> consumer =
> > consumers.get(consumerId);
> > > >> > > >>>>>>>       if (consumer != null) {
> > > >> > > >>>>>>>           consumer.connectionClosed(this);
> > > >> > > >>>>>>>       } else {
> > > >> > > >>>>>>>           log.warn("Consumer with id {} not found while
> > > >> closing
> > > >> > > >>> consumer
> > > >> > > >>>>>>> ", consumerId);
> > > >> > > >>>>>>>       }
> > > >> > > >>>>>>>   }
> > > >> > > >>>>>>> ```
> > > >> > > >>>>>>> **After**
> > > >> > > >>>>>>> ```java
> > > >> > > >>>>>>>   @Override
> > > >> > > >>>>>>>   protected void
> handleCloseConsumer(CommandCloseConsumer
> > > >> > > >>> closeConsumer)
> > > >> > > >>>>>>> {
> > > >> > > >>>>>>>       log.info("[{}] Broker notification of Closed
> > consumer:
> > > >> {}",
> > > >> > > >>>>>>> remoteAddress, closeConsumer.getConsumerId());
> > > >> > > >>>>>>>       final long consumerId =
> > closeConsumer.getConsumerId();
> > > >> > > >>>>>>>       ConsumerImpl<?> consumer =
> > consumers.get(consumerId);
> > > >> > > >>>>>>>       if (consumer != null) {
> > > >> > > >>>>>>>           if (closeConsumer.isAllowReconnect) {
> > > >> > > >>>>>>>               consumer.connectionClosed(this);
> > > >> > > >>>>>>>           } else {
> > > >> > > >>>>>>>               consumer.closeAsync();
> > > >> > > >>>>>>>           }
> > > >> > > >>>>>>>       } else {
> > > >> > > >>>>>>>           log.warn("Consumer with id {} not found while
> > > >> closing
> > > >> > > >>> consumer
> > > >> > > >>>>>>> ", consumerId);
> > > >> > > >>>>>>>       }
> > > >> > > >>>>>>>   }
> > > >> > > >>>>>>> ```
> > > >> > > >>>>>>>
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> ## Reject Alternatives
> > > >> > > >>>>>>>
> > > >> > > >>>>>>> none.
> > > >> > > >>>>>
> > > >> > > >>>>>
> > > >> > > >>>>>
> > > >> > > >>>>> --
> > > >> > > >>>>> Zike Yang
> > > >> > > >>>
> > > >> > > >>>
> > > >> > >
> > > >>
> > > >
> >
>

Reply via email to