Hi, I have a different thought about my previous comment.
- Agreed with your point that we should merge CompactionStrategy APIs. I updated the interface proposal in the PIP. I replaced `"isValid", "isMergeEnabled", and "merge"` apis with "compact" api. boolean isValid(T prev, T cur) boolean isMergeEnabled() T merge(T prev, T cur) => T compact(T prev, T cur) In fact, with the `compact(T prev, T cur)` api only, it is not clear if prev => cur is a valid transition or not(if invalid, we should filter out the cur message instead of further compacting/merging). I think we still need to keep the `isValid()` and `merge()` separated. Regarding redundant deserialization, the input type `T` is the type of message value, so the input values are already deserialized. We don't want to expose the Message<T> interface in this CompactionStrategy to avoid message serialization/deserialization dependencies in the CompactionStrategy. The `merge()` functionality is suggested for more complex use cases (merge values instead of just filtering), and to support this `merge()`, we need to internally create a new msg with the compacted value, metadata, and messageId copies. We could initially define `isValid()` only in CompactionStrategy, and add `isMergeEnabled() and merge()` later in the CompactionStrategy interface if requested. Regards, Heesung On Thu, Nov 3, 2022 at 9:50 AM Heesung Sohn <heesung.s...@streamnative.io> wrote: > Oops! Michael, I apologize for the typo in your name. > > On Thu, Nov 3, 2022 at 9:47 AM Heesung Sohn <heesung.s...@streamnative.io> > wrote: > >> Hi Machel, >> >> Here are my additional comments regarding your earlier email. >> >> - I updated the PIP title to show that this will impact table view as >> well. >> >> - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the >> general idea of the states and their actions, and I defined the actual >> states here in the PR, >> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a. >> I >> will further clarify the bundle state data validation logic when >> introducing `BundleStateCompactionStrategy` class. This PIP is to support >> CompactionStrategy in general. >> >> - Agreed with your point that we should merge CompactionStrategy APIs. I >> updated the interface proposal in the PIP. I replaced `"isValid", >> "isMergeEnabled", and "merge"` apis with "compact" api. >> >> >> Thanks, >> Heesung >> >> >> On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn < >> heesung.s...@streamnative.io> wrote: >> >>> Hi, >>> Thank you for the great comments. >>> Please find my comments inline too. >>> >>> Regards, >>> Heesung >>> >>> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <mmarsh...@apache.org> >>> wrote: >>> >>>> > I think we lose a single linearized view. >>>> >>>> Which linearized view are we losing, and what is the role of that >>>> linearized view? I think I might be missing why it is important. I >>>> agree that consumers won't know about each unsuccessful attempted >>>> acquisition of a bundle, but that seems like unnecessary information >>>> to broadcast to every broker in the cluster. >>>> >>>> >>> >>> PIP-192 proposes an assignment, transfer, and split protocol(multi-phase >>> state changes), relying on early broadcast across brokers, and all brokers >>> react to their clients according to the state change notifications -- >>> brokers could defer any client lookups for bundle x if an >>> assignment/transfer/split is ongoing for x(broadcasted early in the topic). >>> One early broadcast example is the one that I discussed above, `When the >>> topic broadcast is faster than the concurrent assignment requests.` I think >>> the prefilter could delay this early broadcast, as it needs to go through >>> the additional single-leader compaction path. >>> >>> The bundle state recovery process is simpler by a single linearized view. >>> >>> The single linearized view can be easier to debug bundle states. We can >>> more easily track where the assignment requests come from and how it is >>> compacted in a single linearized view. >>> >>> >>> >>>> > I think the leader requires a write-through cache to compact messages >>>> based >>>> > on the latest states. >>>> >>> This brings up an important point that I would like to clarify. If we >>>> trust the write ahead log as the source of truth, what happens when a >>>> bundle has been validly owned by multiple brokers? As a broker starts >>>> and consumes from the compacted topic, how do we prevent it from >>>> incorrectly thinking that it owns a bundle for some short time period >>>> in the case that the ownership topic hasn't yet been compacted to >>>> remove old ownership state? >>>> >>> >>> Since the multi-phase transfer protocol involves the source and >>> destination broker's actions, the successful transfer should get the source >>> and destination broker to have the (near) latest state. For example, if >>> some brokers have old ownership states(network partitioned or delayed), >>> they will redirect clients to the source(old) broker. However, by the >>> transfer protocol, the source broker should have the latest state, so it >>> can redirect the client again to the destination broker. >>> >>> When a broker restarts, it won't start until its BSC state to the (near) >>> latest (til the last known messageId at that time). >>> >>> >>>> > Pulsar guarantees "a single writer". >>>> >>>> I didn't think we were using a single writer in the PIP 192 design. I >>>> thought we had many producers sending events to a compacted topic. My >>>> proposal would still have many producers, but the writer to bookkeeper >>>> would act as the single writer. It would technically be distinct from >>>> a normal Pulsar topic producer. >>>> >>>> I should highlight that I am only proposing "broker filtering before >>>> write" in the context of PIP 192 and as an alternative to adding >>>> pluggable compaction strategies. It would not be a generic feature. >>>> >>>> >>> I was worried about the worst case where two producers(leaders) happen >>> to write the compacted topic (although Pulsar can guarantee "a single >>> writer" or "a single producer" for a topic in normal situations). >>> >>> >>> >>>> > Could we clarify how to handle >>>> > the following(edge cases and failure recovery)? >>>> > 0. Is the un-compacted topic a persistent topic or a non-persistent >>>> topic? >>>> >>>> It is a persistent topic. >>>> >>>> > 1. How does the leader recover state from the two topics? >>>> >>>> A leader would recover state by first consuming the whole compacted >>>> topic and then by consuming from the current location of a cursor on >>>> the first input topic. As stated elsewhere, this introduces latency >>>> and could be an issue. >>>> >>>> > 2. How do we handle the case when the leader fails before writing >>>> messages >>>> > to the compacted topic >>>> >>>> The leader would not acknowledge the message on the input topic until >>>> it has successfully persisted the event on the compacted topic. >>>> Publishing the same event to a compacted topic multiple times is >>>> idempotent, so there is no risk of lost state. The real risk is >>>> latency. However, I think we might have similar (though not the same) >>>> latency risks in the current solution. >>>> >>>> > Analysis: the "pre-filter + two-topics" option can reduce the number >>>> of >>>> > messages to broadcast at the expense of the leader broker compaction. >>>> >>>> My primary point is that with this PIP's design, the filter logic is >>>> run on every broker and again during topic compaction. With the >>>> alternative design, the filter is run once. >>>> >>>> Thank you for the clarification. >>> >>> I think the difference is that the post-filter is an optimistic approach >>> as it optimistically relies on the "broadcast-filter" effect(brokers will >>> defer client lookups if notified ahead that any assignment is ongoing for >>> bundle x). Yes, in the worst case, if the broadcast is slower, each broker >>> needs to individually compact the conflicting assignment requests. >>> >>> Conversely, one downside of the pessimistic approach (single leader >>> pre-filter) is that when there are not many conflict concurrent assignment >>> requests(assign for bundle a, assign for bundle b, assign for bundle c...), >>> the requests need to redundantly go through the leader compaction. >>> >>> >>> >>>> > 3. initially less complex to implement (leaderless conflict >>>> resolution and >>>> > requires a single topic) >>>> >>>> PIP 215 has its own complexity too. Coordinating filters >>>> on both the client (table view) and the server (compaction) is non >>>> trivial. The proposed API includes hard coded client configuration for >>>> each component, which will make upgrading the version of the >>>> compaction strategy complicated, and could lead to incorrect >>>> interpretation of events in the stream. When a single broker is doing >>>> the filtering, versioning is no longer a distributed problem. That >>>> being said, I do not mean to suggest my solution is without >>>> complexity. >>>> >>>> > 4. it is not a "one-way door" decision (we could add the pre-filter >>>> logic >>>> > as well later) >>>> >>>> It's fair to say that we could add it later, but at that point, we >>>> will have added this new API for compaction strategy. Are we confident >>>> that pluggable compaction is independently an important addition to >>>> Pulsar's >>>> features, or would it make sense to make this API only exposed in the >>>> broker? >>>> >>>> >>> The intention is that this compaction feature could be useful for >>> complex user applications (if they are trying to do a similar thing). As I >>> mentioned, this feature is closely tied to the PIP-192 now. We are not >>> planning to expose this feature to users soon unless demanded and proven to >>> be stable. >>> >>> >>>> Thanks, >>>> Michael >>>> >>>> >>>> >>>> >>>> >>>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn >>>> <heesung.s...@streamnative.io.invalid> wrote: >>>> > >>>> > Hi, >>>> > >>>> > Also, I thought about some concurrent assignment scenarios between >>>> > pre-filter vs post-filter. >>>> > >>>> > Example 1: When the topic broadcast is slower than the concurrent >>>> > assignment requests >>>> > >>>> > With pre-filter + two-topics (non-compacted and compacted topics) >>>> > t1: A -> non-compacted topic // broker A published a message to the >>>> > non-compacted topic, m1: {broker A assigned bundle x to broker A} >>>> > t2: B -> non-compacted topic // broker B published a message, m2: >>>> {broker B >>>> > assigned bundle x to broker C} >>>> > t3: C -> non-compacted topic // broker C published a message, m3: >>>> {broker C >>>> > assigned bundle x to broker B} >>>> > t4: non-compacted topic -> L // leader broker consumed the messages: >>>> m1,m2, >>>> > and m3 >>>> > t5: L -> compacted topic // leader compacted the messages and >>>> broadcasted >>>> > m1 to all consumers >>>> > t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1 >>>> > >>>> > With post-filter + a single topic >>>> > t1: A -> topic // broker A published a message to the non-compacted >>>> topic, >>>> > m1: {broker A assigned bundle x to broker A} >>>> > t2: B -> topic // broker B published a message, m2: {broker B assigned >>>> > bundle x to broker C} >>>> > t3: C -> topic // broker C published a message, m3: {broker C assigned >>>> > bundle x to broker B} >>>> > t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2, >>>> and m3 >>>> > t5: [A,B,C] -> m1 // broker A,B,C individually compacted the messages >>>> to m1. >>>> > >>>> > Analysis: the "pre-filter + two-topics" option can reduce the number >>>> of >>>> > messages to broadcast at the expense of the leader broker compaction. >>>> > >>>> > >>>> > Example 2: When the topic broadcast is faster than the concurrent >>>> > assignment requests >>>> > >>>> > With pre-filter + two-topics (non-compacted and compacted topics) >>>> > t1: A -> non-compacted topic // broker A published a message to the >>>> > non-compacted topic, m1: {broker A assigned bundle x to broker A} >>>> > t2: non-compacted topic -> L // leader broker consumed the messages: >>>> m1 >>>> > t3: L -> compacted topic // leader compacted the message and >>>> broadcasted m1 >>>> > to all consumers >>>> > t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1 >>>> > t5: A-> own bundle // broker A knows that its assignment has been >>>> accepted, >>>> > so proceeding to own the bundle (meanwhile deferring lookup requests) >>>> > t6: B -> defer client lookups // broker B knows that bundle >>>> assignment is >>>> > running(meanwhile deferring lookup requests) >>>> > t7: C -> defer client lookups // broker C knows that bundle >>>> assignment is >>>> > running(meanwhile deferring lookup requests) >>>> > >>>> > With post-filter + a single topic >>>> > t1: A -> topic // broker A published a message to the non-compacted >>>> topic, >>>> > m1: {broker A assigned bundle x to broker A} >>>> > t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1 >>>> > t3: A-> own bundle // broker A knows that its assignment has been >>>> > accepted, so proceeding to own the bundle (meanwhile deferring lookup >>>> > requests) >>>> > t4: B -> defer client lookups // broker B knows that bundle >>>> assignment is >>>> > running(meanwhile deferring lookup requests) >>>> > t5: C -> defer client lookups // broker C knows that bundle >>>> assignment is >>>> > running(meanwhile deferring lookup requests) >>>> > >>>> > Analysis: The "post-filter + a single topic" can perform ok in this >>>> case >>>> > without the additional leader coordination and the secondary topic >>>> because >>>> > the early broadcast can inform all brokers and prevent them from >>>> requesting >>>> > other assignments for the same bundle. >>>> > >>>> > I think the post-filter option is initially not bad because: >>>> > >>>> > 1. it is safe in the worst case (in case the messages are not >>>> correctly >>>> > pre-filtered at the leader) >>>> > 2. it performs ok because the early broadcast can prevent >>>> > concurrent assignment requests. >>>> > 3. initially less complex to implement (leaderless conflict >>>> resolution and >>>> > requires a single topic) >>>> > 4. it is not a "one-way door" decision (we could add the pre-filter >>>> logic >>>> > as well later) >>>> > >>>> > Regards, >>>> > Heesung >>>> > >>>> > >>>> > >>>> > On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn < >>>> heesung.s...@streamnative.io> >>>> > wrote: >>>> > >>>> > > Hi Michael, >>>> > > >>>> > > For the pre-prefilter(pre-compaction) option, >>>> > > I think the leader requires a write-through cache to compact >>>> messages >>>> > > based on the latest states. Otherwise, the leader needs to wait for >>>> the >>>> > > last msg from the (compacted) topic before compacting the next msg >>>> for the >>>> > > same bundle. >>>> > > >>>> > > Pulsar guarantees "a single writer". However, for the worst-case >>>> > > scenario(due to network partitions, bugs in zk or etcd leader >>>> election, >>>> > > bugs in bk, data corruption ), I think it is safe to place the >>>> post-filter >>>> > > on the consumer side(compaction and table views) as well in order to >>>> > > validate the state changes. >>>> > > >>>> > > For the two-topic approach, >>>> > > I think we lose a single linearized view. Could we clarify how to >>>> handle >>>> > > the following(edge cases and failure recovery)? >>>> > > 0. Is the un-compacted topic a persistent topic or a non-persistent >>>> topic? >>>> > > 1. How does the leader recover state from the two topics? >>>> > > 2. How do we handle the case when the leader fails before writing >>>> messages >>>> > > to the compacted topic >>>> > > >>>> > > Regards, >>>> > > Heesung >>>> > > >>>> > > On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall < >>>> mmarsh...@apache.org> >>>> > > wrote: >>>> > > >>>> > >> Sharing some more thoughts. We could alternatively use two topics >>>> > >> instead of one. In this design, the first topic is the unfiltered >>>> > >> write ahead log that represents many writers (brokers) trying to >>>> > >> acquire ownership of bundles. The second topic is the distilled log >>>> > >> that represents the "winners" or the "owners" of the bundles. >>>> There is >>>> > >> a single writer, the leader broker, that reads from the input topic >>>> > >> and writes to the output topic. The first topic is normal and the >>>> > >> second is compacted. >>>> > >> >>>> > >> The primary benefit in a two topic solution is that it is easy for >>>> the >>>> > >> leader broker to trade off ownership without needing to slow down >>>> > >> writes to the input topic. The leader broker will start consuming >>>> from >>>> > >> the input topic when it has fully consumed the table view on the >>>> > >> output topic. In general, I don't think consumers know when they >>>> have >>>> > >> "reached the end of a table view", but we should be able to >>>> trivially >>>> > >> figure this out if we are the topic's only writer and the topic and >>>> > >> writer are collocated on the same broker. >>>> > >> >>>> > >> In that design, it might make sense to use something like the >>>> > >> replication cursor to keep track of this consumer's state. >>>> > >> >>>> > >> - Michael >>>> > >> >>>> > >> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall < >>>> mmarsh...@apache.org> >>>> > >> wrote: >>>> > >> > >>>> > >> > Thanks for your proposal, Heesung. >>>> > >> > >>>> > >> > Fundamentally, we have the problems listed in this PIP because >>>> we have >>>> > >> > multiple writers instead of just one writer. Can we solve this >>>> problem >>>> > >> > by changing our write pattern? What if we use the leader broker >>>> as the >>>> > >> > single writer? That broker would intercept attempts to acquire >>>> > >> > ownership on bundles and would grant ownership to the first >>>> broker to >>>> > >> > claim an unassigned bundle. It could "grant ownership" by >>>> letting the >>>> > >> > first write to claim an unassigned bundle get written to the >>>> ownership >>>> > >> > topic. When a bundle is already owned, the leader won't persist >>>> that >>>> > >> > event to the bookkeeper. In this design, the log becomes a true >>>> > >> > ownership log, which will correctly work with the existing topic >>>> > >> > compaction and table view solutions. My proposal essentially >>>> moves the >>>> > >> > conflict resolution to just before the write, and as a >>>> consequence, it >>>> > >> > greatly reduces the need for post processing of the event log. >>>> One >>>> > >> > trade off might be that the leader broker could slow down the >>>> write >>>> > >> > path, but given that the leader would just need to verify the >>>> current >>>> > >> > state of the bundle, I think it'd be performant enough. >>>> > >> > >>>> > >> > Additionally, we'd need the leader broker to be "caught up" on >>>> bundle >>>> > >> > ownership in order to grant ownership of topics, but unless I am >>>> > >> > mistaken, that is already a requirement of the current PIP 192 >>>> > >> > paradigm. >>>> > >> > >>>> > >> > Below are some additional thoughts that will be relevant if we >>>> move >>>> > >> > forward with the design as it is currently proposed. >>>> > >> > >>>> > >> > I think it might be helpful to update the title to show that this >>>> > >> > proposal will also affect table view as well. I didn't catch >>>> that at >>>> > >> > first. >>>> > >> > >>>> > >> > Do you have any documentation describing how the >>>> > >> > TopicCompactionStrategy will determine which states are valid in >>>> the >>>> > >> > context of load balancing? I looked at >>>> > >> > https://github.com/apache/pulsar/pull/18195, but I couldn't >>>> seem to >>>> > >> > find anything for it. That would help make this proposal less >>>> > >> > abstract. >>>> > >> > >>>> > >> > The proposed API seems very tied to the needs of PIP 192. For >>>> example, >>>> > >> > `isValid` is not a term I associate with topic compaction. The >>>> > >> > fundamental question for compaction is which value to keep (or >>>> build a >>>> > >> > new value). I think we might be able to simplify the API by >>>> replacing >>>> > >> > the "isValid", "isMergeEnabled", and "merge" methods with a >>>> single >>>> > >> > method that lets the implementation handle one or all tasks. That >>>> > >> > would also remove the need to deserialize payloads multiple >>>> times too. >>>> > >> > >>>> > >> > I also feel like mentioning that after working with the PIP 105 >>>> broker >>>> > >> > side filtering, I think we should avoid running UDFs in the >>>> broker as >>>> > >> > much as possible. (I do not consider the load balancing logic to >>>> be a >>>> > >> > UDF here.) I think it would be worth not making this a user >>>> facing >>>> > >> > feature unless there is demand for real use cases. >>>> > >> > >>>> > >> > Thanks! >>>> > >> > Michael >>>> > >> > >>>> > >> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bog...@apache.org> wrote: >>>> > >> > > >>>> > >> > > +1(non-binding) >>>> > >> > > >>>> > >> > > thanks, >>>> > >> > > bo >>>> > >> > > >>>> > >> > > Heesung Sohn <heesung.s...@streamnative.io.invalid> >>>> 于2022年10月19日周三 >>>> > >> 07:54写道: >>>> > >> > > > >>>> > >> > > > Hi pulsar-dev community, >>>> > >> > > > >>>> > >> > > > I raised a pip to discuss : PIP-215: Configurable Topic >>>> Compaction >>>> > >> Strategy >>>> > >> > > > >>>> > >> > > > PIP link: https://github.com/apache/pulsar/issues/18099 >>>> > >> > > > >>>> > >> > > > Regards, >>>> > >> > > > Heesung >>>> > >> >>>> > > >>>> >>>