> On Nov 4, 2022, at 10:28 AM, Heesung Sohn
> <heesung.s...@streamnative.io.INVALID> wrote:
>
> Hi,
>
> I think `bool shouldKeepLeft(T prev, T cur)` is clearer. I updated the PIP.
>
> Hopefully, we provided enough context about this PIP and the design
> trade-off as well.
I would like to understand what the performance tradeoffs are for the changes
that you (as an individual) and others are proposing.
> Goal
>
> • Create another Topic compactor, StrategicTwoPhaseCompactor, where we
> can configure a compaction strategy,
> TopicCompactionStrategy
>
> • Update the TableViewConfigurationData to load and consider the
> TopicCompactionStrategy when updating the internal key-value map in TableView.
>
> • Add TopicCompactionStrategy in Topic-level Policy to run
> StrategicTwoPhaseCompactor instead of TwoPhaseCompactor when executing
> compaction.
It really seems that you are proposing a change to the default behavior whether
or not a user chooses to use the interface in PIP-192.
>
> I will send out a vote email soon.
>
> Thank you,
> Heesung
>
>
>
>
>
>
> On Thu, Nov 3, 2022 at 9:59 PM Michael Marshall <mmarsh...@apache.org>
> wrote:
>
>> Thank you for your detailed responses, Heesung.
>>
>>> We are not planning to expose this feature to users
>>> soon unless demanded and proven to be stable.
>>
>> In that case, I think we should move forward with this PIP. I have a
>> different opinion about the trade offs for the two designs, but none
>> of my concerns are problems that could not be solved later if we
>> encounter problems.
>>
>> Just to say it succinctly, my concern is that broadcasting all
>> attempts to acquire ownership of every unclaimed bundle to all brokers
>> will generate a lot of unnecessary traffic.
>>
>>>
>> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a
>>
>> Thank you for this reference. I missed it. That is great documentation!
>>
>>> In fact, with the `compact(T prev, T cur)` api only, it is not clear if
>>> prev => cur is a valid transition or not(if invalid, we should filter out
>>> the cur message instead of further compacting/merging). I think we still
>>> need to keep the `isValid()` and `merge()` separated.
>>
>> I was thinking that the result of `compact` would be the result put in
>> the table view or written to the compacted topic. The one issue might
>> be about keeping the memory utilization down for use cases that are
>> not updating the message's value but are only selecting "left" or
>> "right". I thought we could infer when to keep the message id vs keep
>> the message value, but that might be easy to implement.
>>
>> My final critique is that I think `isValid` could have a better name.
>> In the event this does become a public API, I don't think all use
>> cases will think about which event should be persisted in terms of
>> validity.
>>
>> The main name that comes to my mind is `bool shouldKeepLeft(T prev, T
>> cur)`. When true, prev wins. When false, cur wins. That nomenclature
>> comes from Akka Streams. It's not perfect, but it is easy to infer
>> what the result will do.
>>
>>> Regarding redundant deserialization, the input type `T` is the type of
>>> message value, so the input values are already deserialized.
>>
>> Great, I should have realized that. That takes care of that concern.
>>
>> Thanks,
>> Michael
>>
>> On Thu, Nov 3, 2022 at 7:37 PM Heesung Sohn
>> <heesung.s...@streamnative.io.invalid> wrote:
>>>
>>> Hi,
>>>
>>> I have a different thought about my previous comment.
>>>
>>> - Agreed with your point that we should merge CompactionStrategy APIs. I
>>> updated the interface proposal in the PIP. I replaced `"isValid",
>>> "isMergeEnabled", and "merge"` apis with "compact" api.
>>>
>>> boolean isValid(T prev, T cur)
>>> boolean isMergeEnabled()
>>> T merge(T prev, T cur)
>>>
>>> =>
>>>
>>> T compact(T prev, T cur)
>>>
>>> In fact, with the `compact(T prev, T cur)` api only, it is not clear if
>>> prev => cur is a valid transition or not(if invalid, we should filter out
>>> the cur message instead of further compacting/merging). I think we still
>>> need to keep the `isValid()` and `merge()` separated.
>>>
>>> Regarding redundant deserialization, the input type `T` is the type of
>>> message value, so the input values are already deserialized. We don't
>> want
>>> to expose the Message<T> interface in this CompactionStrategy to avoid
>>> message serialization/deserialization dependencies in the
>>> CompactionStrategy.
>>>
>>> The `merge()` functionality is suggested for more complex use cases
>> (merge
>>> values instead of just filtering), and to support this `merge()`, we need
>>> to internally create a new msg with the compacted value, metadata, and
>>> messageId copies. We could initially define `isValid()` only in
>>> CompactionStrategy, and add `isMergeEnabled() and merge()` later in the
>>> CompactionStrategy interface if requested.
>>>
>>> Regards,
>>> Heesung
>>>
>>>
>>> On Thu, Nov 3, 2022 at 9:50 AM Heesung Sohn <
>> heesung.s...@streamnative.io>
>>> wrote:
>>>
>>>> Oops! Michael, I apologize for the typo in your name.
>>>>
>>>> On Thu, Nov 3, 2022 at 9:47 AM Heesung Sohn <
>> heesung.s...@streamnative.io>
>>>> wrote:
>>>>
>>>>> Hi Machel,
>>>>>
>>>>> Here are my additional comments regarding your earlier email.
>>>>>
>>>>> - I updated the PIP title to show that this will impact table view as
>>>>> well.
>>>>>
>>>>> - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the
>>>>> general idea of the states and their actions, and I defined the actual
>>>>> states here in the PR,
>>>>>
>> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a.
>> I
>>>>> will further clarify the bundle state data validation logic when
>>>>> introducing `BundleStateCompactionStrategy` class. This PIP is to
>> support
>>>>> CompactionStrategy in general.
>>>>>
>>>>> - Agreed with your point that we should merge CompactionStrategy
>> APIs. I
>>>>> updated the interface proposal in the PIP. I replaced `"isValid",
>>>>> "isMergeEnabled", and "merge"` apis with "compact" api.
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Heesung
>>>>>
>>>>>
>>>>> On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <
>>>>> heesung.s...@streamnative.io> wrote:
>>>>>
>>>>>> Hi,
>>>>>> Thank you for the great comments.
>>>>>> Please find my comments inline too.
>>>>>>
>>>>>> Regards,
>>>>>> Heesung
>>>>>>
>>>>>> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <
>> mmarsh...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>>> I think we lose a single linearized view.
>>>>>>>
>>>>>>> Which linearized view are we losing, and what is the role of that
>>>>>>> linearized view? I think I might be missing why it is important. I
>>>>>>> agree that consumers won't know about each unsuccessful attempted
>>>>>>> acquisition of a bundle, but that seems like unnecessary information
>>>>>>> to broadcast to every broker in the cluster.
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> PIP-192 proposes an assignment, transfer, and split
>> protocol(multi-phase
>>>>>> state changes), relying on early broadcast across brokers, and all
>> brokers
>>>>>> react to their clients according to the state change notifications --
>>>>>> brokers could defer any client lookups for bundle x if an
>>>>>> assignment/transfer/split is ongoing for x(broadcasted early in the
>> topic).
>>>>>> One early broadcast example is the one that I discussed above, `When
>> the
>>>>>> topic broadcast is faster than the concurrent assignment requests.`
>> I think
>>>>>> the prefilter could delay this early broadcast, as it needs to go
>> through
>>>>>> the additional single-leader compaction path.
>>>>>>
>>>>>> The bundle state recovery process is simpler by a single linearized
>> view.
>>>>>>
>>>>>> The single linearized view can be easier to debug bundle states. We
>> can
>>>>>> more easily track where the assignment requests come from and how it
>> is
>>>>>> compacted in a single linearized view.
>>>>>>
>>>>>>
>>>>>>
>>>>>>>> I think the leader requires a write-through cache to compact
>> messages
>>>>>>> based
>>>>>>>> on the latest states.
>>>>>>>
>>>>>> This brings up an important point that I would like to clarify. If we
>>>>>>> trust the write ahead log as the source of truth, what happens when
>> a
>>>>>>> bundle has been validly owned by multiple brokers? As a broker
>> starts
>>>>>>> and consumes from the compacted topic, how do we prevent it from
>>>>>>> incorrectly thinking that it owns a bundle for some short time
>> period
>>>>>>> in the case that the ownership topic hasn't yet been compacted to
>>>>>>> remove old ownership state?
>>>>>>>
>>>>>>
>>>>>> Since the multi-phase transfer protocol involves the source and
>>>>>> destination broker's actions, the successful transfer should get the
>> source
>>>>>> and destination broker to have the (near) latest state. For example,
>> if
>>>>>> some brokers have old ownership states(network partitioned or
>> delayed),
>>>>>> they will redirect clients to the source(old) broker. However, by the
>>>>>> transfer protocol, the source broker should have the latest state,
>> so it
>>>>>> can redirect the client again to the destination broker.
>>>>>>
>>>>>> When a broker restarts, it won't start until its BSC state to the
>> (near)
>>>>>> latest (til the last known messageId at that time).
>>>>>>
>>>>>>
>>>>>>>> Pulsar guarantees "a single writer".
>>>>>>>
>>>>>>> I didn't think we were using a single writer in the PIP 192 design.
>> I
>>>>>>> thought we had many producers sending events to a compacted topic.
>> My
>>>>>>> proposal would still have many producers, but the writer to
>> bookkeeper
>>>>>>> would act as the single writer. It would technically be distinct
>> from
>>>>>>> a normal Pulsar topic producer.
>>>>>>>
>>>>>>> I should highlight that I am only proposing "broker filtering before
>>>>>>> write" in the context of PIP 192 and as an alternative to adding
>>>>>>> pluggable compaction strategies. It would not be a generic feature.
>>>>>>>
>>>>>>>
>>>>>> I was worried about the worst case where two producers(leaders)
>> happen
>>>>>> to write the compacted topic (although Pulsar can guarantee "a single
>>>>>> writer" or "a single producer" for a topic in normal situations).
>>>>>>
>>>>>>
>>>>>>
>>>>>>>> Could we clarify how to handle
>>>>>>>> the following(edge cases and failure recovery)?
>>>>>>>> 0. Is the un-compacted topic a persistent topic or a
>> non-persistent
>>>>>>> topic?
>>>>>>>
>>>>>>> It is a persistent topic.
>>>>>>>
>>>>>>>> 1. How does the leader recover state from the two topics?
>>>>>>>
>>>>>>> A leader would recover state by first consuming the whole compacted
>>>>>>> topic and then by consuming from the current location of a cursor on
>>>>>>> the first input topic. As stated elsewhere, this introduces latency
>>>>>>> and could be an issue.
>>>>>>>
>>>>>>>> 2. How do we handle the case when the leader fails before writing
>>>>>>> messages
>>>>>>>> to the compacted topic
>>>>>>>
>>>>>>> The leader would not acknowledge the message on the input topic
>> until
>>>>>>> it has successfully persisted the event on the compacted topic.
>>>>>>> Publishing the same event to a compacted topic multiple times is
>>>>>>> idempotent, so there is no risk of lost state. The real risk is
>>>>>>> latency. However, I think we might have similar (though not the
>> same)
>>>>>>> latency risks in the current solution.
>>>>>>>
>>>>>>>> Analysis: the "pre-filter + two-topics" option can reduce the
>> number
>>>>>>> of
>>>>>>>> messages to broadcast at the expense of the leader broker
>> compaction.
>>>>>>>
>>>>>>> My primary point is that with this PIP's design, the filter logic is
>>>>>>> run on every broker and again during topic compaction. With the
>>>>>>> alternative design, the filter is run once.
>>>>>>>
>>>>>>> Thank you for the clarification.
>>>>>>
>>>>>> I think the difference is that the post-filter is an optimistic
>> approach
>>>>>> as it optimistically relies on the "broadcast-filter" effect(brokers
>> will
>>>>>> defer client lookups if notified ahead that any assignment is
>> ongoing for
>>>>>> bundle x). Yes, in the worst case, if the broadcast is slower, each
>> broker
>>>>>> needs to individually compact the conflicting assignment requests.
>>>>>>
>>>>>> Conversely, one downside of the pessimistic approach (single leader
>>>>>> pre-filter) is that when there are not many conflict concurrent
>> assignment
>>>>>> requests(assign for bundle a, assign for bundle b, assign for bundle
>> c...),
>>>>>> the requests need to redundantly go through the leader compaction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>>> 3. initially less complex to implement (leaderless conflict
>>>>>>> resolution and
>>>>>>>> requires a single topic)
>>>>>>>
>>>>>>> PIP 215 has its own complexity too. Coordinating filters
>>>>>>> on both the client (table view) and the server (compaction) is non
>>>>>>> trivial. The proposed API includes hard coded client configuration
>> for
>>>>>>> each component, which will make upgrading the version of the
>>>>>>> compaction strategy complicated, and could lead to incorrect
>>>>>>> interpretation of events in the stream. When a single broker is
>> doing
>>>>>>> the filtering, versioning is no longer a distributed problem. That
>>>>>>> being said, I do not mean to suggest my solution is without
>>>>>>> complexity.
>>>>>>>
>>>>>>>> 4. it is not a "one-way door" decision (we could add the
>> pre-filter
>>>>>>> logic
>>>>>>>> as well later)
>>>>>>>
>>>>>>> It's fair to say that we could add it later, but at that point, we
>>>>>>> will have added this new API for compaction strategy. Are we
>> confident
>>>>>>> that pluggable compaction is independently an important addition to
>>>>>>> Pulsar's
>>>>>>> features, or would it make sense to make this API only exposed in
>> the
>>>>>>> broker?
>>>>>>>
>>>>>>>
>>>>>> The intention is that this compaction feature could be useful for
>>>>>> complex user applications (if they are trying to do a similar
>> thing). As I
>>>>>> mentioned, this feature is closely tied to the PIP-192 now. We are
>> not
>>>>>> planning to expose this feature to users soon unless demanded and
>> proven to
>>>>>> be stable.
>>>>>>
>>>>>>
>>>>>>> Thanks,
>>>>>>> Michael
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
>>>>>>> <heesung.s...@streamnative.io.invalid> wrote:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Also, I thought about some concurrent assignment scenarios between
>>>>>>>> pre-filter vs post-filter.
>>>>>>>>
>>>>>>>> Example 1: When the topic broadcast is slower than the concurrent
>>>>>>>> assignment requests
>>>>>>>>
>>>>>>>> With pre-filter + two-topics (non-compacted and compacted topics)
>>>>>>>> t1: A -> non-compacted topic // broker A published a message to
>> the
>>>>>>>> non-compacted topic, m1: {broker A assigned bundle x to broker A}
>>>>>>>> t2: B -> non-compacted topic // broker B published a message, m2:
>>>>>>> {broker B
>>>>>>>> assigned bundle x to broker C}
>>>>>>>> t3: C -> non-compacted topic // broker C published a message, m3:
>>>>>>> {broker C
>>>>>>>> assigned bundle x to broker B}
>>>>>>>> t4: non-compacted topic -> L // leader broker consumed the
>> messages:
>>>>>>> m1,m2,
>>>>>>>> and m3
>>>>>>>> t5: L -> compacted topic // leader compacted the messages and
>>>>>>> broadcasted
>>>>>>>> m1 to all consumers
>>>>>>>> t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>>>>>>>>
>>>>>>>> With post-filter + a single topic
>>>>>>>> t1: A -> topic // broker A published a message to the
>> non-compacted
>>>>>>> topic,
>>>>>>>> m1: {broker A assigned bundle x to broker A}
>>>>>>>> t2: B -> topic // broker B published a message, m2: {broker B
>> assigned
>>>>>>>> bundle x to broker C}
>>>>>>>> t3: C -> topic // broker C published a message, m3: {broker C
>> assigned
>>>>>>>> bundle x to broker B}
>>>>>>>> t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2,
>>>>>>> and m3
>>>>>>>> t5: [A,B,C] -> m1 // broker A,B,C individually compacted the
>> messages
>>>>>>> to m1.
>>>>>>>>
>>>>>>>> Analysis: the "pre-filter + two-topics" option can reduce the
>> number
>>>>>>> of
>>>>>>>> messages to broadcast at the expense of the leader broker
>> compaction.
>>>>>>>>
>>>>>>>>
>>>>>>>> Example 2: When the topic broadcast is faster than the concurrent
>>>>>>>> assignment requests
>>>>>>>>
>>>>>>>> With pre-filter + two-topics (non-compacted and compacted topics)
>>>>>>>> t1: A -> non-compacted topic // broker A published a message to
>> the
>>>>>>>> non-compacted topic, m1: {broker A assigned bundle x to broker A}
>>>>>>>> t2: non-compacted topic -> L // leader broker consumed the
>> messages:
>>>>>>> m1
>>>>>>>> t3: L -> compacted topic // leader compacted the message and
>>>>>>> broadcasted m1
>>>>>>>> to all consumers
>>>>>>>> t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>>>>>>>> t5: A-> own bundle // broker A knows that its assignment has been
>>>>>>> accepted,
>>>>>>>> so proceeding to own the bundle (meanwhile deferring lookup
>> requests)
>>>>>>>> t6: B -> defer client lookups // broker B knows that bundle
>>>>>>> assignment is
>>>>>>>> running(meanwhile deferring lookup requests)
>>>>>>>> t7: C -> defer client lookups // broker C knows that bundle
>>>>>>> assignment is
>>>>>>>> running(meanwhile deferring lookup requests)
>>>>>>>>
>>>>>>>> With post-filter + a single topic
>>>>>>>> t1: A -> topic // broker A published a message to the
>> non-compacted
>>>>>>> topic,
>>>>>>>> m1: {broker A assigned bundle x to broker A}
>>>>>>>> t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
>>>>>>>> t3: A-> own bundle // broker A knows that its assignment has been
>>>>>>>> accepted, so proceeding to own the bundle (meanwhile deferring
>> lookup
>>>>>>>> requests)
>>>>>>>> t4: B -> defer client lookups // broker B knows that bundle
>>>>>>> assignment is
>>>>>>>> running(meanwhile deferring lookup requests)
>>>>>>>> t5: C -> defer client lookups // broker C knows that bundle
>>>>>>> assignment is
>>>>>>>> running(meanwhile deferring lookup requests)
>>>>>>>>
>>>>>>>> Analysis: The "post-filter + a single topic" can perform ok in
>> this
>>>>>>> case
>>>>>>>> without the additional leader coordination and the secondary topic
>>>>>>> because
>>>>>>>> the early broadcast can inform all brokers and prevent them from
>>>>>>> requesting
>>>>>>>> other assignments for the same bundle.
>>>>>>>>
>>>>>>>> I think the post-filter option is initially not bad because:
>>>>>>>>
>>>>>>>> 1. it is safe in the worst case (in case the messages are not
>>>>>>> correctly
>>>>>>>> pre-filtered at the leader)
>>>>>>>> 2. it performs ok because the early broadcast can prevent
>>>>>>>> concurrent assignment requests.
>>>>>>>> 3. initially less complex to implement (leaderless conflict
>>>>>>> resolution and
>>>>>>>> requires a single topic)
>>>>>>>> 4. it is not a "one-way door" decision (we could add the
>> pre-filter
>>>>>>> logic
>>>>>>>> as well later)
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Heesung
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
>>>>>>> heesung.s...@streamnative.io>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Michael,
>>>>>>>>>
>>>>>>>>> For the pre-prefilter(pre-compaction) option,
>>>>>>>>> I think the leader requires a write-through cache to compact
>>>>>>> messages
>>>>>>>>> based on the latest states. Otherwise, the leader needs to wait
>> for
>>>>>>> the
>>>>>>>>> last msg from the (compacted) topic before compacting the next
>> msg
>>>>>>> for the
>>>>>>>>> same bundle.
>>>>>>>>>
>>>>>>>>> Pulsar guarantees "a single writer". However, for the worst-case
>>>>>>>>> scenario(due to network partitions, bugs in zk or etcd leader
>>>>>>> election,
>>>>>>>>> bugs in bk, data corruption ), I think it is safe to place the
>>>>>>> post-filter
>>>>>>>>> on the consumer side(compaction and table views) as well in
>> order to
>>>>>>>>> validate the state changes.
>>>>>>>>>
>>>>>>>>> For the two-topic approach,
>>>>>>>>> I think we lose a single linearized view. Could we clarify how
>> to
>>>>>>> handle
>>>>>>>>> the following(edge cases and failure recovery)?
>>>>>>>>> 0. Is the un-compacted topic a persistent topic or a
>> non-persistent
>>>>>>> topic?
>>>>>>>>> 1. How does the leader recover state from the two topics?
>>>>>>>>> 2. How do we handle the case when the leader fails before
>> writing
>>>>>>> messages
>>>>>>>>> to the compacted topic
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Heesung
>>>>>>>>>
>>>>>>>>> On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <
>>>>>>> mmarsh...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Sharing some more thoughts. We could alternatively use two
>> topics
>>>>>>>>>> instead of one. In this design, the first topic is the
>> unfiltered
>>>>>>>>>> write ahead log that represents many writers (brokers) trying
>> to
>>>>>>>>>> acquire ownership of bundles. The second topic is the
>> distilled log
>>>>>>>>>> that represents the "winners" or the "owners" of the bundles.
>>>>>>> There is
>>>>>>>>>> a single writer, the leader broker, that reads from the input
>> topic
>>>>>>>>>> and writes to the output topic. The first topic is normal and
>> the
>>>>>>>>>> second is compacted.
>>>>>>>>>>
>>>>>>>>>> The primary benefit in a two topic solution is that it is easy
>> for
>>>>>>> the
>>>>>>>>>> leader broker to trade off ownership without needing to slow
>> down
>>>>>>>>>> writes to the input topic. The leader broker will start
>> consuming
>>>>>>> from
>>>>>>>>>> the input topic when it has fully consumed the table view on
>> the
>>>>>>>>>> output topic. In general, I don't think consumers know when
>> they
>>>>>>> have
>>>>>>>>>> "reached the end of a table view", but we should be able to
>>>>>>> trivially
>>>>>>>>>> figure this out if we are the topic's only writer and the
>> topic and
>>>>>>>>>> writer are collocated on the same broker.
>>>>>>>>>>
>>>>>>>>>> In that design, it might make sense to use something like the
>>>>>>>>>> replication cursor to keep track of this consumer's state.
>>>>>>>>>>
>>>>>>>>>> - Michael
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
>>>>>>> mmarsh...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Thanks for your proposal, Heesung.
>>>>>>>>>>>
>>>>>>>>>>> Fundamentally, we have the problems listed in this PIP
>> because
>>>>>>> we have
>>>>>>>>>>> multiple writers instead of just one writer. Can we solve
>> this
>>>>>>> problem
>>>>>>>>>>> by changing our write pattern? What if we use the leader
>> broker
>>>>>>> as the
>>>>>>>>>>> single writer? That broker would intercept attempts to
>> acquire
>>>>>>>>>>> ownership on bundles and would grant ownership to the first
>>>>>>> broker to
>>>>>>>>>>> claim an unassigned bundle. It could "grant ownership" by
>>>>>>> letting the
>>>>>>>>>>> first write to claim an unassigned bundle get written to the
>>>>>>> ownership
>>>>>>>>>>> topic. When a bundle is already owned, the leader won't
>> persist
>>>>>>> that
>>>>>>>>>>> event to the bookkeeper. In this design, the log becomes a
>> true
>>>>>>>>>>> ownership log, which will correctly work with the existing
>> topic
>>>>>>>>>>> compaction and table view solutions. My proposal essentially
>>>>>>> moves the
>>>>>>>>>>> conflict resolution to just before the write, and as a
>>>>>>> consequence, it
>>>>>>>>>>> greatly reduces the need for post processing of the event
>> log.
>>>>>>> One
>>>>>>>>>>> trade off might be that the leader broker could slow down the
>>>>>>> write
>>>>>>>>>>> path, but given that the leader would just need to verify the
>>>>>>> current
>>>>>>>>>>> state of the bundle, I think it'd be performant enough.
>>>>>>>>>>>
>>>>>>>>>>> Additionally, we'd need the leader broker to be "caught up"
>> on
>>>>>>> bundle
>>>>>>>>>>> ownership in order to grant ownership of topics, but unless
>> I am
>>>>>>>>>>> mistaken, that is already a requirement of the current PIP
>> 192
>>>>>>>>>>> paradigm.
>>>>>>>>>>>
>>>>>>>>>>> Below are some additional thoughts that will be relevant if
>> we
>>>>>>> move
>>>>>>>>>>> forward with the design as it is currently proposed.
>>>>>>>>>>>
>>>>>>>>>>> I think it might be helpful to update the title to show that
>> this
>>>>>>>>>>> proposal will also affect table view as well. I didn't catch
>>>>>>> that at
>>>>>>>>>>> first.
>>>>>>>>>>>
>>>>>>>>>>> Do you have any documentation describing how the
>>>>>>>>>>> TopicCompactionStrategy will determine which states are
>> valid in
>>>>>>> the
>>>>>>>>>>> context of load balancing? I looked at
>>>>>>>>>>> https://github.com/apache/pulsar/pull/18195, but I couldn't
>>>>>>> seem to
>>>>>>>>>>> find anything for it. That would help make this proposal less
>>>>>>>>>>> abstract.
>>>>>>>>>>>
>>>>>>>>>>> The proposed API seems very tied to the needs of PIP 192. For
>>>>>>> example,
>>>>>>>>>>> `isValid` is not a term I associate with topic compaction.
>> The
>>>>>>>>>>> fundamental question for compaction is which value to keep
>> (or
>>>>>>> build a
>>>>>>>>>>> new value). I think we might be able to simplify the API by
>>>>>>> replacing
>>>>>>>>>>> the "isValid", "isMergeEnabled", and "merge" methods with a
>>>>>>> single
>>>>>>>>>>> method that lets the implementation handle one or all tasks.
>> That
>>>>>>>>>>> would also remove the need to deserialize payloads multiple
>>>>>>> times too.
>>>>>>>>>>>
>>>>>>>>>>> I also feel like mentioning that after working with the PIP
>> 105
>>>>>>> broker
>>>>>>>>>>> side filtering, I think we should avoid running UDFs in the
>>>>>>> broker as
>>>>>>>>>>> much as possible. (I do not consider the load balancing
>> logic to
>>>>>>> be a
>>>>>>>>>>> UDF here.) I think it would be worth not making this a user
>>>>>>> facing
>>>>>>>>>>> feature unless there is demand for real use cases.
>>>>>>>>>>>
>>>>>>>>>>> Thanks!
>>>>>>>>>>> Michael
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bog...@apache.org>
>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> +1(non-binding)
>>>>>>>>>>>>
>>>>>>>>>>>> thanks,
>>>>>>>>>>>> bo
>>>>>>>>>>>>
>>>>>>>>>>>> Heesung Sohn <heesung.s...@streamnative.io.invalid>
>>>>>>> 于2022年10月19日周三
>>>>>>>>>> 07:54写道:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi pulsar-dev community,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I raised a pip to discuss : PIP-215: Configurable Topic
>>>>>>> Compaction
>>>>>>>>>> Strategy
>>>>>>>>>>>>>
>>>>>>>>>>>>> PIP link: https://github.com/apache/pulsar/issues/18099
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> Heesung
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>