> If you persisted the message successfully by the producer and the broker > was terminated before being able to delete the ledger from the metadata > service? If the broker is terminated, the consumer won't ack the message, the message will be re-consume later.
> I recommend having the logic to delete the ledger be done in the message > consumer side: > - if the ledger exists in the MD store, delete it. > - send delete command to BK > Both as I understand are idempotent. If for some reason one action was done > but the other wasn't - ZK down, BK down, broker hosting the consumer > terminated - either the message will not be acknowledged, or you negatively > acknowledge. We send a delete command to the broker, it will connect to the corresponding broker which loads the topic. The corresponding broker received the command, then passes the command to ManagedLedger, the ManagedLedger does the actual delete operation. If the consumer does the delete operation, it's a little unreasonable. The ledger manager should be `ManagedLedger`, let it do the delete will be better. > General question: When a ledger is persisted to ZK, where is the ledger > metadata persisted in ZK (more specifically it's metadata, which includes > the component? > Is it also used when building out the key (path) in ZK? https://github.com/apache/bookkeeper/blob/901f76ce4c4f9f771363424dbb60da4d590ad122/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataImpl.java#L74 It's the content in the zk node. when creating a ledger by bookkeeper, it will create a path like `/ledgers/00/0000/L0000`, the path value is an instance of LedgerMetadataImpl. The bookkeeper LedgerMetadataImpl, the customMetadata stores the user's metadata. If the ledger is for Ledger, the customMetadata store: `key:application, value:pulsar` `key:component, value:managed-ledger` `key:pulsar/managed-ledger, value: ledgerName` If the ledger is for Cursor, the customMetadata store: `key:application, value:pulsar` `key:component, value:managed-ledger` `key:pulsar/managed-ledger, value: ledgerName` `key:pulsar/cursor, value: curSorName` If the ledger is for schema, the customMetadata store: `key:application, value:pulsar` `key:component, value:schema` `key:pulsar/schemaId, value: schemaId` So when we get the ledger metadata from bookkeeper, we can get the ledger source. > Isn't the type saved together with the ledger in ZK? We need to differ it, the same ledger may store both on the bk side and the offload side. If a ledger want to delete the bk data and the offload data, it should publishes two message to the system topic. The broker needs it to determine whether to delete offload or bk. > It's for the offloaded ledger, when we want to delete the offload ledger, > > we need offloadedContextUuid, here we can simplify it to offloadContextUuid. > Sounds much better. Maybe offloadedLedgerUUID? (why context?) Agree. > > Are you encoding all that extra info besides the ledger ID and its source > to avoid reading it again from ZK when deleting it? No, only encoding the data which is useful for deletion. > > It's for extended. > > > Can't really understand from that short sentence what you mean. Can you > please elaborate? Maybe we can delete it, I just want to didn't change the class when we want to add new property. Put the new property as key-value to extend. > > > In https://github.com/apache/pulsar/issues/16569. The first step section > > and second step section process flow picture are detailed. > > > I'm sorry but you didn't answer all the questions I wrote. I'll paste them > here: > Can you explain the starting point? How does deletion work in general? > > When? What happens? ... I understand there are time based triggers, and > > sometimes used based triggers. They are somehow marked in metadata. > In ManagedLedgerImpl, the method trimConsumedLedgersInBackground will trigger the delete operation. It will find the slowest cursor read position in the topic, and find the ledger which is before the slowest cursor read position. Then check the ledgers if `isLedgerRetentionOverSizeQuota` or `hasLedgerRetentionExpired`. If so, we think the ledger should be delete, append it to deleteLedgerList, we also check if the ledger `isOffloadedNeedsDelete`, if so, append it to deleteOffloadLedgerList. Then iterate `deleteLedgerList` and `deleteOffloadLedgerList`, build `PendingDeleteLedgerInfo` , and send it to systemtopic. If send succeed, I will remove the ledger from the ledgerList, then persist the ledgerList. If send failed, didn't do anything. Example: there are ledger [1,2,3,4,5], and we want to delete 1,2,3. And we send 1,3 to the system topic succeed, send 2 failed. We remove 1,3. And persist [2,4,5] to the zk metadata store. There are some cases to trigger it. 1. A cursor be removed. 2. Close the current ledger and create a new ledger. 3. Consumer ack the message, the slowest cursor move forward. 4. User trigger truncateTopic by admin. > If delete fails, that means the storage system occur some problems. I guess > > the storage system will recovery in 10 mins. > > > > In https://github.com/apache/pulsar/issues/16569, we define > > reconsumeLaterOfTopicTwoPhaseDeletionInSeconds in the ServiceConfiguration, > > it's configurable. > > private int reconsumeLaterOfTopicTwoPhaseDeletionInSeconds = 600; > > > We need some experienced people here to contribute their opinion. Default > 10min might be too much. I recommend you ask Penghui. Fine. > > > > > You mentioned you are working with a client which has retries configured. > > > Retry is client side based, ack one message while producing another, > > > transaction free. Are you prepared to handle a case where you acked but > > > failed to produce the message, hence you lost it completely? > > > > > The pulsarClient only sends a new message that succeeds, then ack the > > origin message, so didn't care in this case. > > > Ok, then you will have concurrent consumption of a message which tries to > delete the ledger from ZK and then tries to delete it from BK? Isn't that a > concurrency issue? if i understand correctly, I guess it won't happen, we only delete the ledger from Zk at the first step(producer side) and delete the ledger from bk at the second step.(consumer side). > > > > > > If we want to delete a topic, we should send the delete ledger msg to > > > > system topic and remove ledger id from metadata one by one, after all > > the > > > > ledger has been deleted, then delete the topic. If any ledger operate > > > > failed, we think this delete topic operation failed and return the left > > > > ledger id in the response. > > > > > > I couldn't understand. Can you please clarify this section. How exactly > > > topic deletion is modified to use the features described in this pip? > > > > > We need to ensure that all ledgers are deleted before the topic is > > deleted, otherwise, there will be orphan ledgers. > > > Your PIP is about introducing a workflow for deleting a ledger, right? > When you delete a topic you iterate its ledger list and delete each ledger. > Your PIP changes the way each ledger is deleted and makes it async. So I > guess what I want to understand is: What are the changes you are making to > topic deletion due to your PIP? You said "we need to make sure" - can you > please clarify how you will make sure? Before delete the topic, we will iterate the ledger list like delete ledger. But we need to ensure all the ledger be send to the system topic and remove all the ledger from zk. then delete the topic. If there are some ledgers sent to system topic failed, we will throw an exception to avoid to delete the topic. > > > 10. > > > Backward compatibility - I would make sure to document in the docs > > exactly > > > what to do in case we need to rollback (cook book). > > Well. > > > You added > > > If user upgrade and enable two phase deletion, the ledger deletion msg > > will store in system topic. If the user rolls back to the old version and > > the system topic msg hasn't consumed all, some ledger may not delete. > > > A cookbook is giving instructions like "Before downgrading, wait for > metrics xxx to be 0 which indicates the in-flight ledgers delete commands > have all been processed". Here you just say, some ledgers may note delete - > give them some action - what commands can they run to delete those ledgers > themselves? Help them be successful as you have all the > implementation knowledge - they have nothing. Nicely suggestion, the user can consume the system topic to get the ledger deletion info, and delete it from bk or offload system. Maybe we can support a tool to help it, but it's not official. > > > > > > > > 11. > > > General comment - You're basically implementing a bespoke workflow using > > a > > > topic to save the state of where you are in the topic. > > > Is this the only place in Pulsar (delete ledger) that an action is > > composed > > > of several steps ? > > > If the answer is no to this, wouldn't it be better to have a small > > utility > > > which is in charge of moving through the workflow steps? It can even be a > > > simple state enum, where you move your state from a to b to c to d and it > > > is persisted. > > We need to persist in the middle steps, and we didn't want to operate the > > metadata store continually, so used pulsar to persist it. > > > > I didn't ask whether we should persist the workflow state to Pulsar instead > of ZK. > Can you please re-read my question? Sorry for the speed read, Asaf. At now, I only found the ledger deletion has the several steps. I agree with you to abstract a small utility to handle the multi steps works, but we may pay many efforts for it. So we just introduce a system topic to handle it. After this pip, if there are some similar multi steps works, we can refer this pip, use system topic to handle it. a -> b -> c -> d (syetmTopicA -> systemTopicB -> systemTopicC -> systemTopicD), the thought is generic. > > > > 12. Monitoring > > > Some actions here can take a long time. We're basically relying on logs > > to > > > monitor where we are in the flow? > > yes, we didn't trace the ledger deletion steps. we only use stats to > > record whether the delete operation succeeds or not. > > > That's not enough. > A user needs to be able to operate the cluster, so we need to give them > eyes into what's happening inside the system. > Please add metrics to help them figure that out: > * How many in-flight ledger deletion commands do we have? We can count the all unacked msg to describe it. > * How many ZK deletions failed/succeeded? I think we didn't need to count zk deletions. In fact, delete ledger from metadata is a zk node update operation. Before delete, there are [1,2,3,4,5], delete 1,2. just update the zk node content with the less ledgerList [3,4,5]. > * How many BK deletions failed/succeeded? Yes, we can count it. But it is transient, after the broker restart, it will re-count again. Thanks for your ideas. Asaf.