On Fri, Feb 3, 2023 at 1:48 PM Yan Zhao <horizo...@apache.org> wrote:

> > If you persisted the message successfully by the producer and the broker
> > was terminated before being able to delete the ledger from the metadata
> > service?
> If the broker is terminated, the consumer won't ack the message, the
> message will be re-consume later.


Let me quote the text I was replying to:

> When pulsar wants to delete a ledger, ManagedLedger uses
> ledgerDeletionService to send a message, the message content contains the
> waiting delete ledger info. After sending success, delete the ledger id
> from the metadata store.


If I understand correctly, the following steps will happen:
1. Producer writes a message to system topic, containing the ledger
information to delete.
2. Upon success of (1), the ledger ID is deleted from the metadata store.

You reply was about the consumer, my question was about the producer, so
I'm writing my reply here again (full reply):

If you persisted the message successfully by the producer and the broker
was terminated before being able to delete the ledger (ID) from the
metadata service?


>
> > I recommend having the logic to delete the ledger be done in the message
> > consumer side:
> > - if the ledger exists in the MD store, delete it.
> > - send delete command to BK
> > Both as I understand are idempotent. If for some reason one action was
> done
> > but the other wasn't - ZK down, BK down, broker hosting the consumer
> > terminated - either the message will not be acknowledged, or you
> negatively
> > acknowledge.
> We send a delete command to the broker, it will connect to the
> corresponding broker which loads the topic. The corresponding broker
> received the command, then passes the command to ManagedLedger, the
> ManagedLedger does the actual delete operation.
> If the consumer does the delete operation, it's a little unreasonable. The
> ledger manager should be `ManagedLedger`, let it do the delete will be
> better.
>

You say the consumer will send a delete command to the broker: By what
means? Sending a message on a topic, or running an RPC request by adding a
new command to the PulsarApi.proto file?
If the latter - shouldn't this be documented?

Let me assume the consumer which receives the message, will run an Pulsar
API command to instruct the broker assigned to this topic to delete the
ledger - Why the implementation of this command in the assigned broker
can't have both the deletion of the ledger ID from the ZK, AND deletion
from BK? Why do we need to have the ZK deletion done on the producer side,
after we produced the message?


