I had some opportunity to reflect on the default for close time today... Note that the current "close time" is equal to the retention time, and therefore "close" today shares the default retention of 24h.
It would definitely break any application that today specifies a retention time to set close shorter than that time. It's also likely to break apps if they *don't* set the retention time and rely on the 24h default. So it's unfortunate, but I think if "close" isn't set, we should use the retention time instead of a fixed default. When we ultimately remove the retention time parameter ("until"), we will have to set "close" to a default of 24h. Of course, this has a negative impact on the user of "final results", since they won't see any output at all for retentionTime/24h, and may find this confusing. What can we do about this except document it well? Maybe log a warning if we see that close wasn't explicitly set while using "final results"? Thanks, -John On Tue, Jul 10, 2018 at 10:46 AM John Roesler <j...@confluent.io> wrote: > Hi Guozhang, > > That sounds good to me. I'll include that in the KIP. > > Thanks, > -John > > On Mon, Jul 9, 2018 at 6:33 PM Guozhang Wang <wangg...@gmail.com> wrote: > >> Let me clarify a bit on what I meant about moving `retentionPeriod` to >> WindowStoreBuilder: >> >> In another discussion we had around KIP-319 / 330, that the "retention >> period" should not really be a window spec, but only a window store spec, >> as it only affects how long to retain each window to be queryable along >> with the storage cost. >> >> More specifically, today the "maintainMs" returned from Windows is used in >> three places: >> >> 1) for windowed aggregations, they are passed in directly into >> `Stores.persistentWindows()` as the retention period parameters. For this >> use case we should just let the WindowStoreBuilder to specify this value >> itself. >> >> NOTE: It is also returned in the KStreamWindowAggregate processor, to >> determine if a received record should be dropped due to its lateness. We >> may need to think of another way to get this value inside the processor >> >> 2) for windowed stream-stream join, it is used as the join range parameter >> but only to check that "windowSizeMs <= retentionPeriodMs". We can do this >> check at the store builder lever instead of at the processor level. >> >> >> If we can remove its usage in both 1) and 2), then we should be able to >> safely remove this from the `Windows` spec. >> >> >> Guozhang >> >> >> On Mon, Jul 9, 2018 at 3:53 PM, John Roesler <j...@confluent.io> wrote: >> >> > Thanks for the reply, Guozhang, >> > >> > Good! I agree, that is also a good reason, and I actually made use of >> that >> > in my tests. I'll update the KIP. >> > >> > By the way, I chose "allowedLateness" as I was trying to pick a better >> name >> > than "close", but I think it's actually the wrong name. We don't want to >> > bound the lateness of events in general, only with respect to the end of >> > their window. >> > >> > If we have a window [0,10), with "allowedLateness" of 5, then if we get >> an >> > event with timestamp 3 at time 9, the name implies we'd reject it, which >> > seems silly. Really, we'd only want to start rejecting that event at >> stream >> > time 15. >> > >> > What I meant was more like "allowedLatenessAfterWindowEnd", but that's >> too >> > verbose. I think that "close" + some documentation about what it means >> will >> > be better. >> > >> > 1: "Close" would be measured from the end of the window, so a reasonable >> > default would be "0". Recall that "close" really only needs to be >> specified >> > for final results, and a default of 0 would produce the most intuitive >> > results. If folks later discover that they are missing some late events, >> > they can adjust the parameter accordingly. IMHO, any other value would >> just >> > be a guess on our part. >> > >> > 2a: >> > I think you're saying to re-use "until" instead of adding "close" to the >> > window. >> > >> > The downside here would be that the semantic change could be more >> confusing >> > than deprecating "until" and introducing window "close" and a >> > "retentionTime" on the store builder. The deprecation is a good, >> controlled >> > way for us to make sure people are getting the semantics they think >> they're >> > getting, as well as giving us an opportunity to link people to the API >> they >> > should use instead. >> > >> > I didn't fully understand the second part, but it sounds like you're >> > suggesting to add a new "retentionTime" setter to Windows to bridge the >> gap >> > until we add it to the store builder? That seems kind of roundabout to >> me, >> > if that's what you meant. We could just immediately add it to the store >> > builders in the same PR. >> > >> > 2b: Sounds good to me! >> > >> > Thanks again, >> > -John >> > >> > >> > On Mon, Jul 9, 2018 at 4:55 PM Guozhang Wang <wangg...@gmail.com> >> wrote: >> > >> > > John, >> > > >> > > Thanks for your replies. As for the two options of the API, I think >> I'm >> > > slightly inclined to the first option as well. My motivation is a bit >> > > different, as I think of the first one maybe more flexible, for >> example: >> > > >> > > KTable<Windowed<..>> table = ... count(); >> > > >> > > table.toStream().peek(..); // want to peek at the changelog stream, >> do >> > > not care about final results. >> > > >> > > table.suppress().toStream().to("topic"); // sending to a topic, >> want >> > to >> > > only send the final results. >> > > >> > > -------------- >> > > >> > > Besides that, I have a few more minor questions: >> > > >> > > 1. For "allowedLateness", what should be the default value? I.e. if >> user >> > do >> > > not specify "allowedLateness" in TimeWindows, what value should we >> set? >> > > >> > > 2. For API names, some personal suggestions here: >> > > >> > > 2.a) "allowedLateness" -> "until" (semantics changed, and also value >> is >> > > defined as delta on top of window length), where "until" -> >> > > "retentionPeriod", and the latter will be removed from `Windows` to ` >> > > WindowStoreBuilder` in the future. >> > > >> > > 2.b) "BufferConfig" -> "Buffered" ? >> > > >> > > >> > > >> > > Guozhang >> > > >> > > >> > > On Mon, Jul 9, 2018 at 2:09 PM, John Roesler <j...@confluent.io> >> wrote: >> > > >> > > > Hey Matthias and Guozhang, >> > > > >> > > > Sorry for the slow reply. I was mulling about your feedback and >> > weighing >> > > > some ideas in a sketchbook PR: https://github.com/apache/ >> > kafka/pull/5337 >> > > . >> > > > >> > > > Your thought about keeping suppression independent of business logic >> > is a >> > > > very good one. I agree that it would make more sense to add some >> kind >> > of >> > > > "window close" concept to the window definition. >> > > > >> > > > In fact, doing that immediately solves the inconsistency problem >> > Guozhang >> > > > brought up. There's no need to add a "final results" or "emission" >> > option >> > > > to the windowed aggregation. >> > > > >> > > > What do you think about an API more like this: >> > > > >> > > > final StreamsBuilder builder = new StreamsBuilder(); >> > > > >> > > > builder >> > > > .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)) >> > > > .groupBy( >> > > > (String k1, String v1) -> k1, >> > > > Serialized.with(STRING_SERDE, STRING_SERDE) >> > > > ) >> > > > .windowedBy(TimeWindows >> > > > .of(scaledTime(2L)) >> > > > .until(scaledTime(3L)) >> > > > .allowedLateness(scaledTime(1L)) >> > > > ) >> > > > .count(Materialized.as("counts")) >> > > > .suppress( >> > > > emitFinalResultsOnly( >> > > > BufferConfig.withBufferKeys(10_000L).bufferFullStrategy( >> > SHUT_DOWN) >> > > > ) >> > > > ) >> > > > .toStream() >> > > > .to("output-suppressed", Produced.with(STRING_SERDE, LONG_SERDE)); >> > > > >> > > > Note that: >> > > > * "emitFinalResultsOnly" is available *only* on windowed tables >> > > (enforced >> > > > by the type system at compile time), and it determines the time to >> wait >> > > by >> > > > looking at "allowedLateness" on the TimeWindows config. >> > > > * querying "counts" will produce results (eventually) consistent >> with >> > > > what's observable in "output-suppressed". >> > > > * in all cases, "suppress" has no effect on business logic, just on >> > > event >> > > > suppression. >> > > > >> > > > Is this API straightforward? Or do you still prefer the version that >> > both >> > > > proposed: >> > > > >> > > > ... >> > > > .windowedBy(TimeWindows >> > > > .of(scaledTime(2L)) >> > > > .until(scaledTime(3L)) >> > > > .allowedLateness(scaledTime(1L)) >> > > > ) >> > > > .count( >> > > > Materialized.as("counts"), >> > > > emitFinalResultsOnly( >> > > > BufferConfig.withBufferKeys(10_000L).bufferFullStrategy( >> > SHUT_DOWN) >> > > > ) >> > > > ) >> > > > ... >> > > > >> > > > To me, these two are practically identical, and I still vaguely >> prefer >> > > the >> > > > first one. >> > > > >> > > > The prototype has made clearer to me that users of "final results >> for >> > > > windows" and users of "suppression for table events" both need to >> > > configure >> > > > the suppression buffer. >> > > > >> > > > This buffer configuration consists of: >> > > > 1. how many keys or bytes to keep in memory >> > > > 2. what to do if memory runs out (shut down, start using disk, ...) >> > > > >> > > > So it's not as simple as setting a "final results" flag. We'll >> either >> > > have >> > > > an "Emit" config object on the windowed aggregators that takes the >> same >> > > > BufferConfig that the "Suppress" config on the suppression >> operator, or >> > > we >> > > > just use the suppression operator for both. >> > > > >> > > > Perhaps it would sweeten the deal a little to point out that we >> have 2 >> > > > overloads already for each windowed aggregator (with and without >> > > > Materialized). Adding "Emitted" or something would mean that we'd >> add a >> > > new >> > > > overload for each one, taking us up to 4 overloads each for "count", >> > > > "aggregate" and "reduce". Using "suppress" means that we don't add >> any >> > > new >> > > > overloads. >> > > > >> > > > Thanks again for helping to hash this out, >> > > > -John >> > > > >> > > > On Fri, Jul 6, 2018 at 6:20 PM Guozhang Wang <wangg...@gmail.com> >> > wrote: >> > > > >> > > > > I think I agree with Matthias for having dedicated APIs for >> windowed >> > > > > operation final output scenario, PLUS separating the window close >> > which >> > > > the >> > > > > "final output" would rely on, from the window retention time >> itself >> > > > > (admittedly it would make this KIP effort larger, but if we >> believe >> > we >> > > > need >> > > > > to do this separation anyways we could just do it now). >> > > > > >> > > > > And then we can have the `KTable#suppress()` for >> > > intermediate-suppression >> > > > > only, not for late-record-suppression, until we've seen that >> becomes >> > a >> > > > > common feature request because our current design still allows to >> be >> > > > > extended for that purpose. >> > > > > >> > > > > >> > > > > Guozhang >> > > > > >> > > > > On Wed, Jul 4, 2018 at 12:53 PM, Matthias J. Sax < >> > > matth...@confluent.io> >> > > > > wrote: >> > > > > >> > > > > > Thanks for the discussion. I am just catching up. >> > > > > > >> > > > > > In general, I think we have different uses cases and >> non-windowed >> > and >> > > > > > windowed is quite different. For the non-windowed case, >> suppress() >> > > has >> > > > > > no (useful) close or retention time, no final semantics, and >> also >> > no >> > > > > > business logic impact. >> > > > > > >> > > > > > On the other hand, for windowed aggregations, close time and >> final >> > > > > > result do have a meaning. IMHO, `close()` is part of business >> logic >> > > > > > while retention time is not. Also, suppression of intermediate >> > result >> > > > is >> > > > > > not a business rule and there might be use case for which either >> > > "early >> > > > > > intermediate" (before window end time) are suppressed only, or >> all >> > > > > > intermediates are suppressed (maybe also something in the >> middle, >> > ie, >> > > > > > just reduce the load of intermediate updates). Thus, >> > > window-suppression >> > > > > > is much richer. >> > > > > > >> > > > > > IMHO, a generic `suppress()` operator that can be inserted into >> the >> > > > data >> > > > > > flow at any point is useful. Maybe we should keep is as generic >> as >> > > > > > possible. However, it might be difficult to use with regard to >> > > > > > windowing, as the mental effort to use it is high. >> > > > > > >> > > > > > With regard to Guozhang's comment: >> > > > > > >> > > > > > > we will actually >> > > > > > > process data as old as 30 days as well, while most of the late >> > > > updates >> > > > > > > beyond 5 minutes would be discarded anyways. >> > > > > > >> > > > > > If we use `suppress()` as a standalone operator, this is correct >> > and >> > > > > > intended IMHO. To address the issue if the behavior is >> unwanted, I >> > > > would >> > > > > > suggest to add a "suppress option" directly to >> > > > > > `count()/reduce()/aggregate()` window operator similar to >> > > > > > `Materialized`. This would be an "embedded suppress" and avoid >> the >> > > > > > issue. It would also address the issue about mental effort for >> > > "single >> > > > > > final window result" use case. >> > > > > > >> > > > > > I also think that a shorter close-time than retention time is >> > useful >> > > > for >> > > > > > window aggregation. If we add close() to the window definition >> and >> > > > > > until() to `Materialized`, we can separate both correctly IMHO. >> > > > > > >> > > > > > About setting `close = min(close,retention)` I am not sure. We >> > might >> > > > > > rather throw an exception than reducing the close time >> > automatically. >> > > > > > Otherwise, I see many user question about "I set close to X but >> it >> > > does >> > > > > > not get updated for some data that is with delay of X". >> > > > > > >> > > > > > The tricky question might be to design the API in a backward >> > > compatible >> > > > > > way though. >> > > > > > >> > > > > > >> > > > > > >> > > > > > -Matthias >> > > > > > >> > > > > > On 7/3/18 5:38 AM, John Roesler wrote: >> > > > > > > Hi Guozhang, >> > > > > > > >> > > > > > > I see. It seems like if we want to decouple 1) and 2), we >> need to >> > > > alter >> > > > > > the >> > > > > > > definition of the window. Do you think it would close the gap >> if >> > we >> > > > > > added a >> > > > > > > "window close" time to the window definition? >> > > > > > > >> > > > > > > Such as: >> > > > > > > >> > > > > > > builder.stream("input") >> > > > > > > .groupByKey() >> > > > > > > .windowedBy( >> > > > > > > TimeWindows >> > > > > > > .of(60_000) >> > > > > > > .closeAfter(10 * 60) >> > > > > > > .until(30L * 24 * 60 * 60 * 1000) >> > > > > > > ) >> > > > > > > .count() >> > > > > > > .suppress(Suppression.finalResultsOnly()); >> > > > > > > >> > > > > > > Possibly called "finalResultsAtWindowClose" or something? >> > > > > > > >> > > > > > > Thanks, >> > > > > > > -John >> > > > > > > >> > > > > > > On Mon, Jul 2, 2018 at 6:50 PM Guozhang Wang < >> wangg...@gmail.com >> > > >> > > > > wrote: >> > > > > > > >> > > > > > >> Hey John, >> > > > > > >> >> > > > > > >> Obviously I'm too lazy on email replying diligence compared >> with >> > > you >> > > > > :) >> > > > > > >> Will try to reply them separately: >> > > > > > >> >> > > > > > >> >> > > > > > >> ------------------------------------------------------------ >> > > > > > ----------------- >> > > > > > >> >> > > > > > >> To reply your email on "Mon, Jul 2, 2018 at 8:23 AM": >> > > > > > >> >> > > > > > >> I'm aware of this use case, but again, the concern is that, >> in >> > > this >> > > > > > setting >> > > > > > >> in order to let the window be queryable for 30 days, we will >> > > > actually >> > > > > > >> process data as old as 30 days as well, while most of the >> late >> > > > updates >> > > > > > >> beyond 5 minutes would be discarded anyways. Personally I >> think >> > > for >> > > > > the >> > > > > > >> final update scenario, the ideal situation users would want >> is >> > > that >> > > > > "do >> > > > > > not >> > > > > > >> process any data that is less than 5 minutes, and of course >> no >> > > > update >> > > > > > >> records to the downstream later than 5 minutes either; but >> > retain >> > > > the >> > > > > > >> window to be queryable for 30 days". And by doing that the >> final >> > > > > window >> > > > > > >> snapshot would also be aligned with the update stream as >> well. >> > In >> > > > > other >> > > > > > >> words, among these three periods: >> > > > > > >> >> > > > > > >> 1) the retention length of the window / table. >> > > > > > >> 2) the late records acceptance for updating the window. >> > > > > > >> 3) the late records update to be sent downstream. >> > > > > > >> >> > > > > > >> Final update use cases would naturally want 2) = 3), while 1) >> > may >> > > be >> > > > > > >> different and larger, while what we provide now is that 1) = >> 2), >> > > > which >> > > > > > >> could be different and in practice larger than 3), hence not >> the >> > > > most >> > > > > > >> intuitive for their needs. >> > > > > > >> >> > > > > > >> >> > > > > > >> >> > > > > > >> ------------------------------------------------------------ >> > > > > > ----------------- >> > > > > > >> >> > > > > > >> To reply your email on "Mon, Jul 2, 2018 at 10:27 AM": >> > > > > > >> >> > > > > > >> I'd like option 2) over option 1) better as well from >> > programming >> > > > pov. >> > > > > > But >> > > > > > >> I'm wondering if option 2) would provide the above semantics >> or >> > it >> > > > is >> > > > > > still >> > > > > > >> coupling 1) with 2) as well ? >> > > > > > >> >> > > > > > >> >> > > > > > >> >> > > > > > >> Guozhang >> > > > > > >> >> > > > > > >> >> > > > > > >> >> > > > > > >> >> > > > > > >> On Mon, Jul 2, 2018 at 1:08 PM, John Roesler < >> j...@confluent.io >> > > >> > > > > wrote: >> > > > > > >> >> > > > > > >>> In fact, to push the idea further (which IIRC is what >> Matthias >> > > > > > originally >> > > > > > >>> proposed), if we can accept "Suppression#finalResultsOnly" >> in >> > my >> > > > last >> > > > > > >>> email, then we could also consider whether to eliminate >> > > > > > >>> "suppressLateEvents" entirely. >> > > > > > >>> >> > > > > > >>> We could always add it later, but you've both expressed >> doubt >> > > that >> > > > > > there >> > > > > > >>> are practical use cases for it outside of final-results. >> > > > > > >>> >> > > > > > >>> -John >> > > > > > >>> >> > > > > > >>> On Mon, Jul 2, 2018 at 12:27 PM John Roesler < >> > j...@confluent.io> >> > > > > > wrote: >> > > > > > >>> >> > > > > > >>>> Hi again, Guozhang ;) Here's the second part of my >> response... >> > > > > > >>>> >> > > > > > >>>> It seems like your main concern is: "if I'm a user who >> wants >> > > final >> > > > > > >> update >> > > > > > >>>> semantics, how complicated is it for me to get it?" >> > > > > > >>>> >> > > > > > >>>> I think we have to assume that people don't always have >> time >> > to >> > > > > become >> > > > > > >>>> deeply familiar with all the nuances of a programming >> > > environment >> > > > > > >> before >> > > > > > >>>> they use it. Especially if they're evaluating several >> > frameworks >> > > > for >> > > > > > >>> their >> > > > > > >>>> use case, it's very valuable to make it as obvious as >> possible >> > > how >> > > > > to >> > > > > > >>>> accomplish various computations with Streams. >> > > > > > >>>> >> > > > > > >>>> To me the biggest question is whether with a fresh >> > perspective, >> > > > > people >> > > > > > >>>> would say "oh, I get it, I have to bound my lateness and >> > > suppress >> > > > > > >>>> intermediate updates, and of course I'll get only the final >> > > > > result!", >> > > > > > >> or >> > > > > > >>> if >> > > > > > >>>> it's more like "wtf? all I want is the final result, what >> are >> > > all >> > > > > > these >> > > > > > >>>> parameters?". >> > > > > > >>>> >> > > > > > >>>> I was talking with Matthias a while back, and he had an >> idea >> > > that >> > > > I >> > > > > > >> think >> > > > > > >>>> can help, which is to essentially set up a final-result >> recipe >> > > in >> > > > > > >>> addition >> > > > > > >>>> to the raw parameters. I previously thought that it >> wouldn't >> > be >> > > > > > >> possible >> > > > > > >>> to >> > > > > > >>>> restrict its usage to Windowed KTables, but thinking about >> it >> > > > again >> > > > > > >> this >> > > > > > >>>> weekend, I have a couple of ideas: >> > > > > > >>>> >> > > > > > >>>> ================ >> > > > > > >>>> = 1. Static Wrapper = >> > > > > > >>>> ================ >> > > > > > >>>> We can define an extra static function that "wraps" a >> KTable >> > > with >> > > > > > >>>> final-result semantics. >> > > > > > >>>> >> > > > > > >>>> public static <K extends Windowed, V> KTable<K, V> >> > > > finalResultsOnly( >> > > > > > >>>> final KTable<K, V> windowedKTable, >> > > > > > >>>> final Duration maxAllowedLateness, >> > > > > > >>>> final Suppression.BufferFullStrategy bufferFullStrategy) >> { >> > > > > > >>>> return windowedKTable.suppress( >> > > > > > >>>> Suppression.suppressLateEvents(maxAllowedLateness) >> > > > > > >>>> .suppressIntermediateEvents( >> > > > > > >>>> IntermediateSuppression >> > > > > > >>>> .emitAfter(maxAllowedLateness) >> > > > > > >>>> .bufferFullStrategy( >> > bufferFullStrategy) >> > > > > > >>>> ) >> > > > > > >>>> ); >> > > > > > >>>> } >> > > > > > >>>> >> > > > > > >>>> Because windowedKTable is a parameter, the static function >> can >> > > > > easily >> > > > > > >>>> impose an extra bound on the key type, that it extends >> > Windowed. >> > > > > This >> > > > > > >>> would >> > > > > > >>>> make "final results only" only available on windowed >> ktables. >> > > > > > >>>> >> > > > > > >>>> Here's how it would look to use: >> > > > > > >>>> >> > > > > > >>>> final KTable<Windowed<Integer>, Long> windowCounts = ... >> > > > > > >>>> final KTable<Windowed<Integer>, Long> finalCounts = >> > > > > > >>>> finalResultsOnly( >> > > > > > >>>> windowCounts, >> > > > > > >>>> Duration.ofMinutes(10), >> > > > > > >>>> Suppression.BufferFullStrategy.SHUT_DOWN >> > > > > > >>>> ); >> > > > > > >>>> >> > > > > > >>>> Trying to use it on a non-windowed KTable yields: >> > > > > > >>>> >> > > > > > >>>>> Error:(129, 35) java: method finalResultsOnly in class >> > > > > > >>>>> org.apache.kafka.streams.kstream.internals. >> > KTableAggregateTest >> > > > > > cannot >> > > > > > >>> be >> > > > > > >>>>> applied to given types; >> > > > > > >>>>> required: >> > > > > > >>>>> org.apache.kafka.streams.kstream.KTable<K,V>,java.time. >> > > > > > >>> Duration,org.apache.kafka.streams.kstream.Suppression. >> > > > > > BufferFullStrategy >> > > > > > >>>>> found: >> > > > > > >>>>> org.apache.kafka.streams.kstream.KTable<java.lang. >> > > > > > >>> String,java.lang.String>,java.time.Duration,org.apache. >> > > > > > >>> kafka.streams.kstream.Suppression.BufferFullStrategy >> > > > > > >>>>> reason: inference variable K has incompatible bounds >> > > > > > >>>>> equality constraints: java.lang.String >> > > > > > >>>>> upper bounds: >> org.apache.kafka.streams.kstream.Windowed >> > > > > > >>>> >> > > > > > >>>> >> > > > > > >>>> >> > > > > > >>>> ================================================= >> > > > > > >>>> = 2. Add <K,V> parameters and recipe method to Suppression >> = >> > > > > > >>>> ================================================= >> > > > > > >>>> >> > > > > > >>>> By adding K,V parameters to Suppression, we can provide a >> > > > similarly >> > > > > > >>>> bounded config method directly on the Suppression class: >> > > > > > >>>> >> > > > > > >>>> public static <K extends Windowed, V> Suppression<K, V> >> > > > > > >>>> finalResultsOnly(final Duration maxAllowedLateness, final >> > > > > > >>>> BufferFullStrategy bufferFullStrategy) { >> > > > > > >>>> return Suppression >> > > > > > >>>> .<K, V>suppressLateEvents(maxAllowedLateness) >> > > > > > >>>> .suppressIntermediateEvents(IntermediateSuppression >> > > > > > >>>> .emitAfter(maxAllowedLateness) >> > > > > > >>>> .bufferFullStrategy(bufferFullStrategy) >> > > > > > >>>> ); >> > > > > > >>>> } >> > > > > > >>>> >> > > > > > >>>> Then, here's how it would look to use it: >> > > > > > >>>> >> > > > > > >>>> final KTable<Windowed<Integer>, Long> windowCounts = ... >> > > > > > >>>> final KTable<Windowed<Integer>, Long> finalCounts = >> > > > > > >>>> windowCounts.suppress( >> > > > > > >>>> Suppression.finalResultsOnly( >> > > > > > >>>> Duration.ofMinutes(10) >> > > > > > >>>> Suppression.BufferFullStrategy.SHUT_DOWN >> > > > > > >>>> ) >> > > > > > >>>> ); >> > > > > > >>>> >> > > > > > >>>> Trying to use it on a non-windowed ktable yields: >> > > > > > >>>> >> > > > > > >>>>> Error:(127, 35) java: method finalResultsOnly in class >> > > > > > >>>>> org.apache.kafka.streams.kstream.Suppression<K,V> cannot >> be >> > > > applied >> > > > > > to >> > > > > > >>>>> given types; >> > > > > > >>>>> required: >> > > > > > >>>>> java.time.Duration,org.apache.kafka.streams.kstream. >> > > > > > >>> Suppression.BufferFullStrategy >> > > > > > >>>>> found: >> > > > > > >>>>> java.time.Duration,org.apache.kafka.streams.kstream. >> > > > > > >>> Suppression.BufferFullStrategy >> > > > > > >>>>> reason: explicit type argument java.lang.String does not >> > > > conform >> > > > > to >> > > > > > >>>>> declared bound(s) >> org.apache.kafka.streams.kstream.Windowed >> > > > > > >>>> >> > > > > > >>>> >> > > > > > >>>> >> > > > > > >>>> ============ >> > > > > > >>>> = Downsides = >> > > > > > >>>> ============ >> > > > > > >>>> >> > > > > > >>>> Of course, there's a downside either way: >> > > > > > >>>> * for 1: this "wrapper" interaction would be the first in >> the >> > > > DSL. >> > > > > Is >> > > > > > >> it >> > > > > > >>>> too strange, and how discoverable would it be? >> > > > > > >>>> * for 2: adding those type parameters to Suppression will >> > force >> > > > all >> > > > > > >>>> callers to provide them in the event of a chained >> construction >> > > > > because >> > > > > > >>> Java >> > > > > > >>>> doesn't do RHS recursive type inference. This is already >> > visible >> > > > in >> > > > > > >> other >> > > > > > >>>> parts of the Streams DSL. For example, often calls to >> > > Materialized >> > > > > > >>> builders >> > > > > > >>>> have to provide seemingly obvious type bounds. >> > > > > > >>>> >> > > > > > >>>> ============ >> > > > > > >>>> = Conclusion = >> > > > > > >>>> ============ >> > > > > > >>>> >> > > > > > >>>> I think option 2 is more "normal" and discoverable. It does >> > > have a >> > > > > > >>>> downside, but it's one that's pre-existing elsewhere in the >> > DSL. >> > > > > > >>>> >> > > > > > >>>> WDYT? Would the addition of this "recipe" method to >> > Suppression >> > > > > > resolve >> > > > > > >>>> your concern? >> > > > > > >>>> >> > > > > > >>>> Thanks again, >> > > > > > >>>> -John >> > > > > > >>>> >> > > > > > >>>> On Sun, Jul 1, 2018 at 11:24 PM Guozhang Wang < >> > > wangg...@gmail.com >> > > > > >> > > > > > >>> wrote: >> > > > > > >>>> >> > > > > > >>>>> Hi John, >> > > > > > >>>>> >> > > > > > >>>>> Regarding the metrics: yeah I think I'm with you that the >> > > dropped >> > > > > > >>> records >> > > > > > >>>>> due to window retention or emit suppression policies >> should >> > be >> > > > > > >> recorded >> > > > > > >>>>> differently, and using this KIP's proposed metric would be >> > > fine. >> > > > If >> > > > > > >> you >> > > > > > >>>>> also think we can use this KIP's proposed metrics to cover >> > the >> > > > > window >> > > > > > >>>>> retention cased skipping records, then we can include the >> > > changes >> > > > > in >> > > > > > >>> this >> > > > > > >>>>> KIP as well. >> > > > > > >>>>> >> > > > > > >>>>> Regarding the current proposal, I'm actually not too >> worried >> > > > about >> > > > > > the >> > > > > > >>>>> inconsistency between query semantics and downstream emit >> > > > > semantics. >> > > > > > >> For >> > > > > > >>>>> queries, we will always return the current running >> results of >> > > the >> > > > > > >>> windows, >> > > > > > >>>>> being it partial or final results depending on the window >> > > > retention >> > > > > > >> time >> > > > > > >>>>> anyways, which has nothing to do whether the emitted >> stream >> > > > should >> > > > > be >> > > > > > >>> one >> > > > > > >>>>> final output per key or not. I also agree that having a >> > unified >> > > > > > >>> operation >> > > > > > >>>>> is generally better for users to focus on leveraging that >> one >> > > > only >> > > > > > >> than >> > > > > > >>>>> learning about two set of operations. The only question I >> had >> > > is, >> > > > > for >> > > > > > >>>>> final >> > > > > > >>>>> updates of window stores, if it is a bit awkward to >> > understand >> > > > the >> > > > > > >>>>> configuration combo. Thinking about this more, I think my >> > root >> > > > > worry >> > > > > > >> in >> > > > > > >>>>> the >> > > > > > >>>>> "suppressLateEvents" call for windowed tables, since from >> a >> > > user >> > > > > > >>>>> perspective: if my retention time is X which means "pay >> the >> > > cost >> > > > to >> > > > > > >>> allow >> > > > > > >>>>> late records up to X to still be applied updating the >> > tables", >> > > > why >> > > > > > >>> would I >> > > > > > >>>>> ever want to suppressLateEvents by Y ( < X), to say "do >> not >> > > send >> > > > > the >> > > > > > >>>>> updates up to Y, which means the downstream operator or >> sink >> > > > topic >> > > > > > for >> > > > > > >>>>> this >> > > > > > >>>>> stream would actually see a truncated update stream while >> > I've >> > > > paid >> > > > > > >>> larger >> > > > > > >>>>> cost for that"; and of course, Y > X would not make sense >> > > either >> > > > as >> > > > > > >> you >> > > > > > >>>>> would not see any updates later than X anyways. So in >> all, my >> > > > > feeling >> > > > > > >> is >> > > > > > >>>>> that it makes less sense for windowed table's >> > > > "suppressLateEvents" >> > > > > > >> with >> > > > > > >>> a >> > > > > > >>>>> parameter that is not equal to the window retention, and >> > > opening >> > > > > the >> > > > > > >>> door >> > > > > > >>>>> in the current proposal may confuse people with that. >> > > > > > >>>>> >> > > > > > >>>>> Again, above is just a subjective opinion and probably we >> can >> > > > also >> > > > > > >> bring >> > > > > > >>>>> up >> > > > > > >>>>> some scenarios that users does want to set X != Y.. but >> > > > personally >> > > > > I >> > > > > > >>> feel >> > > > > > >>>>> that even if the semantics for this scenario if intuitive >> for >> > > > user >> > > > > to >> > > > > > >>>>> understand, doe that really make sense and should we >> really >> > > open >> > > > > the >> > > > > > >>> door >> > > > > > >>>>> for it. So I think maybe separating the final update in a >> > > > separate >> > > > > > >> API's >> > > > > > >>>>> benefits may overwhelm the advantage of having one uniform >> > > > > > definition. >> > > > > > >>> And >> > > > > > >>>>> for my alternative proposal, the rationale was from both >> my >> > > > concern >> > > > > > >>> about >> > > > > > >>>>> "suppressLateEvents" for windowed store, and Matthias' >> > question >> > > > > about >> > > > > > >>>>> "suppressLateEvents" for non-windowed stores, that if it >> is >> > > less >> > > > > > >>>>> meaningful >> > > > > > >>>>> for both, we can consider removing it completely and only >> do >> > > > > > >>>>> "IntermediateSuppression" in Suppress instead. >> > > > > > >>>>> >> > > > > > >>>>> So I'd summarize my thoughts in the following questions: >> > > > > > >>>>> >> > > > > > >>>>> 1. Does "suppressLateEvents" with parameter Y != X (window >> > > > > retention >> > > > > > >>> time) >> > > > > > >>>>> for windowed stores make sense in practice? >> > > > > > >>>>> 2. Does "suppressLateEvents" with any parameter Y for >> > > > non-windowed >> > > > > > >>> stores >> > > > > > >>>>> make sense in practice? >> > > > > > >>>>> >> > > > > > >>>>> >> > > > > > >>>>> >> > > > > > >>>>> Guozhang >> > > > > > >>>>> >> > > > > > >>>>> >> > > > > > >>>>> On Fri, Jun 29, 2018 at 2:26 PM, Bill Bejeck < >> > > bbej...@gmail.com> >> > > > > > >> wrote: >> > > > > > >>>>> >> > > > > > >>>>>> Thanks for the explanation, that does make sense. I have >> > some >> > > > > > >>>>> questions on >> > > > > > >>>>>> operations, but I'll just wait for the PR and tests. >> > > > > > >>>>>> >> > > > > > >>>>>> Thanks, >> > > > > > >>>>>> Bill >> > > > > > >>>>>> >> > > > > > >>>>>> On Wed, Jun 27, 2018 at 8:14 PM John Roesler < >> > > j...@confluent.io >> > > > > >> > > > > > >>> wrote: >> > > > > > >>>>>> >> > > > > > >>>>>>> Hi Bill, >> > > > > > >>>>>>> >> > > > > > >>>>>>> Thanks for the review! >> > > > > > >>>>>>> >> > > > > > >>>>>>> Your question is very much applicable to the KIP and >> not at >> > > all >> > > > > an >> > > > > > >>>>>>> implementation detail. Thanks for bringing it up. >> > > > > > >>>>>>> >> > > > > > >>>>>>> I'm proposing not to change the existing caches and >> > > > > configurations >> > > > > > >>> at >> > > > > > >>>>> all >> > > > > > >>>>>>> (for now). >> > > > > > >>>>>>> >> > > > > > >>>>>>> Imagine you have a topology like this: >> > > > > > >>>>>>> commit.interval.ms = 100 >> > > > > > >>>>>>> >> > > > > > >>>>>>> (ktable1 (cached)) -> (suppress emitAfter 200) >> > > > > > >>>>>>> >> > > > > > >>>>>>> The first ktable (ktable1) will respect the commit >> interval >> > > and >> > > > > > >>> buffer >> > > > > > >>>>>>> events for 100ms before logging, storing, or forwarding >> > them >> > > > > > >> (IIRC). >> > > > > > >>>>>>> Therefore, the second ktable (suppress) will only see >> the >> > > > events >> > > > > > >> at >> > > > > > >>> a >> > > > > > >>>>>> rate >> > > > > > >>>>>>> of once per 100ms. It will apply its own buffering, and >> > emit >> > > > once >> > > > > > >>> per >> > > > > > >>>>>> 200ms >> > > > > > >>>>>>> This case is pretty trivial because the suppress time >> is a >> > > > > > >> multiple >> > > > > > >>> of >> > > > > > >>>>>> the >> > > > > > >>>>>>> commit interval. >> > > > > > >>>>>>> >> > > > > > >>>>>>> When it's not an integer multiple, you'll get behavior >> like >> > > in >> > > > > > >> this >> > > > > > >>>>>> marble >> > > > > > >>>>>>> diagram: >> > > > > > >>>>>>> >> > > > > > >>>>>>> >> > > > > > >>>>>>> <-(k:1)--(k:2)--(k:3)--(k:4)--(k:5)--(k:6)-> >> > > > > > >>>>>>> >> > > > > > >>>>>>> [ KTable caching with commit interval = 2 ] >> > > > > > >>>>>>> >> > > > > > >>>>>>> <--------(k:2)---------(k:4)---------(k:6)-> >> > > > > > >>>>>>> >> > > > > > >>>>>>> [ suppress with emitAfter = 3 ] >> > > > > > >>>>>>> >> > > > > > >>>>>>> <---------------(k:2)----------------(k:6)-> >> > > > > > >>>>>>> >> > > > > > >>>>>>> >> > > > > > >>>>>>> If this behavior isn't desired (for example, if you >> wanted >> > to >> > > > > emit >> > > > > > >>>>> (k:3) >> > > > > > >>>>>> at >> > > > > > >>>>>>> time 3, I'd recommend setting the >> > "cache.max.bytes.buffering" >> > > > to >> > > > > 0 >> > > > > > >>> or >> > > > > > >>>>>>> modifying the topology to disable caching. Then, the >> > behavior >> > > > is >> > > > > > >>> more >> > > > > > >>>>>>> simply determined just by the suppress operator. >> > > > > > >>>>>>> >> > > > > > >>>>>>> Does that seem right to you? >> > > > > > >>>>>>> >> > > > > > >>>>>>> >> > > > > > >>>>>>> Regarding the changelogs, because the suppression >> operator >> > > > hangs >> > > > > > >>> onto >> > > > > > >>>>>>> events for a while, it will need its own changelog. The >> > > > changelog >> > > > > > >>>>>>> should represent the current state of the buffer at all >> > > times. >> > > > So >> > > > > > >>> when >> > > > > > >>>>>> the >> > > > > > >>>>>>> suppress operator sees (k:2), for example, it will log >> > (k:2). >> > > > > When >> > > > > > >>> it >> > > > > > >>>>>>> later gets to time 3, it's time to emit (k:2) >> downstream. >> > > > Because >> > > > > > >> k >> > > > > > >>>>> is no >> > > > > > >>>>>>> longer buffered, the suppress operator will log >> (k:null). >> > > Thus, >> > > > > > >> when >> > > > > > >>>>>>> recovering, >> > > > > > >>>>>>> it can rebuild the buffer by reading its changelog. >> > > > > > >>>>>>> >> > > > > > >>>>>>> What do you think about this? >> > > > > > >>>>>>> >> > > > > > >>>>>>> Thanks, >> > > > > > >>>>>>> -John >> > > > > > >>>>>>> >> > > > > > >>>>>>> >> > > > > > >>>>>>> >> > > > > > >>>>>>> On Wed, Jun 27, 2018 at 4:16 PM Bill Bejeck < >> > > bbej...@gmail.com >> > > > > >> > > > > > >>>>> wrote: >> > > > > > >>>>>>> >> > > > > > >>>>>>>> Hi John, thanks for the KIP. >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> Early on in the KIP, you mention the current approaches >> > for >> > > > > > >>>>> controlling >> > > > > > >>>>>>> the >> > > > > > >>>>>>>> rate of downstream records from a KTable, cache size >> > > > > > >> configuration >> > > > > > >>>>> and >> > > > > > >>>>>>>> commit time. >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> Will these configuration parameters still be in effect >> for >> > > > > > >> tables >> > > > > > >>>>> that >> > > > > > >>>>>>>> don't use suppression? For tables taking advantage of >> > > > > > >>> suppression, >> > > > > > >>>>>> will >> > > > > > >>>>>>>> these configurations have no impact? >> > > > > > >>>>>>>> This last question may be to implementation specific >> but >> > if >> > > > the >> > > > > > >>>>>> requested >> > > > > > >>>>>>>> suppression time is longer than the specified commit >> time, >> > > > will >> > > > > > >>> the >> > > > > > >>>>>>> latest >> > > > > > >>>>>>>> record in the suppression buffer get stored in a >> > changelog? >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> Thanks, >> > > > > > >>>>>>>> Bill >> > > > > > >>>>>>>> >> > > > > > >>>>>>>> On Wed, Jun 27, 2018 at 3:04 PM John Roesler < >> > > > j...@confluent.io >> > > > > > >>> >> > > > > > >>>>>> wrote: >> > > > > > >>>>>>>> >> > > > > > >>>>>>>>> Thanks for the feedback, Matthias, >> > > > > > >>>>>>>>> >> > > > > > >>>>>>>>> It seems like in straightforward relational processing >> > > cases, >> > > > > > >> it >> > > > > > >>>>>> would >> > > > > > >>>>>>>> not >> > > > > > >>>>>>>>> make sense to bound the lateness of KTables. In >> general, >> > it >> > > > > > >>> seems >> > > > > > >>>>>>> better >> > > > > > >>>>>>>> to >> > > > > > >>>>>>>>> have "guard rails" in place that make it easier to >> write >> > > > > > >>> sensible >> > > > > > >>>>>>>> programs >> > > > > > >>>>>>>>> than insensible ones. >> > > > > > >>>>>>>>> >> > > > > > >>>>>>>>> But I'm still going to argue in favor of keeping it >> for >> > all >> > > > > > >>>>> KTables >> > > > > > >>>>>> ;) >> > > > > > >>>>>>>>> >> > > > > > >>>>>>>>> 1. I believe it is simpler to understand the operator >> if >> > it >> > > > > > >> has >> > > > > > >>>>> one >> > > > > > >>>>>>>> uniform >> > > > > > >>>>>>>>> definition, regardless of context. It's well defined >> and >> > > > > > >>> intuitive >> > > > > > >>>>>> what >> > > > > > >>>>>>>>> will happen when you use late-event suppression on a >> > > KTable, >> > > > > > >> so >> > > > > > >>> I >> > > > > > >>>>>> think >> > > > > > >>>>>>>>> nothing surprising or dangerous will happen in that >> case. >> > > > From >> > > > > > >>> my >> > > > > > >>>>>>>>> perspective, having two sets of allowed operations is >> > > > actually >> > > > > > >>> an >> > > > > > >>>>>>>> increase >> > > > > > >>>>>>>>> in cognitive complexity. >> > > > > > >>>>>>>>> >> > > > > > >>>>>>>>> 2. To me, it's not crazy to use the operator this way. >> > For >> > > > > > >>>>> example, >> > > > > > >>>>>> in >> > > > > > >>>>>>>> lieu >> > > > > > >>>>>>>>> of full-featured timestamp semantics, I can implement >> > MVCC >> > > > > > >>>>> behavior >> > > > > > >>>>>>> when >> > > > > > >>>>>>>>> building a KTable by >> "suppressLateEvents(Duration.ZERO)". >> > I >> > > > > > >>>>> suspect >> > > > > > >>>>>>> that >> > > > > > >>>>>>>>> there are other, non-obvious applications of >> suppressing >> > > late >> > > > > > >>>>> events >> > > > > > >>>>>> on >> > > > > > >>>>>>>>> KTables. >> > > > > > >>>>>>>>> >> > > > > > >>>>>>>>> 3. Not to get too much into implementation details in >> a >> > KIP >> > > > > > >>>>>> discussion, >> > > > > > >>>>>>>> but >> > > > > > >>>>>>>>> if we did want to make late-event suppression >> available >> > > only >> > > > > > >> on >> > > > > > >>>>>>> windowed >> > > > > > >>>>>>>>> KTables, we have two enforcement options: >> > > > > > >>>>>>>>> a. check when we build the topology - this would be >> > > simple >> > > > > > >> to >> > > > > > >>>>>>>> implement, >> > > > > > >>>>>>>>> but would be a runtime check. Hopefully, people write >> > tests >> > > > > > >> for >> > > > > > >>>>> their >> > > > > > >>>>>>>>> topology before deploying them, so the feedback loop >> > isn't >> > > > > > >>>>>>> instantaneous, >> > > > > > >>>>>>>>> but it's not too long either. >> > > > > > >>>>>>>>> b. add a new WindowedKTable type - this would be a >> > > compile >> > > > > > >>> time >> > > > > > >>>>>>> check, >> > > > > > >>>>>>>>> but would also be substantial increase of both >> interface >> > > and >> > > > > > >>> code >> > > > > > >>>>>>>>> complexity. >> > > > > > >>>>>>>>> >> > > > > > >>>>>>>>> We should definitely strive to have guard rails >> > protecting >> > > > > > >>> against >> > > > > > >>>>>>>>> surprising or dangerous behavior. Protecting against >> > > programs >> > > > > > >>>>> that we >> > > > > > >>>>>>>> don't >> > > > > > >>>>>>>>> currently predict is a lesser benefit, and I think we >> can >> > > put >> > > > > > >> up >> > > > > > >>>>>> guard >> > > > > > >>>>>>>>> rails on a case-by-case basis for that. It seems like >> the >> > > > > > >>>>> increase in >> > > > > > >>>>>>>>> cognitive (and potentially code and interface) >> complexity >> > > > > > >> makes >> > > > > > >>> me >> > > > > > >>>>>>> think >> > > > > > >>>>>>>> we >> > > > > > >>>>>>>>> should skip this case. >> > > > > > >>>>>>>>> >> > > > > > >>>>>>>>> What do you think? >> > > > > > >>>>>>>>> >> > > > > > >>>>>>>>> Thanks, >> > > > > > >>>>>>>>> -John >> > > > > > >>>>>>>>> >> > > > > > >>>>>>>>> On Wed, Jun 27, 2018 at 11:59 AM Matthias J. Sax < >> > > > > > >>>>>>> matth...@confluent.io> >> > > > > > >>>>>>>>> wrote: >> > > > > > >>>>>>>>> >> > > > > > >>>>>>>>>> Thanks for the KIP John. >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>>> One initial comments about the last example "Bounded >> > > > > > >>> lateness": >> > > > > > >>>>>> For a >> > > > > > >>>>>>>>>> non-windowed KTable bounding the lateness does not >> > really >> > > > > > >> make >> > > > > > >>>>>> sense, >> > > > > > >>>>>>>>>> does it? >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>>> Thus, I am wondering if we should allow >> > > > > > >> `suppressLateEvents()` >> > > > > > >>>>> for >> > > > > > >>>>>>> this >> > > > > > >>>>>>>>>> case? It seems to be better to only allow it for >> > > > > > >>>>> windowed-KTables. >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>>> -Matthias >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>>> On 6/27/18 8:53 AM, Ted Yu wrote: >> > > > > > >>>>>>>>>>> I noticed this (lack of primary parameter) as well. >> > > > > > >>>>>>>>>>> >> > > > > > >>>>>>>>>>> What you gave as new example is semantically the >> same >> > as >> > > > > > >>> what >> > > > > > >>>>> I >> > > > > > >>>>>>>>>> suggested. >> > > > > > >>>>>>>>>>> So it is good by me. >> > > > > > >>>>>>>>>>> >> > > > > > >>>>>>>>>>> Thanks >> > > > > > >>>>>>>>>>> >> > > > > > >>>>>>>>>>> On Wed, Jun 27, 2018 at 7:31 AM, John Roesler < >> > > > > > >>>>> j...@confluent.io >> > > > > > >>>>>>> >> > > > > > >>>>>>>>> wrote: >> > > > > > >>>>>>>>>>> >> > > > > > >>>>>>>>>>>> Thanks for taking look, Ted, >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> I agree this is a departure from the conventions of >> > > > > > >> Streams >> > > > > > >>>>> DSL. >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> Most of our config objects have one or two >> "required" >> > > > > > >>>>>> parameters, >> > > > > > >>>>>>>>> which >> > > > > > >>>>>>>>>> fit >> > > > > > >>>>>>>>>>>> naturally with the static factory method approach. >> > > > > > >>>>> TimeWindow, >> > > > > > >>>>>> for >> > > > > > >>>>>>>>>> example, >> > > > > > >>>>>>>>>>>> requires a size parameter, so we can naturally say >> > > > > > >>>>>>>>> TimeWindows.of(size). >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> I think in the case of a suppression, there's >> really >> > no >> > > > > > >>>>> "core" >> > > > > > >>>>>>>>>> parameter, >> > > > > > >>>>>>>>>>>> and "Suppression.of()" seems sillier than "new >> > > > > > >>>>> Suppression()". I >> > > > > > >>>>>>>> think >> > > > > > >>>>>>>>>> that >> > > > > > >>>>>>>>>>>> Suppression.of(duration) would be ambiguous, since >> > there >> > > > > > >>> are >> > > > > > >>>>>> many >> > > > > > >>>>>>>>>> durations >> > > > > > >>>>>>>>>>>> that we can configure. >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> However, thinking about it again, I suppose that I >> can >> > > > > > >> give >> > > > > > >>>>> each >> > > > > > >>>>>>>>>>>> configuration method a static version, which would >> let >> > > > > > >> you >> > > > > > >>>>>> replace >> > > > > > >>>>>>>>> "new >> > > > > > >>>>>>>>>>>> Suppression()." with "Suppression." in all the >> > examples. >> > > > > > >>>>>>> Basically, >> > > > > > >>>>>>>>>> instead >> > > > > > >>>>>>>>>>>> of "of()", we'd support any of the methods I >> listed. >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> For example: >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> windowCounts >> > > > > > >>>>>>>>>>>> .suppress( >> > > > > > >>>>>>>>>>>> Suppression >> > > > > > >>>>>>>>>>>> .suppressLateEvents(Duration. >> > ofMinutes(10)) >> > > > > > >>>>>>>>>>>> .suppressIntermediateEvents( >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>> >> IntermediateSuppression.emitAfter(Duration.ofMinutes( >> > 10)) >> > > > > > >>>>>>>>>>>> ) >> > > > > > >>>>>>>>>>>> ); >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> Does that seem better? >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> Thanks, >> > > > > > >>>>>>>>>>>> -John >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> On Wed, Jun 27, 2018 at 12:44 AM Ted Yu < >> > > > > > >>> yuzhih...@gmail.com >> > > > > > >>>>>> >> > > > > > >>>>>>>> wrote: >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> I started to read this KIP which contains a lot of >> > > > > > >>>>> materials. >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> One suggestion: >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> .suppress( >> > > > > > >>>>>>>>>>>>> new Suppression() >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> Do you think it would be more consistent with the >> > rest >> > > > > > >> of >> > > > > > >>>>>> Streams >> > > > > > >>>>>>>>> data >> > > > > > >>>>>>>>>>>>> structures by supporting `of` ? >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> Suppression.of(Duration.ofMinutes(10)) >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> Cheers >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> On Tue, Jun 26, 2018 at 1:11 PM, John Roesler < >> > > > > > >>>>>> j...@confluent.io >> > > > > > >>>>>>>> >> > > > > > >>>>>>>>>> wrote: >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>> Hello devs and users, >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>> Please take some time to consider this proposal >> for >> > > > > > >> Kafka >> > > > > > >>>>>>> Streams: >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>> KIP-328: Ability to suppress updates for KTables >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>> link: >> https://cwiki.apache.org/confluence/x/sQU0BQ >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>> The basic idea is to provide: >> > > > > > >>>>>>>>>>>>>> * more usable control over update rate (vs the >> > current >> > > > > > >>>>> state >> > > > > > >>>>>>> store >> > > > > > >>>>>>>>>>>>> caches) >> > > > > > >>>>>>>>>>>>>> * the final-result-for-windowed-computations >> > feature >> > > > > > >>> which >> > > > > > >>>>>>> several >> > > > > > >>>>>>>>>>>> people >> > > > > > >>>>>>>>>>>>>> have requested >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>> I look forward to your feedback! >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>>> Thanks, >> > > > > > >>>>>>>>>>>>>> -John >> > > > > > >>>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>>> >> > > > > > >>>>>>>>>>>> >> > > > > > >>>>>>>>>>> >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>>> >> > > > > > >>>>>>>>> >> > > > > > >>>>>>>> >> > > > > > >>>>>>> >> > > > > > >>>>>> >> > > > > > >>>>> >> > > > > > >>>>> >> > > > > > >>>>> >> > > > > > >>>>> -- >> > > > > > >>>>> -- Guozhang >> > > > > > >>>>> >> > > > > > >>>> >> > > > > > >>> >> > > > > > >> >> > > > > > >> >> > > > > > >> >> > > > > > >> -- >> > > > > > >> -- Guozhang >> > > > > > >> >> > > > > > > >> > > > > > >> > > > > > >> > > > > >> > > > > >> > > > > -- >> > > > > -- Guozhang >> > > > > >> > > > >> > > >> > > >> > > >> > > -- >> > > -- Guozhang >> > > >> > >> >> >> >> -- >> -- Guozhang >> >