Hi Machel,

Here are my additional comments regarding your earlier email.

- I updated the PIP title to show that this will impact table view as well.

- PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the general
idea of the states and their actions, and I defined the actual states here
in the PR,
https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a.
I
will further clarify the bundle state data validation logic when
introducing `BundleStateCompactionStrategy` class. This PIP is to support
CompactionStrategy in general.

- Agreed with your point that we should merge CompactionStrategy APIs. I
updated the interface proposal in the PIP. I replaced `"isValid",
"isMergeEnabled", and "merge"` apis with "compact" api.


Thanks,
Heesung


On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <heesung.s...@streamnative.io>
wrote:

> Hi,
> Thank you for the great comments.
> Please find my comments inline too.
>
> Regards,
> Heesung
>
> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <mmarsh...@apache.org>
> wrote:
>
>> > I think we lose a single linearized view.
>>
>> Which linearized view are we losing, and what is the role of that
>> linearized view? I think I might be missing why it is important. I
>> agree that consumers won't know about each unsuccessful attempted
>> acquisition of a bundle, but that seems like unnecessary information
>> to broadcast to every broker in the cluster.
>>
>>
>
> PIP-192 proposes an assignment, transfer, and split protocol(multi-phase
> state changes), relying on early broadcast across brokers, and all brokers
> react to their clients according to the state change notifications --
> brokers could defer any client lookups for bundle x if an
> assignment/transfer/split is ongoing for x(broadcasted early in the topic).
> One early broadcast example is the one that I discussed above, `When the
> topic broadcast is faster than the concurrent assignment requests.` I think
> the prefilter could delay this early broadcast, as it needs to go through
> the additional single-leader compaction path.
>
> The bundle state recovery process is simpler by a single linearized view.
>
> The single linearized view can be easier to debug bundle states. We can
> more easily track where the assignment requests come from and how it is
> compacted in a single linearized view.
>
>
>
>> > I think the leader requires a write-through cache to compact messages
>> based
>> > on the latest states.
>>
> This brings up an important point that I would like to clarify. If we
>> trust the write ahead log as the source of truth, what happens when a
>> bundle has been validly owned by multiple brokers? As a broker starts
>> and consumes from the compacted topic, how do we prevent it from
>> incorrectly thinking that it owns a bundle for some short time period
>> in the case that the ownership topic hasn't yet been compacted to
>> remove old ownership state?
>>
>
> Since the multi-phase transfer protocol involves the source and
> destination broker's actions, the successful transfer should get the source
> and destination broker to have the (near) latest state. For example, if
> some brokers have old ownership states(network partitioned or delayed),
> they will redirect clients to the source(old) broker. However, by the
> transfer protocol, the source broker should have the latest state, so it
> can redirect the client again to the destination broker.
>
> When a broker restarts, it won't start until its BSC state to the (near)
> latest (til the last known messageId at that time).
>
>
>> > Pulsar guarantees "a single writer".
>>
>> I didn't think we were using a single writer in the PIP 192 design. I
>> thought we had many producers sending events to a compacted topic. My
>> proposal would still have many producers, but the writer to bookkeeper
>> would act as the single writer. It would technically be distinct from
>> a normal Pulsar topic producer.
>>
>> I should highlight that I am only proposing "broker filtering before
>> write" in the context of PIP 192 and as an alternative to adding
>> pluggable compaction strategies. It would not be a generic feature.
>>
>>
> I was worried about the worst case where two producers(leaders) happen to
> write the compacted topic (although Pulsar can guarantee "a single writer"
> or "a single producer" for a topic in normal situations).
>
>
>
>> > Could we clarify how to handle
>> > the following(edge cases and failure recovery)?
>> > 0. Is the un-compacted topic a persistent topic or a non-persistent
>> topic?
>>
>> It is a persistent topic.
>>
>> > 1. How does the leader recover state from the two topics?
>>
>> A leader would recover state by first consuming the whole compacted
>> topic and then by consuming from the current location of a cursor on
>> the first input topic. As stated elsewhere, this introduces latency
>> and could be an issue.
>>
>> > 2. How do we handle the case when the leader fails before writing
>> messages
>> > to the compacted topic
>>
>> The leader would not acknowledge the message on the input topic until
>> it has successfully persisted the event on the compacted topic.
>> Publishing the same event to a compacted topic multiple times is
>> idempotent, so there is no risk of lost state. The real risk is
>> latency. However, I think we might have similar (though not the same)
>> latency risks in the current solution.
>>
>> > Analysis: the "pre-filter + two-topics" option can reduce the number of
>> > messages to broadcast at the expense of the leader broker compaction.
>>
>> My primary point is that with this PIP's design, the filter logic is
>> run on every broker and again during topic compaction. With the
>> alternative design, the filter is run once.
>>
>> Thank you for the clarification.
>
> I think the difference is that the post-filter is an optimistic approach
> as it optimistically relies on the "broadcast-filter" effect(brokers will
> defer client lookups if notified ahead that any assignment is ongoing for
> bundle x). Yes, in the worst case, if the broadcast is slower, each broker
> needs to individually compact the conflicting assignment requests.
>
> Conversely, one downside of the pessimistic approach (single leader
> pre-filter) is that when there are not many conflict concurrent assignment
> requests(assign for bundle a, assign for bundle b, assign for bundle c...),
> the requests need to redundantly go through the leader compaction.
>
>
>
>> > 3. initially less complex to implement (leaderless conflict resolution
>> and
>> > requires a single topic)
>>
>> PIP 215 has its own complexity too. Coordinating filters
>> on both the client (table view) and the server (compaction) is non
>> trivial. The proposed API includes hard coded client configuration for
>> each component, which will make upgrading the version of the
>> compaction strategy complicated, and could lead to incorrect
>> interpretation of events in the stream. When a single broker is doing
>> the filtering, versioning is no longer a distributed problem. That
>> being said, I do not mean to suggest my solution is without
>> complexity.
>>
>> > 4. it is not a "one-way door" decision (we could add the pre-filter
>> logic
>> > as well later)
>>
>> It's fair to say that we could add it later, but at that point, we
>> will have added this new API for compaction strategy. Are we confident
>> that pluggable compaction is independently an important addition to
>> Pulsar's
>> features, or would it make sense to make this API only exposed in the
>> broker?
>>
>>
> The intention is that this compaction feature could be useful for complex
> user applications (if they are trying to do a similar thing). As I
> mentioned, this feature is closely tied to the PIP-192 now. We are not
> planning to expose this feature to users soon unless demanded and proven to
> be stable.
>
>
>> Thanks,
>> Michael
>>
>>
>>
>>
>>
>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
>> <heesung.s...@streamnative.io.invalid> wrote:
>> >
>> > Hi,
>> >
>> > Also, I thought about some concurrent assignment scenarios between
>> > pre-filter vs post-filter.
>> >
>> > Example 1: When the topic broadcast is slower than the concurrent
>> > assignment requests
>> >
>> > With pre-filter + two-topics (non-compacted and compacted topics)
>> > t1: A -> non-compacted topic // broker A published a message to the
>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
>> > t2: B -> non-compacted topic // broker B published a message, m2:
>> {broker B
>> > assigned bundle x to broker C}
>> > t3: C -> non-compacted topic // broker C published a message, m3:
>> {broker C
>> > assigned bundle x to broker B}
>> > t4: non-compacted topic -> L // leader broker consumed the messages:
>> m1,m2,
>> > and m3
>> > t5: L -> compacted topic // leader compacted the messages and
>> broadcasted
>> > m1 to all consumers
>> > t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>> >
>> > With post-filter + a single topic
>> > t1: A -> topic // broker A published a message to the non-compacted
>> topic,
>> > m1: {broker A assigned bundle x to broker A}
>> > t2: B -> topic // broker B published a message, m2: {broker B assigned
>> > bundle x to broker C}
>> > t3: C -> topic // broker C published a message, m3: {broker C assigned
>> > bundle x to broker B}
>> > t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2, and
>> m3
>> > t5: [A,B,C] -> m1 // broker A,B,C individually compacted the messages
>> to m1.
>> >
>> > Analysis: the "pre-filter + two-topics" option can reduce the number of
>> > messages to broadcast at the expense of the leader broker compaction.
>> >
>> >
>> > Example 2: When the topic broadcast is faster than the concurrent
>> > assignment requests
>> >
>> > With pre-filter + two-topics (non-compacted and compacted topics)
>> > t1: A -> non-compacted topic // broker A published a message to the
>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
>> > t2: non-compacted topic -> L // leader broker consumed the messages: m1
>> > t3: L -> compacted topic // leader compacted the message and
>> broadcasted m1
>> > to all consumers
>> > t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>> > t5: A-> own bundle // broker A knows that its assignment has been
>> accepted,
>> > so proceeding to own the bundle (meanwhile deferring lookup requests)
>> > t6: B -> defer client lookups // broker B knows that bundle assignment
>> is
>> > running(meanwhile deferring lookup requests)
>> > t7: C -> defer client lookups // broker C knows that bundle assignment
>> is
>> > running(meanwhile deferring lookup requests)
>> >
>> > With post-filter + a single topic
>> > t1: A -> topic // broker A published a message to the non-compacted
>> topic,
>> > m1: {broker A assigned bundle x to broker A}
>> > t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
>> > t3:  A-> own bundle // broker A knows that its assignment has been
>> > accepted, so proceeding to own the bundle (meanwhile deferring lookup
>> > requests)
>> > t4: B -> defer client lookups // broker B knows that bundle assignment
>> is
>> > running(meanwhile deferring lookup requests)
>> > t5: C -> defer client lookups // broker C knows that bundle assignment
>> is
>> > running(meanwhile deferring lookup requests)
>> >
>> > Analysis: The "post-filter + a single topic" can perform ok in this case
>> > without the additional leader coordination and the secondary topic
>> because
>> > the early broadcast can inform all brokers and prevent them from
>> requesting
>> > other assignments for the same bundle.
>> >
>> > I think the post-filter option is initially not bad because:
>> >
>> > 1. it is safe in the worst case (in case the messages are not correctly
>> > pre-filtered at the leader)
>> > 2. it performs ok because the early broadcast can prevent
>> > concurrent assignment requests.
>> > 3. initially less complex to implement (leaderless conflict resolution
>> and
>> > requires a single topic)
>> > 4. it is not a "one-way door" decision (we could add the pre-filter
>> logic
>> > as well later)
>> >
>> > Regards,
>> > Heesung
>> >
>> >
>> >
>> > On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
>> heesung.s...@streamnative.io>
>> > wrote:
>> >
>> > > Hi Michael,
>> > >
>> > > For the pre-prefilter(pre-compaction) option,
>> > > I think the leader requires a write-through cache to compact messages
>> > > based on the latest states. Otherwise, the leader needs to wait for
>> the
>> > > last msg from the (compacted) topic before compacting the next msg
>> for the
>> > > same bundle.
>> > >
>> > > Pulsar guarantees "a single writer". However, for the worst-case
>> > > scenario(due to network partitions, bugs in zk or etcd leader
>> election,
>> > > bugs in bk, data corruption ), I think it is safe to place the
>> post-filter
>> > > on the consumer side(compaction and table views) as well in order to
>> > > validate the state changes.
>> > >
>> > > For the two-topic approach,
>> > > I think we lose a single linearized view. Could we clarify how to
>> handle
>> > > the following(edge cases and failure recovery)?
>> > > 0. Is the un-compacted topic a persistent topic or a non-persistent
>> topic?
>> > > 1. How does the leader recover state from the two topics?
>> > > 2. How do we handle the case when the leader fails before writing
>> messages
>> > > to the compacted topic
>> > >
>> > > Regards,
>> > > Heesung
>> > >
>> > > On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <
>> mmarsh...@apache.org>
>> > > wrote:
>> > >
>> > >> Sharing some more thoughts. We could alternatively use two topics
>> > >> instead of one. In this design, the first topic is the unfiltered
>> > >> write ahead log that represents many writers (brokers) trying to
>> > >> acquire ownership of bundles. The second topic is the distilled log
>> > >> that represents the "winners" or the "owners" of the bundles. There
>> is
>> > >> a single writer, the leader broker, that reads from the input topic
>> > >> and writes to the output topic. The first topic is normal and the
>> > >> second is compacted.
>> > >>
>> > >> The primary benefit in a two topic solution is that it is easy for
>> the
>> > >> leader broker to trade off ownership without needing to slow down
>> > >> writes to the input topic. The leader broker will start consuming
>> from
>> > >> the input topic when it has fully consumed the table view on the
>> > >> output topic. In general, I don't think consumers know when they have
>> > >> "reached the end of a table view", but we should be able to trivially
>> > >> figure this out if we are the topic's only writer and the topic and
>> > >> writer are collocated on the same broker.
>> > >>
>> > >> In that design, it might make sense to use something like the
>> > >> replication cursor to keep track of this consumer's state.
>> > >>
>> > >> - Michael
>> > >>
>> > >> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
>> mmarsh...@apache.org>
>> > >> wrote:
>> > >> >
>> > >> > Thanks for your proposal, Heesung.
>> > >> >
>> > >> > Fundamentally, we have the problems listed in this PIP because we
>> have
>> > >> > multiple writers instead of just one writer. Can we solve this
>> problem
>> > >> > by changing our write pattern? What if we use the leader broker as
>> the
>> > >> > single writer? That broker would intercept attempts to acquire
>> > >> > ownership on bundles and would grant ownership to the first broker
>> to
>> > >> > claim an unassigned bundle. It could "grant ownership" by letting
>> the
>> > >> > first write to claim an unassigned bundle get written to the
>> ownership
>> > >> > topic. When a bundle is already owned, the leader won't persist
>> that
>> > >> > event to the bookkeeper. In this design, the log becomes a true
>> > >> > ownership log, which will correctly work with the existing topic
>> > >> > compaction and table view solutions. My proposal essentially moves
>> the
>> > >> > conflict resolution to just before the write, and as a
>> consequence, it
>> > >> > greatly reduces the need for post processing of the event log. One
>> > >> > trade off might be that the leader broker could slow down the write
>> > >> > path, but given that the leader would just need to verify the
>> current
>> > >> > state of the bundle, I think it'd be performant enough.
>> > >> >
>> > >> > Additionally, we'd need the leader broker to be "caught up" on
>> bundle
>> > >> > ownership in order to grant ownership of topics, but unless I am
>> > >> > mistaken, that is already a requirement of the current PIP 192
>> > >> > paradigm.
>> > >> >
>> > >> > Below are some additional thoughts that will be relevant if we move
>> > >> > forward with the design as it is currently proposed.
>> > >> >
>> > >> > I think it might be helpful to update the title to show that this
>> > >> > proposal will also affect table view as well. I didn't catch that
>> at
>> > >> > first.
>> > >> >
>> > >> > Do you have any documentation describing how the
>> > >> > TopicCompactionStrategy will determine which states are valid in
>> the
>> > >> > context of load balancing? I looked at
>> > >> > https://github.com/apache/pulsar/pull/18195, but I couldn't seem
>> to
>> > >> > find anything for it. That would help make this proposal less
>> > >> > abstract.
>> > >> >
>> > >> > The proposed API seems very tied to the needs of PIP 192. For
>> example,
>> > >> > `isValid` is not a term I associate with topic compaction. The
>> > >> > fundamental question for compaction is which value to keep (or
>> build a
>> > >> > new value). I think we might be able to simplify the API by
>> replacing
>> > >> > the "isValid", "isMergeEnabled", and "merge" methods with a single
>> > >> > method that lets the implementation handle one or all tasks. That
>> > >> > would also remove the need to deserialize payloads multiple times
>> too.
>> > >> >
>> > >> > I also feel like mentioning that after working with the PIP 105
>> broker
>> > >> > side filtering, I think we should avoid running UDFs in the broker
>> as
>> > >> > much as possible. (I do not consider the load balancing logic to
>> be a
>> > >> > UDF here.) I think it would be worth not making this a user facing
>> > >> > feature unless there is demand for real use cases.
>> > >> >
>> > >> > Thanks!
>> > >> > Michael
>> > >> >
>> > >> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bog...@apache.org> wrote:
>> > >> > >
>> > >> > > +1(non-binding)
>> > >> > >
>> > >> > > thanks,
>> > >> > > bo
>> > >> > >
>> > >> > > Heesung Sohn <heesung.s...@streamnative.io.invalid>
>> 于2022年10月19日周三
>> > >> 07:54写道:
>> > >> > > >
>> > >> > > > Hi pulsar-dev community,
>> > >> > > >
>> > >> > > > I raised a pip to discuss : PIP-215: Configurable Topic
>> Compaction
>> > >> Strategy
>> > >> > > >
>> > >> > > > PIP link: https://github.com/apache/pulsar/issues/18099
>> > >> > > >
>> > >> > > > Regards,
>> > >> > > > Heesung
>> > >>
>> > >
>>
>

Reply via email to