Oops! Michael, I apologize for the typo in your name.

On Thu, Nov 3, 2022 at 9:47 AM Heesung Sohn <heesung.s...@streamnative.io>
wrote:

> Hi Machel,
>
> Here are my additional comments regarding your earlier email.
>
> - I updated the PIP title to show that this will impact table view as well.
>
> - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the
> general idea of the states and their actions, and I defined the actual
> states here in the PR,
> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a.
>  I
> will further clarify the bundle state data validation logic when
> introducing `BundleStateCompactionStrategy` class. This PIP is to support
> CompactionStrategy in general.
>
> - Agreed with your point that we should merge CompactionStrategy APIs. I
> updated the interface proposal in the PIP. I replaced `"isValid",
> "isMergeEnabled", and "merge"` apis with "compact" api.
>
>
> Thanks,
> Heesung
>
>
> On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <heesung.s...@streamnative.io>
> wrote:
>
>> Hi,
>> Thank you for the great comments.
>> Please find my comments inline too.
>>
>> Regards,
>> Heesung
>>
>> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <mmarsh...@apache.org>
>> wrote:
>>
>>> > I think we lose a single linearized view.
>>>
>>> Which linearized view are we losing, and what is the role of that
>>> linearized view? I think I might be missing why it is important. I
>>> agree that consumers won't know about each unsuccessful attempted
>>> acquisition of a bundle, but that seems like unnecessary information
>>> to broadcast to every broker in the cluster.
>>>
>>>
>>
>> PIP-192 proposes an assignment, transfer, and split protocol(multi-phase
>> state changes), relying on early broadcast across brokers, and all brokers
>> react to their clients according to the state change notifications --
>> brokers could defer any client lookups for bundle x if an
>> assignment/transfer/split is ongoing for x(broadcasted early in the topic).
>> One early broadcast example is the one that I discussed above, `When the
>> topic broadcast is faster than the concurrent assignment requests.` I think
>> the prefilter could delay this early broadcast, as it needs to go through
>> the additional single-leader compaction path.
>>
>> The bundle state recovery process is simpler by a single linearized view.
>>
>> The single linearized view can be easier to debug bundle states. We can
>> more easily track where the assignment requests come from and how it is
>> compacted in a single linearized view.
>>
>>
>>
>>> > I think the leader requires a write-through cache to compact messages
>>> based
>>> > on the latest states.
>>>
>> This brings up an important point that I would like to clarify. If we
>>> trust the write ahead log as the source of truth, what happens when a
>>> bundle has been validly owned by multiple brokers? As a broker starts
>>> and consumes from the compacted topic, how do we prevent it from
>>> incorrectly thinking that it owns a bundle for some short time period
>>> in the case that the ownership topic hasn't yet been compacted to
>>> remove old ownership state?
>>>
>>
>> Since the multi-phase transfer protocol involves the source and
>> destination broker's actions, the successful transfer should get the source
>> and destination broker to have the (near) latest state. For example, if
>> some brokers have old ownership states(network partitioned or delayed),
>> they will redirect clients to the source(old) broker. However, by the
>> transfer protocol, the source broker should have the latest state, so it
>> can redirect the client again to the destination broker.
>>
>> When a broker restarts, it won't start until its BSC state to the (near)
>> latest (til the last known messageId at that time).
>>
>>
>>> > Pulsar guarantees "a single writer".
>>>
>>> I didn't think we were using a single writer in the PIP 192 design. I
>>> thought we had many producers sending events to a compacted topic. My
>>> proposal would still have many producers, but the writer to bookkeeper
>>> would act as the single writer. It would technically be distinct from
>>> a normal Pulsar topic producer.
>>>
>>> I should highlight that I am only proposing "broker filtering before
>>> write" in the context of PIP 192 and as an alternative to adding
>>> pluggable compaction strategies. It would not be a generic feature.
>>>
>>>
>> I was worried about the worst case where two producers(leaders) happen to
>> write the compacted topic (although Pulsar can guarantee "a single writer"
>> or "a single producer" for a topic in normal situations).
>>
>>
>>
>>> > Could we clarify how to handle
>>> > the following(edge cases and failure recovery)?
>>> > 0. Is the un-compacted topic a persistent topic or a non-persistent
>>> topic?
>>>
>>> It is a persistent topic.
>>>
>>> > 1. How does the leader recover state from the two topics?
>>>
>>> A leader would recover state by first consuming the whole compacted
>>> topic and then by consuming from the current location of a cursor on
>>> the first input topic. As stated elsewhere, this introduces latency
>>> and could be an issue.
>>>
>>> > 2. How do we handle the case when the leader fails before writing
>>> messages
>>> > to the compacted topic
>>>
>>> The leader would not acknowledge the message on the input topic until
>>> it has successfully persisted the event on the compacted topic.
>>> Publishing the same event to a compacted topic multiple times is
>>> idempotent, so there is no risk of lost state. The real risk is
>>> latency. However, I think we might have similar (though not the same)
>>> latency risks in the current solution.
>>>
>>> > Analysis: the "pre-filter + two-topics" option can reduce the number of
>>> > messages to broadcast at the expense of the leader broker compaction.
>>>
>>> My primary point is that with this PIP's design, the filter logic is
>>> run on every broker and again during topic compaction. With the
>>> alternative design, the filter is run once.
>>>
>>> Thank you for the clarification.
>>
>> I think the difference is that the post-filter is an optimistic approach
>> as it optimistically relies on the "broadcast-filter" effect(brokers will
>> defer client lookups if notified ahead that any assignment is ongoing for
>> bundle x). Yes, in the worst case, if the broadcast is slower, each broker
>> needs to individually compact the conflicting assignment requests.
>>
>> Conversely, one downside of the pessimistic approach (single leader
>> pre-filter) is that when there are not many conflict concurrent assignment
>> requests(assign for bundle a, assign for bundle b, assign for bundle c...),
>> the requests need to redundantly go through the leader compaction.
>>
>>
>>
>>> > 3. initially less complex to implement (leaderless conflict resolution
>>> and
>>> > requires a single topic)
>>>
>>> PIP 215 has its own complexity too. Coordinating filters
>>> on both the client (table view) and the server (compaction) is non
>>> trivial. The proposed API includes hard coded client configuration for
>>> each component, which will make upgrading the version of the
>>> compaction strategy complicated, and could lead to incorrect
>>> interpretation of events in the stream. When a single broker is doing
>>> the filtering, versioning is no longer a distributed problem. That
>>> being said, I do not mean to suggest my solution is without
>>> complexity.
>>>
>>> > 4. it is not a "one-way door" decision (we could add the pre-filter
>>> logic
>>> > as well later)
>>>
>>> It's fair to say that we could add it later, but at that point, we
>>> will have added this new API for compaction strategy. Are we confident
>>> that pluggable compaction is independently an important addition to
>>> Pulsar's
>>> features, or would it make sense to make this API only exposed in the
>>> broker?
>>>
>>>
>> The intention is that this compaction feature could be useful for complex
>> user applications (if they are trying to do a similar thing). As I
>> mentioned, this feature is closely tied to the PIP-192 now. We are not
>> planning to expose this feature to users soon unless demanded and proven to
>> be stable.
>>
>>
>>> Thanks,
>>> Michael
>>>
>>>
>>>
>>>
>>>
>>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn
>>> <heesung.s...@streamnative.io.invalid> wrote:
>>> >
>>> > Hi,
>>> >
>>> > Also, I thought about some concurrent assignment scenarios between
>>> > pre-filter vs post-filter.
>>> >
>>> > Example 1: When the topic broadcast is slower than the concurrent
>>> > assignment requests
>>> >
>>> > With pre-filter + two-topics (non-compacted and compacted topics)
>>> > t1: A -> non-compacted topic // broker A published a message to the
>>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
>>> > t2: B -> non-compacted topic // broker B published a message, m2:
>>> {broker B
>>> > assigned bundle x to broker C}
>>> > t3: C -> non-compacted topic // broker C published a message, m3:
>>> {broker C
>>> > assigned bundle x to broker B}
>>> > t4: non-compacted topic -> L // leader broker consumed the messages:
>>> m1,m2,
>>> > and m3
>>> > t5: L -> compacted topic // leader compacted the messages and
>>> broadcasted
>>> > m1 to all consumers
>>> > t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>>> >
>>> > With post-filter + a single topic
>>> > t1: A -> topic // broker A published a message to the non-compacted
>>> topic,
>>> > m1: {broker A assigned bundle x to broker A}
>>> > t2: B -> topic // broker B published a message, m2: {broker B assigned
>>> > bundle x to broker C}
>>> > t3: C -> topic // broker C published a message, m3: {broker C assigned
>>> > bundle x to broker B}
>>> > t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2, and
>>> m3
>>> > t5: [A,B,C] -> m1 // broker A,B,C individually compacted the messages
>>> to m1.
>>> >
>>> > Analysis: the "pre-filter + two-topics" option can reduce the number of
>>> > messages to broadcast at the expense of the leader broker compaction.
>>> >
>>> >
>>> > Example 2: When the topic broadcast is faster than the concurrent
>>> > assignment requests
>>> >
>>> > With pre-filter + two-topics (non-compacted and compacted topics)
>>> > t1: A -> non-compacted topic // broker A published a message to the
>>> > non-compacted topic, m1: {broker A assigned bundle x to broker A}
>>> > t2: non-compacted topic -> L // leader broker consumed the messages: m1
>>> > t3: L -> compacted topic // leader compacted the message and
>>> broadcasted m1
>>> > to all consumers
>>> > t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1
>>> > t5: A-> own bundle // broker A knows that its assignment has been
>>> accepted,
>>> > so proceeding to own the bundle (meanwhile deferring lookup requests)
>>> > t6: B -> defer client lookups // broker B knows that bundle assignment
>>> is
>>> > running(meanwhile deferring lookup requests)
>>> > t7: C -> defer client lookups // broker C knows that bundle assignment
>>> is
>>> > running(meanwhile deferring lookup requests)
>>> >
>>> > With post-filter + a single topic
>>> > t1: A -> topic // broker A published a message to the non-compacted
>>> topic,
>>> > m1: {broker A assigned bundle x to broker A}
>>> > t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1
>>> > t3:  A-> own bundle // broker A knows that its assignment has been
>>> > accepted, so proceeding to own the bundle (meanwhile deferring lookup
>>> > requests)
>>> > t4: B -> defer client lookups // broker B knows that bundle assignment
>>> is
>>> > running(meanwhile deferring lookup requests)
>>> > t5: C -> defer client lookups // broker C knows that bundle assignment
>>> is
>>> > running(meanwhile deferring lookup requests)
>>> >
>>> > Analysis: The "post-filter + a single topic" can perform ok in this
>>> case
>>> > without the additional leader coordination and the secondary topic
>>> because
>>> > the early broadcast can inform all brokers and prevent them from
>>> requesting
>>> > other assignments for the same bundle.
>>> >
>>> > I think the post-filter option is initially not bad because:
>>> >
>>> > 1. it is safe in the worst case (in case the messages are not correctly
>>> > pre-filtered at the leader)
>>> > 2. it performs ok because the early broadcast can prevent
>>> > concurrent assignment requests.
>>> > 3. initially less complex to implement (leaderless conflict resolution
>>> and
>>> > requires a single topic)
>>> > 4. it is not a "one-way door" decision (we could add the pre-filter
>>> logic
>>> > as well later)
>>> >
>>> > Regards,
>>> > Heesung
>>> >
>>> >
>>> >
>>> > On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn <
>>> heesung.s...@streamnative.io>
>>> > wrote:
>>> >
>>> > > Hi Michael,
>>> > >
>>> > > For the pre-prefilter(pre-compaction) option,
>>> > > I think the leader requires a write-through cache to compact messages
>>> > > based on the latest states. Otherwise, the leader needs to wait for
>>> the
>>> > > last msg from the (compacted) topic before compacting the next msg
>>> for the
>>> > > same bundle.
>>> > >
>>> > > Pulsar guarantees "a single writer". However, for the worst-case
>>> > > scenario(due to network partitions, bugs in zk or etcd leader
>>> election,
>>> > > bugs in bk, data corruption ), I think it is safe to place the
>>> post-filter
>>> > > on the consumer side(compaction and table views) as well in order to
>>> > > validate the state changes.
>>> > >
>>> > > For the two-topic approach,
>>> > > I think we lose a single linearized view. Could we clarify how to
>>> handle
>>> > > the following(edge cases and failure recovery)?
>>> > > 0. Is the un-compacted topic a persistent topic or a non-persistent
>>> topic?
>>> > > 1. How does the leader recover state from the two topics?
>>> > > 2. How do we handle the case when the leader fails before writing
>>> messages
>>> > > to the compacted topic
>>> > >
>>> > > Regards,
>>> > > Heesung
>>> > >
>>> > > On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall <
>>> mmarsh...@apache.org>
>>> > > wrote:
>>> > >
>>> > >> Sharing some more thoughts. We could alternatively use two topics
>>> > >> instead of one. In this design, the first topic is the unfiltered
>>> > >> write ahead log that represents many writers (brokers) trying to
>>> > >> acquire ownership of bundles. The second topic is the distilled log
>>> > >> that represents the "winners" or the "owners" of the bundles. There
>>> is
>>> > >> a single writer, the leader broker, that reads from the input topic
>>> > >> and writes to the output topic. The first topic is normal and the
>>> > >> second is compacted.
>>> > >>
>>> > >> The primary benefit in a two topic solution is that it is easy for
>>> the
>>> > >> leader broker to trade off ownership without needing to slow down
>>> > >> writes to the input topic. The leader broker will start consuming
>>> from
>>> > >> the input topic when it has fully consumed the table view on the
>>> > >> output topic. In general, I don't think consumers know when they
>>> have
>>> > >> "reached the end of a table view", but we should be able to
>>> trivially
>>> > >> figure this out if we are the topic's only writer and the topic and
>>> > >> writer are collocated on the same broker.
>>> > >>
>>> > >> In that design, it might make sense to use something like the
>>> > >> replication cursor to keep track of this consumer's state.
>>> > >>
>>> > >> - Michael
>>> > >>
>>> > >> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall <
>>> mmarsh...@apache.org>
>>> > >> wrote:
>>> > >> >
>>> > >> > Thanks for your proposal, Heesung.
>>> > >> >
>>> > >> > Fundamentally, we have the problems listed in this PIP because we
>>> have
>>> > >> > multiple writers instead of just one writer. Can we solve this
>>> problem
>>> > >> > by changing our write pattern? What if we use the leader broker
>>> as the
>>> > >> > single writer? That broker would intercept attempts to acquire
>>> > >> > ownership on bundles and would grant ownership to the first
>>> broker to
>>> > >> > claim an unassigned bundle. It could "grant ownership" by letting
>>> the
>>> > >> > first write to claim an unassigned bundle get written to the
>>> ownership
>>> > >> > topic. When a bundle is already owned, the leader won't persist
>>> that
>>> > >> > event to the bookkeeper. In this design, the log becomes a true
>>> > >> > ownership log, which will correctly work with the existing topic
>>> > >> > compaction and table view solutions. My proposal essentially
>>> moves the
>>> > >> > conflict resolution to just before the write, and as a
>>> consequence, it
>>> > >> > greatly reduces the need for post processing of the event log. One
>>> > >> > trade off might be that the leader broker could slow down the
>>> write
>>> > >> > path, but given that the leader would just need to verify the
>>> current
>>> > >> > state of the bundle, I think it'd be performant enough.
>>> > >> >
>>> > >> > Additionally, we'd need the leader broker to be "caught up" on
>>> bundle
>>> > >> > ownership in order to grant ownership of topics, but unless I am
>>> > >> > mistaken, that is already a requirement of the current PIP 192
>>> > >> > paradigm.
>>> > >> >
>>> > >> > Below are some additional thoughts that will be relevant if we
>>> move
>>> > >> > forward with the design as it is currently proposed.
>>> > >> >
>>> > >> > I think it might be helpful to update the title to show that this
>>> > >> > proposal will also affect table view as well. I didn't catch that
>>> at
>>> > >> > first.
>>> > >> >
>>> > >> > Do you have any documentation describing how the
>>> > >> > TopicCompactionStrategy will determine which states are valid in
>>> the
>>> > >> > context of load balancing? I looked at
>>> > >> > https://github.com/apache/pulsar/pull/18195, but I couldn't seem
>>> to
>>> > >> > find anything for it. That would help make this proposal less
>>> > >> > abstract.
>>> > >> >
>>> > >> > The proposed API seems very tied to the needs of PIP 192. For
>>> example,
>>> > >> > `isValid` is not a term I associate with topic compaction. The
>>> > >> > fundamental question for compaction is which value to keep (or
>>> build a
>>> > >> > new value). I think we might be able to simplify the API by
>>> replacing
>>> > >> > the "isValid", "isMergeEnabled", and "merge" methods with a single
>>> > >> > method that lets the implementation handle one or all tasks. That
>>> > >> > would also remove the need to deserialize payloads multiple times
>>> too.
>>> > >> >
>>> > >> > I also feel like mentioning that after working with the PIP 105
>>> broker
>>> > >> > side filtering, I think we should avoid running UDFs in the
>>> broker as
>>> > >> > much as possible. (I do not consider the load balancing logic to
>>> be a
>>> > >> > UDF here.) I think it would be worth not making this a user facing
>>> > >> > feature unless there is demand for real use cases.
>>> > >> >
>>> > >> > Thanks!
>>> > >> > Michael
>>> > >> >
>>> > >> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bog...@apache.org> wrote:
>>> > >> > >
>>> > >> > > +1(non-binding)
>>> > >> > >
>>> > >> > > thanks,
>>> > >> > > bo
>>> > >> > >
>>> > >> > > Heesung Sohn <heesung.s...@streamnative.io.invalid>
>>> 于2022年10月19日周三
>>> > >> 07:54写道:
>>> > >> > > >
>>> > >> > > > Hi pulsar-dev community,
>>> > >> > > >
>>> > >> > > > I raised a pip to discuss : PIP-215: Configurable Topic
>>> Compaction
>>> > >> Strategy
>>> > >> > > >
>>> > >> > > > PIP link: https://github.com/apache/pulsar/issues/18099
>>> > >> > > >
>>> > >> > > > Regards,
>>> > >> > > > Heesung
>>> > >>
>>> > >
>>>
>>

Reply via email to