Hi,

> I would like to understand what the performance tradeoffs are for the
changes that you (as an individual) and others are proposing.
The problem that this PIP is trying to solve is how to support custom topic
compaction logic. (currently, Pulsar only takes the latest message in the
topic compaction and table-view(consumers))

As Michael and I discussed in the above emails, in the worst case, when
there are many conflicting messages, this PIP can incur more repeated
custom compaction than the alternative as individual consumers need to
compact messages (topic compaction and table views). However, one of the
advantages of this proposal is that pub/sub is faster since it uses a
single topic. For example, in PIP-192, if the "bundle assignment" broadcast
is fast enough, conflicting bundle assignment requests can be
reduced(broadcast filter effect).


Post-Compaction(this PIP proposes)

   - - Producers publish messages to a single topic.
   - - All consumers individually run the custom compaction logic when
   consuming the topic (by table-view).
   - - Compactor needs to run the custom compaction logic during compaction.



The alternative that Michael proposed is instead compacting messages at the
earlier stage by a single writer, using two topics.
Pre-Compaction(the alternative that Michael proposes)

   - - Producers publish messages to a non-compacted topic first.
   - - Only the leader consumes this non-compacted topic and runs the
   custom compaction logic.
   - - Then, the leader publishes compacted messages to the compacted
   topic(resolve conflicts by the single writer).
   - - All consumers consume the compacted topic. (no need to compact the
   messages separately on the consumer side)
   - - Compactor does not need to run the custom compaction logic during
   compaction.



> It really seems that you are proposing a change to the default behavior
whether or not a user chooses to use the interface in PIP-192.
The pip does not change the default behavior of compaction and table-view.
I updated the goals in the PIP to clarify this.

Thanks,
Heesung


On Fri, Nov 4, 2022 at 11:11 AM Dave Fisher <w...@apache.org> wrote:

