Hi Pulsar community: I open a pip to discuss "Introduce two phase deletion protocol based on system topic" Proposal Link: https://github.com/apache/pulsar/issues/16569
--- ## Motivation Original issue: #13238 In current ledger deletion, we divided it into two separate steps. It happens in ManagedLedger and ManagedCursor. Remove all the waiting to delete ledgers from the ledger list and update the newest ledger list into a meta store. In the meta store update callback operation, delete the waiting to delete ledgers from storage systems, such as BookKeeper or Tiered storage. Due to the separate step, we can’t ensure the ledger deletion transaction. If the first step succeeds and the second step fails, it will lead to ledgers that can't be deleted from the storage system forever. The second step may fail by broker restart or storage system deletion failed. In our customer’s environment, we have found many orphan ledgers cause by the above reason. ## Design Based on the above, we Introduce LedgerDeletionService to support two phase deletion. We hope it provides a general solution for two phase deletion. It will cover the problem we already found in managed-ledger, managed-cursor and schema-storage. In this design, we use the system topic to help us to store the pending delete ledger. * pulsar/system/persistent/__ledger_deletion : store the pending delete ledger * pulsar/system/persistent/__ledger_deletion_archive : as the DLQ for above ### The first phase: ``` client.newProducer(Schema.AVRO(PendingDeleteLedgerInfo.class)) .topic("pulsar/system/persistent/__ledger_deletion") .enableBatching(false) .createAsync(); ``` In the LedgerDeletionService start, it will create a producer to send pending delete ledger. When delete a ledger, a PendingDeleteLedgerInfo msg with 1 min delay (the delay is for consumer side, if send it immediately, maybe the metadata din't change when consumer receive it). After the send operation succeeds, then to operate metadata. If send msg failed, we think this deletion operation failed, and didn't operate metadata. **PendingDeleteLedgerInfo** ``` public class PendingDeleteLedgerInfo { /** * Partitioned topic name without domain. Likes public/default/test-topic-partition-1 or * public/default/test-topic */ private String topicName; /** * The ledger component . managed-ledger, managed-cursor and schema-storage. */ private LedgerComponent ledgerComponent; /** * The ledger type. ledger or offload-ledger. */ private LedgerType ledgerType; /** * LedgerId. */ private Long ledgerId; /** * Context, holds offload info. If bk ledger, the context is null. */ private MLDataFormats.ManagedLedgerInfo.LedgerInfo context; /** * When a consumer receives a pending delete ledger, maybe the ledger is still in use, we need to check the ledger is in use. * In some cases, we needn't check the ledger still in use. */ private boolean checkLedgerStillInUse; /** * Extent properties. */ private Map<String, String> properties = new HashMap<>(); } ``` ### The second phase ``` client.newConsumer(Schema.AVRO(PendingDeleteLedgerInfo.class)) .topic("pulsar/system/persistent/__ledger_deletion") .subscriptionName("ledger-deletion-worker") .subscriptionType(SubscriptionType.Shared) .deadLetterPolicy(DeadLetterPolicy.builder() .deadLetterTopic(SystemTopicNames.LEDGER_DELETION_ARCHIVE_TOPIC.getPartitionedTopicName()) .maxRedeliverCount(10).build()) .subscribeAsync() ``` In the LedgerDeletionService start, it will start a consumer to consume pending delete ledger. ### Check if the ledger is still in use When received a pending delete ledger, we should check if the pending delete ledger still exists in the metadata. If exists, we should give up to delete this ledger in this time, the consumer won't ack this message, do reconsumeLater 10 min. If it does not exist, it will try to delete legder or offload-ledger according to the ledger-type. If delete succeeds, ack this message, if delete failed, reconsumerLater 10 min. If a PendingDeleteLedger msg reconsume reach 10, the msg will transfer to DLQ pulsar/system/persistent/__ledger_deletion_archive _Tips: We define DLQ maxRedeliverCount is 10 and reconsmeLater 10 min, If the storage system shuts down, the pending delete ledger will try to delete 10 times in 100 min. So we don't worry if the storage system shutdown, the ledger can't be delete._ ### How to get existing ledger when consumer receives pending delete ledger Now we supply admin api topics getInternalStats {topic-name}, the response contains three part info we want. * managed-ledger ledgerIds * managed-cursor ledgerIds * schema-storage ledgerIds So when receiving a pending delete ledger, we will fetch topic internal stats. To avoid fetching topic internal stats too frequently, we define ledgersCache for them. ``` private final LoadingCache<String, TreeSet<Long>> ledgersCache = Caffeine.newBuilder() .expireAfterWrite(10, TimeUnit.MINUTES) .build(new CacheLoader<>() { @Override public @Nullable TreeSet<Long> load(@NonNull String key) throws Exception { return fetchInUseLedgerIds(key); } }); private TreeSet<Long> fetchInUseLedgerIds(String topicName) throws PulsarAdminException { TreeSet<Long> ledgerIds = new TreeSet<>(); PersistentTopicInternalStats internalStats = pulsarAdmin.topics().getInternalStats(topicName); ledgerIds.addAll(internalStats.ledgers.stream().map(ele1 -> ele1.ledgerId).collect(Collectors.toSet())); ledgerIds.addAll( internalStats.cursors.values().stream().map(ele -> ele.cursorLedger).collect(Collectors.toSet())); ledgerIds.addAll(internalStats.schemaLedgers.stream().map(ele -> ele.ledgerId).collect(Collectors.toSet())); return ledgerIds; } ``` The key is **topicName**, the value is **ledgerId set**. It will expire after 10 min. * Reduce memory, if some topic didn't delete the ledger anymore, it won't spend memory. * Decrease frequency of fetch metadata. When receiving a pending delete ledger, we get the ledgerId set from ledgersCache, then check if the ledgerId is greater than the max ledgerId from ledgerId set. if the ledgerId is greater than max ledgerId from the ledgerId set, it means the cache is out of date, we shouldn't believe it, we should consume this msg 10 min later. After 10 min reconsume, the cache has already expired, and will fetch new ledgerIds. If the ledgerId is not greater than the max ledgerId from the ledgerId set, then the ledgerId set contains this ledgerId, if contains, reconsume this msg 10 min later. If not contained, do delete ledger operation. ### Here maybe some middle state * Case1: Send pending delete ledger succeed, before operating metadata, the broker down. The consumer will receive the pending delete ledger msg, but the ledger id still exists, but the consumer didn't delete this ledger. * Case2: Send pending delete ledger succeed, before operate metadata, the broker down.After awhile, send this pending delete ledger again, and operate metadata succeed. The consumer will receive two pending delete ledger msg with the same ledgerId, the ledger deletion is idempotent, the first deletion and second deletion will succeed, ack both msg. * Case3: Consumer received pending delete ledger msg, and delete ledger succeeded, the broker shutdown before ack the msg. The pending delete ledger msg will redelivery, the second consumer will delete the pending delete ledger again, cause the ledger deletion is idempotent, the msg will be ack finally. ### How to create system topic In LedgerDeletionService start, we will use pulsarAdmin to create partitioned system topic pulsar/system/persistent/__ledger_deletion-partition-x. We should ensure the system topic is partitioned, or if the metadata changes by user, the old system topic pulsar/system/persistent/__ledger_deletion will be lost. ## Process flow ### The first deletion phase:  ### The second deletion phase:  ## API Changes Introduce LedgerDeletionService interface ``` public interface LedgerDeletionService { void start() throws PulsarClientException, PulsarAdminException; CompletableFuture<?> appendPendingDeleteLedger(String topicName, long ledgerId, LedgerInfo context, LedgerComponent component, LedgerType type, boolean checkLedgerStillInUse); void close() throws Exception; CompletableFuture<?> asyncClose(); } ``` ## Configuration Changes Add property in ServiceConfiguration ``` public class ServiceConfiguration { @FieldContext( dynamic = true, category = CATEGORY_SERVER, doc = "Using two phase deletion when delete ledger. if true, " + "LedgerDeletionService will take over ledger deletion. (Default false)" ) private boolean twoPhaseDeletionEnabled; @FieldContext( category = CATEGORY_SERVER, doc = "Ledger deletion parallelism. Create partitioned system topic with this value when " + "twoPhaseDeletionEnabled is true." ) private int ledgerDeletionParallelismOfTwoPhaseDeletion = 4; @FieldContext( category = CATEGORY_SERVER, doc = "When delete ledger of two phase deletion, it will send PendingDeleteLedgerInfo to system topic," + " send it delay according to this value. (Default 60s)" ) private int sendDelaySecondsOfTwoPhaseDeletion = 60; @FieldContext( category = CATEGORY_SERVER, doc = "When delete ledger of two phase deletion, it will start consumer to subscribe system topic," + " when consumer PendingDeleteLedgerInfo failed, will reconsume later according to this value." + " (Default 600s)" ) private int reconsumeLaterSecondsOfTwoPhaseDeletion = 600; } ``` ## Documentation Changes We should add some documents for this new feature. ## Compatibility If user upgrade and enable two phase deletion, the ledger deletion msg will store in system topic. If the user rolls back to the old version and the system topic msg hasn't consumed all, some ledger may not delete.