+1
Thanks, Zike On Sat, Jan 29, 2022 at 12:30 PM guo jiwei <techno...@apache.org> wrote: > > Hi > The PIP link : https://github.com/apache/pulsar/issues/13989 > > Regards > Jiwei Guo (Tboy) > > > On Sat, Jan 29, 2022 at 11:46 AM mattison chao <mattisonc...@gmail.com> > wrote: > > > Hello everyone, > > > > I want to start a discussion about PIP-139 : Support Broker send command > > to real close producer/consumer. > > > > This is the PIP document > > > > https://github.com/apache/pulsar/issues/13989 < > > https://github.com/apache/pulsar/issues/13979> > > > > Please check it out and feel free to share your thoughts. > > > > Best, > > Mattison > > > > > > ———————— Pasted below for quoting convenience. > > > > > > > > Relation pull request: #13337 > > Authors: @Technoboy- @mattisonchao > > > > ## Motivation > > > > Before we discuss this pip, I'd like to supplement some context to help > > contributors who don't want to read the original pull request. > > > > > When there are no user-created topics under a namespace, Namespace > > should be deleted. But currently, the system topic existed and the > > reader/producer could auto-create the system which may cause the namespace > > deletion to fail. > > > > For this reason, we need to close the system topic reader/producer first, > > then remove the system topic. finally, remove the namespace. > > > > Following this way, we first want to use ``terminate`` to solve this > > problem. then we found producers can disconnect, but consumers are still > > alive. So, another PR #13960 wants to add consumers' closing logic. > > > > After #13960, all things look good, but another problem appears. that is > > we need to wait until consumers completely consume all messages (this may > > make terminate topic so long and the operation timeout)then get > > ``reachedEndOfTopic``. the relative code here : > > > > > > https://github.com/apache/pulsar/blob/07ef9231db8b844586b9217ee2d59237eb9c54b7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java#L102-L106 > > > > In the #13337 case, we need to force close consumers immediately. So we > > write this PIP to discuss another way to resolve this problem. > > > > ## Goal > > > > We can add a new field(`allow_reconnect`) in command > > ``CommandCloseProducer``/ ``CommandCloseConsumer`` to close > > producer/consumers immediately. > > > > ## API Changes > > > > - Add ``allow_reconnect`` to ``CommandCloseProducer``; > > > > ```java > > **Before** > > > > message CommandCloseProducer { > > required uint64 producer_id = 1; > > required uint64 request_id = 2; > > } > > > > **After** > > message CommandCloseProducer { > > required uint64 producer_id = 1; > > required uint64 request_id = 2; > > optional bool allow_reconnect = 3 [default = true]; > > } > > ``` > > > > - Add ``allow_reconnect`` to ``CommandCloseConsumer`` > > ```java > > **Before** > > > > message CommandCloseConsumer { > > required uint64 consumer_id = 1; > > required uint64 request_id = 2; > > } > > > > **After** > > message CommandCloseConsumer { > > required uint64 consumer_id = 1; > > required uint64 request_id = 2; > > optional bool allow_reconnect = 3 [default = true]; > > } > > ``` > > > > ## Implementation > > > > ### ClientCnx - Producer: > > > > **Before** > > ```java > > @Override > > protected void handleCloseProducer(CommandCloseProducer closeProducer) > > { > > log.info("[{}] Broker notification of Closed producer: {}", > > remoteAddress, closeProducer.getProducerId()); > > final long producerId = closeProducer.getProducerId(); > > ProducerImpl<?> producer = producers.get(producerId); > > if (producer != null) { > > producer.connectionClosed(this); > > } else { > > log.warn("Producer with id {} not found while closing producer > > ", producerId); > > } > > } > > ``` > > After: > > ```java > > @Override > > protected void handleCloseProducer(CommandCloseProducer closeProducer) > > { > > log.info("[{}] Broker notification of Closed producer: {}", > > remoteAddress, closeProducer.getProducerId()); > > final long producerId = closeProducer.getProducerId(); > > ProducerImpl<?> producer = producers.get(producerId); > > if (producer != null) { > > if (closeProducer.isAllowReconnect) { > > producer.connectionClosed(this); > > } else { > > producer.closeAsync(); > > } > > } else { > > log.warn("Producer with id {} not found while closing producer > > ", producerId); > > } > > } > > ``` > > ### ClientCnx - Consumer: > > > > **Before** > > ```java > > @Override > > protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) > > { > > log.info("[{}] Broker notification of Closed consumer: {}", > > remoteAddress, closeConsumer.getConsumerId()); > > final long consumerId = closeConsumer.getConsumerId(); > > ConsumerImpl<?> consumer = consumers.get(consumerId); > > if (consumer != null) { > > consumer.connectionClosed(this); > > } else { > > log.warn("Consumer with id {} not found while closing consumer > > ", consumerId); > > } > > } > > ``` > > **After** > > ```java > > @Override > > protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) > > { > > log.info("[{}] Broker notification of Closed consumer: {}", > > remoteAddress, closeConsumer.getConsumerId()); > > final long consumerId = closeConsumer.getConsumerId(); > > ConsumerImpl<?> consumer = consumers.get(consumerId); > > if (consumer != null) { > > if (closeConsumer.isAllowReconnect) { > > consumer.connectionClosed(this); > > } else { > > consumer.closeAsync(); > > } > > } else { > > log.warn("Consumer with id {} not found while closing consumer > > ", consumerId); > > } > > } > > ``` > > > > > > ## Reject Alternatives > > > > none. -- Zike Yang