Hi Machel, Here are my additional comments regarding your earlier email.
- I updated the PIP title to show that this will impact table view as well. - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the general idea of the states and their actions, and I defined the actual states here in the PR, https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a. I will further clarify the bundle state data validation logic when introducing `BundleStateCompactionStrategy` class. This PIP is to support CompactionStrategy in general. - Agreed with your point that we should merge CompactionStrategy APIs. I updated the interface proposal in the PIP. I replaced `"isValid", "isMergeEnabled", and "merge"` apis with "compact" api. Thanks, Heesung On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn <heesung.s...@streamnative.io> wrote: > Hi, > Thank you for the great comments. > Please find my comments inline too. > > Regards, > Heesung > > On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall <mmarsh...@apache.org> > wrote: > >> > I think we lose a single linearized view. >> >> Which linearized view are we losing, and what is the role of that >> linearized view? I think I might be missing why it is important. I >> agree that consumers won't know about each unsuccessful attempted >> acquisition of a bundle, but that seems like unnecessary information >> to broadcast to every broker in the cluster. >> >> > > PIP-192 proposes an assignment, transfer, and split protocol(multi-phase > state changes), relying on early broadcast across brokers, and all brokers > react to their clients according to the state change notifications -- > brokers could defer any client lookups for bundle x if an > assignment/transfer/split is ongoing for x(broadcasted early in the topic). > One early broadcast example is the one that I discussed above, `When the > topic broadcast is faster than the concurrent assignment requests.` I think > the prefilter could delay this early broadcast, as it needs to go through > the additional single-leader compaction path. > > The bundle state recovery process is simpler by a single linearized view. > > The single linearized view can be easier to debug bundle states. We can > more easily track where the assignment requests come from and how it is > compacted in a single linearized view. > > > >> > I think the leader requires a write-through cache to compact messages >> based >> > on the latest states. >> > This brings up an important point that I would like to clarify. If we >> trust the write ahead log as the source of truth, what happens when a >> bundle has been validly owned by multiple brokers? As a broker starts >> and consumes from the compacted topic, how do we prevent it from >> incorrectly thinking that it owns a bundle for some short time period >> in the case that the ownership topic hasn't yet been compacted to >> remove old ownership state? >> > > Since the multi-phase transfer protocol involves the source and > destination broker's actions, the successful transfer should get the source > and destination broker to have the (near) latest state. For example, if > some brokers have old ownership states(network partitioned or delayed), > they will redirect clients to the source(old) broker. However, by the > transfer protocol, the source broker should have the latest state, so it > can redirect the client again to the destination broker. > > When a broker restarts, it won't start until its BSC state to the (near) > latest (til the last known messageId at that time). > > >> > Pulsar guarantees "a single writer". >> >> I didn't think we were using a single writer in the PIP 192 design. I >> thought we had many producers sending events to a compacted topic. My >> proposal would still have many producers, but the writer to bookkeeper >> would act as the single writer. It would technically be distinct from >> a normal Pulsar topic producer. >> >> I should highlight that I am only proposing "broker filtering before >> write" in the context of PIP 192 and as an alternative to adding >> pluggable compaction strategies. It would not be a generic feature. >> >> > I was worried about the worst case where two producers(leaders) happen to > write the compacted topic (although Pulsar can guarantee "a single writer" > or "a single producer" for a topic in normal situations). > > > >> > Could we clarify how to handle >> > the following(edge cases and failure recovery)? >> > 0. Is the un-compacted topic a persistent topic or a non-persistent >> topic? >> >> It is a persistent topic. >> >> > 1. How does the leader recover state from the two topics? >> >> A leader would recover state by first consuming the whole compacted >> topic and then by consuming from the current location of a cursor on >> the first input topic. As stated elsewhere, this introduces latency >> and could be an issue. >> >> > 2. How do we handle the case when the leader fails before writing >> messages >> > to the compacted topic >> >> The leader would not acknowledge the message on the input topic until >> it has successfully persisted the event on the compacted topic. >> Publishing the same event to a compacted topic multiple times is >> idempotent, so there is no risk of lost state. The real risk is >> latency. However, I think we might have similar (though not the same) >> latency risks in the current solution. >> >> > Analysis: the "pre-filter + two-topics" option can reduce the number of >> > messages to broadcast at the expense of the leader broker compaction. >> >> My primary point is that with this PIP's design, the filter logic is >> run on every broker and again during topic compaction. With the >> alternative design, the filter is run once. >> >> Thank you for the clarification. > > I think the difference is that the post-filter is an optimistic approach > as it optimistically relies on the "broadcast-filter" effect(brokers will > defer client lookups if notified ahead that any assignment is ongoing for > bundle x). Yes, in the worst case, if the broadcast is slower, each broker > needs to individually compact the conflicting assignment requests. > > Conversely, one downside of the pessimistic approach (single leader > pre-filter) is that when there are not many conflict concurrent assignment > requests(assign for bundle a, assign for bundle b, assign for bundle c...), > the requests need to redundantly go through the leader compaction. > > > >> > 3. initially less complex to implement (leaderless conflict resolution >> and >> > requires a single topic) >> >> PIP 215 has its own complexity too. Coordinating filters >> on both the client (table view) and the server (compaction) is non >> trivial. The proposed API includes hard coded client configuration for >> each component, which will make upgrading the version of the >> compaction strategy complicated, and could lead to incorrect >> interpretation of events in the stream. When a single broker is doing >> the filtering, versioning is no longer a distributed problem. That >> being said, I do not mean to suggest my solution is without >> complexity. >> >> > 4. it is not a "one-way door" decision (we could add the pre-filter >> logic >> > as well later) >> >> It's fair to say that we could add it later, but at that point, we >> will have added this new API for compaction strategy. Are we confident >> that pluggable compaction is independently an important addition to >> Pulsar's >> features, or would it make sense to make this API only exposed in the >> broker? >> >> > The intention is that this compaction feature could be useful for complex > user applications (if they are trying to do a similar thing). As I > mentioned, this feature is closely tied to the PIP-192 now. We are not > planning to expose this feature to users soon unless demanded and proven to > be stable. > > >> Thanks, >> Michael >> >> >> >> >> >> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn >> <heesung.s...@streamnative.io.invalid> wrote: >> > >> > Hi, >> > >> > Also, I thought about some concurrent assignment scenarios between >> > pre-filter vs post-filter. >> > >> > Example 1: When the topic broadcast is slower than the concurrent >> > assignment requests >> > >> > With pre-filter + two-topics (non-compacted and compacted topics) >> > t1: A -> non-compacted topic // broker A published a message to the >> > non-compacted topic, m1: {broker A assigned bundle x to broker A} >> > t2: B -> non-compacted topic // broker B published a message, m2: >> {broker B >> > assigned bundle x to broker C} >> > t3: C -> non-compacted topic // broker C published a message, m3: >> {broker C >> > assigned bundle x to broker B} >> > t4: non-compacted topic -> L // leader broker consumed the messages: >> m1,m2, >> > and m3 >> > t5: L -> compacted topic // leader compacted the messages and >> broadcasted >> > m1 to all consumers >> > t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1 >> > >> > With post-filter + a single topic >> > t1: A -> topic // broker A published a message to the non-compacted >> topic, >> > m1: {broker A assigned bundle x to broker A} >> > t2: B -> topic // broker B published a message, m2: {broker B assigned >> > bundle x to broker C} >> > t3: C -> topic // broker C published a message, m3: {broker C assigned >> > bundle x to broker B} >> > t4: topic -> [A,B,C] // broker A,B,C consumed the messages: m1,m2, and >> m3 >> > t5: [A,B,C] -> m1 // broker A,B,C individually compacted the messages >> to m1. >> > >> > Analysis: the "pre-filter + two-topics" option can reduce the number of >> > messages to broadcast at the expense of the leader broker compaction. >> > >> > >> > Example 2: When the topic broadcast is faster than the concurrent >> > assignment requests >> > >> > With pre-filter + two-topics (non-compacted and compacted topics) >> > t1: A -> non-compacted topic // broker A published a message to the >> > non-compacted topic, m1: {broker A assigned bundle x to broker A} >> > t2: non-compacted topic -> L // leader broker consumed the messages: m1 >> > t3: L -> compacted topic // leader compacted the message and >> broadcasted m1 >> > to all consumers >> > t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1 >> > t5: A-> own bundle // broker A knows that its assignment has been >> accepted, >> > so proceeding to own the bundle (meanwhile deferring lookup requests) >> > t6: B -> defer client lookups // broker B knows that bundle assignment >> is >> > running(meanwhile deferring lookup requests) >> > t7: C -> defer client lookups // broker C knows that bundle assignment >> is >> > running(meanwhile deferring lookup requests) >> > >> > With post-filter + a single topic >> > t1: A -> topic // broker A published a message to the non-compacted >> topic, >> > m1: {broker A assigned bundle x to broker A} >> > t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1 >> > t3: A-> own bundle // broker A knows that its assignment has been >> > accepted, so proceeding to own the bundle (meanwhile deferring lookup >> > requests) >> > t4: B -> defer client lookups // broker B knows that bundle assignment >> is >> > running(meanwhile deferring lookup requests) >> > t5: C -> defer client lookups // broker C knows that bundle assignment >> is >> > running(meanwhile deferring lookup requests) >> > >> > Analysis: The "post-filter + a single topic" can perform ok in this case >> > without the additional leader coordination and the secondary topic >> because >> > the early broadcast can inform all brokers and prevent them from >> requesting >> > other assignments for the same bundle. >> > >> > I think the post-filter option is initially not bad because: >> > >> > 1. it is safe in the worst case (in case the messages are not correctly >> > pre-filtered at the leader) >> > 2. it performs ok because the early broadcast can prevent >> > concurrent assignment requests. >> > 3. initially less complex to implement (leaderless conflict resolution >> and >> > requires a single topic) >> > 4. it is not a "one-way door" decision (we could add the pre-filter >> logic >> > as well later) >> > >> > Regards, >> > Heesung >> > >> > >> > >> > On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn < >> heesung.s...@streamnative.io> >> > wrote: >> > >> > > Hi Michael, >> > > >> > > For the pre-prefilter(pre-compaction) option, >> > > I think the leader requires a write-through cache to compact messages >> > > based on the latest states. Otherwise, the leader needs to wait for >> the >> > > last msg from the (compacted) topic before compacting the next msg >> for the >> > > same bundle. >> > > >> > > Pulsar guarantees "a single writer". However, for the worst-case >> > > scenario(due to network partitions, bugs in zk or etcd leader >> election, >> > > bugs in bk, data corruption ), I think it is safe to place the >> post-filter >> > > on the consumer side(compaction and table views) as well in order to >> > > validate the state changes. >> > > >> > > For the two-topic approach, >> > > I think we lose a single linearized view. Could we clarify how to >> handle >> > > the following(edge cases and failure recovery)? >> > > 0. Is the un-compacted topic a persistent topic or a non-persistent >> topic? >> > > 1. How does the leader recover state from the two topics? >> > > 2. How do we handle the case when the leader fails before writing >> messages >> > > to the compacted topic >> > > >> > > Regards, >> > > Heesung >> > > >> > > On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall < >> mmarsh...@apache.org> >> > > wrote: >> > > >> > >> Sharing some more thoughts. We could alternatively use two topics >> > >> instead of one. In this design, the first topic is the unfiltered >> > >> write ahead log that represents many writers (brokers) trying to >> > >> acquire ownership of bundles. The second topic is the distilled log >> > >> that represents the "winners" or the "owners" of the bundles. There >> is >> > >> a single writer, the leader broker, that reads from the input topic >> > >> and writes to the output topic. The first topic is normal and the >> > >> second is compacted. >> > >> >> > >> The primary benefit in a two topic solution is that it is easy for >> the >> > >> leader broker to trade off ownership without needing to slow down >> > >> writes to the input topic. The leader broker will start consuming >> from >> > >> the input topic when it has fully consumed the table view on the >> > >> output topic. In general, I don't think consumers know when they have >> > >> "reached the end of a table view", but we should be able to trivially >> > >> figure this out if we are the topic's only writer and the topic and >> > >> writer are collocated on the same broker. >> > >> >> > >> In that design, it might make sense to use something like the >> > >> replication cursor to keep track of this consumer's state. >> > >> >> > >> - Michael >> > >> >> > >> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall < >> mmarsh...@apache.org> >> > >> wrote: >> > >> > >> > >> > Thanks for your proposal, Heesung. >> > >> > >> > >> > Fundamentally, we have the problems listed in this PIP because we >> have >> > >> > multiple writers instead of just one writer. Can we solve this >> problem >> > >> > by changing our write pattern? What if we use the leader broker as >> the >> > >> > single writer? That broker would intercept attempts to acquire >> > >> > ownership on bundles and would grant ownership to the first broker >> to >> > >> > claim an unassigned bundle. It could "grant ownership" by letting >> the >> > >> > first write to claim an unassigned bundle get written to the >> ownership >> > >> > topic. When a bundle is already owned, the leader won't persist >> that >> > >> > event to the bookkeeper. In this design, the log becomes a true >> > >> > ownership log, which will correctly work with the existing topic >> > >> > compaction and table view solutions. My proposal essentially moves >> the >> > >> > conflict resolution to just before the write, and as a >> consequence, it >> > >> > greatly reduces the need for post processing of the event log. One >> > >> > trade off might be that the leader broker could slow down the write >> > >> > path, but given that the leader would just need to verify the >> current >> > >> > state of the bundle, I think it'd be performant enough. >> > >> > >> > >> > Additionally, we'd need the leader broker to be "caught up" on >> bundle >> > >> > ownership in order to grant ownership of topics, but unless I am >> > >> > mistaken, that is already a requirement of the current PIP 192 >> > >> > paradigm. >> > >> > >> > >> > Below are some additional thoughts that will be relevant if we move >> > >> > forward with the design as it is currently proposed. >> > >> > >> > >> > I think it might be helpful to update the title to show that this >> > >> > proposal will also affect table view as well. I didn't catch that >> at >> > >> > first. >> > >> > >> > >> > Do you have any documentation describing how the >> > >> > TopicCompactionStrategy will determine which states are valid in >> the >> > >> > context of load balancing? I looked at >> > >> > https://github.com/apache/pulsar/pull/18195, but I couldn't seem >> to >> > >> > find anything for it. That would help make this proposal less >> > >> > abstract. >> > >> > >> > >> > The proposed API seems very tied to the needs of PIP 192. For >> example, >> > >> > `isValid` is not a term I associate with topic compaction. The >> > >> > fundamental question for compaction is which value to keep (or >> build a >> > >> > new value). I think we might be able to simplify the API by >> replacing >> > >> > the "isValid", "isMergeEnabled", and "merge" methods with a single >> > >> > method that lets the implementation handle one or all tasks. That >> > >> > would also remove the need to deserialize payloads multiple times >> too. >> > >> > >> > >> > I also feel like mentioning that after working with the PIP 105 >> broker >> > >> > side filtering, I think we should avoid running UDFs in the broker >> as >> > >> > much as possible. (I do not consider the load balancing logic to >> be a >> > >> > UDF here.) I think it would be worth not making this a user facing >> > >> > feature unless there is demand for real use cases. >> > >> > >> > >> > Thanks! >> > >> > Michael >> > >> > >> > >> > On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bog...@apache.org> wrote: >> > >> > > >> > >> > > +1(non-binding) >> > >> > > >> > >> > > thanks, >> > >> > > bo >> > >> > > >> > >> > > Heesung Sohn <heesung.s...@streamnative.io.invalid> >> 于2022年10月19日周三 >> > >> 07:54写道: >> > >> > > > >> > >> > > > Hi pulsar-dev community, >> > >> > > > >> > >> > > > I raised a pip to discuss : PIP-215: Configurable Topic >> Compaction >> > >> Strategy >> > >> > > > >> > >> > > > PIP link: https://github.com/apache/pulsar/issues/18099 >> > >> > > > >> > >> > > > Regards, >> > >> > > > Heesung >> > >> >> > > >> >