>
>
> > On Nov 4, 2022, at 10:28 AM, Heesung Sohn 
> > <heesung.s...@streamnative.io.INVALID>
> wrote:
> >
> > Hi,
> >
> > I think `bool shouldKeepLeft(T prev, T cur)` is clearer. I updated the
> PIP.
> >
> > Hopefully, we provided enough context about this PIP and the design
> > trade-off as well.
>
> I would like to understand what the performance tradeoffs are for the
> changes that you (as an individual) and others are proposing.
>
> > Goal
> >
> >       • Create another Topic compactor, StrategicTwoPhaseCompactor,
> where we can configure a compaction strategy,
> > TopicCompactionStrategy
> >
> >       • Update the TableViewConfigurationData to load and consider the
> TopicCompactionStrategy when updating the internal key-value map in
> TableView.
> >
> >       • Add TopicCompactionStrategy in Topic-level Policy to run
> StrategicTwoPhaseCompactor instead of TwoPhaseCompactor when executing
> compaction.
>
> It really seems that you are proposing a change to the default behavior
> whether or not a user chooses to use the interface in PIP-192.
>
> >
> > I will send out a vote email soon.
> >
> > Thank you,
> > Heesung
> >
> >
> >
> >
> >
> >
> > On Thu, Nov 3, 2022 at 9:59 PM Michael Marshall <mmarsh...@apache.org>
> > wrote:
> >
> >> Thank you for your detailed responses, Heesung.
> >>
> >>> We are not planning to expose this feature to users
> >>> soon unless demanded and proven to be stable.
> >>
> >> In that case, I think we should move forward with this PIP. I have a
> >> different opinion about the trade offs for the two designs, but none
> >> of my concerns are problems that could not be solved later if we
> >> encounter problems.
> >>
> >> Just to say it succinctly, my concern is that broadcasting all
> >> attempts to acquire ownership of every unclaimed bundle to all brokers
> >> will generate a lot of unnecessary traffic.
> >>
> >>>
> >>
> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a
> >>
> >> Thank you for this reference. I missed it. That is great documentation!
> >>
> >>> In fact, with the `compact(T prev, T cur)` api only, it is not clear if
> >>> prev => cur is a valid transition or not(if invalid, we should filter
> out
> >>> the cur message instead of further compacting/merging). I think we
> still
> >>> need to keep the `isValid()` and `merge()` separated.
> >>
> >> I was thinking that the result of `compact` would be the result put in
> >> the table view or written to the compacted topic. The one issue might
> >> be about keeping the memory utilization down for use cases that are
> >> not updating the message's value but are only selecting "left" or
> >> "right". I thought we could infer when to keep the message id vs keep
> >> the message value, but that might be easy to implement.
> >>
> >> My final critique is that I think `isValid` could have a better name.
> >> In the event this does become a public API, I don't think all use
> >> cases will think about which event should be persisted in terms of
> >> validity.
> >>
> >> The main name that comes to my mind is `bool shouldKeepLeft(T prev, T
> >> cur)`. When true, prev wins. When false, cur wins. That nomenclature
> >> comes from Akka Streams. It's not perfect, but it is easy to infer
> >> what the result will do.
> >>
> >>> Regarding redundant deserialization, the input type `T` is the type of
> >>> message value, so the input values are already deserialized.
> >>
> >> Great, I should have realized that. That takes care of that concern.
> >>
> >> Thanks,
> >> Michael
> >>
> >> On Thu, Nov 3, 2022 at 7:37 PM Heesung Sohn
> >> <heesung.s...@streamnative.io.invalid> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I have a different thought about my previous comment.
> >>>
> >>> - Agreed with your point that we should merge CompactionStrategy APIs.
> I
> >>> updated the interface proposal in the PIP. I replaced `"isValid",
> >>> "isMergeEnabled", and "merge"` apis with "compact" api.
> >>>
> >>> boolean isValid(T prev, T cur)
> >>> boolean isMergeEnabled()
> >>> T merge(T prev, T cur)
> >>>
> >>> =>
> >>>
> >>> T compact(T prev, T cur)
> >>>
> >>> In fact, with the `compact(T prev, T cur)` api only, it is not clear if
> >>> prev => cur is a valid transition or not(if invalid, we should filter
> out
> >>> the cur message instead of further compacting/merging). I think we
> still
> >>> need to keep the `isValid()` and `merge()` separated.
> >>>
> >>> Regarding redundant deserialization, the input type `T` is the type of
> >>> message value, so the input values are already deserialized. We don't
> >> want
> >>> to expose the Message<T> interface in this CompactionStrategy to avoid
> >>> message serialization/deserialization dependencies in the
> >>> CompactionStrategy.
> >>>
> >>> The `merge()` functionality is suggested for more complex use cases
> >> (merge
> >>> values instead of just filtering), and to support this `merge()`, we
> need
> >>> to internally create a new msg with the compacted value, metadata, and
> >>> messageId copies. We could initially define `isValid()` only in
> >>> CompactionStrategy, and add `isMergeEnabled() and merge()` later in the
> >>> CompactionStrategy interface if requested.
> >>>
> >>> Regards,
> >>> Heesung
> >>>
> >>>
> >>> On Thu, Nov 3, 2022 at 9:50 AM Heesung Sohn <
> >> heesung.s...@streamnative.io>
> >>> wrote:
> >>>
> >>>> Oops! Michael, I apologize for the typo in your name.
> >>>>
> >>>> On Thu, Nov 3, 2022 at 9:47 AM Heesung Sohn <
> >> heesung.s...@streamnative.io>
> >>>> wrote:
> >>>>
> >>>>> Hi Machel,
> >>>>>
> >>>>> Here are my additional comments regarding your earlier email.
> >>>>>
> >>>>> - I updated the PIP title to show that this will impact table view as
> >>>>> well.
> >>>>>
> >>>>> - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the
> >>>>> general idea of the states and their actions, and I defined the
> actual
> >>>>> states here in the PR,
> >>>>>
> >>
> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a
> .
> >> I
> >>>>> will further clarify the bundle state data validation logic when
> >>>>> introducing `BundleStateCompactionStrategy` class. This PIP is to
> >> support
> >>>>> CompactionStrategy in general.
> >>>>>
> >>>>> - Agreed with your point that we should merge CompactionStrategy
> >> APIs. I
> >>>>> updated the interface proposal in the PIP. I replaced `"isValid",
> >>>>> "isMergeEnabled", and "merge"` apis with "compact" api.
> >>>>>
> >>>>>
> >>>>> Thanks,
> >>>>> Heesung
> >>>>>
> >>>>>
> >>>>> On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <
> >>>>> heesung.s...@streamnative.io> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>> Thank you for the great comments.
> >>>>>> Please find my comments inline too.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Heesung
> >>>>>>
> >>>>>> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <
> >> mmarsh...@apache.org>
> >>>>>> wrote:
> >>>>>>
> >>>>>>>> I think we lose a single linearized view.
> >>>>>>>
> >>>>>>> Which linearized view are we losing, and what is the role of that
> >>>>>>> linearized view? I think I might be missing why it is important. I
> >>>>>>> agree that consumers won't know about each unsuccessful attempted
> >>>>>>> acquisition of a bundle, but that seems like unnecessary
> information
> >>>>>>> to broadcast to every broker in the cluster.
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>> PIP-192 proposes an assignment, transfer, and split
> >> protocol(multi-phase
> >>>>>> state changes), relying on early broadcast across brokers, and all
> >> brokers
> >>>>>> react to their clients according to the state change notifications
> --
> >>>>>> brokers could defer any client lookups for bundle x if an
> >>>>>> assignment/transfer/split is ongoing for x(broadcasted early in the
> >> topic).
> >>>>>> One early broadcast example is the one that I discussed above, `When
> >> the
> >>>>>> topic broadcast is faster than the concurrent assignment requests.`
> >> I think
> >>>>>> the prefilter could delay this early broadcast, as it needs to go
> >> through
> >>>>>> the additional single-leader compaction path.
> >>>>>>
> >>>>>> The bundle state recovery process is simpler by a single linearized
> >> view.
> >>>>>>
> >>>>>> The single linearized view can be easier to debug bundle states. We
> >> can
> >>>>>> more easily track where the assignment requests come from and how it
> >> is
> >>>>>> compacted in a single linearized view.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>>> I think the leader requires a write-through cache to compact
> >> messages
> >>>>>>> based
> >>>>>>>> on the latest states.
> >>>>>>>
> >>>>>> This brings up an important point that I would like to clarify. If
> we
> >>>>>>> trust the write ahead log as the source of truth, what happens when
> >> a
> >>>>>>> bundle has been validly owned by multiple brokers? As a broker
> >> starts
> >>>>>>> and consumes from the compacted topic, how do we prevent it from
> >>>>>>> incorrectly thinking that it owns a bundle for some short time
> >> period
> >>>>>>> in the case that the ownership topic hasn't yet been compacted to
> >>>>>>> remove old ownership state?
> >>>>>>>
> >>>>>>
> >>>>>> Since the multi-phase transfer protocol involves the source and
> >>>>>> destination broker's actions, the successful transfer should get the
> >> source
> >>>>>> and destination broker to have the (near) latest state. For example,
> >> if
> >>>>>> some brokers have old ownership states(network partitioned or
> >> delayed),
> >>>>>> they will redirect clients to the source(old) broker. However, by
> the
> >>>>>> transfer protocol, the source broker should have the latest state,
> >> so it
> >>>>>> can redirect the client again to the destination broker.
> >>>>>>
> >>>>>> When a broker restarts, it won't start until its BSC state to the
> >> (near)
> >>>>>> latest (til the last known messageId at that time).
> >>>>>>
> >>>>>>
> >>>>>>>> Pulsar guarantees "a single writer".
> >>>>>>>
> >>>>>>> I didn't think we were using a single writer in the PIP 192 design.
> >> I
> >>>>>>> thought we had many producers sending events to a compacted topic.
> >> My
> >>>>>>> proposal would still have many producers, but the writer to
> >> bookkeeper
> >>>>>>> would act as the single writer. It would technically be distinct
> >> from
> >>>>>>> a normal Pulsar topic producer.
> >>>>>>>
> >>>>>>> I should highlight that I am only proposing "broker filtering
> before
> >>>>>>> write" in the context of PIP 192 and as an alternative to adding
> >>>>>>> pluggable compaction strategies. It would not be a generic feature.
> >>>>>>>
> >>>>>>>
> >>>>>> I was worried about the worst case where two producers(leaders)
> >> happen
> >>>>>> to write the compacted topic (although Pulsar can guarantee "a
> single
> >>>>>> writer" or "a single producer" for a topic in normal situations).
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>>> Could we clarify how to handle
> >>>>>>>> the following(edge cases and failure recovery)?
> >>>>>>>> 0. Is the un-compacted topic a persistent topic or a
> >> non-persistent
> >>>>>>> topic?
> >>>>>>>
> >>>>>>> It is a persistent topic.
> >>>>>>>
> >>>>>>>> 1. How does the leader recover state from the two topics?
> >>>>>>>
> >>>>>>> A leader would recover state by first consuming the whole compacted
> >>>>>>> topic and then by consuming from the current location of a cursor
> on
> >>>>>>> the first input topic. As stated elsewhere, this introduces latency
> >>>>>>> and could be an issue.
> >>>>>>>
> >>>>>>>> 2. How do we handle the case when the leader fails before writing
> >>>>>>> messages
> >>>>>>>> to the compacted topic
> >>>>>>>
> >>>>>>> The leader would not acknowledge the message on the input topic
> >> until
> >>>>>>> it has successfully persisted the event on the compacted topic.
> >>>>>>> Publishing the same event to a compacted topic multiple times is
> >>>>>>> idempotent, so there is no risk of lost state. The real risk is
> >>>>>>> latency. However, I think we might have similar (though not the
> >> same)
> >>>>>>> latency risks in the current solution.
> >>>>>>>
> >>>>>>>> Analysis: the "pre-filter + two-topics" option can reduce the
> >> number
> >>>>>>> of
> >>>>>>>> messages to broadcast at the expense of the leader broker
> >> compaction.
> >>>>>>>
> >>>>>>> My primary point is that with this PIP's design, the filter logic
> is
> >>>>>>> run on every broker and again during topic compaction. With the
> >>>>>>> alternative design, the filter is run once.
> >>>>>>>
> >>>>>>> Thank you for the clarification.
> >>>>>>
> >>>>>> I think the difference is that the post-filter is an optimistic
> >> approach
> >>>>>> as it optimistically relies on the "broadcast-filter" effect(brokers
> >> will
> >>>>>> defer client lookups if notified ahead that any assignment is
> >> ongoing for
> >>>>>> bundle x). Yes, in the worst case, if the broadcast is slower, each
> >> broker
> >>>>>> needs to individually compact the conflicting assignment requests.
> >>>>>>
> >>>>>> Conversely, one downside of the pessimistic approach (single leader
> >>>>>> pre-filter) is that when there are not many conflict concurrent
> >> assignment
> >>>>>> requests(assign for bundle a, assign for bundle b, assign for bundle
> >> c...),
> >>>>>> the requests need to redundantly go through the leader compaction.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>>> 3. initially less complex to implement (leaderless conflict
> >>>>>>> resolution and
> >>>>>>>> requires a single topic)
> >>>>>>>
> >>>>>>> PIP 215 has its own complexity too. Coordinating filters
> >>>>>>> on both the client (table view) and the server (compaction) is non
> >>>>>>> trivial. The proposed API includes hard coded client configuration
> >> for
> >>>>>>> each component, which will make upgrading the version of the
> >>>>>>> compaction strategy complicated, and could lead to incorrect
> >>>>>>> interpretation of events in the stream. When a single broker is
> >> doing
> >>>>>>> the filtering, versioning is no longer a distributed problem. That
> >>>>>>> being said, I do not mean to suggest my solution is without
> >>>>>>> complexity.
> >>>>>>>
> >>>>>>>> 4. it is not a "one-way door" decision (we could add the
> >> pre-filter
> >>>>>>> logic
> >>>>>>>> as well later)
> >>>>>>>
> >>>>>>> It's fair to say that we could add it later, but at that point, we
> >>>>>>> will have added this new API for compaction strategy. Are we
> >> confident
> >>>>>>> that pluggable compaction is independently an important addition to
> >>>>>>> Pulsar's
> >>>>>>> features, or would it make sense to make this API only exposed in
> >> the
> >>>>>>> broker?
> >>>>>>>
> >>>>>>>
> >>>>>> The intention is that this compaction feature could be useful for
> >>>>>> complex user applications (if they are trying to do a similar
> >> thing). As I
> >>>>>> mentioned, this feature is closely tied to the PIP-192 now. We are
> >> not
> >>>>>> planning to expose this feature to users soon unless demanded and
> >> proven to
> >>>>>> be stable.
> >>>>>>
> >>>>>>
> >>>>>>> Thanks,
> >>>>>>> Michael
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
> >>>>>>> <heesung.s...@streamnative.io.invalid> wrote:
> >>>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> Also, I thought about some concurrent assignment scenarios between
> >>>>>>>> pre-filter vs post-filter.
> >>>>>>>>
> >>>>>>>> Example 1: When the topic broadcast is slower than the concurrent
> >>>>>>>> assignment requests
> >>>>>>>>
> >>>>>>>> With pre-filter + two-topics (non-compacted and compacted topics)
> >>>>>>>> t1: A -> non-compacted topic // broker A published a message to
> >> the
> >>>>>>>> non-compacted topic, m1: {broker A assigned bundle x to broker A}
> >>>>>>>> t2: B -> non-compacted topic // broker B published a message, m2:
> >>>>>>> {broker B
> >>>>>>>> assigned bundle x to broker C}
> >>>>>>>> t3: C -> non-compacted topic // broker C published a message, m3:
> >>>>>>> {broker C
> >>>>>>>> assigned bundle x to broker B}
> >>>>>>>> t4: non-compacted topic -> L // leader broker consumed the
> >> messages:
> >>>>>>> m1,m2,
> >>>>>>>> and m3
> >>>>>>>> t5: L -> compacted topic // leader compacted the messages and
> >>>>>>> broadcasted
> >>>>>>>> m1 to all consumers
> >>>>>>>> t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
> >>>>>>>>
> >>>>>>>> With post-filter + a single topic
> >>>>>>>> t1: A -> topic // broker A published a message to the
> >> non-compacted
> >>>>>>> topic,
> >>>>>>>> m1: {broker A assigned bundle x to broker A}
> >>>>>>>> t2: B -> topic // broker B published a message, m2: {broker B
> >> assigned
> >>>>>>>> bundle x to broker C}
> >>>>>>>> t3: C -> topic // broker C published a message, m3: {broker C
> >> assigned
> >>>>>>>> bundle x to broker B}
> >>>>>>>> t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2,
> >>>>>>> and m3
> >>>>>>>> t5: [A,B,C] -> m1 // broker A,B,C individually compacted the
> >> messages
> >>>>>>> to m1.
> >>>>>>>>
> >>>>>>>> Analysis: the "pre-filter + two-topics" option can reduce the
> >> number
> >>>>>>> of
> >>>>>>>> messages to broadcast at the expense of the leader broker
> >> compaction.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Example 2: When the topic broadcast is faster than the concurrent
> >>>>>>>> assignment requests
> >>>>>>>>
> >>>>>>>> With pre-filter + two-topics (non-compacted and compacted topics)
> >>>>>>>> t1: A -> non-compacted topic // broker A published a message to
> >> the
> >>>>>>>> non-compacted topic, m1: {broker A assigned bundle x to broker A}
> >>>>>>>> t2: non-compacted topic -> L // leader broker consumed the
> >> messages:
> >>>>>>> m1
> >>>>>>>> t3: L -> compacted topic // leader compacted the message and
> >>>>>>> broadcasted m1
> >>>>>>>> to all consumers
> >>>>>>>> t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
> >>>>>>>> t5: A-> own bundle // broker A knows that its assignment has been
> >>>>>>> accepted,
> >>>>>>>> so proceeding to own the bundle (meanwhile deferring lookup
> >> requests)
> >>>>>>>> t6: B -> defer client lookups // broker B knows that bundle
> >>>>>>> assignment is
> >>>>>>>> running(meanwhile deferring lookup requests)
> >>>>>>>> t7: C -> defer client lookups // broker C knows that bundle
> >>>>>>> assignment is
> >>>>>>>> running(meanwhile deferring lookup requests)
> >>>>>>>>
> >>>>>>>> With post-filter + a single topic
> >>>>>>>> t1: A -> topic // broker A published a message to the
> >> non-compacted
> >>>>>>> topic,
> >>>>>>>> m1: {broker A assigned bundle x to broker A}
> >>>>>>>> t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
> >>>>>>>> t3:  A-> own bundle // broker A knows that its assignment has been
> >>>>>>>> accepted, so proceeding to own the bundle (meanwhile deferring
> >> lookup
> >>>>>>>> requests)
> >>>>>>>> t4: B -> defer client lookups // broker B knows that bundle
> >>>>>>> assignment is
> >>>>>>>> running(meanwhile deferring lookup requests)
> >>>>>>>> t5: C -> defer client lookups // broker C knows that bundle
> >>>>>>> assignment is
> >>>>>>>> running(meanwhile deferring lookup requests)
> >>>>>>>>
> >>>>>>>> Analysis: The "post-filter + a single topic" can perform ok in
> >> this
> >>>>>>> case
> >>>>>>>> without the additional leader coordination and the secondary topic
> >>>>>>> because
> >>>>>>>> the early broadcast can inform all brokers and prevent them from
> >>>>>>> requesting
> >>>>>>>> other assignments for the same bundle.
> >>>>>>>>
> >>>>>>>> I think the post-filter option is initially not bad because:
> >>>>>>>>
> >>>>>>>> 1. it is safe in the worst case (in case the messages are not
> >>>>>>> correctly
> >>>>>>>> pre-filtered at the leader)
> >>>>>>>> 2. it performs ok because the early broadcast can prevent
> >>>>>>>> concurrent assignment requests.
> >>>>>>>> 3. initially less complex to implement (leaderless conflict
> >>>>>>> resolution and
> >>>>>>>> requires a single topic)
> >>>>>>>> 4. it is not a "one-way door" decision (we could add the
> >> pre-filter
> >>>>>>> logic
> >>>>>>>> as well later)
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Heesung
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
> >>>>>>> heesung.s...@streamnative.io>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Michael,
> >>>>>>>>>
> >>>>>>>>> For the pre-prefilter(pre-compaction) option,
> >>>>>>>>> I think the leader requires a write-through cache to compact
> >>>>>>> messages
> >>>>>>>>> based on the latest states. Otherwise, the leader needs to wait
> >> for
> >>>>>>> the
> >>>>>>>>> last msg from the (compacted) topic before compacting the next
> >> msg
> >>>>>>> for the
> >>>>>>>>> same bundle.
> >>>>>>>>>
> >>>>>>>>> Pulsar guarantees "a single writer". However, for the worst-case
> >>>>>>>>> scenario(due to network partitions, bugs in zk or etcd leader
> >>>>>>> election,
> >>>>>>>>> bugs in bk, data corruption ), I think it is safe to place the
> >>>>>>> post-filter
> >>>>>>>>> on the consumer side(compaction and table views) as well in
> >> order to
> >>>>>>>>> validate the state changes.
> >>>>>>>>>
> >>>>>>>>> For the two-topic approach,
> >>>>>>>>> I think we lose a single linearized view. Could we clarify how
> >> to
> >>>>>>> handle
> >>>>>>>>> the following(edge cases and failure recovery)?
> >>>>>>>>> 0. Is the un-compacted topic a persistent topic or a
> >> non-persistent
> >>>>>>> topic?
> >>>>>>>>> 1. How does the leader recover state from the two topics?
> >>>>>>>>> 2. How do we handle the case when the leader fails before
> >> writing
> >>>>>>> messages
> >>>>>>>>> to the compacted topic
> >>>>>>>>>
> >>>>>>>>> Regards,
> >>>>>>>>> Heesung
> >>>>>>>>>
> >>>>>>>>> On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <
> >>>>>>> mmarsh...@apache.org>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Sharing some more thoughts. We could alternatively use two
> >> topics
> >>>>>>>>>> instead of one. In this design, the first topic is the
> >> unfiltered
> >>>>>>>>>> write ahead log that represents many writers (brokers) trying
> >> to
> >>>>>>>>>> acquire ownership of bundles. The second topic is the
> >> distilled log
> >>>>>>>>>> that represents the "winners" or the "owners" of the bundles.
> >>>>>>> There is
> >>>>>>>>>> a single writer, the leader broker, that reads from the input
> >> topic
> >>>>>>>>>> and writes to the output topic. The first topic is normal and
> >> the
> >>>>>>>>>> second is compacted.
> >>>>>>>>>>
> >>>>>>>>>> The primary benefit in a two topic solution is that it is easy
> >> for
> >>>>>>> the
> >>>>>>>>>> leader broker to trade off ownership without needing to slow
> >> down
> >>>>>>>>>> writes to the input topic. The leader broker will start
> >> consuming
> >>>>>>> from
> >>>>>>>>>> the input topic when it has fully consumed the table view on
> >> the
> >>>>>>>>>> output topic. In general, I don't think consumers know when
> >> they
> >>>>>>> have
> >>>>>>>>>> "reached the end of a table view", but we should be able to
> >>>>>>> trivially
> >>>>>>>>>> figure this out if we are the topic's only writer and the
> >> topic and
> >>>>>>>>>> writer are collocated on the same broker.
> >>>>>>>>>>
> >>>>>>>>>> In that design, it might make sense to use something like the
> >>>>>>>>>> replication cursor to keep track of this consumer's state.
> >>>>>>>>>>
> >>>>>>>>>> - Michael
> >>>>>>>>>>
> >>>>>>>>>> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
> >>>>>>> mmarsh...@apache.org>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for your proposal, Heesung.
> >>>>>>>>>>>
> >>>>>>>>>>> Fundamentally, we have the problems listed in this PIP
> >> because
> >>>>>>> we have
> >>>>>>>>>>> multiple writers instead of just one writer. Can we solve
> >> this
> >>>>>>> problem
> >>>>>>>>>>> by changing our write pattern? What if we use the leader
> >> broker
> >>>>>>> as the
> >>>>>>>>>>> single writer? That broker would intercept attempts to
> >> acquire
> >>>>>>>>>>> ownership on bundles and would grant ownership to the first
> >>>>>>> broker to
> >>>>>>>>>>> claim an unassigned bundle. It could "grant ownership" by
> >>>>>>> letting the
> >>>>>>>>>>> first write to claim an unassigned bundle get written to the
> >>>>>>> ownership
> >>>>>>>>>>> topic. When a bundle is already owned, the leader won't
> >> persist
> >>>>>>> that
> >>>>>>>>>>> event to the bookkeeper. In this design, the log becomes a
> >> true
> >>>>>>>>>>> ownership log, which will correctly work with the existing
> >> topic
> >>>>>>>>>>> compaction and table view solutions. My proposal essentially
> >>>>>>> moves the
> >>>>>>>>>>> conflict resolution to just before the write, and as a
> >>>>>>> consequence, it
> >>>>>>>>>>> greatly reduces the need for post processing of the event
> >> log.
> >>>>>>> One
> >>>>>>>>>>> trade off might be that the leader broker could slow down the
> >>>>>>> write
> >>>>>>>>>>> path, but given that the leader would just need to verify the
> >>>>>>> current
> >>>>>>>>>>> state of the bundle, I think it'd be performant enough.
> >>>>>>>>>>>
> >>>>>>>>>>> Additionally, we'd need the leader broker to be "caught up"
> >> on
> >>>>>>> bundle
> >>>>>>>>>>> ownership in order to grant ownership of topics, but unless
> >> I am
> >>>>>>>>>>> mistaken, that is already a requirement of the current PIP
> >> 192
> >>>>>>>>>>> paradigm.
> >>>>>>>>>>>
> >>>>>>>>>>> Below are some additional thoughts that will be relevant if
> >> we
> >>>>>>> move
> >>>>>>>>>>> forward with the design as it is currently proposed.
> >>>>>>>>>>>
> >>>>>>>>>>> I think it might be helpful to update the title to show that
> >> this
> >>>>>>>>>>> proposal will also affect table view as well. I didn't catch
> >>>>>>> that at
> >>>>>>>>>>> first.
> >>>>>>>>>>>
> >>>>>>>>>>> Do you have any documentation describing how the
> >>>>>>>>>>> TopicCompactionStrategy will determine which states are
> >> valid in
> >>>>>>> the
> >>>>>>>>>>> context of load balancing? I looked at
> >>>>>>>>>>> https://github.com/apache/pulsar/pull/18195, but I couldn't
> >>>>>>> seem to
> >>>>>>>>>>> find anything for it. That would help make this proposal less
> >>>>>>>>>>> abstract.
> >>>>>>>>>>>
> >>>>>>>>>>> The proposed API seems very tied to the needs of PIP 192. For
> >>>>>>> example,
> >>>>>>>>>>> `isValid` is not a term I associate with topic compaction.
> >> The
> >>>>>>>>>>> fundamental question for compaction is which value to keep
> >> (or
> >>>>>>> build a
> >>>>>>>>>>> new value). I think we might be able to simplify the API by
> >>>>>>> replacing
> >>>>>>>>>>> the "isValid", "isMergeEnabled", and "merge" methods with a
> >>>>>>> single
> >>>>>>>>>>> method that lets the implementation handle one or all tasks.
> >> That
> >>>>>>>>>>> would also remove the need to deserialize payloads multiple
> >>>>>>> times too.
> >>>>>>>>>>>
> >>>>>>>>>>> I also feel like mentioning that after working with the PIP
> >> 105
> >>>>>>> broker
> >>>>>>>>>>> side filtering, I think we should avoid running UDFs in the
> >>>>>>> broker as
> >>>>>>>>>>> much as possible. (I do not consider the load balancing
> >> logic to
> >>>>>>> be a
> >>>>>>>>>>> UDF here.) I think it would be worth not making this a user
> >>>>>>> facing
> >>>>>>>>>>> feature unless there is demand for real use cases.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks!
> >>>>>>>>>>> Michael
> >>>>>>>>>>>
> >>>>>>>>>>> On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bog...@apache.org>
> >> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> +1(non-binding)
> >>>>>>>>>>>>
> >>>>>>>>>>>> thanks,
> >>>>>>>>>>>> bo
> >>>>>>>>>>>>
> >>>>>>>>>>>> Heesung Sohn <heesung.s...@streamnative.io.invalid>
> >>>>>>> 于2022年10月19日周三
> >>>>>>>>>> 07:54写道:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi pulsar-dev community,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I raised a pip to discuss : PIP-215: Configurable Topic
> >>>>>>> Compaction
> >>>>>>>>>> Strategy
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> PIP link: https://github.com/apache/pulsar/issues/18099
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>> Heesung
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>
>
>

Reply via email to