Hi Pulsar community: I open a pip to discuss "Introduce two phase deletion
protocol based on system topic" Proposal Link:
https://github.com/apache/pulsar/issues/16569

---

## Motivation
Original issue: #13238
In current ledger deletion, we divided it into two separate steps. It
happens in ManagedLedger and ManagedCursor.
Remove all the waiting to delete ledgers from the ledger list and update
the newest ledger list into a meta store.
In the meta store update callback operation, delete the waiting to delete
ledgers from storage systems, such as BookKeeper or Tiered storage.

Due to the separate step, we can’t ensure the ledger deletion transaction.
If the first step succeeds and the second step fails, it will lead to
ledgers that can't be deleted from the storage system forever. The second
step may fail by broker restart or storage system deletion failed.

In our customer’s environment, we have found many orphan ledgers cause by
the above reason.
## Design
Based on the above, we Introduce LedgerDeletionService to support two phase
deletion. We hope it provides a general solution for two phase deletion. It
will cover the problem we already found in managed-ledger, managed-cursor
and schema-storage.

In this design, we use the system topic to help us to store the pending
delete ledger.
* pulsar/system/persistent/__ledger_deletion : store the pending delete
ledger
* pulsar/system/persistent/__ledger_deletion_archive : as the DLQ for above


### The first phase:
```
client.newProducer(Schema.AVRO(PendingDeleteLedgerInfo.class))
 .topic("pulsar/system/persistent/__ledger_deletion")
 .enableBatching(false)
 .createAsync();
```
In the LedgerDeletionService  start, it will create  a producer to send
pending delete ledger.
When delete a ledger,  a PendingDeleteLedgerInfo msg with 1 min delay (the
delay is for consumer side, if send it immediately, maybe the metadata
din't change when consumer receive it). After the send operation succeeds,
 then to operate metadata. If send msg failed, we think this deletion
operation failed, and didn't operate metadata.

**PendingDeleteLedgerInfo**
```
public class PendingDeleteLedgerInfo {
    /**
     * Partitioned topic name without domain. Likes
public/default/test-topic-partition-1 or
     * public/default/test-topic
     */
    private String topicName;

    /**
     * The ledger component . managed-ledger, managed-cursor and
schema-storage.
     */
    private LedgerComponent ledgerComponent;

    /**
     * The ledger type. ledger or offload-ledger.
     */
    private LedgerType ledgerType;

    /**
     * LedgerId.
     */
    private Long ledgerId;

    /**
     * Context, holds offload info. If bk ledger, the context is null.
     */
    private MLDataFormats.ManagedLedgerInfo.LedgerInfo context;

    /**
     * When a consumer receives a pending delete ledger, maybe the ledger
is still in use, we need to check the ledger is in use.
     * In some cases, we needn't check the ledger still in use.
     */
    private boolean checkLedgerStillInUse;

    /**
     * Extent properties.
     */
    private Map<String, String> properties = new HashMap<>();
}
```
### The second phase
```
client.newConsumer(Schema.AVRO(PendingDeleteLedgerInfo.class))
.topic("pulsar/system/persistent/__ledger_deletion")
.subscriptionName("ledger-deletion-worker")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()

 
.deadLetterTopic(SystemTopicNames.LEDGER_DELETION_ARCHIVE_TOPIC.getPartitionedTopicName())
         .maxRedeliverCount(10).build())
 .subscribeAsync()
```
In the LedgerDeletionService start, it will start a consumer to consume
pending delete ledger.

### Check if the ledger is still in use
When received a pending delete ledger, we should check if the pending
delete ledger still exists in the metadata. If exists, we should give up to
delete this ledger in this time, the consumer won't ack this message,  do
reconsumeLater 10 min. If it does not exist, it will try to delete legder
or offload-ledger according to the ledger-type. If delete succeeds, ack
this message, if delete failed, reconsumerLater 10 min. If a
PendingDeleteLedger msg reconsume reach 10, the msg will transfer to DLQ
pulsar/system/persistent/__ledger_deletion_archive

_Tips: We define DLQ maxRedeliverCount is 10 and reconsmeLater 10 min, If
the storage system shuts down, the pending delete ledger will try to delete
10 times in 100 min. So we don't worry if the storage system shutdown, the
ledger can't be delete._

### How to get existing ledger when consumer receives pending delete ledger
Now we supply admin api topics getInternalStats {topic-name}, the response
contains three part info we want.
* managed-ledger ledgerIds
* managed-cursor ledgerIds
* schema-storage ledgerIds

So when receiving a pending delete ledger,  we will fetch topic internal
stats.  To avoid fetching topic internal stats too frequently, we define
ledgersCache  for them.
```
    private final LoadingCache<String, TreeSet<Long>> ledgersCache =
Caffeine.newBuilder()
            .expireAfterWrite(10, TimeUnit.MINUTES)
            .build(new CacheLoader<>() {
                @Override
                public @Nullable TreeSet<Long> load(@NonNull String key)
throws Exception {
                    return fetchInUseLedgerIds(key);
                }
            });

    private TreeSet<Long> fetchInUseLedgerIds(String topicName) throws
PulsarAdminException {
        TreeSet<Long> ledgerIds = new TreeSet<>();

        PersistentTopicInternalStats internalStats =
pulsarAdmin.topics().getInternalStats(topicName);

        ledgerIds.addAll(internalStats.ledgers.stream().map(ele1 ->
ele1.ledgerId).collect(Collectors.toSet()));
        ledgerIds.addAll(
                internalStats.cursors.values().stream().map(ele ->
ele.cursorLedger).collect(Collectors.toSet()));
        ledgerIds.addAll(internalStats.schemaLedgers.stream().map(ele ->
ele.ledgerId).collect(Collectors.toSet()));

        return ledgerIds;
    }
```

The key is **topicName**, the value is **ledgerId set**.
It will expire after 10 min.
 * Reduce memory, if some topic didn't delete the ledger anymore, it won't
spend memory.
 * Decrease frequency of fetch metadata.

When receiving a pending delete ledger, we get the ledgerId set from
ledgersCache, then check if the ledgerId is greater than the max ledgerId
from ledgerId set. if the ledgerId is greater than max ledgerId from the
ledgerId set, it means the cache is out of date, we shouldn't believe it,
we should consume this msg 10 min later. After 10 min reconsume, the cache
has already expired, and will fetch new ledgerIds. If the ledgerId is not
greater than the max ledgerId from the ledgerId set, then the ledgerId set
contains this ledgerId, if contains, reconsume this msg 10 min later. If
not contained, do delete ledger operation.

### Here maybe some middle state
* Case1: Send pending delete ledger succeed, before operating metadata, the
broker down.
The consumer will receive the pending delete ledger msg, but the ledger id
still exists, but the consumer didn't delete this ledger.
* Case2: Send pending delete ledger succeed, before operate metadata, the
broker down.After awhile, send this pending delete ledger again, and
operate metadata succeed.
The consumer will receive two pending delete ledger msg with the same
ledgerId, the ledger deletion is idempotent, the first deletion and second
deletion will succeed, ack both msg.
* Case3: Consumer received pending delete ledger msg, and delete ledger
succeeded, the broker shutdown before ack the msg.
The pending delete ledger msg will redelivery, the second consumer will
delete the pending delete ledger again, cause the ledger deletion is
idempotent, the msg will be ack finally.

### How to create system topic
In LedgerDeletionService start, we will use pulsarAdmin to create
partitioned system topic
pulsar/system/persistent/__ledger_deletion-partition-x. We should ensure
the system topic is partitioned, or if the metadata changes by user, the
old system topic pulsar/system/persistent/__ledger_deletion will be lost.

## Process flow
### The first deletion phase:
![lwdBzs4upOufb9DAWkVHMpUq](
https://user-images.githubusercontent.com/22524871/178689500-281e9ff3-a213-468e-a1ff-d999e6a8be20.png
)


### The second deletion phase:
![Q1_-vAy4yLtv5PSBo5b_YTxI](
https://user-images.githubusercontent.com/22524871/178689513-fce476de-7dd3-4f05-9528-dc3c427fffe4.png
)

## API Changes
Introduce LedgerDeletionService interface
```
public interface LedgerDeletionService {


    void start() throws PulsarClientException, PulsarAdminException;

    CompletableFuture<?> appendPendingDeleteLedger(String topicName, long
ledgerId, LedgerInfo context, LedgerComponent component,
                                                   LedgerType type, boolean
checkLedgerStillInUse);

    void close() throws Exception;

    CompletableFuture<?> asyncClose();
}
```

## Configuration Changes
Add property in ServiceConfiguration
```
public class ServiceConfiguration {
    @FieldContext(
            dynamic = true,
            category = CATEGORY_SERVER,
            doc = "Using two phase deletion when delete ledger. if true, "
                    + "LedgerDeletionService will take over ledger
deletion. (Default false)"
    )
    private boolean twoPhaseDeletionEnabled;

    @FieldContext(
            category = CATEGORY_SERVER,
            doc = "Ledger deletion parallelism. Create partitioned system
topic with this value when "
                    + "twoPhaseDeletionEnabled is true."
    )
    private int ledgerDeletionParallelismOfTwoPhaseDeletion = 4;

    @FieldContext(
            category = CATEGORY_SERVER,
            doc = "When delete ledger of two phase deletion, it will send
PendingDeleteLedgerInfo to system topic,"
                    + " send it delay according to this value. (Default
60s)"
    )
    private int sendDelaySecondsOfTwoPhaseDeletion = 60;

    @FieldContext(
            category = CATEGORY_SERVER,
            doc = "When delete ledger of two phase deletion, it will start
consumer to subscribe system topic,"
                    + " when consumer PendingDeleteLedgerInfo failed, will
reconsume later according to this value."
                    + " (Default 600s)"
    )
    private int reconsumeLaterSecondsOfTwoPhaseDeletion = 600;
}
```

## Documentation Changes
We should add some documents for this new feature.

## Compatibility
If user upgrade and enable two phase deletion, the ledger deletion msg will
store in system topic. If the user rolls back to the old version and the
system topic msg hasn't consumed all, some ledger may not delete.

Reply via email to