On 2023/02/05 14:40:10 Asaf Mesika wrote:
> On Fri, Feb 3, 2023 at 1:48 PM Yan Zhao <horizo...@apache.org> wrote:
> 
> > > If you persisted the message successfully by the producer and the broker
> > > was terminated before being able to delete the ledger from the metadata
> > > service?
> > If the broker is terminated, the consumer won't ack the message, the
> > message will be re-consume later.
> 
> 
> Let me quote the text I was replying to:
> 
> > When pulsar wants to delete a ledger, ManagedLedger uses
> > ledgerDeletionService to send a message, the message content contains the
> > waiting delete ledger info. After sending success, delete the ledger id
> > from the metadata store.
> 
> 
> If I understand correctly, the following steps will happen:
> 1. Producer writes a message to system topic, containing the ledger
> information to delete.
> 2. Upon success of (1), the ledger ID is deleted from the metadata store.
> 
> You reply was about the consumer, my question was about the producer, so
> I'm writing my reply here again (full reply):
> 
> If you persisted the message successfully by the producer and the broker
> was terminated before being able to delete the ledger (ID) from the
> metadata service?

After the broker restart, the broker still thinks the ledger is available, and 
the broker can still read data from this ledger. 
The consumer received the message, it triggers the corresponding broker to 
delete the ledger.
The ledger still exists in the metadata, ignore deletion, and the consumer ack 
the message.

At the next trimConsumedLedgersInBackground, the ledger will be deleted again.

> 
> >
> > > I recommend having the logic to delete the ledger be done in the message
> > > consumer side:
> > > - if the ledger exists in the MD store, delete it.
> > > - send delete command to BK
> > > Both as I understand are idempotent. If for some reason one action was
> > done
> > > but the other wasn't - ZK down, BK down, broker hosting the consumer
> > > terminated - either the message will not be acknowledged, or you
> > negatively
> > > acknowledge.
> > We send a delete command to the broker, it will connect to the
> > corresponding broker which loads the topic. The corresponding broker
> > received the command, then passes the command to ManagedLedger, the
> > ManagedLedger does the actual delete operation.
> > If the consumer does the delete operation, it's a little unreasonable. The
> > ledger manager should be `ManagedLedger`, let it do the delete will be
> > better.
> >
> 
> You say the consumer will send a delete command to the broker: By what
> means? Sending a message on a topic, or running an RPC request by adding a
> new command to the PulsarApi.proto file?
> If the latter - shouldn't this be documented?
Yes, by sending an RPC request. 
I just add sections:
`Introduce admin api in Topics`.
`Introduce admin api in Schemas.`

I will describe it detailed.

> 
> Let me assume the consumer which receives the message, will run an Pulsar
> API command to instruct the broker assigned to this topic to delete the
> ledger - Why the implementation of this command in the assigned broker
> can't have both the deletion of the ledger ID from the ZK, AND deletion
> from BK? Why do we need to have the ZK deletion done on the producer side,
> after we produced the message?

In pulsar, as long as we delete the ledger id from metadata store, we think the 
ledger has already been deleted. The Pulsar didn't refer to the ledger anymore, 
although the ledger still exists in the BK. The ledger deletion in BK can be 
done at any time, as long as we ensure the ledger deletion in BK will do 
finally.

If we delete both ledger ID from zk and ledger from bk when the broker received 
the delete RPC command.
There may be a problem in this case. 
The broker wants to delete ledger 1, it send the message to the system topic 
and didn't remove the ledger id 1 from the zk. So the ManagedLedger still 
thinks ledger 1 is readable, it may read ledger 1 from BK. But at the same 
time, the consumer received msg, it send the delete RPC command to the broker, 
the broker delete ledger id from zk and delete ledger from BK. The 
ManagedLedger is reading from the BK, it will throw exception.

So we remove the ledger id from zk at the producer stage, the broker thinks the 
ledger has already deleted, and didn't do any operation for the ledger.

