There are some cases to trigger it. 1. A cursor be removed. 2. Close the current ledger and create a new ledger. 3. Consumer ack the message, the slowest cursor move forward. 4. User trigger truncateTopic by admin.
I see that this pip is for the internal ledger deletion cases(1-3 above), and it appears that such internal deletion operations do not require pre-validation or status checks(and no additional iops on the metadata store). I agree that we need a separate pip for async admin operations. On Fri, Feb 3, 2023 at 9:16 AM Yan Zhao <horizo...@apache.org> wrote: > > If we don't want to add more pressure on the metadata store from this > > feature as a requirement, > > I think we can use a system topic as well. Just to confirm, is this the > > direction we are agreeing on? > The design is too generic, I'm afraid not it is not suitable for ledger > deletion. > > > > > Using a system topic, > > 1. A broker receives an async operation request on a target resource > from a > > client. > > 2. If the target resource is available, the broker publishes an operation > > request message to the system topic. > > 2.1 If there is no target resource, > > error out(ResourceDoesNotExistException). > It's more generic, I'm afraid it not suitable for ledger deletion. > > In ManagedLedgerImpl, the method trimConsumedLedgersInBackground will > trigger the delete operation. It will find the slowest cursor read position > in the topic, and find the ledger which is before the slowest cursor read > position. Then check the ledgers if `isLedgerRetentionOverSizeQuota` or > `hasLedgerRetentionExpired`. If so, we think the ledger should be delete, > append it to deleteLedgerList, we also check if the ledger > `isOffloadedNeedsDelete`, if so, append it to deleteOffloadLedgerList. > Then iterate `deleteLedgerList` and `deleteOffloadLedgerList`, build > `PendingDeleteLedgerInfo` , and send it to systemtopic. If send succeed, I > will remove the ledger from the ledgerList, then persist the ledgerList. If > send failed, didn't do anything. > Example: there are ledger [1,2,3,4,5], and we want to delete 1,2,3. And we > send 1,3 to the system topic succeed, send 2 failed. We remove 1,3. And > persist [2,4,5] to the zk metadata store. > > There are some cases to trigger it. > 1. A cursor be removed. > 2. Close the current ledger and create a new ledger. > 3. Consumer ack the message, the slowest cursor move forward. > 4. User trigger truncateTopic by admin. > > Case 4 is related to it, if the topic id not exists, throw > ResourceDoesNotExistException. > 2.2 If the target resource's state is pending or running, error > > out(InvalidResourceStateException). > If we use the system topic to store the pending deleted ledger, we can't > know it. Unless we read all the messages to check it, it's not realistic > 3. The broker returns success(messageId) to the client. > ledger deletion may trigger by the broker self. > Same above reason. > > > 4. A system topic consumer(Shared Subscription Mode?) consumes the > > operation request message. > Yes. > > 5. The consumer runs the operation workflow with an x-min timeout. > > 5.1 The consumer gets the target resource data from the metadata. > > 5.1.1 If there is no targeted resource, complete the operation. > Go > > to 5.4. > > 5.2 The consumer updates the target resource state to running. > (state: > > running, startedAt: now) > > 5.3 Run the workflow step 1 ... n. > > 5.4 Acknowledge the message. > The consumer received message, the message contains the delete ledger > info. Didn't need to get the target resource from metadata. It send delete > command to the corresponding broker like produce message. The broker > received delete command, it check if the ledger is exists in the > ledgerList, if exists, do nothing. If not exists, delete the bk data or > offload data. > If the topic already be delete, when send delete command, it will throw > TopicNotFoundException, then consumer will delete the ledger locally. > > > > > > > * I guess we will enable deduplication for the system topic for possibly > > duplicated operation requests. > That's not essential. The deletion is idempotent. > > > > > * Here, Target Resource means the last metadata to update in the metadata > > store. > > For example, a workflow updates multiple metadata places like the > > following. In this case, the target resource is the 3rd one. > > 1. sub-resource-1 > > 2. sub-resource-2 > > 3. target-resource. e.g. ledger, topic metadata. > > > > * For Step 5.3, all steps should be idempotent. Also, the consumer > > periodically updates the operation state in the target resource's > metadata > > every x secs. Any retry on a long-running job should resume the operation > > closer to where it failed instead of running it from the beginning. > > > * We could have describe* apis to check any inflight operations state by > > querying the target resource's operation state, but this will show some > > eventual consistency because of the delay between step 3 and step 5. > > > > > > Thanks, > > Heesung > > Thanks for your good idea, but it may not suitable the ledger deletion. > And in this pip, I didn't want to operate the ledger metadata too much. > But your idea may be a good solution for other cases. Maybe we can use it > to draft a new pip. >