> If we want to solve this problem, we need to add some sync resources like lock/state, I think it’s a harm for us, we don’t need to do that.
I think we can make the namespace/tenants to the inactive state first so that we can avoid any new producer/consumer connect to the topic under the namespace/tenant. The old producer/consumer should be closed after applying the changes from this proposal. Thanks, Penghui On Tue, Feb 8, 2022 at 5:47 PM mattison chao <mattisonc...@gmail.com> wrote: > > This is supposed to mean that the namespace should be able to be > > deleted, correct? > > Yes, the main background is the user doesn’t have an active topic. so, > they want to delete the namespace. > > > However, I think > > we might still have a race condition that could make tenant or > > namespace deletion fail. Specifically, if a new producer or consumer > > creates a topic after the namespace deletion has started but > > before it is complete. Do you agree that the underlying race still > exists? > > Yes, this condition exists. I think it’s not a big problem because the > user doesn’t want to use this namespace anymore. > If this scenario appears, they will get an error and need to delete it > again. > > > What if we expand our usage of the "terminated" feature to apply to > > namespaces (and tenants)? Then, a terminated namespace can have > > bundles and topics can be deleted but not created (just as a terminated > > topic cannot have any new messages published to it). This would take > > care of all topic creation race conditions. We'd probably need to add > > new protobuf exceptions for this feature. > > > If we want to solve this problem, we need to add some sync resources like > lock/state, I think it’s a harm for us, we don’t need to do that. > > Thanks for your suggestions, let me know what you think. > > Best, > Mattison > > > On Feb 1, 2022, at 2:26 PM, Michael Marshall <mmarsh...@apache.org> > wrote: > > > > This proposal identifies an important issue that we should definitely > > solve. I have some questions. > > > >> When there are no user-created topics under a namespace, > >> Namespace should be deleted. > > > > This is supposed to mean that the namespace should be able to be > > deleted, correct? > > > >> For this reason, we need to close the system topic reader/producer > >> first, then remove the system topic. finally, remove the namespace. > > > > I agree that expanding the protobuf CloseProducer and CloseConsumer > > commands could be valuable here. The expansion would ensure that > > producers and consumers don't attempt to reconnect. However, I think > > we might still have a race condition that could make tenant or > > namespace deletion fail. Specifically, if a new producer or consumer > > creates a topic after the namespace deletion has started but > > before it is complete. Do you agree that the underlying race still > exists? > > > > In my view, the fundamental problem here is that deleting certain Pulsar > > resources takes time and, in a distributed system, that means race > > conditions. > > > > What if we expand our usage of the "terminated" feature to apply to > > namespaces (and tenants)? Then, a terminated namespace can have > > bundles and topics can be deleted but not created (just as a terminated > > topic cannot have any new messages published to it). This would take > > care of all topic creation race conditions. We'd probably need to add > > new protobuf exceptions for this feature. > > > > Thanks, > > Michael > > > > > > On Sat, Jan 29, 2022 at 7:25 PM Zike Yang > > <zky...@streamnative.io.invalid> wrote: > >> > >> +1 > >> > >> > >> Thanks, > >> Zike > >> > >> On Sat, Jan 29, 2022 at 12:30 PM guo jiwei <techno...@apache.org> > wrote: > >>> > >>> Hi > >>> The PIP link : https://github.com/apache/pulsar/issues/13989 > >>> > >>> Regards > >>> Jiwei Guo (Tboy) > >>> > >>> > >>> On Sat, Jan 29, 2022 at 11:46 AM mattison chao <mattisonc...@gmail.com > > > >>> wrote: > >>> > >>>> Hello everyone, > >>>> > >>>> I want to start a discussion about PIP-139 : Support Broker send > command > >>>> to real close producer/consumer. > >>>> > >>>> This is the PIP document > >>>> > >>>> https://github.com/apache/pulsar/issues/13989 < > >>>> https://github.com/apache/pulsar/issues/13979> > >>>> > >>>> Please check it out and feel free to share your thoughts. > >>>> > >>>> Best, > >>>> Mattison > >>>> > >>>> > >>>> ———————— Pasted below for quoting convenience. > >>>> > >>>> > >>>> > >>>> Relation pull request: #13337 > >>>> Authors: @Technoboy- @mattisonchao > >>>> > >>>> ## Motivation > >>>> > >>>> Before we discuss this pip, I'd like to supplement some context to > help > >>>> contributors who don't want to read the original pull request. > >>>> > >>>>> When there are no user-created topics under a namespace, Namespace > >>>> should be deleted. But currently, the system topic existed and the > >>>> reader/producer could auto-create the system which may cause the > namespace > >>>> deletion to fail. > >>>> > >>>> For this reason, we need to close the system topic reader/producer > first, > >>>> then remove the system topic. finally, remove the namespace. > >>>> > >>>> Following this way, we first want to use ``terminate`` to solve this > >>>> problem. then we found producers can disconnect, but consumers are > still > >>>> alive. So, another PR #13960 wants to add consumers' closing logic. > >>>> > >>>> After #13960, all things look good, but another problem appears. that > is > >>>> we need to wait until consumers completely consume all messages (this > may > >>>> make terminate topic so long and the operation timeout)then get > >>>> ``reachedEndOfTopic``. the relative code here : > >>>> > >>>> > >>>> > https://github.com/apache/pulsar/blob/07ef9231db8b844586b9217ee2d59237eb9c54b7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java#L102-L106 > >>>> > >>>> In the #13337 case, we need to force close consumers immediately. So > we > >>>> write this PIP to discuss another way to resolve this problem. > >>>> > >>>> ## Goal > >>>> > >>>> We can add a new field(`allow_reconnect`) in command > >>>> ``CommandCloseProducer``/ ``CommandCloseConsumer`` to close > >>>> producer/consumers immediately. > >>>> > >>>> ## API Changes > >>>> > >>>> - Add ``allow_reconnect`` to ``CommandCloseProducer``; > >>>> > >>>> ```java > >>>> **Before** > >>>> > >>>> message CommandCloseProducer { > >>>> required uint64 producer_id = 1; > >>>> required uint64 request_id = 2; > >>>> } > >>>> > >>>> **After** > >>>> message CommandCloseProducer { > >>>> required uint64 producer_id = 1; > >>>> required uint64 request_id = 2; > >>>> optional bool allow_reconnect = 3 [default = true]; > >>>> } > >>>> ``` > >>>> > >>>> - Add ``allow_reconnect`` to ``CommandCloseConsumer`` > >>>> ```java > >>>> **Before** > >>>> > >>>> message CommandCloseConsumer { > >>>> required uint64 consumer_id = 1; > >>>> required uint64 request_id = 2; > >>>> } > >>>> > >>>> **After** > >>>> message CommandCloseConsumer { > >>>> required uint64 consumer_id = 1; > >>>> required uint64 request_id = 2; > >>>> optional bool allow_reconnect = 3 [default = true]; > >>>> } > >>>> ``` > >>>> > >>>> ## Implementation > >>>> > >>>> ### ClientCnx - Producer: > >>>> > >>>> **Before** > >>>> ```java > >>>> @Override > >>>> protected void handleCloseProducer(CommandCloseProducer > closeProducer) > >>>> { > >>>> log.info("[{}] Broker notification of Closed producer: {}", > >>>> remoteAddress, closeProducer.getProducerId()); > >>>> final long producerId = closeProducer.getProducerId(); > >>>> ProducerImpl<?> producer = producers.get(producerId); > >>>> if (producer != null) { > >>>> producer.connectionClosed(this); > >>>> } else { > >>>> log.warn("Producer with id {} not found while closing > producer > >>>> ", producerId); > >>>> } > >>>> } > >>>> ``` > >>>> After: > >>>> ```java > >>>> @Override > >>>> protected void handleCloseProducer(CommandCloseProducer > closeProducer) > >>>> { > >>>> log.info("[{}] Broker notification of Closed producer: {}", > >>>> remoteAddress, closeProducer.getProducerId()); > >>>> final long producerId = closeProducer.getProducerId(); > >>>> ProducerImpl<?> producer = producers.get(producerId); > >>>> if (producer != null) { > >>>> if (closeProducer.isAllowReconnect) { > >>>> producer.connectionClosed(this); > >>>> } else { > >>>> producer.closeAsync(); > >>>> } > >>>> } else { > >>>> log.warn("Producer with id {} not found while closing > producer > >>>> ", producerId); > >>>> } > >>>> } > >>>> ``` > >>>> ### ClientCnx - Consumer: > >>>> > >>>> **Before** > >>>> ```java > >>>> @Override > >>>> protected void handleCloseConsumer(CommandCloseConsumer > closeConsumer) > >>>> { > >>>> log.info("[{}] Broker notification of Closed consumer: {}", > >>>> remoteAddress, closeConsumer.getConsumerId()); > >>>> final long consumerId = closeConsumer.getConsumerId(); > >>>> ConsumerImpl<?> consumer = consumers.get(consumerId); > >>>> if (consumer != null) { > >>>> consumer.connectionClosed(this); > >>>> } else { > >>>> log.warn("Consumer with id {} not found while closing > consumer > >>>> ", consumerId); > >>>> } > >>>> } > >>>> ``` > >>>> **After** > >>>> ```java > >>>> @Override > >>>> protected void handleCloseConsumer(CommandCloseConsumer > closeConsumer) > >>>> { > >>>> log.info("[{}] Broker notification of Closed consumer: {}", > >>>> remoteAddress, closeConsumer.getConsumerId()); > >>>> final long consumerId = closeConsumer.getConsumerId(); > >>>> ConsumerImpl<?> consumer = consumers.get(consumerId); > >>>> if (consumer != null) { > >>>> if (closeConsumer.isAllowReconnect) { > >>>> consumer.connectionClosed(this); > >>>> } else { > >>>> consumer.closeAsync(); > >>>> } > >>>> } else { > >>>> log.warn("Consumer with id {} not found while closing > consumer > >>>> ", consumerId); > >>>> } > >>>> } > >>>> ``` > >>>> > >>>> > >>>> ## Reject Alternatives > >>>> > >>>> none. > >> > >> > >> > >> -- > >> Zike Yang > >