Hello everyone, I want to start a discussion about PIP-139 : Support Broker send command to real close producer/consumer.
This is the PIP document https://github.com/apache/pulsar/issues/13989 <https://github.com/apache/pulsar/issues/13979> Please check it out and feel free to share your thoughts. Best, Mattison ———————— Pasted below for quoting convenience. Relation pull request: #13337 Authors: @Technoboy- @mattisonchao ## Motivation Before we discuss this pip, I'd like to supplement some context to help contributors who don't want to read the original pull request. > When there are no user-created topics under a namespace, Namespace should be > deleted. But currently, the system topic existed and the reader/producer > could auto-create the system which may cause the namespace deletion to fail. For this reason, we need to close the system topic reader/producer first, then remove the system topic. finally, remove the namespace. Following this way, we first want to use ``terminate`` to solve this problem. then we found producers can disconnect, but consumers are still alive. So, another PR #13960 wants to add consumers' closing logic. After #13960, all things look good, but another problem appears. that is we need to wait until consumers completely consume all messages (this may make terminate topic so long and the operation timeout)then get ``reachedEndOfTopic``. the relative code here : https://github.com/apache/pulsar/blob/07ef9231db8b844586b9217ee2d59237eb9c54b7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java#L102-L106 In the #13337 case, we need to force close consumers immediately. So we write this PIP to discuss another way to resolve this problem. ## Goal We can add a new field(`allow_reconnect`) in command ``CommandCloseProducer``/ ``CommandCloseConsumer`` to close producer/consumers immediately. ## API Changes - Add ``allow_reconnect`` to ``CommandCloseProducer``; ```java **Before** message CommandCloseProducer { required uint64 producer_id = 1; required uint64 request_id = 2; } **After** message CommandCloseProducer { required uint64 producer_id = 1; required uint64 request_id = 2; optional bool allow_reconnect = 3 [default = true]; } ``` - Add ``allow_reconnect`` to ``CommandCloseConsumer`` ```java **Before** message CommandCloseConsumer { required uint64 consumer_id = 1; required uint64 request_id = 2; } **After** message CommandCloseConsumer { required uint64 consumer_id = 1; required uint64 request_id = 2; optional bool allow_reconnect = 3 [default = true]; } ``` ## Implementation ### ClientCnx - Producer: **Before** ```java @Override protected void handleCloseProducer(CommandCloseProducer closeProducer) { log.info("[{}] Broker notification of Closed producer: {}", remoteAddress, closeProducer.getProducerId()); final long producerId = closeProducer.getProducerId(); ProducerImpl<?> producer = producers.get(producerId); if (producer != null) { producer.connectionClosed(this); } else { log.warn("Producer with id {} not found while closing producer ", producerId); } } ``` After: ```java @Override protected void handleCloseProducer(CommandCloseProducer closeProducer) { log.info("[{}] Broker notification of Closed producer: {}", remoteAddress, closeProducer.getProducerId()); final long producerId = closeProducer.getProducerId(); ProducerImpl<?> producer = producers.get(producerId); if (producer != null) { if (closeProducer.isAllowReconnect) { producer.connectionClosed(this); } else { producer.closeAsync(); } } else { log.warn("Producer with id {} not found while closing producer ", producerId); } } ``` ### ClientCnx - Consumer: **Before** ```java @Override protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) { log.info("[{}] Broker notification of Closed consumer: {}", remoteAddress, closeConsumer.getConsumerId()); final long consumerId = closeConsumer.getConsumerId(); ConsumerImpl<?> consumer = consumers.get(consumerId); if (consumer != null) { consumer.connectionClosed(this); } else { log.warn("Consumer with id {} not found while closing consumer ", consumerId); } } ``` **After** ```java @Override protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) { log.info("[{}] Broker notification of Closed consumer: {}", remoteAddress, closeConsumer.getConsumerId()); final long consumerId = closeConsumer.getConsumerId(); ConsumerImpl<?> consumer = consumers.get(consumerId); if (consumer != null) { if (closeConsumer.isAllowReconnect) { consumer.connectionClosed(this); } else { consumer.closeAsync(); } } else { log.warn("Consumer with id {} not found while closing consumer ", consumerId); } } ``` ## Reject Alternatives none.