Hi, The vote is open in this email thread.
https://lists.apache.org/thread/6y8407pw4fv21n2n0cbrvsspg5tok0h7 Regards, Heesung On Fri, Nov 4, 2022 at 3:07 PM Heesung Sohn <heesung.s...@streamnative.io> wrote: > Hi, > > > I would like to understand what the performance tradeoffs are for the > changes that you (as an individual) and others are proposing. > The problem that this PIP is trying to solve is how to support custom > topic compaction logic. (currently, Pulsar only takes the latest message in > the topic compaction and table-view(consumers)) > > As Michael and I discussed in the above emails, in the worst case, when > there are many conflicting messages, this PIP can incur more repeated > custom compaction than the alternative as individual consumers need to > compact messages (topic compaction and table views). However, one of the > advantages of this proposal is that pub/sub is faster since it uses a > single topic. For example, in PIP-192, if the "bundle assignment" broadcast > is fast enough, conflicting bundle assignment requests can be > reduced(broadcast filter effect). > > > Post-Compaction(this PIP proposes) > > - - Producers publish messages to a single topic. > - - All consumers individually run the custom compaction logic when > consuming the topic (by table-view). > - - Compactor needs to run the custom compaction logic during > compaction. > > > > The alternative that Michael proposed is instead compacting messages at > the earlier stage by a single writer, using two topics. > Pre-Compaction(the alternative that Michael proposes) > > - - Producers publish messages to a non-compacted topic first. > - - Only the leader consumes this non-compacted topic and runs the > custom compaction logic. > - - Then, the leader publishes compacted messages to the compacted > topic(resolve conflicts by the single writer). > - - All consumers consume the compacted topic. (no need to compact the > messages separately on the consumer side) > - - Compactor does not need to run the custom compaction logic during > compaction. > > > > > It really seems that you are proposing a change to the default behavior > whether or not a user chooses to use the interface in PIP-192. > The pip does not change the default behavior of compaction and table-view. > I updated the goals in the PIP to clarify this. > > Thanks, > Heesung > > > On Fri, Nov 4, 2022 at 11:11 AM Dave Fisher <w...@apache.org> wrote: > >> >> >> > On Nov 4, 2022, at 10:28 AM, Heesung Sohn >> > <heesung.s...@streamnative.io.INVALID> >> wrote: >> > >> > Hi, >> > >> > I think `bool shouldKeepLeft(T prev, T cur)` is clearer. I updated the >> PIP. >> > >> > Hopefully, we provided enough context about this PIP and the design >> > trade-off as well. >> >> I would like to understand what the performance tradeoffs are for the >> changes that you (as an individual) and others are proposing. >> >> > Goal >> > >> > • Create another Topic compactor, StrategicTwoPhaseCompactor, >> where we can configure a compaction strategy, >> > TopicCompactionStrategy >> > >> > • Update the TableViewConfigurationData to load and consider the >> TopicCompactionStrategy when updating the internal key-value map in >> TableView. >> > >> > • Add TopicCompactionStrategy in Topic-level Policy to run >> StrategicTwoPhaseCompactor instead of TwoPhaseCompactor when executing >> compaction. >> >> It really seems that you are proposing a change to the default behavior >> whether or not a user chooses to use the interface in PIP-192. >> >> > >> > I will send out a vote email soon. >> > >> > Thank you, >> > Heesung >> > >> > >> > >> > >> > >> > >> > On Thu, Nov 3, 2022 at 9:59 PM Michael Marshall <mmarsh...@apache.org> >> > wrote: >> > >> >> Thank you for your detailed responses, Heesung. >> >> >> >>> We are not planning to expose this feature to users >> >>> soon unless demanded and proven to be stable. >> >> >> >> In that case, I think we should move forward with this PIP. I have a >> >> different opinion about the trade offs for the two designs, but none >> >> of my concerns are problems that could not be solved later if we >> >> encounter problems. >> >> >> >> Just to say it succinctly, my concern is that broadcasting all >> >> attempts to acquire ownership of every unclaimed bundle to all brokers >> >> will generate a lot of unnecessary traffic. >> >> >> >>> >> >> >> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a >> >> >> >> Thank you for this reference. I missed it. That is great documentation! >> >> >> >>> In fact, with the `compact(T prev, T cur)` api only, it is not clear >> if >> >>> prev => cur is a valid transition or not(if invalid, we should filter >> out >> >>> the cur message instead of further compacting/merging). I think we >> still >> >>> need to keep the `isValid()` and `merge()` separated. >> >> >> >> I was thinking that the result of `compact` would be the result put in >> >> the table view or written to the compacted topic. The one issue might >> >> be about keeping the memory utilization down for use cases that are >> >> not updating the message's value but are only selecting "left" or >> >> "right". I thought we could infer when to keep the message id vs keep >> >> the message value, but that might be easy to implement. >> >> >> >> My final critique is that I think `isValid` could have a better name. >> >> In the event this does become a public API, I don't think all use >> >> cases will think about which event should be persisted in terms of >> >> validity. >> >> >> >> The main name that comes to my mind is `bool shouldKeepLeft(T prev, T >> >> cur)`. When true, prev wins. When false, cur wins. That nomenclature >> >> comes from Akka Streams. It's not perfect, but it is easy to infer >> >> what the result will do. >> >> >> >>> Regarding redundant deserialization, the input type `T` is the type of >> >>> message value, so the input values are already deserialized. >> >> >> >> Great, I should have realized that. That takes care of that concern. >> >> >> >> Thanks, >> >> Michael >> >> >> >> On Thu, Nov 3, 2022 at 7:37 PM Heesung Sohn >> >> <heesung.s...@streamnative.io.invalid> wrote: >> >>> >> >>> Hi, >> >>> >> >>> I have a different thought about my previous comment. >> >>> >> >>> - Agreed with your point that we should merge CompactionStrategy >> APIs. I >> >>> updated the interface proposal in the PIP. I replaced `"isValid", >> >>> "isMergeEnabled", and "merge"` apis with "compact" api. >> >>> >> >>> boolean isValid(T prev, T cur) >> >>> boolean isMergeEnabled() >> >>> T merge(T prev, T cur) >> >>> >> >>> => >> >>> >> >>> T compact(T prev, T cur) >> >>> >> >>> In fact, with the `compact(T prev, T cur)` api only, it is not clear >> if >> >>> prev => cur is a valid transition or not(if invalid, we should filter >> out >> >>> the cur message instead of further compacting/merging). I think we >> still >> >>> need to keep the `isValid()` and `merge()` separated. >> >>> >> >>> Regarding redundant deserialization, the input type `T` is the type of >> >>> message value, so the input values are already deserialized. We don't >> >> want >> >>> to expose the Message<T> interface in this CompactionStrategy to avoid >> >>> message serialization/deserialization dependencies in the >> >>> CompactionStrategy. >> >>> >> >>> The `merge()` functionality is suggested for more complex use cases >> >> (merge >> >>> values instead of just filtering), and to support this `merge()`, we >> need >> >>> to internally create a new msg with the compacted value, metadata, and >> >>> messageId copies. We could initially define `isValid()` only in >> >>> CompactionStrategy, and add `isMergeEnabled() and merge()` later in >> the >> >>> CompactionStrategy interface if requested. >> >>> >> >>> Regards, >> >>> Heesung >> >>> >> >>> >> >>> On Thu, Nov 3, 2022 at 9:50 AM Heesung Sohn < >> >> heesung.s...@streamnative.io> >> >>> wrote: >> >>> >> >>>> Oops! Michael, I apologize for the typo in your name. >> >>>> >> >>>> On Thu, Nov 3, 2022 at 9:47 AM Heesung Sohn < >> >> heesung.s...@streamnative.io> >> >>>> wrote: >> >>>> >> >>>>> Hi Machel, >> >>>>> >> >>>>> Here are my additional comments regarding your earlier email. >> >>>>> >> >>>>> - I updated the PIP title to show that this will impact table view >> as >> >>>>> well. >> >>>>> >> >>>>> - PIP-192 : https://github.com/apache/pulsar/issues/16691 shows the >> >>>>> general idea of the states and their actions, and I defined the >> actual >> >>>>> states here in the PR, >> >>>>> >> >> >> https://github.com/apache/pulsar/pull/18079/files#diff-7f9930a5c7896b411f61901cf38371e23ba69e753f460bf7f520f6f800d8321a >> . >> >> I >> >>>>> will further clarify the bundle state data validation logic when >> >>>>> introducing `BundleStateCompactionStrategy` class. This PIP is to >> >> support >> >>>>> CompactionStrategy in general. >> >>>>> >> >>>>> - Agreed with your point that we should merge CompactionStrategy >> >> APIs. I >> >>>>> updated the interface proposal in the PIP. I replaced `"isValid", >> >>>>> "isMergeEnabled", and "merge"` apis with "compact" api. >> >>>>> >> >>>>> >> >>>>> Thanks, >> >>>>> Heesung >> >>>>> >> >>>>> >> >>>>> On Tue, Nov 1, 2022 at 11:26 AM Heesung Sohn < >> >>>>> heesung.s...@streamnative.io> wrote: >> >>>>> >> >>>>>> Hi, >> >>>>>> Thank you for the great comments. >> >>>>>> Please find my comments inline too. >> >>>>>> >> >>>>>> Regards, >> >>>>>> Heesung >> >>>>>> >> >>>>>> On Mon, Oct 31, 2022 at 10:59 PM Michael Marshall < >> >> mmarsh...@apache.org> >> >>>>>> wrote: >> >>>>>> >> >>>>>>>> I think we lose a single linearized view. >> >>>>>>> >> >>>>>>> Which linearized view are we losing, and what is the role of that >> >>>>>>> linearized view? I think I might be missing why it is important. I >> >>>>>>> agree that consumers won't know about each unsuccessful attempted >> >>>>>>> acquisition of a bundle, but that seems like unnecessary >> information >> >>>>>>> to broadcast to every broker in the cluster. >> >>>>>>> >> >>>>>>> >> >>>>>> >> >>>>>> PIP-192 proposes an assignment, transfer, and split >> >> protocol(multi-phase >> >>>>>> state changes), relying on early broadcast across brokers, and all >> >> brokers >> >>>>>> react to their clients according to the state change notifications >> -- >> >>>>>> brokers could defer any client lookups for bundle x if an >> >>>>>> assignment/transfer/split is ongoing for x(broadcasted early in the >> >> topic). >> >>>>>> One early broadcast example is the one that I discussed above, >> `When >> >> the >> >>>>>> topic broadcast is faster than the concurrent assignment requests.` >> >> I think >> >>>>>> the prefilter could delay this early broadcast, as it needs to go >> >> through >> >>>>>> the additional single-leader compaction path. >> >>>>>> >> >>>>>> The bundle state recovery process is simpler by a single linearized >> >> view. >> >>>>>> >> >>>>>> The single linearized view can be easier to debug bundle states. We >> >> can >> >>>>>> more easily track where the assignment requests come from and how >> it >> >> is >> >>>>>> compacted in a single linearized view. >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>>>> I think the leader requires a write-through cache to compact >> >> messages >> >>>>>>> based >> >>>>>>>> on the latest states. >> >>>>>>> >> >>>>>> This brings up an important point that I would like to clarify. If >> we >> >>>>>>> trust the write ahead log as the source of truth, what happens >> when >> >> a >> >>>>>>> bundle has been validly owned by multiple brokers? As a broker >> >> starts >> >>>>>>> and consumes from the compacted topic, how do we prevent it from >> >>>>>>> incorrectly thinking that it owns a bundle for some short time >> >> period >> >>>>>>> in the case that the ownership topic hasn't yet been compacted to >> >>>>>>> remove old ownership state? >> >>>>>>> >> >>>>>> >> >>>>>> Since the multi-phase transfer protocol involves the source and >> >>>>>> destination broker's actions, the successful transfer should get >> the >> >> source >> >>>>>> and destination broker to have the (near) latest state. For >> example, >> >> if >> >>>>>> some brokers have old ownership states(network partitioned or >> >> delayed), >> >>>>>> they will redirect clients to the source(old) broker. However, by >> the >> >>>>>> transfer protocol, the source broker should have the latest state, >> >> so it >> >>>>>> can redirect the client again to the destination broker. >> >>>>>> >> >>>>>> When a broker restarts, it won't start until its BSC state to the >> >> (near) >> >>>>>> latest (til the last known messageId at that time). >> >>>>>> >> >>>>>> >> >>>>>>>> Pulsar guarantees "a single writer". >> >>>>>>> >> >>>>>>> I didn't think we were using a single writer in the PIP 192 >> design. >> >> I >> >>>>>>> thought we had many producers sending events to a compacted topic. >> >> My >> >>>>>>> proposal would still have many producers, but the writer to >> >> bookkeeper >> >>>>>>> would act as the single writer. It would technically be distinct >> >> from >> >>>>>>> a normal Pulsar topic producer. >> >>>>>>> >> >>>>>>> I should highlight that I am only proposing "broker filtering >> before >> >>>>>>> write" in the context of PIP 192 and as an alternative to adding >> >>>>>>> pluggable compaction strategies. It would not be a generic >> feature. >> >>>>>>> >> >>>>>>> >> >>>>>> I was worried about the worst case where two producers(leaders) >> >> happen >> >>>>>> to write the compacted topic (although Pulsar can guarantee "a >> single >> >>>>>> writer" or "a single producer" for a topic in normal situations). >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>>>> Could we clarify how to handle >> >>>>>>>> the following(edge cases and failure recovery)? >> >>>>>>>> 0. Is the un-compacted topic a persistent topic or a >> >> non-persistent >> >>>>>>> topic? >> >>>>>>> >> >>>>>>> It is a persistent topic. >> >>>>>>> >> >>>>>>>> 1. How does the leader recover state from the two topics? >> >>>>>>> >> >>>>>>> A leader would recover state by first consuming the whole >> compacted >> >>>>>>> topic and then by consuming from the current location of a cursor >> on >> >>>>>>> the first input topic. As stated elsewhere, this introduces >> latency >> >>>>>>> and could be an issue. >> >>>>>>> >> >>>>>>>> 2. How do we handle the case when the leader fails before writing >> >>>>>>> messages >> >>>>>>>> to the compacted topic >> >>>>>>> >> >>>>>>> The leader would not acknowledge the message on the input topic >> >> until >> >>>>>>> it has successfully persisted the event on the compacted topic. >> >>>>>>> Publishing the same event to a compacted topic multiple times is >> >>>>>>> idempotent, so there is no risk of lost state. The real risk is >> >>>>>>> latency. However, I think we might have similar (though not the >> >> same) >> >>>>>>> latency risks in the current solution. >> >>>>>>> >> >>>>>>>> Analysis: the "pre-filter + two-topics" option can reduce the >> >> number >> >>>>>>> of >> >>>>>>>> messages to broadcast at the expense of the leader broker >> >> compaction. >> >>>>>>> >> >>>>>>> My primary point is that with this PIP's design, the filter logic >> is >> >>>>>>> run on every broker and again during topic compaction. With the >> >>>>>>> alternative design, the filter is run once. >> >>>>>>> >> >>>>>>> Thank you for the clarification. >> >>>>>> >> >>>>>> I think the difference is that the post-filter is an optimistic >> >> approach >> >>>>>> as it optimistically relies on the "broadcast-filter" >> effect(brokers >> >> will >> >>>>>> defer client lookups if notified ahead that any assignment is >> >> ongoing for >> >>>>>> bundle x). Yes, in the worst case, if the broadcast is slower, each >> >> broker >> >>>>>> needs to individually compact the conflicting assignment requests. >> >>>>>> >> >>>>>> Conversely, one downside of the pessimistic approach (single leader >> >>>>>> pre-filter) is that when there are not many conflict concurrent >> >> assignment >> >>>>>> requests(assign for bundle a, assign for bundle b, assign for >> bundle >> >> c...), >> >>>>>> the requests need to redundantly go through the leader compaction. >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>>>> 3. initially less complex to implement (leaderless conflict >> >>>>>>> resolution and >> >>>>>>>> requires a single topic) >> >>>>>>> >> >>>>>>> PIP 215 has its own complexity too. Coordinating filters >> >>>>>>> on both the client (table view) and the server (compaction) is non >> >>>>>>> trivial. The proposed API includes hard coded client configuration >> >> for >> >>>>>>> each component, which will make upgrading the version of the >> >>>>>>> compaction strategy complicated, and could lead to incorrect >> >>>>>>> interpretation of events in the stream. When a single broker is >> >> doing >> >>>>>>> the filtering, versioning is no longer a distributed problem. That >> >>>>>>> being said, I do not mean to suggest my solution is without >> >>>>>>> complexity. >> >>>>>>> >> >>>>>>>> 4. it is not a "one-way door" decision (we could add the >> >> pre-filter >> >>>>>>> logic >> >>>>>>>> as well later) >> >>>>>>> >> >>>>>>> It's fair to say that we could add it later, but at that point, we >> >>>>>>> will have added this new API for compaction strategy. Are we >> >> confident >> >>>>>>> that pluggable compaction is independently an important addition >> to >> >>>>>>> Pulsar's >> >>>>>>> features, or would it make sense to make this API only exposed in >> >> the >> >>>>>>> broker? >> >>>>>>> >> >>>>>>> >> >>>>>> The intention is that this compaction feature could be useful for >> >>>>>> complex user applications (if they are trying to do a similar >> >> thing). As I >> >>>>>> mentioned, this feature is closely tied to the PIP-192 now. We are >> >> not >> >>>>>> planning to expose this feature to users soon unless demanded and >> >> proven to >> >>>>>> be stable. >> >>>>>> >> >>>>>> >> >>>>>>> Thanks, >> >>>>>>> Michael >> >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> On Sat, Oct 29, 2022 at 10:13 PM Heesung Sohn >> >>>>>>> <heesung.s...@streamnative.io.invalid> wrote: >> >>>>>>>> >> >>>>>>>> Hi, >> >>>>>>>> >> >>>>>>>> Also, I thought about some concurrent assignment scenarios >> between >> >>>>>>>> pre-filter vs post-filter. >> >>>>>>>> >> >>>>>>>> Example 1: When the topic broadcast is slower than the concurrent >> >>>>>>>> assignment requests >> >>>>>>>> >> >>>>>>>> With pre-filter + two-topics (non-compacted and compacted topics) >> >>>>>>>> t1: A -> non-compacted topic // broker A published a message to >> >> the >> >>>>>>>> non-compacted topic, m1: {broker A assigned bundle x to broker A} >> >>>>>>>> t2: B -> non-compacted topic // broker B published a message, m2: >> >>>>>>> {broker B >> >>>>>>>> assigned bundle x to broker C} >> >>>>>>>> t3: C -> non-compacted topic // broker C published a message, m3: >> >>>>>>> {broker C >> >>>>>>>> assigned bundle x to broker B} >> >>>>>>>> t4: non-compacted topic -> L // leader broker consumed the >> >> messages: >> >>>>>>> m1,m2, >> >>>>>>>> and m3 >> >>>>>>>> t5: L -> compacted topic // leader compacted the messages and >> >>>>>>> broadcasted >> >>>>>>>> m1 to all consumers >> >>>>>>>> t6: compacted topic-> [A,B,C] // broker A,B,C consumed m1 >> >>>>>>>> >> >>>>>>>> With post-filter + a single topic >> >>>>>>>> t1: A -> topic // broker A published a message to the >> >> non-compacted >> >>>>>>> topic, >> >>>>>>>> m1: {broker A assigned bundle x to broker A} >> >>>>>>>> t2: B -> topic // broker B published a message, m2: {broker B >> >> assigned >> >>>>>>>> bundle x to broker C} >> >>>>>>>> t3: C -> topic // broker C published a message, m3: {broker C >> >> assigned >> >>>>>>>> bundle x to broker B} >> >>>>>>>> t4: topic -> [A,B,C] // broker A,B,C consumed the messages: >> m1,m2, >> >>>>>>> and m3 >> >>>>>>>> t5: [A,B,C] -> m1 // broker A,B,C individually compacted the >> >> messages >> >>>>>>> to m1. >> >>>>>>>> >> >>>>>>>> Analysis: the "pre-filter + two-topics" option can reduce the >> >> number >> >>>>>>> of >> >>>>>>>> messages to broadcast at the expense of the leader broker >> >> compaction. >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> Example 2: When the topic broadcast is faster than the concurrent >> >>>>>>>> assignment requests >> >>>>>>>> >> >>>>>>>> With pre-filter + two-topics (non-compacted and compacted topics) >> >>>>>>>> t1: A -> non-compacted topic // broker A published a message to >> >> the >> >>>>>>>> non-compacted topic, m1: {broker A assigned bundle x to broker A} >> >>>>>>>> t2: non-compacted topic -> L // leader broker consumed the >> >> messages: >> >>>>>>> m1 >> >>>>>>>> t3: L -> compacted topic // leader compacted the message and >> >>>>>>> broadcasted m1 >> >>>>>>>> to all consumers >> >>>>>>>> t4: compacted topic-> [A,B,C] // broker A,B,C consumed m1 >> >>>>>>>> t5: A-> own bundle // broker A knows that its assignment has been >> >>>>>>> accepted, >> >>>>>>>> so proceeding to own the bundle (meanwhile deferring lookup >> >> requests) >> >>>>>>>> t6: B -> defer client lookups // broker B knows that bundle >> >>>>>>> assignment is >> >>>>>>>> running(meanwhile deferring lookup requests) >> >>>>>>>> t7: C -> defer client lookups // broker C knows that bundle >> >>>>>>> assignment is >> >>>>>>>> running(meanwhile deferring lookup requests) >> >>>>>>>> >> >>>>>>>> With post-filter + a single topic >> >>>>>>>> t1: A -> topic // broker A published a message to the >> >> non-compacted >> >>>>>>> topic, >> >>>>>>>> m1: {broker A assigned bundle x to broker A} >> >>>>>>>> t2: topic -> [A,B,C] // broker A,B,C consumed the message: m1 >> >>>>>>>> t3: A-> own bundle // broker A knows that its assignment has >> been >> >>>>>>>> accepted, so proceeding to own the bundle (meanwhile deferring >> >> lookup >> >>>>>>>> requests) >> >>>>>>>> t4: B -> defer client lookups // broker B knows that bundle >> >>>>>>> assignment is >> >>>>>>>> running(meanwhile deferring lookup requests) >> >>>>>>>> t5: C -> defer client lookups // broker C knows that bundle >> >>>>>>> assignment is >> >>>>>>>> running(meanwhile deferring lookup requests) >> >>>>>>>> >> >>>>>>>> Analysis: The "post-filter + a single topic" can perform ok in >> >> this >> >>>>>>> case >> >>>>>>>> without the additional leader coordination and the secondary >> topic >> >>>>>>> because >> >>>>>>>> the early broadcast can inform all brokers and prevent them from >> >>>>>>> requesting >> >>>>>>>> other assignments for the same bundle. >> >>>>>>>> >> >>>>>>>> I think the post-filter option is initially not bad because: >> >>>>>>>> >> >>>>>>>> 1. it is safe in the worst case (in case the messages are not >> >>>>>>> correctly >> >>>>>>>> pre-filtered at the leader) >> >>>>>>>> 2. it performs ok because the early broadcast can prevent >> >>>>>>>> concurrent assignment requests. >> >>>>>>>> 3. initially less complex to implement (leaderless conflict >> >>>>>>> resolution and >> >>>>>>>> requires a single topic) >> >>>>>>>> 4. it is not a "one-way door" decision (we could add the >> >> pre-filter >> >>>>>>> logic >> >>>>>>>> as well later) >> >>>>>>>> >> >>>>>>>> Regards, >> >>>>>>>> Heesung >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> >>>>>>>> On Sat, Oct 29, 2022 at 1:03 PM Heesung Sohn < >> >>>>>>> heesung.s...@streamnative.io> >> >>>>>>>> wrote: >> >>>>>>>> >> >>>>>>>>> Hi Michael, >> >>>>>>>>> >> >>>>>>>>> For the pre-prefilter(pre-compaction) option, >> >>>>>>>>> I think the leader requires a write-through cache to compact >> >>>>>>> messages >> >>>>>>>>> based on the latest states. Otherwise, the leader needs to wait >> >> for >> >>>>>>> the >> >>>>>>>>> last msg from the (compacted) topic before compacting the next >> >> msg >> >>>>>>> for the >> >>>>>>>>> same bundle. >> >>>>>>>>> >> >>>>>>>>> Pulsar guarantees "a single writer". However, for the worst-case >> >>>>>>>>> scenario(due to network partitions, bugs in zk or etcd leader >> >>>>>>> election, >> >>>>>>>>> bugs in bk, data corruption ), I think it is safe to place the >> >>>>>>> post-filter >> >>>>>>>>> on the consumer side(compaction and table views) as well in >> >> order to >> >>>>>>>>> validate the state changes. >> >>>>>>>>> >> >>>>>>>>> For the two-topic approach, >> >>>>>>>>> I think we lose a single linearized view. Could we clarify how >> >> to >> >>>>>>> handle >> >>>>>>>>> the following(edge cases and failure recovery)? >> >>>>>>>>> 0. Is the un-compacted topic a persistent topic or a >> >> non-persistent >> >>>>>>> topic? >> >>>>>>>>> 1. How does the leader recover state from the two topics? >> >>>>>>>>> 2. How do we handle the case when the leader fails before >> >> writing >> >>>>>>> messages >> >>>>>>>>> to the compacted topic >> >>>>>>>>> >> >>>>>>>>> Regards, >> >>>>>>>>> Heesung >> >>>>>>>>> >> >>>>>>>>> On Fri, Oct 28, 2022 at 6:56 PM Michael Marshall < >> >>>>>>> mmarsh...@apache.org> >> >>>>>>>>> wrote: >> >>>>>>>>> >> >>>>>>>>>> Sharing some more thoughts. We could alternatively use two >> >> topics >> >>>>>>>>>> instead of one. In this design, the first topic is the >> >> unfiltered >> >>>>>>>>>> write ahead log that represents many writers (brokers) trying >> >> to >> >>>>>>>>>> acquire ownership of bundles. The second topic is the >> >> distilled log >> >>>>>>>>>> that represents the "winners" or the "owners" of the bundles. >> >>>>>>> There is >> >>>>>>>>>> a single writer, the leader broker, that reads from the input >> >> topic >> >>>>>>>>>> and writes to the output topic. The first topic is normal and >> >> the >> >>>>>>>>>> second is compacted. >> >>>>>>>>>> >> >>>>>>>>>> The primary benefit in a two topic solution is that it is easy >> >> for >> >>>>>>> the >> >>>>>>>>>> leader broker to trade off ownership without needing to slow >> >> down >> >>>>>>>>>> writes to the input topic. The leader broker will start >> >> consuming >> >>>>>>> from >> >>>>>>>>>> the input topic when it has fully consumed the table view on >> >> the >> >>>>>>>>>> output topic. In general, I don't think consumers know when >> >> they >> >>>>>>> have >> >>>>>>>>>> "reached the end of a table view", but we should be able to >> >>>>>>> trivially >> >>>>>>>>>> figure this out if we are the topic's only writer and the >> >> topic and >> >>>>>>>>>> writer are collocated on the same broker. >> >>>>>>>>>> >> >>>>>>>>>> In that design, it might make sense to use something like the >> >>>>>>>>>> replication cursor to keep track of this consumer's state. >> >>>>>>>>>> >> >>>>>>>>>> - Michael >> >>>>>>>>>> >> >>>>>>>>>> On Fri, Oct 28, 2022 at 5:12 PM Michael Marshall < >> >>>>>>> mmarsh...@apache.org> >> >>>>>>>>>> wrote: >> >>>>>>>>>>> >> >>>>>>>>>>> Thanks for your proposal, Heesung. >> >>>>>>>>>>> >> >>>>>>>>>>> Fundamentally, we have the problems listed in this PIP >> >> because >> >>>>>>> we have >> >>>>>>>>>>> multiple writers instead of just one writer. Can we solve >> >> this >> >>>>>>> problem >> >>>>>>>>>>> by changing our write pattern? What if we use the leader >> >> broker >> >>>>>>> as the >> >>>>>>>>>>> single writer? That broker would intercept attempts to >> >> acquire >> >>>>>>>>>>> ownership on bundles and would grant ownership to the first >> >>>>>>> broker to >> >>>>>>>>>>> claim an unassigned bundle. It could "grant ownership" by >> >>>>>>> letting the >> >>>>>>>>>>> first write to claim an unassigned bundle get written to the >> >>>>>>> ownership >> >>>>>>>>>>> topic. When a bundle is already owned, the leader won't >> >> persist >> >>>>>>> that >> >>>>>>>>>>> event to the bookkeeper. In this design, the log becomes a >> >> true >> >>>>>>>>>>> ownership log, which will correctly work with the existing >> >> topic >> >>>>>>>>>>> compaction and table view solutions. My proposal essentially >> >>>>>>> moves the >> >>>>>>>>>>> conflict resolution to just before the write, and as a >> >>>>>>> consequence, it >> >>>>>>>>>>> greatly reduces the need for post processing of the event >> >> log. >> >>>>>>> One >> >>>>>>>>>>> trade off might be that the leader broker could slow down the >> >>>>>>> write >> >>>>>>>>>>> path, but given that the leader would just need to verify the >> >>>>>>> current >> >>>>>>>>>>> state of the bundle, I think it'd be performant enough. >> >>>>>>>>>>> >> >>>>>>>>>>> Additionally, we'd need the leader broker to be "caught up" >> >> on >> >>>>>>> bundle >> >>>>>>>>>>> ownership in order to grant ownership of topics, but unless >> >> I am >> >>>>>>>>>>> mistaken, that is already a requirement of the current PIP >> >> 192 >> >>>>>>>>>>> paradigm. >> >>>>>>>>>>> >> >>>>>>>>>>> Below are some additional thoughts that will be relevant if >> >> we >> >>>>>>> move >> >>>>>>>>>>> forward with the design as it is currently proposed. >> >>>>>>>>>>> >> >>>>>>>>>>> I think it might be helpful to update the title to show that >> >> this >> >>>>>>>>>>> proposal will also affect table view as well. I didn't catch >> >>>>>>> that at >> >>>>>>>>>>> first. >> >>>>>>>>>>> >> >>>>>>>>>>> Do you have any documentation describing how the >> >>>>>>>>>>> TopicCompactionStrategy will determine which states are >> >> valid in >> >>>>>>> the >> >>>>>>>>>>> context of load balancing? I looked at >> >>>>>>>>>>> https://github.com/apache/pulsar/pull/18195, but I couldn't >> >>>>>>> seem to >> >>>>>>>>>>> find anything for it. That would help make this proposal less >> >>>>>>>>>>> abstract. >> >>>>>>>>>>> >> >>>>>>>>>>> The proposed API seems very tied to the needs of PIP 192. For >> >>>>>>> example, >> >>>>>>>>>>> `isValid` is not a term I associate with topic compaction. >> >> The >> >>>>>>>>>>> fundamental question for compaction is which value to keep >> >> (or >> >>>>>>> build a >> >>>>>>>>>>> new value). I think we might be able to simplify the API by >> >>>>>>> replacing >> >>>>>>>>>>> the "isValid", "isMergeEnabled", and "merge" methods with a >> >>>>>>> single >> >>>>>>>>>>> method that lets the implementation handle one or all tasks. >> >> That >> >>>>>>>>>>> would also remove the need to deserialize payloads multiple >> >>>>>>> times too. >> >>>>>>>>>>> >> >>>>>>>>>>> I also feel like mentioning that after working with the PIP >> >> 105 >> >>>>>>> broker >> >>>>>>>>>>> side filtering, I think we should avoid running UDFs in the >> >>>>>>> broker as >> >>>>>>>>>>> much as possible. (I do not consider the load balancing >> >> logic to >> >>>>>>> be a >> >>>>>>>>>>> UDF here.) I think it would be worth not making this a user >> >>>>>>> facing >> >>>>>>>>>>> feature unless there is demand for real use cases. >> >>>>>>>>>>> >> >>>>>>>>>>> Thanks! >> >>>>>>>>>>> Michael >> >>>>>>>>>>> >> >>>>>>>>>>> On Fri, Oct 28, 2022 at 1:21 AM 丛搏 <bog...@apache.org> >> >> wrote: >> >>>>>>>>>>>> >> >>>>>>>>>>>> +1(non-binding) >> >>>>>>>>>>>> >> >>>>>>>>>>>> thanks, >> >>>>>>>>>>>> bo >> >>>>>>>>>>>> >> >>>>>>>>>>>> Heesung Sohn <heesung.s...@streamnative.io.invalid> >> >>>>>>> 于2022年10月19日周三 >> >>>>>>>>>> 07:54写道: >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> Hi pulsar-dev community, >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> I raised a pip to discuss : PIP-215: Configurable Topic >> >>>>>>> Compaction >> >>>>>>>>>> Strategy >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> PIP link: https://github.com/apache/pulsar/issues/18099 >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> Regards, >> >>>>>>>>>>>>> Heesung >> >>>>>>>>>> >> >>>>>>>>> >> >>>>>>> >> >>>>>> >> >> >> >>