This is very interesting.

I have only one concern.
I think that we should at least use a per-tenant system topic, or,
better, per-namespace.
There is no need to create the deletion topic if there is nothing to delete.

I am used to dealing with Pulsar clusters in which Tenants are
strictly isolated.
Introducing another component that is not tenant aware it kind of a
problem (we already have such problem with the Transaction
Coordinator)

Enrico

Il giorno mer 13 lug 2022 alle ore 10:54 horizonzy
<horizo...@apache.org> ha scritto:
>
> Hi Pulsar community: I open a pip to discuss "Introduce two phase deletion
> protocol based on system topic" Proposal Link:
> https://github.com/apache/pulsar/issues/16569
>
> ---
>
> ## Motivation
> Original issue: #13238
> In current ledger deletion, we divided it into two separate steps. It
> happens in ManagedLedger and ManagedCursor.
> Remove all the waiting to delete ledgers from the ledger list and update
> the newest ledger list into a meta store.
> In the meta store update callback operation, delete the waiting to delete
> ledgers from storage systems, such as BookKeeper or Tiered storage.
>
> Due to the separate step, we can’t ensure the ledger deletion transaction.
> If the first step succeeds and the second step fails, it will lead to
> ledgers that can't be deleted from the storage system forever. The second
> step may fail by broker restart or storage system deletion failed.
>
> In our customer’s environment, we have found many orphan ledgers cause by
> the above reason.
> ## Design
> Based on the above, we Introduce LedgerDeletionService to support two phase
> deletion. We hope it provides a general solution for two phase deletion. It
> will cover the problem we already found in managed-ledger, managed-cursor
> and schema-storage.
>
> In this design, we use the system topic to help us to store the pending
> delete ledger.
> * pulsar/system/persistent/__ledger_deletion : store the pending delete
> ledger
> * pulsar/system/persistent/__ledger_deletion_archive : as the DLQ for above
>
>
> ### The first phase:
> ```
> client.newProducer(Schema.AVRO(PendingDeleteLedgerInfo.class))
>  .topic("pulsar/system/persistent/__ledger_deletion")
>  .enableBatching(false)
>  .createAsync();
> ```
> In the LedgerDeletionService  start, it will create  a producer to send
> pending delete ledger.
> When delete a ledger,  a PendingDeleteLedgerInfo msg with 1 min delay (the
> delay is for consumer side, if send it immediately, maybe the metadata
> din't change when consumer receive it). After the send operation succeeds,
>  then to operate metadata. If send msg failed, we think this deletion
> operation failed, and didn't operate metadata.
>
> **PendingDeleteLedgerInfo**
> ```
> public class PendingDeleteLedgerInfo {
>     /**
>      * Partitioned topic name without domain. Likes
> public/default/test-topic-partition-1 or
>      * public/default/test-topic
>      */
>     private String topicName;
>
>     /**
>      * The ledger component . managed-ledger, managed-cursor and
> schema-storage.
>      */
>     private LedgerComponent ledgerComponent;
>
>     /**
>      * The ledger type. ledger or offload-ledger.
>      */
>     private LedgerType ledgerType;
>
>     /**
>      * LedgerId.
>      */
>     private Long ledgerId;
>
>     /**
>      * Context, holds offload info. If bk ledger, the context is null.
>      */
>     private MLDataFormats.ManagedLedgerInfo.LedgerInfo context;
>
>     /**
>      * When a consumer receives a pending delete ledger, maybe the ledger
> is still in use, we need to check the ledger is in use.
>      * In some cases, we needn't check the ledger still in use.
>      */
>     private boolean checkLedgerStillInUse;
>
>     /**
>      * Extent properties.
>      */
>     private Map<String, String> properties = new HashMap<>();
> }
> ```
> ### The second phase
> ```
> client.newConsumer(Schema.AVRO(PendingDeleteLedgerInfo.class))
> .topic("pulsar/system/persistent/__ledger_deletion")
> .subscriptionName("ledger-deletion-worker")
> .subscriptionType(SubscriptionType.Shared)
> .deadLetterPolicy(DeadLetterPolicy.builder()
>
>  
> .deadLetterTopic(SystemTopicNames.LEDGER_DELETION_ARCHIVE_TOPIC.getPartitionedTopicName())
>          .maxRedeliverCount(10).build())
>  .subscribeAsync()
> ```
> In the LedgerDeletionService start, it will start a consumer to consume
> pending delete ledger.
>
> ### Check if the ledger is still in use
> When received a pending delete ledger, we should check if the pending
> delete ledger still exists in the metadata. If exists, we should give up to
> delete this ledger in this time, the consumer won't ack this message,  do
> reconsumeLater 10 min. If it does not exist, it will try to delete legder
> or offload-ledger according to the ledger-type. If delete succeeds, ack
> this message, if delete failed, reconsumerLater 10 min. If a
> PendingDeleteLedger msg reconsume reach 10, the msg will transfer to DLQ
> pulsar/system/persistent/__ledger_deletion_archive
>
> _Tips: We define DLQ maxRedeliverCount is 10 and reconsmeLater 10 min, If
> the storage system shuts down, the pending delete ledger will try to delete
> 10 times in 100 min. So we don't worry if the storage system shutdown, the
> ledger can't be delete._
>
> ### How to get existing ledger when consumer receives pending delete ledger
> Now we supply admin api topics getInternalStats {topic-name}, the response
> contains three part info we want.
> * managed-ledger ledgerIds
> * managed-cursor ledgerIds
> * schema-storage ledgerIds
>
> So when receiving a pending delete ledger,  we will fetch topic internal
> stats.  To avoid fetching topic internal stats too frequently, we define
> ledgersCache  for them.
> ```
>     private final LoadingCache<String, TreeSet<Long>> ledgersCache =
> Caffeine.newBuilder()
>             .expireAfterWrite(10, TimeUnit.MINUTES)
>             .build(new CacheLoader<>() {
>                 @Override
>                 public @Nullable TreeSet<Long> load(@NonNull String key)
> throws Exception {
>                     return fetchInUseLedgerIds(key);
>                 }
>             });
>
>     private TreeSet<Long> fetchInUseLedgerIds(String topicName) throws
> PulsarAdminException {
>         TreeSet<Long> ledgerIds = new TreeSet<>();
>
>         PersistentTopicInternalStats internalStats =
> pulsarAdmin.topics().getInternalStats(topicName);
>
>         ledgerIds.addAll(internalStats.ledgers.stream().map(ele1 ->
> ele1.ledgerId).collect(Collectors.toSet()));
>         ledgerIds.addAll(
>                 internalStats.cursors.values().stream().map(ele ->
> ele.cursorLedger).collect(Collectors.toSet()));
>         ledgerIds.addAll(internalStats.schemaLedgers.stream().map(ele ->
> ele.ledgerId).collect(Collectors.toSet()));
>
>         return ledgerIds;
>     }
> ```
>
> The key is **topicName**, the value is **ledgerId set**.
> It will expire after 10 min.
>  * Reduce memory, if some topic didn't delete the ledger anymore, it won't
> spend memory.
>  * Decrease frequency of fetch metadata.
>
> When receiving a pending delete ledger, we get the ledgerId set from
> ledgersCache, then check if the ledgerId is greater than the max ledgerId
> from ledgerId set. if the ledgerId is greater than max ledgerId from the
> ledgerId set, it means the cache is out of date, we shouldn't believe it,
> we should consume this msg 10 min later. After 10 min reconsume, the cache
> has already expired, and will fetch new ledgerIds. If the ledgerId is not
> greater than the max ledgerId from the ledgerId set, then the ledgerId set
> contains this ledgerId, if contains, reconsume this msg 10 min later. If
> not contained, do delete ledger operation.
>
> ### Here maybe some middle state
> * Case1: Send pending delete ledger succeed, before operating metadata, the
> broker down.
> The consumer will receive the pending delete ledger msg, but the ledger id
> still exists, but the consumer didn't delete this ledger.
> * Case2: Send pending delete ledger succeed, before operate metadata, the
> broker down.After awhile, send this pending delete ledger again, and
> operate metadata succeed.
> The consumer will receive two pending delete ledger msg with the same
> ledgerId, the ledger deletion is idempotent, the first deletion and second
> deletion will succeed, ack both msg.
> * Case3: Consumer received pending delete ledger msg, and delete ledger
> succeeded, the broker shutdown before ack the msg.
> The pending delete ledger msg will redelivery, the second consumer will
> delete the pending delete ledger again, cause the ledger deletion is
> idempotent, the msg will be ack finally.
>
> ### How to create system topic
> In LedgerDeletionService start, we will use pulsarAdmin to create
> partitioned system topic
> pulsar/system/persistent/__ledger_deletion-partition-x. We should ensure
> the system topic is partitioned, or if the metadata changes by user, the
> old system topic pulsar/system/persistent/__ledger_deletion will be lost.
>
> ## Process flow
> ### The first deletion phase:
> ![lwdBzs4upOufb9DAWkVHMpUq](
> https://user-images.githubusercontent.com/22524871/178689500-281e9ff3-a213-468e-a1ff-d999e6a8be20.png
> )
>
>
> ### The second deletion phase:
> ![Q1_-vAy4yLtv5PSBo5b_YTxI](
> https://user-images.githubusercontent.com/22524871/178689513-fce476de-7dd3-4f05-9528-dc3c427fffe4.png
> )
>
> ## API Changes
> Introduce LedgerDeletionService interface
> ```
> public interface LedgerDeletionService {
>
>
>     void start() throws PulsarClientException, PulsarAdminException;
>
>     CompletableFuture<?> appendPendingDeleteLedger(String topicName, long
> ledgerId, LedgerInfo context, LedgerComponent component,
>                                                    LedgerType type, boolean
> checkLedgerStillInUse);
>
>     void close() throws Exception;
>
>     CompletableFuture<?> asyncClose();
> }
> ```
>
> ## Configuration Changes
> Add property in ServiceConfiguration
> ```
> public class ServiceConfiguration {
>     @FieldContext(
>             dynamic = true,
>             category = CATEGORY_SERVER,
>             doc = "Using two phase deletion when delete ledger. if true, "
>                     + "LedgerDeletionService will take over ledger
> deletion. (Default false)"
>     )
>     private boolean twoPhaseDeletionEnabled;
>
>     @FieldContext(
>             category = CATEGORY_SERVER,
>             doc = "Ledger deletion parallelism. Create partitioned system
> topic with this value when "
>                     + "twoPhaseDeletionEnabled is true."
>     )
>     private int ledgerDeletionParallelismOfTwoPhaseDeletion = 4;
>
>     @FieldContext(
>             category = CATEGORY_SERVER,
>             doc = "When delete ledger of two phase deletion, it will send
> PendingDeleteLedgerInfo to system topic,"
>                     + " send it delay according to this value. (Default
> 60s)"
>     )
>     private int sendDelaySecondsOfTwoPhaseDeletion = 60;
>
>     @FieldContext(
>             category = CATEGORY_SERVER,
>             doc = "When delete ledger of two phase deletion, it will start
> consumer to subscribe system topic,"
>                     + " when consumer PendingDeleteLedgerInfo failed, will
> reconsume later according to this value."
>                     + " (Default 600s)"
>     )
>     private int reconsumeLaterSecondsOfTwoPhaseDeletion = 600;
> }
> ```
>
> ## Documentation Changes
> We should add some documents for this new feature.
>
> ## Compatibility
> If user upgrade and enable two phase deletion, the ledger deletion msg will
> store in system topic. If the user rolls back to the old version and the
> system topic msg hasn't consumed all, some ledger may not delete.

Reply via email to