> On Nov 4, 2022, at 10:28 AM, Heesung Sohn 
> <heesung.s...@streamnative.io.INVALID> wrote:
> 
> Hi,
> 
> I think `bool shouldKeepLeft(T prev, T cur)` is clearer. I updated the PIP.
> 
> Hopefully, we provided enough context about this PIP and the design
> trade-off as well.

I would like to understand what the performance tradeoffs are for the changes 
that you (as an individual) and others are proposing.

> Goal
> 
>       • Create another Topic compactor, StrategicTwoPhaseCompactor, where we 
> can configure a compaction strategy,
> TopicCompactionStrategy
> 
>       • Update the TableViewConfigurationData to load and consider the 
> TopicCompactionStrategy when updating the internal key-value map in TableView.
> 
>       • Add TopicCompactionStrategy in Topic-level Policy to run 
> StrategicTwoPhaseCompactor instead of TwoPhaseCompactor when executing 
> compaction.

It really seems that you are proposing a change to the default behavior whether 
or not a user chooses to use the interface in PIP-192.

> 
> I will send out a vote email soon.
> 
> Thank you,
> Heesung
> 
> 
> 
> 
> 
> 
> On Thu, Nov 3, 2022 at 9:59 PM Michael Marshall <mmarsh...@apache.org>
> wrote:
> 
>> Thank you for your detailed responses, Heesung.
>> 
>>> We are not planning to expose this feature to users
>>> soon unless demanded and proven to be stable.
>> 
>> In that case, I think we should move forward with this PIP. I have a
>> different opinion about the trade offs for the two designs, but none
>> of my concerns are problems that could not be solved later if we
>> encounter problems.
>> 
>> Just to say it succinctly, my concern is that broadcasting all
>> attempts to acquire ownership of every unclaimed bundle to all brokers
>> will generate a lot of unnecessary traffic.
>> 
>>> 
>> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a
>> 
>> Thank you for this reference. I missed it. That is great documentation!
>> 
>>> In fact, with the `compact(T prev, T cur)` api only, it is not clear if
>>> prev => cur is a valid transition or not(if invalid, we should filter out
>>> the cur message instead of further compacting/merging). I think we still
>>> need to keep the `isValid()` and `merge()` separated.
>> 
>> I was thinking that the result of `compact` would be the result put in
>> the table view or written to the compacted topic. The one issue might
>> be about keeping the memory utilization down for use cases that are
>> not updating the message's value but are only selecting "left" or
>> "right". I thought we could infer when to keep the message id vs keep
>> the message value, but that might be easy to implement.
>> 
>> My final critique is that I think `isValid` could have a better name.
>> In the event this does become a public API, I don't think all use
>> cases will think about which event should be persisted in terms of
>> validity.
>> 
>> The main name that comes to my mind is `bool shouldKeepLeft(T prev, T
>> cur)`. When true, prev wins. When false, cur wins. That nomenclature
>> comes from Akka Streams. It's not perfect, but it is easy to infer
>> what the result will do.
>> 
>>> Regarding redundant deserialization, the input type `T` is the type of
>>> message value, so the input values are already deserialized.
>> 
>> Great, I should have realized that. That takes care of that concern.
>> 
>> Thanks,
>> Michael
>> 
>> On Thu, Nov 3, 2022 at 7:37 PM Heesung Sohn
>> <heesung.s...@streamnative.io.invalid> wrote:
>>> 
>>> Hi,
>>> 
>>> I have a different thought about my previous comment.
>>> 
>>> - Agreed with your point that we should merge CompactionStrategy APIs. I
>>> updated the interface proposal in the PIP. I replaced `"isValid",
>>> "isMergeEnabled", and "merge"` apis with "compact" api.
>>> 
>>> boolean isValid(T prev, T cur)
>>> boolean isMergeEnabled()
>>> T merge(T prev, T cur)
>>> 
>>> =>
>>> 
>>> T compact(T prev, T cur)
>>> 
>>> In fact, with the `compact(T prev, T cur)` api only, it is not clear if
>>> prev => cur is a valid transition or not(if invalid, we should filter out
>>> the cur message instead of further compacting/merging). I think we still
>>> need to keep the `isValid()` and `merge()` separated.
>>> 
>>> Regarding redundant deserialization, the input type `T` is the type of
>>> message value, so the input values are already deserialized. We don't
>> want
>>> to expose the Message<T> interface in this CompactionStrategy to avoid
>>> message serialization/deserialization dependencies in the
>>> CompactionStrategy.
>>> 
>>> The `merge()` functionality is suggested for more complex use cases
>> (merge
>>> values instead of just filtering), and to support this `merge()`, we need
>>> to internally create a new msg with the compacted value, metadata, and
>>> messageId copies. We could initially define `isValid()` only in
>>> CompactionStrategy, and add `isMergeEnabled() and merge()` later in the
>>> CompactionStrategy interface if requested.
>>> 
>>> Regards,
>>> Heesung
>>> 
>>> 
>>> On Thu, Nov 3, 2022 at 9:50 AM Heesung Sohn <
>> heesung.s...@streamnative.io>
>>> wrote:
>>> 
>>>> Oops! Michael, I apologize for the typo in your name.
>>>> 
>>>> On Thu, Nov 3, 2022 at 9:47 AM Heesung Sohn <
>> heesung.s...@streamnative.io>
>>>> wrote:
>>>> 
>>>>> Hi Machel,
>>>>> 
>>>>> Here are my additional comments regarding your earlier email.
>>>>> 
>>>>> - I updated the PIP title to show that this will impact table view as
>>>>> well.
>>>>> 
>>>>> - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the
>>>>> general idea of the states and their actions, and I defined the actual
>>>>> states here in the PR,
>>>>> 
>> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a.
>> I
>>>>> will further clarify the bundle state data validation logic when
>>>>> introducing `BundleStateCompactionStrategy` class. This PIP is to
>> support
>>>>> CompactionStrategy in general.
>>>>> 
>>>>> - Agreed with your point that we should merge CompactionStrategy
>> APIs. I
>>>>> updated the interface proposal in the PIP. I replaced `"isValid",
>>>>> "isMergeEnabled", and "merge"` apis with "compact" api.
>>>>> 
>>>>> 
>>>>> Thanks,
>>>>> Heesung
>>>>> 
>>>>> 
>>>>> On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <
>>>>> heesung.s...@streamnative.io> wrote:
>>>>> 
>>>>>> Hi,
>>>>>> Thank you for the great comments.
>>>>>> Please find my comments inline too.
>>>>>> 
>>>>>> Regards,
>>>>>> Heesung
>>>>>> 
>>>>>> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <
>> mmarsh...@apache.org>
>>>>>> wrote:
>>>>>> 
>>>>>>>> I think we lose a single linearized view.
>>>>>>> 
>>>>>>> Which linearized view are we losing, and what is the role of that
>>>>>>> linearized view? I think I might be missing why it is important. I
>>>>>>> agree that consumers won't know about each unsuccessful attempted
>>>>>>> acquisition of a bundle, but that seems like unnecessary information
>>>>>>> to broadcast to every broker in the cluster.
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> PIP-192 proposes an assignment, transfer, and split
>> protocol(multi-phase
>>>>>> state changes), relying on early broadcast across brokers, and all
>> brokers
>>>>>> react to their clients according to the state change notifications --
>>>>>> brokers could defer any client lookups for bundle x if an
>>>>>> assignment/transfer/split is ongoing for x(broadcasted early in the
>> topic).
>>>>>> One early broadcast example is the one that I discussed above, `When
>> the
>>>>>> topic broadcast is faster than the concurrent assignment requests.`
>> I think
>>>>>> the prefilter could delay this early broadcast, as it needs to go
>> through
>>>>>> the additional single-leader compaction path.
>>>>>> 
>>>>>> The bundle state recovery process is simpler by a single linearized
>> view.
>>>>>> 
>>>>>> The single linearized view can be easier to debug bundle states. We
>> can
>>>>>> more easily track where the assignment requests come from and how it
>> is
>>>>>> compacted in a single linearized view.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>>> I think the leader requires a write-through cache to compact
>> messages
>>>>>>> based
>>>>>>>> on the latest states.
>>>>>>> 
>>>>>> This brings up an important point that I would like to clarify. If we
>>>>>>> trust the write ahead log as the source of truth, what happens when
>> a
>>>>>>> bundle has been validly owned by multiple brokers? As a broker
>> starts
>>>>>>> and consumes from the compacted topic, how do we prevent it from
>>>>>>> incorrectly thinking that it owns a bundle for some short time
>> period
>>>>>>> in the case that the ownership topic hasn't yet been compacted to
>>>>>>> remove old ownership state?
>>>>>>> 
>>>>>> 
>>>>>> Since the multi-phase transfer protocol involves the source and
>>>>>> destination broker's actions, the successful transfer should get the
>> source
>>>>>> and destination broker to have the (near) latest state. For example,
>> if
>>>>>> some brokers have old ownership states(network partitioned or
>> delayed),
>>>>>> they will redirect clients to the source(old) broker. However, by the
>>>>>> transfer protocol, the source broker should have the latest state,
>> so it
>>>>>> can redirect the client again to the destination broker.
>>>>>> 
>>>>>> When a broker restarts, it won't start until its BSC state to the
>> (near)
>>>>>> latest (til the last known messageId at that time).
>>>>>> 
>>>>>> 
>>>>>>>> Pulsar guarantees "a single writer".
>>>>>>> 
>>>>>>> I didn't think we were using a single writer in the PIP 192 design.
>> I
>>>>>>> thought we had many producers sending events to a compacted topic.
>> My
>>>>>>> proposal would still have many producers, but the writer to
>> bookkeeper
>>>>>>> would act as the single writer. It would technically be distinct
>> from
>>>>>>> a normal Pulsar topic producer.
>>>>>>> 
>>>>>>> I should highlight that I am only proposing "broker filtering before
>>>>>>> write" in the context of PIP 192 and as an alternative to adding
>>>>>>> pluggable compaction strategies. It would not be a generic feature.
>>>>>>> 
>>>>>>> 
>>>>>> I was worried about the worst case where two producers(leaders)
>> happen
>>>>>> to write the compacted topic (although Pulsar can guarantee "a single
>>>>>> writer" or "a single producer" for a topic in normal situations).
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>>> Could we clarify how to handle
>>>>>>>> the following(edge cases and failure recovery)?
>>>>>>>> 0. Is the un-compacted topic a persistent topic or a
>> non-persistent
>>>>>>> topic?
>>>>>>> 
>>>>>>> It is a persistent topic.
>>>>>>> 
>>>>>>>> 1. How does the leader recover state from the two topics?
>>>>>>> 
>>>>>>> A leader would recover state by first consuming the whole compacted
>>>>>>> topic and then by consuming from the current location of a cursor on
>>>>>>> the first input topic. As stated elsewhere, this introduces latency
>>>>>>> and could be an issue.
>>>>>>> 
>>>>>>>> 2. How do we handle the case when the leader fails before writing
>>>>>>> messages
>>>>>>>> to the compacted topic
>>>>>>> 
>>>>>>> The leader would not acknowledge the message on the input topic
>> until
>>>>>>> it has successfully persisted the event on the compacted topic.
>>>>>>> Publishing the same event to a compacted topic multiple times is
>>>>>>> idempotent, so there is no risk of lost state. The real risk is
>>>>>>> latency. However, I think we might have similar (though not the
>> same)
>>>>>>> latency risks in the current solution.
>>>>>>> 
>>>>>>>> Analysis: the "pre-filter + two-topics" option can reduce the
>> number
>>>>>>> of
>>>>>>>> messages to broadcast at the expense of the leader broker
>> compaction.
>>>>>>> 
>>>>>>> My primary point is that with this PIP's design, the filter logic is
>>>>>>> run on every broker and again during topic compaction. With the
>>>>>>> alternative design, the filter is run once.
>>>>>>> 
>>>>>>> Thank you for the clarification.
>>>>>> 
>>>>>> I think the difference is that the post-filter is an optimistic
>> approach
>>>>>> as it optimistically relies on the "broadcast-filter" effect(brokers
>> will
>>>>>> defer client lookups if notified ahead that any assignment is
>> ongoing for
>>>>>> bundle x). Yes, in the worst case, if the broadcast is slower, each
>> broker
>>>>>> needs to individually compact the conflicting assignment requests.
>>>>>> 
>>>>>> Conversely, one downside of the pessimistic approach (single leader
>>>>>> pre-filter) is that when there are not many conflict concurrent
>> assignment
>>>>>> requests(assign for bundle a, assign for bundle b, assign for bundle
>> c...),
>>>>>> the requests need to redundantly go through the leader compaction.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>>> 3. initially less complex to implement (leaderless conflict
>>>>>>> resolution and
>>>>>>>> requires a single topic)
>>>>>>> 
>>>>>>> PIP 215 has its own complexity too. Coordinating filters
>>>>>>> on both the client (table view) and the server (compaction) is non
>>>>>>> trivial. The proposed API includes hard coded client configuration
>> for
>>>>>>> each component, which will make upgrading the version of the
>>>>>>> compaction strategy complicated, and could lead to incorrect
>>>>>>> interpretation of events in the stream. When a single broker is
>> doing
>>>>>>> the filtering, versioning is no longer a distributed problem. That
>>>>>>> being said, I do not mean to suggest my solution is without
>>>>>>> complexity.
>>>>>>> 
>>>>>>>> 4. it is not a "one-way door" decision (we could add the
>> pre-filter
>>>>>>> logic
>>>>>>>> as well later)
>>>>>>> 
>>>>>>> It's fair to say that we could add it later, but at that point, we
>>>>>>> will have added this new API for compaction strategy. Are we
>> confident
>>>>>>> that pluggable compaction is independently an important addition to
>>>>>>> Pulsar's
>>>>>>> features, or would it make sense to make this API only exposed in
>> the
>>>>>>> broker?
>>>>>>> 
>>>>>>> 
>>>>>> The intention is that this compaction feature could be useful for
>>>>>> complex user applications (if they are trying to do a similar
>> thing). As I
>>>>>> mentioned, this feature is closely tied to the PIP-192 now. We are
>> not
>>>>>> planning to expose this feature to users soon unless demanded and
>> proven to
>>>>>> be stable.
>>>>>> 
>>>>>> 
>>>>>>> Thanks,
>>>>>>> Michael
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
>>>>>>> <heesung.s...@streamnative.io.invalid> wrote:
>>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> 
>>>>>>>> Also, I thought about some concurrent assignment scenarios between
>>>>>>>> pre-filter vs post-filter.
>>>>>>>> 
>>>>>>>> Example 1: When the topic broadcast is slower than the concurrent
>>>>>>>> assignment requests
>>>>>>>> 
>>>>>>>> With pre-filter + two-topics (non-compacted and compacted topics)
>>>>>>>> t1: A -> non-compacted topic // broker A published a message to
>> the
>>>>>>>> non-compacted topic, m1: {broker A assigned bundle x to broker A}
>>>>>>>> t2: B -> non-compacted topic // broker B published a message, m2:
>>>>>>> {broker B
>>>>>>>> assigned bundle x to broker C}
>>>>>>>> t3: C -> non-compacted topic // broker C published a message, m3:
>>>>>>> {broker C
>>>>>>>> assigned bundle x to broker B}
>>>>>>>> t4: non-compacted topic -> L // leader broker consumed the
>> messages:
>>>>>>> m1,m2,
>>>>>>>> and m3
>>>>>>>> t5: L -> compacted topic // leader compacted the messages and
>>>>>>> broadcasted
>>>>>>>> m1 to all consumers
>>>>>>>> t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>>>>>>>> 
>>>>>>>> With post-filter + a single topic
>>>>>>>> t1: A -> topic // broker A published a message to the
>> non-compacted
>>>>>>> topic,
>>>>>>>> m1: {broker A assigned bundle x to broker A}
>>>>>>>> t2: B -> topic // broker B published a message, m2: {broker B
>> assigned
>>>>>>>> bundle x to broker C}
>>>>>>>> t3: C -> topic // broker C published a message, m3: {broker C
>> assigned
>>>>>>>> bundle x to broker B}
>>>>>>>> t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2,
>>>>>>> and m3
>>>>>>>> t5: [A,B,C] -> m1 // broker A,B,C individually compacted the
>> messages
>>>>>>> to m1.
>>>>>>>> 
>>>>>>>> Analysis: the "pre-filter + two-topics" option can reduce the
>> number
>>>>>>> of
>>>>>>>> messages to broadcast at the expense of the leader broker
>> compaction.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Example 2: When the topic broadcast is faster than the concurrent
>>>>>>>> assignment requests
>>>>>>>> 
>>>>>>>> With pre-filter + two-topics (non-compacted and compacted topics)
>>>>>>>> t1: A -> non-compacted topic // broker A published a message to
>> the
>>>>>>>> non-compacted topic, m1: {broker A assigned bundle x to broker A}
>>>>>>>> t2: non-compacted topic -> L // leader broker consumed the
>> messages:
>>>>>>> m1
>>>>>>>> t3: L -> compacted topic // leader compacted the message and
>>>>>>> broadcasted m1
>>>>>>>> to all consumers
>>>>>>>> t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>>>>>>>> t5: A-> own bundle // broker A knows that its assignment has been
>>>>>>> accepted,
>>>>>>>> so proceeding to own the bundle (meanwhile deferring lookup
>> requests)
>>>>>>>> t6: B -> defer client lookups // broker B knows that bundle
>>>>>>> assignment is
>>>>>>>> running(meanwhile deferring lookup requests)
>>>>>>>> t7: C -> defer client lookups // broker C knows that bundle
>>>>>>> assignment is
>>>>>>>> running(meanwhile deferring lookup requests)
>>>>>>>> 
>>>>>>>> With post-filter + a single topic
>>>>>>>> t1: A -> topic // broker A published a message to the
>> non-compacted
>>>>>>> topic,
>>>>>>>> m1: {broker A assigned bundle x to broker A}
>>>>>>>> t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
>>>>>>>> t3:  A-> own bundle // broker A knows that its assignment has been
>>>>>>>> accepted, so proceeding to own the bundle (meanwhile deferring
>> lookup
>>>>>>>> requests)
>>>>>>>> t4: B -> defer client lookups // broker B knows that bundle
>>>>>>> assignment is
>>>>>>>> running(meanwhile deferring lookup requests)
>>>>>>>> t5: C -> defer client lookups // broker C knows that bundle
>>>>>>> assignment is
>>>>>>>> running(meanwhile deferring lookup requests)
>>>>>>>> 
>>>>>>>> Analysis: The "post-filter + a single topic" can perform ok in
>> this
>>>>>>> case
>>>>>>>> without the additional leader coordination and the secondary topic
>>>>>>> because
>>>>>>>> the early broadcast can inform all brokers and prevent them from
>>>>>>> requesting
>>>>>>>> other assignments for the same bundle.
>>>>>>>> 
>>>>>>>> I think the post-filter option is initially not bad because:
>>>>>>>> 
>>>>>>>> 1. it is safe in the worst case (in case the messages are not
>>>>>>> correctly
>>>>>>>> pre-filtered at the leader)
>>>>>>>> 2. it performs ok because the early broadcast can prevent
>>>>>>>> concurrent assignment requests.
>>>>>>>> 3. initially less complex to implement (leaderless conflict
>>>>>>> resolution and
>>>>>>>> requires a single topic)
>>>>>>>> 4. it is not a "one-way door" decision (we could add the
>> pre-filter
>>>>>>> logic
>>>>>>>> as well later)
>>>>>>>> 
>>>>>>>> Regards,
>>>>>>>> Heesung
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
>>>>>>> heesung.s...@streamnative.io>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi Michael,
>>>>>>>>> 
>>>>>>>>> For the pre-prefilter(pre-compaction) option,
>>>>>>>>> I think the leader requires a write-through cache to compact
>>>>>>> messages
>>>>>>>>> based on the latest states. Otherwise, the leader needs to wait
>> for
>>>>>>> the
>>>>>>>>> last msg from the (compacted) topic before compacting the next
>> msg
>>>>>>> for the
>>>>>>>>> same bundle.
>>>>>>>>> 
>>>>>>>>> Pulsar guarantees "a single writer". However, for the worst-case
>>>>>>>>> scenario(due to network partitions, bugs in zk or etcd leader
>>>>>>> election,
>>>>>>>>> bugs in bk, data corruption ), I think it is safe to place the
>>>>>>> post-filter
>>>>>>>>> on the consumer side(compaction and table views) as well in
>> order to
>>>>>>>>> validate the state changes.
>>>>>>>>> 
>>>>>>>>> For the two-topic approach,
>>>>>>>>> I think we lose a single linearized view. Could we clarify how
>> to
>>>>>>> handle
>>>>>>>>> the following(edge cases and failure recovery)?
>>>>>>>>> 0. Is the un-compacted topic a persistent topic or a
>> non-persistent
>>>>>>> topic?
>>>>>>>>> 1. How does the leader recover state from the two topics?
>>>>>>>>> 2. How do we handle the case when the leader fails before
>> writing
>>>>>>> messages
>>>>>>>>> to the compacted topic
>>>>>>>>> 
>>>>>>>>> Regards,
>>>>>>>>> Heesung
>>>>>>>>> 
>>>>>>>>> On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <
>>>>>>> mmarsh...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Sharing some more thoughts. We could alternatively use two
>> topics
>>>>>>>>>> instead of one. In this design, the first topic is the
>> unfiltered
>>>>>>>>>> write ahead log that represents many writers (brokers) trying
>> to
>>>>>>>>>> acquire ownership of bundles. The second topic is the
>> distilled log
>>>>>>>>>> that represents the "winners" or the "owners" of the bundles.
>>>>>>> There is
>>>>>>>>>> a single writer, the leader broker, that reads from the input
>> topic
>>>>>>>>>> and writes to the output topic. The first topic is normal and
>> the
>>>>>>>>>> second is compacted.
>>>>>>>>>> 
>>>>>>>>>> The primary benefit in a two topic solution is that it is easy
>> for
>>>>>>> the
>>>>>>>>>> leader broker to trade off ownership without needing to slow
>> down
>>>>>>>>>> writes to the input topic. The leader broker will start
>> consuming
>>>>>>> from
>>>>>>>>>> the input topic when it has fully consumed the table view on
>> the
>>>>>>>>>> output topic. In general, I don't think consumers know when
>> they
>>>>>>> have
>>>>>>>>>> "reached the end of a table view", but we should be able to
>>>>>>> trivially
>>>>>>>>>> figure this out if we are the topic's only writer and the
>> topic and
>>>>>>>>>> writer are collocated on the same broker.
>>>>>>>>>> 
>>>>>>>>>> In that design, it might make sense to use something like the
>>>>>>>>>> replication cursor to keep track of this consumer's state.
>>>>>>>>>> 
>>>>>>>>>> - Michael
>>>>>>>>>> 
>>>>>>>>>> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
>>>>>>> mmarsh...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Thanks for your proposal, Heesung.
>>>>>>>>>>> 
>>>>>>>>>>> Fundamentally, we have the problems listed in this PIP
>> because
>>>>>>> we have
>>>>>>>>>>> multiple writers instead of just one writer. Can we solve
>> this
>>>>>>> problem
>>>>>>>>>>> by changing our write pattern? What if we use the leader
>> broker
>>>>>>> as the
>>>>>>>>>>> single writer? That broker would intercept attempts to
>> acquire
>>>>>>>>>>> ownership on bundles and would grant ownership to the first
>>>>>>> broker to
>>>>>>>>>>> claim an unassigned bundle. It could "grant ownership" by
>>>>>>> letting the
>>>>>>>>>>> first write to claim an unassigned bundle get written to the
>>>>>>> ownership
>>>>>>>>>>> topic. When a bundle is already owned, the leader won't
>> persist
>>>>>>> that
>>>>>>>>>>> event to the bookkeeper. In this design, the log becomes a
>> true
>>>>>>>>>>> ownership log, which will correctly work with the existing
>> topic
>>>>>>>>>>> compaction and table view solutions. My proposal essentially
>>>>>>> moves the
>>>>>>>>>>> conflict resolution to just before the write, and as a
>>>>>>> consequence, it
>>>>>>>>>>> greatly reduces the need for post processing of the event
>> log.
>>>>>>> One
>>>>>>>>>>> trade off might be that the leader broker could slow down the
>>>>>>> write
>>>>>>>>>>> path, but given that the leader would just need to verify the
>>>>>>> current
>>>>>>>>>>> state of the bundle, I think it'd be performant enough.
>>>>>>>>>>> 
>>>>>>>>>>> Additionally, we'd need the leader broker to be "caught up"
>> on
>>>>>>> bundle
>>>>>>>>>>> ownership in order to grant ownership of topics, but unless
>> I am
>>>>>>>>>>> mistaken, that is already a requirement of the current PIP
>> 192
>>>>>>>>>>> paradigm.
>>>>>>>>>>> 
>>>>>>>>>>> Below are some additional thoughts that will be relevant if
>> we
>>>>>>> move
>>>>>>>>>>> forward with the design as it is currently proposed.
>>>>>>>>>>> 
>>>>>>>>>>> I think it might be helpful to update the title to show that
>> this
>>>>>>>>>>> proposal will also affect table view as well. I didn't catch
>>>>>>> that at
>>>>>>>>>>> first.
>>>>>>>>>>> 
>>>>>>>>>>> Do you have any documentation describing how the
>>>>>>>>>>> TopicCompactionStrategy will determine which states are
>> valid in
>>>>>>> the
>>>>>>>>>>> context of load balancing? I looked at
>>>>>>>>>>> https://github.com/apache/pulsar/pull/18195, but I couldn't
>>>>>>> seem to
>>>>>>>>>>> find anything for it. That would help make this proposal less
>>>>>>>>>>> abstract.
>>>>>>>>>>> 
>>>>>>>>>>> The proposed API seems very tied to the needs of PIP 192. For
>>>>>>> example,
>>>>>>>>>>> `isValid` is not a term I associate with topic compaction.
>> The
>>>>>>>>>>> fundamental question for compaction is which value to keep
>> (or
>>>>>>> build a
>>>>>>>>>>> new value). I think we might be able to simplify the API by
>>>>>>> replacing
>>>>>>>>>>> the "isValid", "isMergeEnabled", and "merge" methods with a
>>>>>>> single
>>>>>>>>>>> method that lets the implementation handle one or all tasks.
>> That
>>>>>>>>>>> would also remove the need to deserialize payloads multiple
>>>>>>> times too.
>>>>>>>>>>> 
>>>>>>>>>>> I also feel like mentioning that after working with the PIP
>> 105
>>>>>>> broker
>>>>>>>>>>> side filtering, I think we should avoid running UDFs in the
>>>>>>> broker as
>>>>>>>>>>> much as possible. (I do not consider the load balancing
>> logic to
>>>>>>> be a
>>>>>>>>>>> UDF here.) I think it would be worth not making this a user
>>>>>>> facing
>>>>>>>>>>> feature unless there is demand for real use cases.
>>>>>>>>>>> 
>>>>>>>>>>> Thanks!
>>>>>>>>>>> Michael
>>>>>>>>>>> 
>>>>>>>>>>> On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bog...@apache.org>
>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> +1(non-binding)
>>>>>>>>>>>> 
>>>>>>>>>>>> thanks,
>>>>>>>>>>>> bo
>>>>>>>>>>>> 
>>>>>>>>>>>> Heesung Sohn <heesung.s...@streamnative.io.invalid>
>>>>>>> 于2022年10月19日周三
>>>>>>>>>> 07:54写道:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi pulsar-dev community,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I raised a pip to discuss : PIP-215: Configurable Topic
>>>>>>> Compaction
>>>>>>>>>> Strategy
>>>>>>>>>>>>> 
>>>>>>>>>>>>> PIP link: https://github.com/apache/pulsar/issues/18099
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> Heesung
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>> 

Reply via email to