> > General question: When a ledger is persisted to ZK, where is the ledger
> > metadata persisted in ZK (more specifically it's metadata, which includes
> > the component?
> > Is it also used when building out the key (path) in ZK?
>
>
> https://github.com/apache/bookkeeper/blob/901f76ce4c4f9f771363424dbb60da4d590ad122/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataImpl.java#L74
>
> It's the content in the zk node. when creating a ledger by bookkeeper, it
> will create a path like `/ledgers/00/0000/L0000`, the path value is an
> instance of LedgerMetadataImpl.
> The bookkeeper LedgerMetadataImpl, the customMetadata stores the user's
> metadata.
>
> If the ledger is for Ledger, the customMetadata store:
> `key:application, value:pulsar`
> `key:component, value:managed-ledger`
> `key:pulsar/managed-ledger, value: ledgerName`
>
> If the ledger is for Cursor, the customMetadata store:
> `key:application, value:pulsar`
> `key:component, value:managed-ledger`
> `key:pulsar/managed-ledger, value: ledgerName`
> `key:pulsar/cursor, value: curSorName`
>
> If the ledger is for schema, the customMetadata store:
> `key:application, value:pulsar`
> `key:component, value:schema`
> `key:pulsar/schemaId, value: schemaId`
>
> So when we get the ledger metadata from bookkeeper, we can get the ledger
> source.
>

Thanks for the info!


>
> > Isn't the type saved together with the ledger in ZK?
> We need to differ it, the same ledger may store both on the bk side and
> the offload side.
> If a ledger want to delete the bk data and the offload data, it should
> publishes two message to the system topic. The broker needs it to determine
> whether to delete offload or bk.

So you're saying that the information whether this ledger is either only in
BK, or only in offload storage or both is NOT encoded in the Ledger custom
metadata properties of the ledger?
The offloader saves that information for that ledger in another place in
ZK?

If this ledger is a Cursor ledger, how do you know where to delete it
inside ZK if you don't include the cursor name in the
PendingDeleteLedgerInfo message?


>
> > It's for the offloaded ledger, when we want to delete the offload ledger,
> > > we need offloadedContextUuid, here we can simplify it to
> offloadContextUuid.
>
> > Sounds much better. Maybe offloadedLedgerUUID? (why context?)
> Agree.
>
> >
> >  Are you encoding all that  extra info besides the ledger ID and its
> source
> > to avoid reading it again from ZK when deleting it?
> No, only encoding the data which is useful for deletion.
>

Can you add to the PIP *how* you will *use* every piece of information
saved in the message?


> >
> > It's for extended.
> > >
> > Can't really understand from that short sentence what you mean. Can you
> > please elaborate?
> Maybe we can delete it, I just want to didn't change the class when we
> want to add new property. Put the new property as key-value to extend.
>
Let's assume you deleted it.
6 months from now someone needs a new feature for this, and needs to add a
field to  PendingDeleteLedgerInfo.
If you control the serialization and deserialization, and you make any
change backward compatible, if should be ok, no?
When you add fields as untyped, you lose the whole notion of type, no?


> >
> > > In https://github.com/apache/pulsar/issues/16569. The first step
> section
> > > and second step section process flow picture are detailed.
> > >
> >  I'm sorry but you didn't answer all the questions I wrote. I'll paste
> them
> > here:
> > Can you explain the starting point? How does deletion work in general?
> > > When? What happens? ... I understand there are time based triggers, and
> > > sometimes used based triggers. They are somehow marked in metadata.
> >
> In ManagedLedgerImpl, the method trimConsumedLedgersInBackground will
> trigger the delete operation. It will find the slowest cursor read position
> in the topic, and find the ledger which is before the slowest cursor read
> position. Then check the ledgers if `isLedgerRetentionOverSizeQuota` or
> `hasLedgerRetentionExpired`. If so, we think the ledger should be delete,
> append it to deleteLedgerList, we also check if the ledger
> `isOffloadedNeedsDelete`, if so, append it to deleteOffloadLedgerList.
> Then iterate `deleteLedgerList` and `deleteOffloadLedgerList`, build
> `PendingDeleteLedgerInfo` , and send it to systemtopic. If send succeed, I
> will remove the ledger from the ledgerList, then persist the ledgerList. If
> send failed, didn't do anything.
> Example: there are ledger [1,2,3,4,5], and we want to delete 1,2,3. And we
> send 1,3 to the system topic succeed, send 2 failed. We remove 1,3. And
> persist [2,4,5] to the zk metadata store.
>
> There are some cases to trigger it.
> 1. A cursor be removed.
> 2. Close the current ledger and create a new ledger.
> 3. Consumer ack the message, the slowest cursor move forward.
> 4. User trigger truncateTopic by admin.
>
>
>
Thanks.
So if I understand correctly, you're only focused on deleting ledgers
related to a manged ledger through `trimConsumedLedgersInBackground()`
*only* - you're not modifying any other ledger deletion mechanism in
Managed Ledger right?

If I'm correct, your PIP must be amended:
1. You need to say that explicitly in your PIP. If you search "trim" in
your PIP you wouldn't find it.
2. PIP title is incorrect. Should be: "system-topic based ManagedLedger's
ledgers trimming"



> > If delete fails, that means the storage system occur some problems. I
> guess
> > > the storage system will recovery in 10 mins.
> > >
> > > In https://github.com/apache/pulsar/issues/16569, we define
> > > reconsumeLaterOfTopicTwoPhaseDeletionInSeconds in the
> ServiceConfiguration,
> > > it's configurable.
> > > private int reconsumeLaterOfTopicTwoPhaseDeletionInSeconds = 600;
> >
> >
> > We need some experienced people here to contribute their opinion. Default
> > 10min might be too much. I recommend you ask Penghui.
> Fine.
>
> > >
> > > > You mentioned you are working with a client which has retries
> configured.
> > > > Retry is client side based, ack one message while producing another,
> > > > transaction free. Are you prepared to handle a case where you acked
> but
> > > > failed to produce the message, hence you lost it completely?
> > > >
> > > The pulsarClient only sends a new message that succeeds, then ack the
> > > origin message, so didn't care in this case.
> > >
> > Ok, then you will have concurrent consumption of a message which tries to
> > delete the ledger from ZK and then tries to delete it from BK? Isn't
> that a
> > concurrency issue?
> if i understand correctly, I guess it won't happen, we only delete the
> ledger from Zk at the first step(producer side) and delete the ledger from
> bk at the second step.(consumer side).
>

1. I suggested above to move ZK deletion to the consumer side.
2. Running delete ledger concurrently in BK is ok?

Maybe you can say: When I execute an RPC command to delete ledger in
Pulsar, it will get a lock so others can't run that concurrently since all
the consumer does is run an RPC command to the topic owner broker?


>
> >
> >
> > > > > If we want to delete a topic, we should send the delete ledger msg
> to
> > > > > system topic and remove ledger id from metadata one by one, after
> all
> > > the
> > > > > ledger has been deleted, then delete the topic. If any ledger
> operate
> > > > > failed, we think this delete topic operation failed and return the
> left
> > > > > ledger id in the response.
> > > >
> > > > I couldn't understand. Can you please clarify this section. How
> exactly
> > > > topic deletion is modified to use the features described in this pip?
> > > >
> > > We need to ensure that all ledgers are deleted before the topic is
> > > deleted, otherwise, there will be orphan ledgers.
> > >
> > Your PIP is about introducing a workflow for deleting a ledger, right?
> > When you delete a topic you iterate its ledger list and delete each
> ledger.
> > Your PIP changes the way each ledger is deleted and makes it async. So I
> > guess what I want to understand is: What are the changes you are making
> to
> > topic deletion due to your PIP? You said "we need to make sure" - can you
> > please clarify how you will make sure?
> Before delete the topic, we will iterate the ledger list like delete
> ledger. But we need to ensure all the ledger be send to the system topic
> and remove all the ledger from zk. then delete the topic. If there are some
> ledgers sent to system topic failed, we will throw an exception to avoid to
> delete the topic.
>

The biggest confusion for me here is that you didn't explain what you want
to do in a logical way.

Your PIP should say something like:

   - Current problem
      - Managed Ledger is composed among other things from a chain of BK
      Ledgers. ZK contains a list of ledger IDs for the specific Managed Ledger
      - Managed Ledger has a couple of operations, all of which contains
      the basic construct of deleting a specific ledger. The operations are
         - Delete
            - It deletes the entire managed ledger, including each ledger.
         - Trim
            - Keeps the managed ledger in tact but deletes its data: all of
            its ledgers
         - Background Trim
            - Deletes all ledgers which have been acknowledged by all
            active subscriptions and comply with the storage policy (TTL, ...)
         - Deleting ledger is composed of the following steps:
         - Delete its ID from the list of ledger IDs of the Managed Ledger
         - Deleting the ledger from BK if it exists there (some ledgers
         have been offloaded and deleted from BK)
         - Delete the ledger from LedgerOffloader (the Tiered storage which
         offloads the ledger to S3 like system) if it has been offloaded
      - Each of the steps above is a call to another system (process).
      - Today ledger deletion steps are executed one after another in the
      broker assigned to the managed ledger's topic. If a certain step fails or
      the broker terminates suddenly, the process will not resume from where it
      has stopped since the state of deletion is not persisted anywhere.
      - The consequence is that the system remains in a place where its
      metadata is not synchronized with the actual data. [EXPAND MORE HERE]
   - Solution
      - We would like to persist the state of the step in such a way that
      once the broker is restarted we can resume where we left off.
      - Each time we wish to delete a ledger (due to one of the operations
      described in the Current Problem section), we will send a message to a
      special designated system topic, containing all the information needed to
      delete that ledger.
      - We will consider the ledger successfully deleted upon
      successful writing of the message to the system topic.
      - The message consumer will be the one running all the steps outlined
      above. Since all steps are idempotent, we can retry many times
without any
      harm
      - Any broker will listen to this system topic for consumption of
      messages, but it will only run a new RPC command to the topic's assigned
      broker to do the actual ledger deletion.
      - In case RPC command fails, we'll use the retry mechanism, meaning a
      new message will be produced with increased retry count and the original
      message acked. In case of failure to ack, upon retry we might produce the
      same message again. Since only a single broker is actually
running the RPC
      command, we're protected by a lock there from concurrency issues.
      - Some operations written above, like delete topic, consist of
      multiple ledgers to be deleted. Each deletion is producing a
message to the
      system topic. One all messages produced successfully we'll continue to
      topic deletion if self. The ledgers will be actually deleted
asynchronously
      by the message consumer.
         - EXPAND on the dead letter queue...
      - Monitoring
   - ...


I strongly suggest you rewrite the document. IMO it's impossible to
understand as it is written currently.



>
> >
> > > 10.
> > > > Backward compatibility - I would make sure to document in the docs
> > > exactly
> > > > what to do in case we need to rollback (cook book).
> > > Well.
> >
> >
> > You added
> >
> > > If user upgrade and enable two phase deletion, the ledger deletion msg
> > > will store in system topic. If the user rolls back to the old version
> and
> > > the system topic msg hasn't consumed all, some ledger may not delete.
> >
> >
> > A cookbook is giving instructions like "Before downgrading, wait for
> > metrics xxx to be 0 which indicates the in-flight ledgers delete commands
> > have all been processed". Here you just say, some ledgers may note
> delete -
> > give them some action - what commands can they run to delete those
> ledgers
> > themselves? Help them be successful as you have all the
> > implementation knowledge - they have nothing.
> Nicely suggestion, the user can consume the system topic to get the ledger
> deletion info, and delete it from bk or offload system. Maybe we can
> support a tool to help it, but it's not official.
>

I'm not saying they should actively consume it. They can wait on some
metric indicating all in-flight ledger deletion has finished and only then
downgrade. Please read my suggestion again. I'm saying to document those
exact steps.
IMO it's imperative to have that. There are real users out there who don't
have all the needed information to understand what you're doing in this
feature. Please help them.



>
> >
> >
> > > >
> > > > 11.
> > > > General comment - You're basically implementing a bespoke workflow
> using
> > > a
> > > > topic to save the state of where you are in the topic.
> > > > Is this the only place in Pulsar (delete ledger) that an action is
> > > composed
> > > > of several steps ?
> > > > If the answer is no to this, wouldn't it be better to have a small
> > > utility
> > > > which is in charge of moving through the workflow steps? It can even
> be a
> > > > simple state enum, where you move your state from a to b to c to d
> and it
> > > > is persisted.
> > > We need to persist in the middle steps, and we didn't want to operate
> the
> > > metadata store continually, so used pulsar to persist it.
> > >
> >
> > I didn't ask whether we should persist the workflow state to Pulsar
> instead
> > of ZK.
> > Can you please re-read my question?
> Sorry for the speed read, Asaf. At now, I only found the ledger deletion
> has the several steps.
> I agree with you to abstract a small utility to handle the multi steps
> works, but we may pay many efforts for it. So we just introduce a system
> topic to handle it.
> After this pip, if there are some similar multi steps works, we can refer
> this pip, use system topic to handle it. a -> b -> c -> d (syetmTopicA ->
> systemTopicB -> systemTopicC -> systemTopicD), the thought is generic.
>

Ok.


>
> >
> > > > 12. Monitoring
> > > > Some actions here can take a long time. We're basically relying on
> logs
> > > to
> > > > monitor where we are in the flow?
> > > yes, we didn't trace the ledger deletion steps. we only use stats to
> > > record whether the delete operation succeeds or not.
> > >
> > That's not enough.
> > A user needs to be able to operate the cluster, so we need to give them
> > eyes into what's happening inside the system.
> > Please add metrics to help them figure that out:
> > * How many in-flight ledger deletion commands do we have?
> We can count the all unacked msg to describe it.
>

Great. Let's add it to the PIP.


>
> > * How many ZK deletions failed/succeeded?
> I think we didn't need to count zk deletions. In fact, delete ledger from
> metadata is a zk node update operation. Before delete, there are
> [1,2,3,4,5], delete 1,2. just update the zk node content with the less
> ledgerList [3,4,5].
>

Just now I figured something out. You need to make a single ZK update
command to all ledgers you decided to delete in that managed ledger.
So I suggest the following:


   - Trim ledgers
      - send DeleteLedgersFromZK message to system topic
   - DeleteLedgersFromZK consumer
   - Run RPC command to broker to update ZK (this will ensure lock)
      - For any ledger ID produce DeleteLedgerFromStorage msg
      - ack
   - DeleteLedgerFromStorage consumer
   - run RPC command to broker to delete ledger from storage
      - ack message




>
> > * How many BK deletions failed/succeeded?
> Yes, we can count it. But it is transient, after the broker restart, it
> will re-count again.
>
No worries, metrics are scraped to Prometheus which can handle restarts.


>
>
> Thanks for your ideas. Asaf.
>

Reply via email to