Hello everyone,

I want to start a discussion about PIP-139 : Support Broker send command to 
real close producer/consumer.

This is the PIP document

https://github.com/apache/pulsar/issues/13989 
<https://github.com/apache/pulsar/issues/13979>

Please check it out and feel free to share your thoughts.

Best,
Mattison


———————— Pasted below for quoting convenience.



Relation pull request:  #13337
Authors: @Technoboy-  @mattisonchao 

## Motivation

Before we discuss this pip, I'd like to supplement some context to help 
contributors who don't want to read the original pull request.

> When there are no user-created topics under a namespace, Namespace should be 
> deleted. But currently, the system topic existed and the reader/producer 
> could auto-create the system which may cause the namespace deletion to fail.

For this reason, we need to close the system topic reader/producer first, then 
remove the system topic. finally, remove the namespace.

Following this way, we first want to use ``terminate`` to solve this problem. 
then we found producers can disconnect, but consumers are still alive. So, 
another PR #13960 wants to add consumers' closing logic.

After #13960, all things look good, but another problem appears. that is we 
need to wait until consumers completely consume all messages (this may make 
terminate topic so long and the operation timeout)then get 
``reachedEndOfTopic``. the relative code here : 

https://github.com/apache/pulsar/blob/07ef9231db8b844586b9217ee2d59237eb9c54b7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java#L102-L106

In the #13337 case, we need to force close consumers immediately. So we write 
this PIP to discuss another way to resolve this problem.

## Goal

We can add a new field(`allow_reconnect`) in command  ``CommandCloseProducer``/ 
``CommandCloseConsumer`` to close producer/consumers immediately.

## API Changes

- Add ``allow_reconnect`` to ``CommandCloseProducer``;

```java
**Before**

message CommandCloseProducer {
    required uint64 producer_id = 1;
    required uint64 request_id = 2;
}

**After**
message CommandCloseProducer {
    required uint64 producer_id = 1;
    required uint64 request_id = 2;
    optional bool allow_reconnect = 3 [default = true];
}
```

- Add ``allow_reconnect`` to ``CommandCloseConsumer``
```java
**Before**

message CommandCloseConsumer {
    required uint64 consumer_id = 1;
    required uint64 request_id = 2;
}

**After**
message CommandCloseConsumer {
    required uint64 consumer_id = 1;
    required uint64 request_id = 2;
    optional bool allow_reconnect = 3 [default = true];
}
```

## Implementation

### ClientCnx - Producer:

**Before**
```java
    @Override
    protected void handleCloseProducer(CommandCloseProducer closeProducer) {
        log.info("[{}] Broker notification of Closed producer: {}", 
remoteAddress, closeProducer.getProducerId());
        final long producerId = closeProducer.getProducerId();
        ProducerImpl<?> producer = producers.get(producerId);
        if (producer != null) {
            producer.connectionClosed(this);
        } else {
            log.warn("Producer with id {} not found while closing producer ", 
producerId);
        }
    }
```
After:
```java
   @Override
    protected void handleCloseProducer(CommandCloseProducer closeProducer) {
        log.info("[{}] Broker notification of Closed producer: {}", 
remoteAddress, closeProducer.getProducerId());
        final long producerId = closeProducer.getProducerId();
        ProducerImpl<?> producer = producers.get(producerId);
        if (producer != null) {
            if (closeProducer.isAllowReconnect) {
                producer.connectionClosed(this);
            } else {
                producer.closeAsync();
            }
        } else {
            log.warn("Producer with id {} not found while closing producer ", 
producerId);
        }
    }
```
### ClientCnx - Consumer:

**Before**
```java
    @Override
    protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
        log.info("[{}] Broker notification of Closed consumer: {}", 
remoteAddress, closeConsumer.getConsumerId());
        final long consumerId = closeConsumer.getConsumerId();
        ConsumerImpl<?> consumer = consumers.get(consumerId);
        if (consumer != null) {
            consumer.connectionClosed(this);
        } else {
            log.warn("Consumer with id {} not found while closing consumer ", 
consumerId);
        }
    }
```
**After**
```java
    @Override
    protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
        log.info("[{}] Broker notification of Closed consumer: {}", 
remoteAddress, closeConsumer.getConsumerId());
        final long consumerId = closeConsumer.getConsumerId();
        ConsumerImpl<?> consumer = consumers.get(consumerId);
        if (consumer != null) {
            if (closeConsumer.isAllowReconnect) {
                consumer.connectionClosed(this);
            } else {
                consumer.closeAsync();
            }
        } else {
            log.warn("Consumer with id {} not found while closing consumer ", 
consumerId);
        }
    }
```


## Reject Alternatives

none.

Reply via email to