This is very interesting. I have only one concern. I think that we should at least use a per-tenant system topic, or, better, per-namespace. There is no need to create the deletion topic if there is nothing to delete.
I am used to dealing with Pulsar clusters in which Tenants are strictly isolated. Introducing another component that is not tenant aware it kind of a problem (we already have such problem with the Transaction Coordinator) Enrico Il giorno mer 13 lug 2022 alle ore 10:54 horizonzy <horizo...@apache.org> ha scritto: > > Hi Pulsar community: I open a pip to discuss "Introduce two phase deletion > protocol based on system topic" Proposal Link: > https://github.com/apache/pulsar/issues/16569 > > --- > > ## Motivation > Original issue: #13238 > In current ledger deletion, we divided it into two separate steps. It > happens in ManagedLedger and ManagedCursor. > Remove all the waiting to delete ledgers from the ledger list and update > the newest ledger list into a meta store. > In the meta store update callback operation, delete the waiting to delete > ledgers from storage systems, such as BookKeeper or Tiered storage. > > Due to the separate step, we can’t ensure the ledger deletion transaction. > If the first step succeeds and the second step fails, it will lead to > ledgers that can't be deleted from the storage system forever. The second > step may fail by broker restart or storage system deletion failed. > > In our customer’s environment, we have found many orphan ledgers cause by > the above reason. > ## Design > Based on the above, we Introduce LedgerDeletionService to support two phase > deletion. We hope it provides a general solution for two phase deletion. It > will cover the problem we already found in managed-ledger, managed-cursor > and schema-storage. > > In this design, we use the system topic to help us to store the pending > delete ledger. > * pulsar/system/persistent/__ledger_deletion : store the pending delete > ledger > * pulsar/system/persistent/__ledger_deletion_archive : as the DLQ for above > > > ### The first phase: > ``` > client.newProducer(Schema.AVRO(PendingDeleteLedgerInfo.class)) > .topic("pulsar/system/persistent/__ledger_deletion") > .enableBatching(false) > .createAsync(); > ``` > In the LedgerDeletionService start, it will create a producer to send > pending delete ledger. > When delete a ledger, a PendingDeleteLedgerInfo msg with 1 min delay (the > delay is for consumer side, if send it immediately, maybe the metadata > din't change when consumer receive it). After the send operation succeeds, > then to operate metadata. If send msg failed, we think this deletion > operation failed, and didn't operate metadata. > > **PendingDeleteLedgerInfo** > ``` > public class PendingDeleteLedgerInfo { > /** > * Partitioned topic name without domain. Likes > public/default/test-topic-partition-1 or > * public/default/test-topic > */ > private String topicName; > > /** > * The ledger component . managed-ledger, managed-cursor and > schema-storage. > */ > private LedgerComponent ledgerComponent; > > /** > * The ledger type. ledger or offload-ledger. > */ > private LedgerType ledgerType; > > /** > * LedgerId. > */ > private Long ledgerId; > > /** > * Context, holds offload info. If bk ledger, the context is null. > */ > private MLDataFormats.ManagedLedgerInfo.LedgerInfo context; > > /** > * When a consumer receives a pending delete ledger, maybe the ledger > is still in use, we need to check the ledger is in use. > * In some cases, we needn't check the ledger still in use. > */ > private boolean checkLedgerStillInUse; > > /** > * Extent properties. > */ > private Map<String, String> properties = new HashMap<>(); > } > ``` > ### The second phase > ``` > client.newConsumer(Schema.AVRO(PendingDeleteLedgerInfo.class)) > .topic("pulsar/system/persistent/__ledger_deletion") > .subscriptionName("ledger-deletion-worker") > .subscriptionType(SubscriptionType.Shared) > .deadLetterPolicy(DeadLetterPolicy.builder() > > > .deadLetterTopic(SystemTopicNames.LEDGER_DELETION_ARCHIVE_TOPIC.getPartitionedTopicName()) > .maxRedeliverCount(10).build()) > .subscribeAsync() > ``` > In the LedgerDeletionService start, it will start a consumer to consume > pending delete ledger. > > ### Check if the ledger is still in use > When received a pending delete ledger, we should check if the pending > delete ledger still exists in the metadata. If exists, we should give up to > delete this ledger in this time, the consumer won't ack this message, do > reconsumeLater 10 min. If it does not exist, it will try to delete legder > or offload-ledger according to the ledger-type. If delete succeeds, ack > this message, if delete failed, reconsumerLater 10 min. If a > PendingDeleteLedger msg reconsume reach 10, the msg will transfer to DLQ > pulsar/system/persistent/__ledger_deletion_archive > > _Tips: We define DLQ maxRedeliverCount is 10 and reconsmeLater 10 min, If > the storage system shuts down, the pending delete ledger will try to delete > 10 times in 100 min. So we don't worry if the storage system shutdown, the > ledger can't be delete._ > > ### How to get existing ledger when consumer receives pending delete ledger > Now we supply admin api topics getInternalStats {topic-name}, the response > contains three part info we want. > * managed-ledger ledgerIds > * managed-cursor ledgerIds > * schema-storage ledgerIds > > So when receiving a pending delete ledger, we will fetch topic internal > stats. To avoid fetching topic internal stats too frequently, we define > ledgersCache for them. > ``` > private final LoadingCache<String, TreeSet<Long>> ledgersCache = > Caffeine.newBuilder() > .expireAfterWrite(10, TimeUnit.MINUTES) > .build(new CacheLoader<>() { > @Override > public @Nullable TreeSet<Long> load(@NonNull String key) > throws Exception { > return fetchInUseLedgerIds(key); > } > }); > > private TreeSet<Long> fetchInUseLedgerIds(String topicName) throws > PulsarAdminException { > TreeSet<Long> ledgerIds = new TreeSet<>(); > > PersistentTopicInternalStats internalStats = > pulsarAdmin.topics().getInternalStats(topicName); > > ledgerIds.addAll(internalStats.ledgers.stream().map(ele1 -> > ele1.ledgerId).collect(Collectors.toSet())); > ledgerIds.addAll( > internalStats.cursors.values().stream().map(ele -> > ele.cursorLedger).collect(Collectors.toSet())); > ledgerIds.addAll(internalStats.schemaLedgers.stream().map(ele -> > ele.ledgerId).collect(Collectors.toSet())); > > return ledgerIds; > } > ``` > > The key is **topicName**, the value is **ledgerId set**. > It will expire after 10 min. > * Reduce memory, if some topic didn't delete the ledger anymore, it won't > spend memory. > * Decrease frequency of fetch metadata. > > When receiving a pending delete ledger, we get the ledgerId set from > ledgersCache, then check if the ledgerId is greater than the max ledgerId > from ledgerId set. if the ledgerId is greater than max ledgerId from the > ledgerId set, it means the cache is out of date, we shouldn't believe it, > we should consume this msg 10 min later. After 10 min reconsume, the cache > has already expired, and will fetch new ledgerIds. If the ledgerId is not > greater than the max ledgerId from the ledgerId set, then the ledgerId set > contains this ledgerId, if contains, reconsume this msg 10 min later. If > not contained, do delete ledger operation. > > ### Here maybe some middle state > * Case1: Send pending delete ledger succeed, before operating metadata, the > broker down. > The consumer will receive the pending delete ledger msg, but the ledger id > still exists, but the consumer didn't delete this ledger. > * Case2: Send pending delete ledger succeed, before operate metadata, the > broker down.After awhile, send this pending delete ledger again, and > operate metadata succeed. > The consumer will receive two pending delete ledger msg with the same > ledgerId, the ledger deletion is idempotent, the first deletion and second > deletion will succeed, ack both msg. > * Case3: Consumer received pending delete ledger msg, and delete ledger > succeeded, the broker shutdown before ack the msg. > The pending delete ledger msg will redelivery, the second consumer will > delete the pending delete ledger again, cause the ledger deletion is > idempotent, the msg will be ack finally. > > ### How to create system topic > In LedgerDeletionService start, we will use pulsarAdmin to create > partitioned system topic > pulsar/system/persistent/__ledger_deletion-partition-x. We should ensure > the system topic is partitioned, or if the metadata changes by user, the > old system topic pulsar/system/persistent/__ledger_deletion will be lost. > > ## Process flow > ### The first deletion phase: >  > > > ### The second deletion phase: >  > > ## API Changes > Introduce LedgerDeletionService interface > ``` > public interface LedgerDeletionService { > > > void start() throws PulsarClientException, PulsarAdminException; > > CompletableFuture<?> appendPendingDeleteLedger(String topicName, long > ledgerId, LedgerInfo context, LedgerComponent component, > LedgerType type, boolean > checkLedgerStillInUse); > > void close() throws Exception; > > CompletableFuture<?> asyncClose(); > } > ``` > > ## Configuration Changes > Add property in ServiceConfiguration > ``` > public class ServiceConfiguration { > @FieldContext( > dynamic = true, > category = CATEGORY_SERVER, > doc = "Using two phase deletion when delete ledger. if true, " > + "LedgerDeletionService will take over ledger > deletion. (Default false)" > ) > private boolean twoPhaseDeletionEnabled; > > @FieldContext( > category = CATEGORY_SERVER, > doc = "Ledger deletion parallelism. Create partitioned system > topic with this value when " > + "twoPhaseDeletionEnabled is true." > ) > private int ledgerDeletionParallelismOfTwoPhaseDeletion = 4; > > @FieldContext( > category = CATEGORY_SERVER, > doc = "When delete ledger of two phase deletion, it will send > PendingDeleteLedgerInfo to system topic," > + " send it delay according to this value. (Default > 60s)" > ) > private int sendDelaySecondsOfTwoPhaseDeletion = 60; > > @FieldContext( > category = CATEGORY_SERVER, > doc = "When delete ledger of two phase deletion, it will start > consumer to subscribe system topic," > + " when consumer PendingDeleteLedgerInfo failed, will > reconsume later according to this value." > + " (Default 600s)" > ) > private int reconsumeLaterSecondsOfTwoPhaseDeletion = 600; > } > ``` > > ## Documentation Changes > We should add some documents for this new feature. > > ## Compatibility > If user upgrade and enable two phase deletion, the ledger deletion msg will > store in system topic. If the user rolls back to the old version and the > system topic msg hasn't consumed all, some ledger may not delete.