Oops! Michael, I apologize for the typo in your name. On Thu, Nov 3, 2022 at 9:47 AM Heesung Sohn <heesung.s...@streamnative.io> wrote:
> Hi Machel, > > Here are my additional comments regarding your earlier email. > > - I updated the PIP title to show that this will impact table view as well. > > - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the > general idea of the states and their actions, and I defined the actual > states here in the PR, > https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a. > I > will further clarify the bundle state data validation logic when > introducing `BundleStateCompactionStrategy` class. This PIP is to support > CompactionStrategy in general. > > - Agreed with your point that we should merge CompactionStrategy APIs. I > updated the interface proposal in the PIP. I replaced `"isValid", > "isMergeEnabled", and "merge"` apis with "compact" api. > > > Thanks, > Heesung > > > On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <heesung.s...@streamnative.io> > wrote: > >> Hi, >> Thank you for the great comments. >> Please find my comments inline too. >> >> Regards, >> Heesung >> >> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <mmarsh...@apache.org> >> wrote: >> >>> > I think we lose a single linearized view. >>> >>> Which linearized view are we losing, and what is the role of that >>> linearized view? I think I might be missing why it is important. I >>> agree that consumers won't know about each unsuccessful attempted >>> acquisition of a bundle, but that seems like unnecessary information >>> to broadcast to every broker in the cluster. >>> >>> >> >> PIP-192 proposes an assignment, transfer, and split protocol(multi-phase >> state changes), relying on early broadcast across brokers, and all brokers >> react to their clients according to the state change notifications -- >> brokers could defer any client lookups for bundle x if an >> assignment/transfer/split is ongoing for x(broadcasted early in the topic). >> One early broadcast example is the one that I discussed above, `When the >> topic broadcast is faster than the concurrent assignment requests.` I think >> the prefilter could delay this early broadcast, as it needs to go through >> the additional single-leader compaction path. >> >> The bundle state recovery process is simpler by a single linearized view. >> >> The single linearized view can be easier to debug bundle states. We can >> more easily track where the assignment requests come from and how it is >> compacted in a single linearized view. >> >> >> >>> > I think the leader requires a write-through cache to compact messages >>> based >>> > on the latest states. >>> >> This brings up an important point that I would like to clarify. If we >>> trust the write ahead log as the source of truth, what happens when a >>> bundle has been validly owned by multiple brokers? As a broker starts >>> and consumes from the compacted topic, how do we prevent it from >>> incorrectly thinking that it owns a bundle for some short time period >>> in the case that the ownership topic hasn't yet been compacted to >>> remove old ownership state? >>> >> >> Since the multi-phase transfer protocol involves the source and >> destination broker's actions, the successful transfer should get the source >> and destination broker to have the (near) latest state. For example, if >> some brokers have old ownership states(network partitioned or delayed), >> they will redirect clients to the source(old) broker. However, by the >> transfer protocol, the source broker should have the latest state, so it >> can redirect the client again to the destination broker. >> >> When a broker restarts, it won't start until its BSC state to the (near) >> latest (til the last known messageId at that time). >> >> >>> > Pulsar guarantees "a single writer". >>> >>> I didn't think we were using a single writer in the PIP 192 design. I >>> thought we had many producers sending events to a compacted topic. My >>> proposal would still have many producers, but the writer to bookkeeper >>> would act as the single writer. It would technically be distinct from >>> a normal Pulsar topic producer. >>> >>> I should highlight that I am only proposing "broker filtering before >>> write" in the context of PIP 192 and as an alternative to adding >>> pluggable compaction strategies. It would not be a generic feature. >>> >>> >> I was worried about the worst case where two producers(leaders) happen to >> write the compacted topic (although Pulsar can guarantee "a single writer" >> or "a single producer" for a topic in normal situations). >> >> >> >>> > Could we clarify how to handle >>> > the following(edge cases and failure recovery)? >>> > 0. Is the un-compacted topic a persistent topic or a non-persistent >>> topic? >>> >>> It is a persistent topic. >>> >>> > 1. How does the leader recover state from the two topics? >>> >>> A leader would recover state by first consuming the whole compacted >>> topic and then by consuming from the current location of a cursor on >>> the first input topic. As stated elsewhere, this introduces latency >>> and could be an issue. >>> >>> > 2. How do we handle the case when the leader fails before writing >>> messages >>> > to the compacted topic >>> >>> The leader would not acknowledge the message on the input topic until >>> it has successfully persisted the event on the compacted topic. >>> Publishing the same event to a compacted topic multiple times is >>> idempotent, so there is no risk of lost state. The real risk is >>> latency. However, I think we might have similar (though not the same) >>> latency risks in the current solution. >>> >>> > Analysis: the "pre-filter + two-topics" option can reduce the number of >>> > messages to broadcast at the expense of the leader broker compaction. >>> >>> My primary point is that with this PIP's design, the filter logic is >>> run on every broker and again during topic compaction. With the >>> alternative design, the filter is run once. >>> >>> Thank you for the clarification. >> >> I think the difference is that the post-filter is an optimistic approach >> as it optimistically relies on the "broadcast-filter" effect(brokers will >> defer client lookups if notified ahead that any assignment is ongoing for >> bundle x). Yes, in the worst case, if the broadcast is slower, each broker >> needs to individually compact the conflicting assignment requests. >> >> Conversely, one downside of the pessimistic approach (single leader >> pre-filter) is that when there are not many conflict concurrent assignment >> requests(assign for bundle a, assign for bundle b, assign for bundle c...), >> the requests need to redundantly go through the leader compaction. >> >> >> >>> > 3. initially less complex to implement (leaderless conflict resolution >>> and >>> > requires a single topic) >>> >>> PIP 215 has its own complexity too. Coordinating filters >>> on both the client (table view) and the server (compaction) is non >>> trivial. The proposed API includes hard coded client configuration for >>> each component, which will make upgrading the version of the >>> compaction strategy complicated, and could lead to incorrect >>> interpretation of events in the stream. When a single broker is doing >>> the filtering, versioning is no longer a distributed problem. That >>> being said, I do not mean to suggest my solution is without >>> complexity. >>> >>> > 4. it is not a "one-way door" decision (we could add the pre-filter >>> logic >>> > as well later) >>> >>> It's fair to say that we could add it later, but at that point, we >>> will have added this new API for compaction strategy. Are we confident >>> that pluggable compaction is independently an important addition to >>> Pulsar's >>> features, or would it make sense to make this API only exposed in the >>> broker? >>> >>> >> The intention is that this compaction feature could be useful for complex >> user applications (if they are trying to do a similar thing). As I >> mentioned, this feature is closely tied to the PIP-192 now. We are not >> planning to expose this feature to users soon unless demanded and proven to >> be stable. >> >> >>> Thanks, >>> Michael >>> >>> >>> >>> >>> >>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn >>> <heesung.s...@streamnative.io.invalid> wrote: >>> > >>> > Hi, >>> > >>> > Also, I thought about some concurrent assignment scenarios between >>> > pre-filter vs post-filter. >>> > >>> > Example 1: When the topic broadcast is slower than the concurrent >>> > assignment requests >>> > >>> > With pre-filter + two-topics (non-compacted and compacted topics) >>> > t1: A -> non-compacted topic // broker A published a message to the >>> > non-compacted topic, m1: {broker A assigned bundle x to broker A} >>> > t2: B -> non-compacted topic // broker B published a message, m2: >>> {broker B >>> > assigned bundle x to broker C} >>> > t3: C -> non-compacted topic // broker C published a message, m3: >>> {broker C >>> > assigned bundle x to broker B} >>> > t4: non-compacted topic -> L // leader broker consumed the messages: >>> m1,m2, >>> > and m3 >>> > t5: L -> compacted topic // leader compacted the messages and >>> broadcasted >>> > m1 to all consumers >>> > t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1 >>> > >>> > With post-filter + a single topic >>> > t1: A -> topic // broker A published a message to the non-compacted >>> topic, >>> > m1: {broker A assigned bundle x to broker A} >>> > t2: B -> topic // broker B published a message, m2: {broker B assigned >>> > bundle x to broker C} >>> > t3: C -> topic // broker C published a message, m3: {broker C assigned >>> > bundle x to broker B} >>> > t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2, and >>> m3 >>> > t5: [A,B,C] -> m1 // broker A,B,C individually compacted the messages >>> to m1. >>> > >>> > Analysis: the "pre-filter + two-topics" option can reduce the number of >>> > messages to broadcast at the expense of the leader broker compaction. >>> > >>> > >>> > Example 2: When the topic broadcast is faster than the concurrent >>> > assignment requests >>> > >>> > With pre-filter + two-topics (non-compacted and compacted topics) >>> > t1: A -> non-compacted topic // broker A published a message to the >>> > non-compacted topic, m1: {broker A assigned bundle x to broker A} >>> > t2: non-compacted topic -> L // leader broker consumed the messages: m1 >>> > t3: L -> compacted topic // leader compacted the message and >>> broadcasted m1 >>> > to all consumers >>> > t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1 >>> > t5: A-> own bundle // broker A knows that its assignment has been >>> accepted, >>> > so proceeding to own the bundle (meanwhile deferring lookup requests) >>> > t6: B -> defer client lookups // broker B knows that bundle assignment >>> is >>> > running(meanwhile deferring lookup requests) >>> > t7: C -> defer client lookups // broker C knows that bundle assignment >>> is >>> > running(meanwhile deferring lookup requests) >>> > >>> > With post-filter + a single topic >>> > t1: A -> topic // broker A published a message to the non-compacted >>> topic, >>> > m1: {broker A assigned bundle x to broker A} >>> > t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1 >>> > t3: A-> own bundle // broker A knows that its assignment has been >>> > accepted, so proceeding to own the bundle (meanwhile deferring lookup >>> > requests) >>> > t4: B -> defer client lookups // broker B knows that bundle assignment >>> is >>> > running(meanwhile deferring lookup requests) >>> > t5: C -> defer client lookups // broker C knows that bundle assignment >>> is >>> > running(meanwhile deferring lookup requests) >>> > >>> > Analysis: The "post-filter + a single topic" can perform ok in this >>> case >>> > without the additional leader coordination and the secondary topic >>> because >>> > the early broadcast can inform all brokers and prevent them from >>> requesting >>> > other assignments for the same bundle. >>> > >>> > I think the post-filter option is initially not bad because: >>> > >>> > 1. it is safe in the worst case (in case the messages are not correctly >>> > pre-filtered at the leader) >>> > 2. it performs ok because the early broadcast can prevent >>> > concurrent assignment requests. >>> > 3. initially less complex to implement (leaderless conflict resolution >>> and >>> > requires a single topic) >>> > 4. it is not a "one-way door" decision (we could add the pre-filter >>> logic >>> > as well later) >>> > >>> > Regards, >>> > Heesung >>> > >>> > >>> > >>> > On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn < >>> heesung.s...@streamnative.io> >>> > wrote: >>> > >>> > > Hi Michael, >>> > > >>> > > For the pre-prefilter(pre-compaction) option, >>> > > I think the leader requires a write-through cache to compact messages >>> > > based on the latest states. Otherwise, the leader needs to wait for >>> the >>> > > last msg from the (compacted) topic before compacting the next msg >>> for the >>> > > same bundle. >>> > > >>> > > Pulsar guarantees "a single writer". However, for the worst-case >>> > > scenario(due to network partitions, bugs in zk or etcd leader >>> election, >>> > > bugs in bk, data corruption ), I think it is safe to place the >>> post-filter >>> > > on the consumer side(compaction and table views) as well in order to >>> > > validate the state changes. >>> > > >>> > > For the two-topic approach, >>> > > I think we lose a single linearized view. Could we clarify how to >>> handle >>> > > the following(edge cases and failure recovery)? >>> > > 0. Is the un-compacted topic a persistent topic or a non-persistent >>> topic? >>> > > 1. How does the leader recover state from the two topics? >>> > > 2. How do we handle the case when the leader fails before writing >>> messages >>> > > to the compacted topic >>> > > >>> > > Regards, >>> > > Heesung >>> > > >>> > > On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall < >>> mmarsh...@apache.org> >>> > > wrote: >>> > > >>> > >> Sharing some more thoughts. We could alternatively use two topics >>> > >> instead of one. In this design, the first topic is the unfiltered >>> > >> write ahead log that represents many writers (brokers) trying to >>> > >> acquire ownership of bundles. The second topic is the distilled log >>> > >> that represents the "winners" or the "owners" of the bundles. There >>> is >>> > >> a single writer, the leader broker, that reads from the input topic >>> > >> and writes to the output topic. The first topic is normal and the >>> > >> second is compacted. >>> > >> >>> > >> The primary benefit in a two topic solution is that it is easy for >>> the >>> > >> leader broker to trade off ownership without needing to slow down >>> > >> writes to the input topic. The leader broker will start consuming >>> from >>> > >> the input topic when it has fully consumed the table view on the >>> > >> output topic. In general, I don't think consumers know when they >>> have >>> > >> "reached the end of a table view", but we should be able to >>> trivially >>> > >> figure this out if we are the topic's only writer and the topic and >>> > >> writer are collocated on the same broker. >>> > >> >>> > >> In that design, it might make sense to use something like the >>> > >> replication cursor to keep track of this consumer's state. >>> > >> >>> > >> - Michael >>> > >> >>> > >> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall < >>> mmarsh...@apache.org> >>> > >> wrote: >>> > >> > >>> > >> > Thanks for your proposal, Heesung. >>> > >> > >>> > >> > Fundamentally, we have the problems listed in this PIP because we >>> have >>> > >> > multiple writers instead of just one writer. Can we solve this >>> problem >>> > >> > by changing our write pattern? What if we use the leader broker >>> as the >>> > >> > single writer? That broker would intercept attempts to acquire >>> > >> > ownership on bundles and would grant ownership to the first >>> broker to >>> > >> > claim an unassigned bundle. It could "grant ownership" by letting >>> the >>> > >> > first write to claim an unassigned bundle get written to the >>> ownership >>> > >> > topic. When a bundle is already owned, the leader won't persist >>> that >>> > >> > event to the bookkeeper. In this design, the log becomes a true >>> > >> > ownership log, which will correctly work with the existing topic >>> > >> > compaction and table view solutions. My proposal essentially >>> moves the >>> > >> > conflict resolution to just before the write, and as a >>> consequence, it >>> > >> > greatly reduces the need for post processing of the event log. One >>> > >> > trade off might be that the leader broker could slow down the >>> write >>> > >> > path, but given that the leader would just need to verify the >>> current >>> > >> > state of the bundle, I think it'd be performant enough. >>> > >> > >>> > >> > Additionally, we'd need the leader broker to be "caught up" on >>> bundle >>> > >> > ownership in order to grant ownership of topics, but unless I am >>> > >> > mistaken, that is already a requirement of the current PIP 192 >>> > >> > paradigm. >>> > >> > >>> > >> > Below are some additional thoughts that will be relevant if we >>> move >>> > >> > forward with the design as it is currently proposed. >>> > >> > >>> > >> > I think it might be helpful to update the title to show that this >>> > >> > proposal will also affect table view as well. I didn't catch that >>> at >>> > >> > first. >>> > >> > >>> > >> > Do you have any documentation describing how the >>> > >> > TopicCompactionStrategy will determine which states are valid in >>> the >>> > >> > context of load balancing? I looked at >>> > >> > https://github.com/apache/pulsar/pull/18195, but I couldn't seem >>> to >>> > >> > find anything for it. That would help make this proposal less >>> > >> > abstract. >>> > >> > >>> > >> > The proposed API seems very tied to the needs of PIP 192. For >>> example, >>> > >> > `isValid` is not a term I associate with topic compaction. The >>> > >> > fundamental question for compaction is which value to keep (or >>> build a >>> > >> > new value). I think we might be able to simplify the API by >>> replacing >>> > >> > the "isValid", "isMergeEnabled", and "merge" methods with a single >>> > >> > method that lets the implementation handle one or all tasks. That >>> > >> > would also remove the need to deserialize payloads multiple times >>> too. >>> > >> > >>> > >> > I also feel like mentioning that after working with the PIP 105 >>> broker >>> > >> > side filtering, I think we should avoid running UDFs in the >>> broker as >>> > >> > much as possible. (I do not consider the load balancing logic to >>> be a >>> > >> > UDF here.) I think it would be worth not making this a user facing >>> > >> > feature unless there is demand for real use cases. >>> > >> > >>> > >> > Thanks! >>> > >> > Michael >>> > >> > >>> > >> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bog...@apache.org> wrote: >>> > >> > > >>> > >> > > +1(non-binding) >>> > >> > > >>> > >> > > thanks, >>> > >> > > bo >>> > >> > > >>> > >> > > Heesung Sohn <heesung.s...@streamnative.io.invalid> >>> 于2022年10月19日周三 >>> > >> 07:54写道: >>> > >> > > > >>> > >> > > > Hi pulsar-dev community, >>> > >> > > > >>> > >> > > > I raised a pip to discuss : PIP-215: Configurable Topic >>> Compaction >>> > >> Strategy >>> > >> > > > >>> > >> > > > PIP link: https://github.com/apache/pulsar/issues/18099 >>> > >> > > > >>> > >> > > > Regards, >>> > >> > > > Heesung >>> > >> >>> > > >>> >>