Hi Guozhang, I see. It seems like if we want to decouple 1) and 2), we need to alter the definition of the window. Do you think it would close the gap if we added a "window close" time to the window definition?
Such as: builder.stream("input") .groupByKey() .windowedBy( TimeWindows .of(60_000) .closeAfter(10 * 60) .until(30L * 24 * 60 * 60 * 1000) ) .count() .suppress(Suppression.finalResultsOnly()); Possibly called "finalResultsAtWindowClose" or something? Thanks, -John On Mon, Jul 2, 2018 at 6:50 PM Guozhang Wang <wangg...@gmail.com> wrote: > Hey John, > > Obviously I'm too lazy on email replying diligence compared with you :) > Will try to reply them separately: > > > ----------------------------------------------------------------------------- > > To reply your email on "Mon, Jul 2, 2018 at 8:23 AM": > > I'm aware of this use case, but again, the concern is that, in this setting > in order to let the window be queryable for 30 days, we will actually > process data as old as 30 days as well, while most of the late updates > beyond 5 minutes would be discarded anyways. Personally I think for the > final update scenario, the ideal situation users would want is that "do not > process any data that is less than 5 minutes, and of course no update > records to the downstream later than 5 minutes either; but retain the > window to be queryable for 30 days". And by doing that the final window > snapshot would also be aligned with the update stream as well. In other > words, among these three periods: > > 1) the retention length of the window / table. > 2) the late records acceptance for updating the window. > 3) the late records update to be sent downstream. > > Final update use cases would naturally want 2) = 3), while 1) may be > different and larger, while what we provide now is that 1) = 2), which > could be different and in practice larger than 3), hence not the most > intuitive for their needs. > > > > ----------------------------------------------------------------------------- > > To reply your email on "Mon, Jul 2, 2018 at 10:27 AM": > > I'd like option 2) over option 1) better as well from programming pov. But > I'm wondering if option 2) would provide the above semantics or it is still > coupling 1) with 2) as well ? > > > > Guozhang > > > > > On Mon, Jul 2, 2018 at 1:08 PM, John Roesler <j...@confluent.io> wrote: > > > In fact, to push the idea further (which IIRC is what Matthias originally > > proposed), if we can accept "Suppression#finalResultsOnly" in my last > > email, then we could also consider whether to eliminate > > "suppressLateEvents" entirely. > > > > We could always add it later, but you've both expressed doubt that there > > are practical use cases for it outside of final-results. > > > > -John > > > > On Mon, Jul 2, 2018 at 12:27 PM John Roesler <j...@confluent.io> wrote: > > > > > Hi again, Guozhang ;) Here's the second part of my response... > > > > > > It seems like your main concern is: "if I'm a user who wants final > update > > > semantics, how complicated is it for me to get it?" > > > > > > I think we have to assume that people don't always have time to become > > > deeply familiar with all the nuances of a programming environment > before > > > they use it. Especially if they're evaluating several frameworks for > > their > > > use case, it's very valuable to make it as obvious as possible how to > > > accomplish various computations with Streams. > > > > > > To me the biggest question is whether with a fresh perspective, people > > > would say "oh, I get it, I have to bound my lateness and suppress > > > intermediate updates, and of course I'll get only the final result!", > or > > if > > > it's more like "wtf? all I want is the final result, what are all these > > > parameters?". > > > > > > I was talking with Matthias a while back, and he had an idea that I > think > > > can help, which is to essentially set up a final-result recipe in > > addition > > > to the raw parameters. I previously thought that it wouldn't be > possible > > to > > > restrict its usage to Windowed KTables, but thinking about it again > this > > > weekend, I have a couple of ideas: > > > > > > ================ > > > = 1. Static Wrapper = > > > ================ > > > We can define an extra static function that "wraps" a KTable with > > > final-result semantics. > > > > > > public static <K extends Windowed, V> KTable<K, V> finalResultsOnly( > > > final KTable<K, V> windowedKTable, > > > final Duration maxAllowedLateness, > > > final Suppression.BufferFullStrategy bufferFullStrategy) { > > > return windowedKTable.suppress( > > > Suppression.suppressLateEvents(maxAllowedLateness) > > > .suppressIntermediateEvents( > > > IntermediateSuppression > > > .emitAfter(maxAllowedLateness) > > > .bufferFullStrategy(bufferFullStrategy) > > > ) > > > ); > > > } > > > > > > Because windowedKTable is a parameter, the static function can easily > > > impose an extra bound on the key type, that it extends Windowed. This > > would > > > make "final results only" only available on windowed ktables. > > > > > > Here's how it would look to use: > > > > > > final KTable<Windowed<Integer>, Long> windowCounts = ... > > > final KTable<Windowed<Integer>, Long> finalCounts = > > > finalResultsOnly( > > > windowCounts, > > > Duration.ofMinutes(10), > > > Suppression.BufferFullStrategy.SHUT_DOWN > > > ); > > > > > > Trying to use it on a non-windowed KTable yields: > > > > > >> Error:(129, 35) java: method finalResultsOnly in class > > >> org.apache.kafka.streams.kstream.internals.KTableAggregateTest cannot > > be > > >> applied to given types; > > >> required: > > >> org.apache.kafka.streams.kstream.KTable<K,V>,java.time. > > Duration,org.apache.kafka.streams.kstream.Suppression.BufferFullStrategy > > >> found: > > >> org.apache.kafka.streams.kstream.KTable<java.lang. > > String,java.lang.String>,java.time.Duration,org.apache. > > kafka.streams.kstream.Suppression.BufferFullStrategy > > >> reason: inference variable K has incompatible bounds > > >> equality constraints: java.lang.String > > >> upper bounds: org.apache.kafka.streams.kstream.Windowed > > > > > > > > > > > > ================================================= > > > = 2. Add <K,V> parameters and recipe method to Suppression = > > > ================================================= > > > > > > By adding K,V parameters to Suppression, we can provide a similarly > > > bounded config method directly on the Suppression class: > > > > > > public static <K extends Windowed, V> Suppression<K, V> > > > finalResultsOnly(final Duration maxAllowedLateness, final > > > BufferFullStrategy bufferFullStrategy) { > > > return Suppression > > > .<K, V>suppressLateEvents(maxAllowedLateness) > > > .suppressIntermediateEvents(IntermediateSuppression > > > .emitAfter(maxAllowedLateness) > > > .bufferFullStrategy(bufferFullStrategy) > > > ); > > > } > > > > > > Then, here's how it would look to use it: > > > > > > final KTable<Windowed<Integer>, Long> windowCounts = ... > > > final KTable<Windowed<Integer>, Long> finalCounts = > > > windowCounts.suppress( > > > Suppression.finalResultsOnly( > > > Duration.ofMinutes(10) > > > Suppression.BufferFullStrategy.SHUT_DOWN > > > ) > > > ); > > > > > > Trying to use it on a non-windowed ktable yields: > > > > > >> Error:(127, 35) java: method finalResultsOnly in class > > >> org.apache.kafka.streams.kstream.Suppression<K,V> cannot be applied to > > >> given types; > > >> required: > > >> java.time.Duration,org.apache.kafka.streams.kstream. > > Suppression.BufferFullStrategy > > >> found: > > >> java.time.Duration,org.apache.kafka.streams.kstream. > > Suppression.BufferFullStrategy > > >> reason: explicit type argument java.lang.String does not conform to > > >> declared bound(s) org.apache.kafka.streams.kstream.Windowed > > > > > > > > > > > > ============ > > > = Downsides = > > > ============ > > > > > > Of course, there's a downside either way: > > > * for 1: this "wrapper" interaction would be the first in the DSL. Is > it > > > too strange, and how discoverable would it be? > > > * for 2: adding those type parameters to Suppression will force all > > > callers to provide them in the event of a chained construction because > > Java > > > doesn't do RHS recursive type inference. This is already visible in > other > > > parts of the Streams DSL. For example, often calls to Materialized > > builders > > > have to provide seemingly obvious type bounds. > > > > > > ============ > > > = Conclusion = > > > ============ > > > > > > I think option 2 is more "normal" and discoverable. It does have a > > > downside, but it's one that's pre-existing elsewhere in the DSL. > > > > > > WDYT? Would the addition of this "recipe" method to Suppression resolve > > > your concern? > > > > > > Thanks again, > > > -John > > > > > > On Sun, Jul 1, 2018 at 11:24 PM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > >> Hi John, > > >> > > >> Regarding the metrics: yeah I think I'm with you that the dropped > > records > > >> due to window retention or emit suppression policies should be > recorded > > >> differently, and using this KIP's proposed metric would be fine. If > you > > >> also think we can use this KIP's proposed metrics to cover the window > > >> retention cased skipping records, then we can include the changes in > > this > > >> KIP as well. > > >> > > >> Regarding the current proposal, I'm actually not too worried about the > > >> inconsistency between query semantics and downstream emit semantics. > For > > >> queries, we will always return the current running results of the > > windows, > > >> being it partial or final results depending on the window retention > time > > >> anyways, which has nothing to do whether the emitted stream should be > > one > > >> final output per key or not. I also agree that having a unified > > operation > > >> is generally better for users to focus on leveraging that one only > than > > >> learning about two set of operations. The only question I had is, for > > >> final > > >> updates of window stores, if it is a bit awkward to understand the > > >> configuration combo. Thinking about this more, I think my root worry > in > > >> the > > >> "suppressLateEvents" call for windowed tables, since from a user > > >> perspective: if my retention time is X which means "pay the cost to > > allow > > >> late records up to X to still be applied updating the tables", why > > would I > > >> ever want to suppressLateEvents by Y ( < X), to say "do not send the > > >> updates up to Y, which means the downstream operator or sink topic for > > >> this > > >> stream would actually see a truncated update stream while I've paid > > larger > > >> cost for that"; and of course, Y > X would not make sense either as > you > > >> would not see any updates later than X anyways. So in all, my feeling > is > > >> that it makes less sense for windowed table's "suppressLateEvents" > with > > a > > >> parameter that is not equal to the window retention, and opening the > > door > > >> in the current proposal may confuse people with that. > > >> > > >> Again, above is just a subjective opinion and probably we can also > bring > > >> up > > >> some scenarios that users does want to set X != Y.. but personally I > > feel > > >> that even if the semantics for this scenario if intuitive for user to > > >> understand, doe that really make sense and should we really open the > > door > > >> for it. So I think maybe separating the final update in a separate > API's > > >> benefits may overwhelm the advantage of having one uniform definition. > > And > > >> for my alternative proposal, the rationale was from both my concern > > about > > >> "suppressLateEvents" for windowed store, and Matthias' question about > > >> "suppressLateEvents" for non-windowed stores, that if it is less > > >> meaningful > > >> for both, we can consider removing it completely and only do > > >> "IntermediateSuppression" in Suppress instead. > > >> > > >> So I'd summarize my thoughts in the following questions: > > >> > > >> 1. Does "suppressLateEvents" with parameter Y != X (window retention > > time) > > >> for windowed stores make sense in practice? > > >> 2. Does "suppressLateEvents" with any parameter Y for non-windowed > > stores > > >> make sense in practice? > > >> > > >> > > >> > > >> Guozhang > > >> > > >> > > >> On Fri, Jun 29, 2018 at 2:26 PM, Bill Bejeck <bbej...@gmail.com> > wrote: > > >> > > >> > Thanks for the explanation, that does make sense. I have some > > >> questions on > > >> > operations, but I'll just wait for the PR and tests. > > >> > > > >> > Thanks, > > >> > Bill > > >> > > > >> > On Wed, Jun 27, 2018 at 8:14 PM John Roesler <j...@confluent.io> > > wrote: > > >> > > > >> > > Hi Bill, > > >> > > > > >> > > Thanks for the review! > > >> > > > > >> > > Your question is very much applicable to the KIP and not at all an > > >> > > implementation detail. Thanks for bringing it up. > > >> > > > > >> > > I'm proposing not to change the existing caches and configurations > > at > > >> all > > >> > > (for now). > > >> > > > > >> > > Imagine you have a topology like this: > > >> > > commit.interval.ms = 100 > > >> > > > > >> > > (ktable1 (cached)) -> (suppress emitAfter 200) > > >> > > > > >> > > The first ktable (ktable1) will respect the commit interval and > > buffer > > >> > > events for 100ms before logging, storing, or forwarding them > (IIRC). > > >> > > Therefore, the second ktable (suppress) will only see the events > at > > a > > >> > rate > > >> > > of once per 100ms. It will apply its own buffering, and emit once > > per > > >> > 200ms > > >> > > This case is pretty trivial because the suppress time is a > multiple > > of > > >> > the > > >> > > commit interval. > > >> > > > > >> > > When it's not an integer multiple, you'll get behavior like in > this > > >> > marble > > >> > > diagram: > > >> > > > > >> > > > > >> > > <-(k:1)--(k:2)--(k:3)--(k:4)--(k:5)--(k:6)-> > > >> > > > > >> > > [ KTable caching with commit interval = 2 ] > > >> > > > > >> > > <--------(k:2)---------(k:4)---------(k:6)-> > > >> > > > > >> > > [ suppress with emitAfter = 3 ] > > >> > > > > >> > > <---------------(k:2)----------------(k:6)-> > > >> > > > > >> > > > > >> > > If this behavior isn't desired (for example, if you wanted to emit > > >> (k:3) > > >> > at > > >> > > time 3, I'd recommend setting the "cache.max.bytes.buffering" to 0 > > or > > >> > > modifying the topology to disable caching. Then, the behavior is > > more > > >> > > simply determined just by the suppress operator. > > >> > > > > >> > > Does that seem right to you? > > >> > > > > >> > > > > >> > > Regarding the changelogs, because the suppression operator hangs > > onto > > >> > > events for a while, it will need its own changelog. The changelog > > >> > > should represent the current state of the buffer at all times. So > > when > > >> > the > > >> > > suppress operator sees (k:2), for example, it will log (k:2). When > > it > > >> > > later gets to time 3, it's time to emit (k:2) downstream. Because > k > > >> is no > > >> > > longer buffered, the suppress operator will log (k:null). Thus, > when > > >> > > recovering, > > >> > > it can rebuild the buffer by reading its changelog. > > >> > > > > >> > > What do you think about this? > > >> > > > > >> > > Thanks, > > >> > > -John > > >> > > > > >> > > > > >> > > > > >> > > On Wed, Jun 27, 2018 at 4:16 PM Bill Bejeck <bbej...@gmail.com> > > >> wrote: > > >> > > > > >> > > > Hi John, thanks for the KIP. > > >> > > > > > >> > > > Early on in the KIP, you mention the current approaches for > > >> controlling > > >> > > the > > >> > > > rate of downstream records from a KTable, cache size > configuration > > >> and > > >> > > > commit time. > > >> > > > > > >> > > > Will these configuration parameters still be in effect for > tables > > >> that > > >> > > > don't use suppression? For tables taking advantage of > > suppression, > > >> > will > > >> > > > these configurations have no impact? > > >> > > > This last question may be to implementation specific but if the > > >> > requested > > >> > > > suppression time is longer than the specified commit time, will > > the > > >> > > latest > > >> > > > record in the suppression buffer get stored in a changelog? > > >> > > > > > >> > > > Thanks, > > >> > > > Bill > > >> > > > > > >> > > > On Wed, Jun 27, 2018 at 3:04 PM John Roesler <j...@confluent.io > > > > >> > wrote: > > >> > > > > > >> > > > > Thanks for the feedback, Matthias, > > >> > > > > > > >> > > > > It seems like in straightforward relational processing cases, > it > > >> > would > > >> > > > not > > >> > > > > make sense to bound the lateness of KTables. In general, it > > seems > > >> > > better > > >> > > > to > > >> > > > > have "guard rails" in place that make it easier to write > > sensible > > >> > > > programs > > >> > > > > than insensible ones. > > >> > > > > > > >> > > > > But I'm still going to argue in favor of keeping it for all > > >> KTables > > >> > ;) > > >> > > > > > > >> > > > > 1. I believe it is simpler to understand the operator if it > has > > >> one > > >> > > > uniform > > >> > > > > definition, regardless of context. It's well defined and > > intuitive > > >> > what > > >> > > > > will happen when you use late-event suppression on a KTable, > so > > I > > >> > think > > >> > > > > nothing surprising or dangerous will happen in that case. From > > my > > >> > > > > perspective, having two sets of allowed operations is actually > > an > > >> > > > increase > > >> > > > > in cognitive complexity. > > >> > > > > > > >> > > > > 2. To me, it's not crazy to use the operator this way. For > > >> example, > > >> > in > > >> > > > lieu > > >> > > > > of full-featured timestamp semantics, I can implement MVCC > > >> behavior > > >> > > when > > >> > > > > building a KTable by "suppressLateEvents(Duration.ZERO)". I > > >> suspect > > >> > > that > > >> > > > > there are other, non-obvious applications of suppressing late > > >> events > > >> > on > > >> > > > > KTables. > > >> > > > > > > >> > > > > 3. Not to get too much into implementation details in a KIP > > >> > discussion, > > >> > > > but > > >> > > > > if we did want to make late-event suppression available only > on > > >> > > windowed > > >> > > > > KTables, we have two enforcement options: > > >> > > > > a. check when we build the topology - this would be simple > to > > >> > > > implement, > > >> > > > > but would be a runtime check. Hopefully, people write tests > for > > >> their > > >> > > > > topology before deploying them, so the feedback loop isn't > > >> > > instantaneous, > > >> > > > > but it's not too long either. > > >> > > > > b. add a new WindowedKTable type - this would be a compile > > time > > >> > > check, > > >> > > > > but would also be substantial increase of both interface and > > code > > >> > > > > complexity. > > >> > > > > > > >> > > > > We should definitely strive to have guard rails protecting > > against > > >> > > > > surprising or dangerous behavior. Protecting against programs > > >> that we > > >> > > > don't > > >> > > > > currently predict is a lesser benefit, and I think we can put > up > > >> > guard > > >> > > > > rails on a case-by-case basis for that. It seems like the > > >> increase in > > >> > > > > cognitive (and potentially code and interface) complexity > makes > > me > > >> > > think > > >> > > > we > > >> > > > > should skip this case. > > >> > > > > > > >> > > > > What do you think? > > >> > > > > > > >> > > > > Thanks, > > >> > > > > -John > > >> > > > > > > >> > > > > On Wed, Jun 27, 2018 at 11:59 AM Matthias J. Sax < > > >> > > matth...@confluent.io> > > >> > > > > wrote: > > >> > > > > > > >> > > > > > Thanks for the KIP John. > > >> > > > > > > > >> > > > > > One initial comments about the last example "Bounded > > lateness": > > >> > For a > > >> > > > > > non-windowed KTable bounding the lateness does not really > make > > >> > sense, > > >> > > > > > does it? > > >> > > > > > > > >> > > > > > Thus, I am wondering if we should allow > `suppressLateEvents()` > > >> for > > >> > > this > > >> > > > > > case? It seems to be better to only allow it for > > >> windowed-KTables. > > >> > > > > > > > >> > > > > > > > >> > > > > > -Matthias > > >> > > > > > > > >> > > > > > > > >> > > > > > On 6/27/18 8:53 AM, Ted Yu wrote: > > >> > > > > > > I noticed this (lack of primary parameter) as well. > > >> > > > > > > > > >> > > > > > > What you gave as new example is semantically the same as > > what > > >> I > > >> > > > > > suggested. > > >> > > > > > > So it is good by me. > > >> > > > > > > > > >> > > > > > > Thanks > > >> > > > > > > > > >> > > > > > > On Wed, Jun 27, 2018 at 7:31 AM, John Roesler < > > >> j...@confluent.io > > >> > > > > >> > > > > wrote: > > >> > > > > > > > > >> > > > > > >> Thanks for taking look, Ted, > > >> > > > > > >> > > >> > > > > > >> I agree this is a departure from the conventions of > Streams > > >> DSL. > > >> > > > > > >> > > >> > > > > > >> Most of our config objects have one or two "required" > > >> > parameters, > > >> > > > > which > > >> > > > > > fit > > >> > > > > > >> naturally with the static factory method approach. > > >> TimeWindow, > > >> > for > > >> > > > > > example, > > >> > > > > > >> requires a size parameter, so we can naturally say > > >> > > > > TimeWindows.of(size). > > >> > > > > > >> > > >> > > > > > >> I think in the case of a suppression, there's really no > > >> "core" > > >> > > > > > parameter, > > >> > > > > > >> and "Suppression.of()" seems sillier than "new > > >> Suppression()". I > > >> > > > think > > >> > > > > > that > > >> > > > > > >> Suppression.of(duration) would be ambiguous, since there > > are > > >> > many > > >> > > > > > durations > > >> > > > > > >> that we can configure. > > >> > > > > > >> > > >> > > > > > >> However, thinking about it again, I suppose that I can > give > > >> each > > >> > > > > > >> configuration method a static version, which would let > you > > >> > replace > > >> > > > > "new > > >> > > > > > >> Suppression()." with "Suppression." in all the examples. > > >> > > Basically, > > >> > > > > > instead > > >> > > > > > >> of "of()", we'd support any of the methods I listed. > > >> > > > > > >> > > >> > > > > > >> For example: > > >> > > > > > >> > > >> > > > > > >> windowCounts > > >> > > > > > >> .suppress( > > >> > > > > > >> Suppression > > >> > > > > > >> .suppressLateEvents(Duration.ofMinutes(10)) > > >> > > > > > >> .suppressIntermediateEvents( > > >> > > > > > >> > > >> > > > > > IntermediateSuppression.emitAfter(Duration.ofMinutes(10)) > > >> > > > > > >> ) > > >> > > > > > >> ); > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> Does that seem better? > > >> > > > > > >> > > >> > > > > > >> Thanks, > > >> > > > > > >> -John > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> On Wed, Jun 27, 2018 at 12:44 AM Ted Yu < > > yuzhih...@gmail.com > > >> > > > >> > > > wrote: > > >> > > > > > >> > > >> > > > > > >>> I started to read this KIP which contains a lot of > > >> materials. > > >> > > > > > >>> > > >> > > > > > >>> One suggestion: > > >> > > > > > >>> > > >> > > > > > >>> .suppress( > > >> > > > > > >>> new Suppression() > > >> > > > > > >>> > > >> > > > > > >>> > > >> > > > > > >>> Do you think it would be more consistent with the rest > of > > >> > Streams > > >> > > > > data > > >> > > > > > >>> structures by supporting `of` ? > > >> > > > > > >>> > > >> > > > > > >>> Suppression.of(Duration.ofMinutes(10)) > > >> > > > > > >>> > > >> > > > > > >>> > > >> > > > > > >>> Cheers > > >> > > > > > >>> > > >> > > > > > >>> > > >> > > > > > >>> > > >> > > > > > >>> On Tue, Jun 26, 2018 at 1:11 PM, John Roesler < > > >> > j...@confluent.io > > >> > > > > > >> > > > > > wrote: > > >> > > > > > >>> > > >> > > > > > >>>> Hello devs and users, > > >> > > > > > >>>> > > >> > > > > > >>>> Please take some time to consider this proposal for > Kafka > > >> > > Streams: > > >> > > > > > >>>> > > >> > > > > > >>>> KIP-328: Ability to suppress updates for KTables > > >> > > > > > >>>> > > >> > > > > > >>>> link: https://cwiki.apache.org/confluence/x/sQU0BQ > > >> > > > > > >>>> > > >> > > > > > >>>> The basic idea is to provide: > > >> > > > > > >>>> * more usable control over update rate (vs the current > > >> state > > >> > > store > > >> > > > > > >>> caches) > > >> > > > > > >>>> * the final-result-for-windowed-computations feature > > which > > >> > > several > > >> > > > > > >> people > > >> > > > > > >>>> have requested > > >> > > > > > >>>> > > >> > > > > > >>>> I look forward to your feedback! > > >> > > > > > >>>> > > >> > > > > > >>>> Thanks, > > >> > > > > > >>>> -John > > >> > > > > > >>>> > > >> > > > > > >>> > > >> > > > > > >> > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > >> > > >> -- > > >> -- Guozhang > > >> > > > > > > > > > -- > -- Guozhang >