> 
> 
> > > General question: When a ledger is persisted to ZK, where is the ledger
> > > metadata persisted in ZK (more specifically it's metadata, which includes
> > > the associate?
> > > Is it also used when building out the key (path) in ZK?
> >
> >
> > https://github.com/apache/bookkeeper/blob/901f76ce4c4f9f771363424dbb60da4d590ad122/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataImpl.java#L74
> >
> > It's the content in the zk node. when creating a ledger by bookkeeper, it
> > will create a path like `/ledgers/00/0000/L0000`, the path value is an
> > instance of LedgerMetadataImpl.
> > The bookkeeper LedgerMetadataImpl, the customMetadata stores the user's
> > metadata.
> >
> > If the ledger is for Ledger, the customMetadata store:
> > `key:application, value:pulsar`
> > `key:component, value:managed-ledger`
> > `key:pulsar/managed-ledger, value: ledgerName`
> >
> > If the ledger is for Cursor, the customMetadata store:
> > `key:application, value:pulsar`
> > `key:component, value:managed-ledger`
> > `key:pulsar/managed-ledger, value: ledgerName`
> > `key:pulsar/cursor, value: curSorName`
> >
> > If the ledger is for schema, the customMetadata store:
> > `key:application, value:pulsar`
> > `key:component, value:schema`
> > `key:pulsar/schemaId, value: schemaId`
> >
> > So when we get the ledger metadata from bookkeeper, we can get the ledger
> > source.
> >
> 
> Thanks for the info!
> 
> 
> >
> > > Isn't the type saved together with the ledger in ZK?
> > We must to differ it, the same ledger may store both on the bk side and
> > the offload side.
> > If a ledger want to delete the bk data and the offload data, it should
> > publishes two message to the system topic. The broker needs it to determine
> > whether to delete offload or bk.
> 
> So you're saying that the information whether this ledger is either only in
> BK, or only in offload storage or both is NOT encoded in the Ledger custom
> metadata properties of the ledger?
> The offloader saves that information for that ledger in another place in
> ZK?

Yes, see the define of LedgerInfo.
```
    message LedgerInfo {
        required int64 ledgerId = 1;
        optional int64 entries = 2;
        optional int64 size = 3;
        optional int64 timestamp = 4;
        optional OffloadContext offloadContext = 5;
    }

    message OffloadContext {
        optional int64 uidMsb = 1;
        optional int64 uidLsb = 2;
        optional bool complete = 3;
        optional bool bookkeeperDeleted = 4;
        optional int64 timestamp = 5;
        optional OffloadDriverMetadata driverMetadata = 6;
        repeated OffloadSegment offloadSegment = 7;
    }

```
If a ledger didn't offload, the offloadContext is null. If a ledger is 
offloaded, the offloadContext will record the offload info. I
n ManagedLedger, it save the ledgers as a map, the key is ledgerId, the value 
is LedgerInfo. When persist the ledgers to the metadata store, using the 
map.values transfer to Set<LedgerInfo>, then persist the Set<LedgerInfo> to 
metadata store. It just record at pulsar side, it's not in bk side. 
If a ledger is offloaded, the data is exist in bk side and offload system side. 
And we may only delete the bk side(it may not reach the offload deletion 
policy). So here we need sign the LedgerType(LEDGER, not OFFLOAD_LEDGER) in the 
message, when the broker received the consumer's deleta command, it knows only 
delete the bk data. 

> 
> If this ledger is a Cursor ledger, how do you know where to delete it
> inside ZK if you don't include the cursor name in the
> PendingDeleteLedgerInfo message?
Yes, it miss it. We should also record the cusorName in the 
PendingDeleteLedgerInfo. 

> 
> 
> >
> > > It's for the offloaded ledger, when we want to delete the offload ledger,
> > > > we need offloadedContextUuid, here we can simplify it to
> > offloadContextUuid.
> >
> > > Sounds much better. Maybe offloadedLedgerUUID? (why context?)
> > Agree.
Sorry for the wrong reply,  here can't simplify to UUID, we need maintain the 
OffloadContext.
```
message OffloadContext {
    optional int64 uidMsb = 1;
    optional int64 uidLsb = 2;
    optional bool complete = 3;
    optional bool bookkeeperDeleted = 4;
    optional int64 timestamp = 5;
    optional OffloadDriverMetadata driverMetadata = 6;
    repeated OffloadSegment offloadSegment = 7;
}
```
When broker wants to delete the offload data, we need the OffloadDriverMetadata 
to find the place where the offload data is.

> >
> > >
> > >  Are you encoding all that  extra info besides the ledger ID and its
> > source
> > > to avoid reading it again from ZK when deleting it?
> > No, only encoding the data which is useful for deletion.
> >
> 
> Can you add to the PIP *how* you will *use* every piece of information
> saved in the message?
Yes, I will add every property usage in the pip. 

> 
> 
> > >
> > > It's for extended.
> > > >
> > > Can't really understand from that short sentence what you mean. Can you
> > > please elaborate?
> > Maybe we can delete it, I just want to didn't change the class when we
> > want to add new property. Put the new property as key-value to extend.
> >
> Let's assume you deleted it.
> 6 months from now someone needs a new feature for this, and needs to add a
> field to  PendingDeleteLedgerInfo.
> If you control the serialization and deserialization, and you make any
> change backward compatible, if should be ok, no?
Yes, we use the AVRO to define the schema, it's compatible for schema evolution.

> When you add fields as untyped, you lose the whole notion of type, no?
Yes. So use the AVRO to define the schema, we can add a new property with 
compatible.

> 
> 
> > >
> > > > In https://github.com/apache/pulsar/issues/16569. The first step
> > section
> > > > and second step section process flow picture are detailed.
> > > >
> > >  I'm sorry but you didn't answer all the questions I wrote. I'll paste
> > them
> > > here:
> > > Can you explain the starting point? How does deletion work in general?
> > > > When? What happens? ... I understand there are time based triggers, and
> > > > sometimes used based triggers. They are somehow marked in metadata.
> > >
> > In ManagedLedgerImpl, the method trimConsumedLedgersInBackground will
> > trigger the delete operation. It will find the slowest cursor read position
> > in the topic, and find the ledger which is before the slowest cursor read
> > position. Then check the ledgers if `isLedgerRetentionOverSizeQuota` or
> > `hasLedgerRetentionExpired`. If so, we think the ledger should be delete,
> > append it to deleteLedgerList, we also check if the ledger
> > `isOffloadedNeedsDelete`, if so, append it to deleteOffloadLedgerList.
> > Then iterate `deleteLedgerList` and `deleteOffloadLedgerList`, build
> > `PendingDeleteLedgerInfo` , and send it to systemtopic. If send succeed, I
> > will remove the ledger from the ledgerList, then persist the ledgerList. If
> > send failed, didn't do anything.
> > Example: there are ledger [1,2,3,4,5], and we want to delete 1,2,3. And we
> > send 1,3 to the system topic succeed, send 2 failed. We remove 1,3. And
> > persist [2,4,5] to the zk metadata store.
> >
> > There are some cases to trigger it.
> > 1. A cursor be removed.
> > 2. Close the current ledger and create a new ledger.
> > 3. Consumer ack the message, the slowest cursor move forward.
> > 4. User trigger truncateTopic by admin.
> >
> >
> >
> Thanks.
> So if I understand correctly, you're only focused on deleting ledgers
> related to a manged ledger through `trimConsumedLedgersInBackground()`
> *only* - you're not modifying any other ledger deletion mechanism in
> Managed Ledger right?
Yes.

> If I'm correct, your PIP must be amended:
> 1. You need to say that explicitly in your PIP. If you search "trim" in
> your PIP you wouldn't find it.
> 2. PIP title is incorrect. Should be: "system-topic based ManagedLedger's
> ledgers trimming"
Agree. The title is too genric.

> 
> 
> 
> > > If delete fails, that means the storage system occur some problems. I
> > guess
> > > > the storage system will recovery in 10 mins.
> > > >
> > > > In https://github.com/apache/pulsar/issues/16569, we define
> > > > reconsumeLaterOfTopicTwoPhaseDeletionInSeconds in the
> > ServiceConfiguration,
> > > > it's configurable.
> > > > private int reconsumeLaterOfTopicTwoPhaseDeletionInSeconds = 600;
> > >
> > >
> > > We need some experienced people here to contribute their opinion. Default
> > > 10min might be too much. I recommend you ask Penghui.
> > Fine.
> >
> > > >
> > > > > You mentioned you are working with a client which has retries
> > configured.
> > > > > Retry is client side based, ack one message while producing another,
> > > > > transaction free. Are you prepared to handle a case where you acked
> > but
> > > > > failed to produce the message, hence you lost it completely?
> > > > >
> > > > The pulsarClient only sends a new message that succeeds, then ack the
> > > > origin message, so didn't care in this case.
> > > >
> > > Ok, then you will have concurrent consumption of a message which tries to
> > > delete the ledger from ZK and then tries to delete it from BK? Isn't
> > that a
> > > concurrency issue?
> > if i understand correctly, I guess it won't happen, we only delete the
> > ledger from Zk at the first step(producer side) and delete the ledger from
> > bk at the second step.(consumer side).
> >
> 
> 1. I suggested above to move ZK deletion to the consumer side.
> 2. Running delete ledger concurrently in BK is ok?
>
> Maybe you can say: When I execute an RPC command to delete ledger in
> Pulsar, it will get a lock so others can't run that concurrently since all
> the consumer does is run an RPC command to the topic owner broker?
Same above reason why remove the zk deletion at the producer.

> 
> 
> >
> > >
> > >
> > > > > > If we want to delete a topic, we should send the delete ledger msg
> > to
> > > > > > system topic and remove ledger id from metadata one by one, after
> > all
> > > > the
> > > > > > ledger has been deleted, then delete the topic. If any ledger
> > operate
> > > > > > failed, we think this delete topic operation failed and return the
> > left
> > > > > > ledger id in the response.
> > > > >
> > > > > I couldn't understand. Can you please clarify this section. How
> > exactly
> > > > > topic deletion is modified to use the features described in this pip?
> > > > >
> > > > We need to ensure that all ledgers are deleted before the topic is
> > > > deleted, otherwise, there will be orphan ledgers.
> > > >
> > > Your PIP is about introducing a workflow for deleting a ledger, right?
> > > When you delete a topic you iterate its ledger list and delete each
> > ledger.
> > > Your PIP changes the way each ledger is deleted and makes it async. So I
> > > guess what I want to understand is: What are the changes you are making
> > to
> > > topic deletion due to your PIP? You said "we need to make sure" - can you
> > > please clarify how you will make sure?
> > Before delete the topic, we will iterate the ledger list like delete
> > ledger. But we need to ensure all the ledger be send to the system topic
> > and remove all the ledger from zk. then delete the topic. If there are some
> > ledgers sent to system topic failed, we will throw an exception to avoid to
> > delete the topic.
> >
> 
> The biggest confusion for me here is that you didn't explain what you want
> to do in a logical way.
> 
> Your PIP should say something like:
> 
>    - Current problem
>       - Managed Ledger is composed among other things from a chain of BK
>       Ledgers. ZK contains a list of ledger IDs for the specific Managed 
> Ledger
>       - Managed Ledger has a couple of operations, all of which contains
>       the basic construct of deleting a specific ledger. The operations are
>          - Delete
>             - It deletes the entire managed ledger, including each ledger.
>          - Trim
>             - Keeps the managed ledger in tact but deletes its data: all of
>             its ledgers
>          - Background Trim
>             - Deletes all ledgers which have been acknowledged by all
>             active subscriptions and comply with the storage policy (TTL, ...)
>          - Deleting ledger is composed of the following steps:
>          - Delete its ID from the list of ledger IDs of the Managed Ledger
>          - Deleting the ledger from BK if it exists there (some ledgers
>          have been offloaded and deleted from BK)
>          - Delete the ledger from LedgerOffloader (the Tiered storage which
>          offloads the ledger to S3 like system) if it has been offloaded
>       - Each of the steps above is a call to another system (process).
>       - Today ledger deletion steps are executed one after another in the
>       broker assigned to the managed ledger's topic. If a certain step fails 
> or
>       the broker terminates suddenly, the process will not resume from where 
> it
>       has stopped since the state of deletion is not persisted anywhere.
>       - The consequence is that the system remains in a place where its
>       metadata is not synchronized with the actual data. [EXPAND MORE HERE]
>    - Solution
>       - We would like to persist the state of the step in such a way that
>       once the broker is restarted we can resume where we left off.
>       - Each time we wish to delete a ledger (due to one of the operations
>       described in the Current Problem section), we will send a message to a
>       special designated system topic, containing all the information needed 
> to
>       delete that ledger.
>       - We will consider the ledger successfully deleted upon
>       successful writing of the message to the system topic.
>       - The message consumer will be the one running all the steps outlined
>       above. Since all steps are idempotent, we can retry many times
> without any
>       harm
>       - Any broker will listen to this system topic for consumption of
>       messages, but it will only run a new RPC command to the topic's assigned
>       broker to do the actual ledger deletion.
>       - In case RPC command fails, we'll use the retry mechanism, meaning a
>       new message will be produced with increased retry count and the original
>       message acked. In case of failure to ack, upon retry we might produce 
> the
>       same message again. Since only a single broker is actually
> running the RPC
>       command, we're protected by a lock there from concurrency issues.
>       - Some operations written above, like delete topic, consist of
>       multiple ledgers to be deleted. Each deletion is producing a
> message to the
>       system topic. One all messages produced successfully we'll continue to
>       topic deletion if self. The ledgers will be actually deleted
> asynchronously
>       by the message consumer.
>          - EXPAND on the dead letter queue...
>       - Monitoring
>    - ...
> 
> 
> I strongly suggest you rewrite the document. IMO it's impossible to
> understand as it is written currently.

Yes, the current pip need the user has some experience for the ledger deletion. 
I will tuning the pip more detail.

> 
> 
> >
> > >
> > > > 10.
> > > > > Backward compatibility - I would make sure to document in the docs
> > > > exactly
> > > > > what to do in case we need to rollback (cook book).
> > > > Well.
> > >
> > >
> > > You added
> > >
> > > > If user upgrade and enable two phase deletion, the ledger deletion msg
> > > > will store in system topic. If the user rolls back to the old version
> > and
> > > > the system topic msg hasn't consumed all, some ledger may not delete.
> > >
> > >
> > > A cookbook is giving instructions like "Before downgrading, wait for
> > > metrics xxx to be 0 which indicates the in-flight ledgers delete commands
> > > have all been processed". Here you just say, some ledgers may note
> > delete -
> > > give them some action - what commands can they run to delete those
> > ledgers
> > > themselves? Help them be successful as you have all the
> > > implementation knowledge - they have nothing.
> > Nicely suggestion, the user can consume the system topic to get the ledger
> > deletion info, and delete it from bk or offload system. Maybe we can
> > support a tool to help it, but it's not official.
> >
> 
> I'm not saying they should actively consume it. They can wait on some
> metric indicating all in-flight ledger deletion has finished and only then
> downgrade. Please read my suggestion again. I'm saying to document those
> exact steps.
> IMO it's imperative to have that. There are real users out there who don't
> have all the needed information to understand what you're doing in this
> feature. Please help them.
Ok, I will append the doc to guide the user how and when to downgrade. And the 
drawbacks if the user is forced to downgrade.
> 
> 
> >
> > >
> > >
> > > > >
> > > > > 11.
> > > > > General comment - You're basically implementing a bespoke workflow
> > using
> > > > a
> > > > > topic to save the state of where you are in the topic.
> > > > > Is this the only place in Pulsar (delete ledger) that an action is
> > > > composed
> > > > > of several steps ?
> > > > > If the answer is no to this, wouldn't it be better to have a small
> > > > utility
> > > > > which is in charge of moving through the workflow steps? It can even
> > be a
> > > > > simple state enum, where you move your state from a to b to c to d
> > and it
> > > > > is persisted.
> > > > We need to persist in the middle steps, and we didn't want to operate
> > the
> > > > metadata store continually, so used pulsar to persist it.
> > > >
> > >
> > > I didn't ask whether we should persist the workflow state to Pulsar
> > instead
> > > of ZK.
> > > Can you please re-read my question?
> > Sorry for the speed read, Asaf. At now, I only found the ledger deletion
> > has the several steps.
> > I agree with you to abstract a small utility to handle the multi steps
> > works, but we may pay many efforts for it. So we just introduce a system
> > topic to handle it.
> > After this pip, if there are some similar multi steps works, we can refer
> > this pip, use system topic to handle it. a -> b -> c -> d (syetmTopicA ->
> > systemTopicB -> systemTopicC -> systemTopicD), the thought is generic.
> >
> 
> Ok.
> 
> 
> >
> > >
> > > > > 12. Monitoring
> > > > > Some actions here can take a long time. We're basically relying on
> > logs
> > > > to
> > > > > monitor where we are in the flow?
> > > > yes, we didn't trace the ledger deletion steps. we only use stats to
> > > > record whether the delete operation succeeds or not.
> > > >
> > > That's not enough.
> > > A user needs to be able to operate the cluster, so we need to give them
> > > eyes into what's happening inside the system.
> > > Please add metrics to help them figure that out:
> > > * How many in-flight ledger deletion commands do we have?
> > We can count the all unacked msg to describe it.
> >
> 
> Great. Let's add it to the PIP.
> 
> 
> >
> > > * How many ZK deletions failed/succeeded?
> > I think we didn't need to count zk deletions. In fact, delete ledger from
> > metadata is a zk node update operation. Before delete, there are
> > [1,2,3,4,5], delete 1,2. just update the zk node content with the less
> > ledgerList [3,4,5].
> >
> 
> Just now I figured something out. You need to make a single ZK update
> command to all ledgers you decided to delete in that managed ledger.
> So I suggest the following:
> 
> 
>    - Trim ledgers
>       - send DeleteLedgersFromZK message to system topic
>    - DeleteLedgersFromZK consumer
>    - Run RPC command to broker to update ZK (this will ensure lock)
>       - For any ledger ID produce DeleteLedgerFromStorage msg
>       - ack
>    - DeleteLedgerFromStorage consumer
>    - run RPC command to broker to delete ledger from storage
>       - ack message

You means we define some metrics for the every operation you mentions? 
If so, I think we just define metrics 
- `send DeleteLedgersFromZK message to system topic` -> record how many ledger 
id be deleted
-  `Consume received the msg` -> record how many ledger the consumer want to 
delete

- `Ack the message at consumer side` -> record the succeed deletion

- `Reconsume later the message at consumer side` -> record the failed deletion

> >
> > > * How many BK deletions failed/succeeded?
> > Yes, we can count it. But it is transient, after the broker restart, it
> > will re-count again.
> >
> No worries, metrics are scraped to Prometheus which can handle restarts.
> 
> 
> >
> >
> > Thanks for your ideas. Asaf.
> >
> 

Reply via email to