> This is supposed to mean that the namespace should be able to be > deleted, correct?
Yes, the main background is the user doesn’t have an active topic. so, they want to delete the namespace. > However, I think > we might still have a race condition that could make tenant or > namespace deletion fail. Specifically, if a new producer or consumer > creates a topic after the namespace deletion has started but > before it is complete. Do you agree that the underlying race still exists? Yes, this condition exists. I think it’s not a big problem because the user doesn’t want to use this namespace anymore. If this scenario appears, they will get an error and need to delete it again. > What if we expand our usage of the "terminated" feature to apply to > namespaces (and tenants)? Then, a terminated namespace can have > bundles and topics can be deleted but not created (just as a terminated > topic cannot have any new messages published to it). This would take > care of all topic creation race conditions. We'd probably need to add > new protobuf exceptions for this feature. If we want to solve this problem, we need to add some sync resources like lock/state, I think it’s a harm for us, we don’t need to do that. Thanks for your suggestions, let me know what you think. Best, Mattison > On Feb 1, 2022, at 2:26 PM, Michael Marshall <mmarsh...@apache.org> wrote: > > This proposal identifies an important issue that we should definitely > solve. I have some questions. > >> When there are no user-created topics under a namespace, >> Namespace should be deleted. > > This is supposed to mean that the namespace should be able to be > deleted, correct? > >> For this reason, we need to close the system topic reader/producer >> first, then remove the system topic. finally, remove the namespace. > > I agree that expanding the protobuf CloseProducer and CloseConsumer > commands could be valuable here. The expansion would ensure that > producers and consumers don't attempt to reconnect. However, I think > we might still have a race condition that could make tenant or > namespace deletion fail. Specifically, if a new producer or consumer > creates a topic after the namespace deletion has started but > before it is complete. Do you agree that the underlying race still exists? > > In my view, the fundamental problem here is that deleting certain Pulsar > resources takes time and, in a distributed system, that means race > conditions. > > What if we expand our usage of the "terminated" feature to apply to > namespaces (and tenants)? Then, a terminated namespace can have > bundles and topics can be deleted but not created (just as a terminated > topic cannot have any new messages published to it). This would take > care of all topic creation race conditions. We'd probably need to add > new protobuf exceptions for this feature. > > Thanks, > Michael > > > On Sat, Jan 29, 2022 at 7:25 PM Zike Yang > <zky...@streamnative.io.invalid> wrote: >> >> +1 >> >> >> Thanks, >> Zike >> >> On Sat, Jan 29, 2022 at 12:30 PM guo jiwei <techno...@apache.org> wrote: >>> >>> Hi >>> The PIP link : https://github.com/apache/pulsar/issues/13989 >>> >>> Regards >>> Jiwei Guo (Tboy) >>> >>> >>> On Sat, Jan 29, 2022 at 11:46 AM mattison chao <mattisonc...@gmail.com> >>> wrote: >>> >>>> Hello everyone, >>>> >>>> I want to start a discussion about PIP-139 : Support Broker send command >>>> to real close producer/consumer. >>>> >>>> This is the PIP document >>>> >>>> https://github.com/apache/pulsar/issues/13989 < >>>> https://github.com/apache/pulsar/issues/13979> >>>> >>>> Please check it out and feel free to share your thoughts. >>>> >>>> Best, >>>> Mattison >>>> >>>> >>>> ———————— Pasted below for quoting convenience. >>>> >>>> >>>> >>>> Relation pull request: #13337 >>>> Authors: @Technoboy- @mattisonchao >>>> >>>> ## Motivation >>>> >>>> Before we discuss this pip, I'd like to supplement some context to help >>>> contributors who don't want to read the original pull request. >>>> >>>>> When there are no user-created topics under a namespace, Namespace >>>> should be deleted. But currently, the system topic existed and the >>>> reader/producer could auto-create the system which may cause the namespace >>>> deletion to fail. >>>> >>>> For this reason, we need to close the system topic reader/producer first, >>>> then remove the system topic. finally, remove the namespace. >>>> >>>> Following this way, we first want to use ``terminate`` to solve this >>>> problem. then we found producers can disconnect, but consumers are still >>>> alive. So, another PR #13960 wants to add consumers' closing logic. >>>> >>>> After #13960, all things look good, but another problem appears. that is >>>> we need to wait until consumers completely consume all messages (this may >>>> make terminate topic so long and the operation timeout)then get >>>> ``reachedEndOfTopic``. the relative code here : >>>> >>>> >>>> https://github.com/apache/pulsar/blob/07ef9231db8b844586b9217ee2d59237eb9c54b7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java#L102-L106 >>>> >>>> In the #13337 case, we need to force close consumers immediately. So we >>>> write this PIP to discuss another way to resolve this problem. >>>> >>>> ## Goal >>>> >>>> We can add a new field(`allow_reconnect`) in command >>>> ``CommandCloseProducer``/ ``CommandCloseConsumer`` to close >>>> producer/consumers immediately. >>>> >>>> ## API Changes >>>> >>>> - Add ``allow_reconnect`` to ``CommandCloseProducer``; >>>> >>>> ```java >>>> **Before** >>>> >>>> message CommandCloseProducer { >>>> required uint64 producer_id = 1; >>>> required uint64 request_id = 2; >>>> } >>>> >>>> **After** >>>> message CommandCloseProducer { >>>> required uint64 producer_id = 1; >>>> required uint64 request_id = 2; >>>> optional bool allow_reconnect = 3 [default = true]; >>>> } >>>> ``` >>>> >>>> - Add ``allow_reconnect`` to ``CommandCloseConsumer`` >>>> ```java >>>> **Before** >>>> >>>> message CommandCloseConsumer { >>>> required uint64 consumer_id = 1; >>>> required uint64 request_id = 2; >>>> } >>>> >>>> **After** >>>> message CommandCloseConsumer { >>>> required uint64 consumer_id = 1; >>>> required uint64 request_id = 2; >>>> optional bool allow_reconnect = 3 [default = true]; >>>> } >>>> ``` >>>> >>>> ## Implementation >>>> >>>> ### ClientCnx - Producer: >>>> >>>> **Before** >>>> ```java >>>> @Override >>>> protected void handleCloseProducer(CommandCloseProducer closeProducer) >>>> { >>>> log.info("[{}] Broker notification of Closed producer: {}", >>>> remoteAddress, closeProducer.getProducerId()); >>>> final long producerId = closeProducer.getProducerId(); >>>> ProducerImpl<?> producer = producers.get(producerId); >>>> if (producer != null) { >>>> producer.connectionClosed(this); >>>> } else { >>>> log.warn("Producer with id {} not found while closing producer >>>> ", producerId); >>>> } >>>> } >>>> ``` >>>> After: >>>> ```java >>>> @Override >>>> protected void handleCloseProducer(CommandCloseProducer closeProducer) >>>> { >>>> log.info("[{}] Broker notification of Closed producer: {}", >>>> remoteAddress, closeProducer.getProducerId()); >>>> final long producerId = closeProducer.getProducerId(); >>>> ProducerImpl<?> producer = producers.get(producerId); >>>> if (producer != null) { >>>> if (closeProducer.isAllowReconnect) { >>>> producer.connectionClosed(this); >>>> } else { >>>> producer.closeAsync(); >>>> } >>>> } else { >>>> log.warn("Producer with id {} not found while closing producer >>>> ", producerId); >>>> } >>>> } >>>> ``` >>>> ### ClientCnx - Consumer: >>>> >>>> **Before** >>>> ```java >>>> @Override >>>> protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) >>>> { >>>> log.info("[{}] Broker notification of Closed consumer: {}", >>>> remoteAddress, closeConsumer.getConsumerId()); >>>> final long consumerId = closeConsumer.getConsumerId(); >>>> ConsumerImpl<?> consumer = consumers.get(consumerId); >>>> if (consumer != null) { >>>> consumer.connectionClosed(this); >>>> } else { >>>> log.warn("Consumer with id {} not found while closing consumer >>>> ", consumerId); >>>> } >>>> } >>>> ``` >>>> **After** >>>> ```java >>>> @Override >>>> protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) >>>> { >>>> log.info("[{}] Broker notification of Closed consumer: {}", >>>> remoteAddress, closeConsumer.getConsumerId()); >>>> final long consumerId = closeConsumer.getConsumerId(); >>>> ConsumerImpl<?> consumer = consumers.get(consumerId); >>>> if (consumer != null) { >>>> if (closeConsumer.isAllowReconnect) { >>>> consumer.connectionClosed(this); >>>> } else { >>>> consumer.closeAsync(); >>>> } >>>> } else { >>>> log.warn("Consumer with id {} not found while closing consumer >>>> ", consumerId); >>>> } >>>> } >>>> ``` >>>> >>>> >>>> ## Reject Alternatives >>>> >>>> none. >> >> >> >> -- >> Zike Yang