Hi all, I have updated KIP-328 with all the feedback I've gotten so far. Please take another look and let me know what you think!
Thanks, -John On Wed, Jul 11, 2018 at 12:28 AM Guozhang Wang <wangg...@gmail.com> wrote: > That is a good point.. > > I cannot think of a better option than documentation and warning, and also > given that we'd probably better not reusing the function name `until` for > close time. > > > Guozhang > > > On Tue, Jul 10, 2018 at 3:31 PM, John Roesler <j...@confluent.io> wrote: > > > I had some opportunity to reflect on the default for close time today... > > > > Note that the current "close time" is equal to the retention time, and > > therefore "close" today shares the default retention of 24h. > > > > It would definitely break any application that today specifies a > retention > > time to set close shorter than that time. It's also likely to break apps > if > > they *don't* set the retention time and rely on the 24h default. So it's > > unfortunate, but I think if "close" isn't set, we should use the > retention > > time instead of a fixed default. > > > > When we ultimately remove the retention time parameter ("until"), we will > > have to set "close" to a default of 24h. > > > > Of course, this has a negative impact on the user of "final results", > since > > they won't see any output at all for retentionTime/24h, and may find this > > confusing. What can we do about this except document it well? Maybe log a > > warning if we see that close wasn't explicitly set while using "final > > results"? > > > > Thanks, > > -John > > > > On Tue, Jul 10, 2018 at 10:46 AM John Roesler <j...@confluent.io> wrote: > > > > > Hi Guozhang, > > > > > > That sounds good to me. I'll include that in the KIP. > > > > > > Thanks, > > > -John > > > > > > On Mon, Jul 9, 2018 at 6:33 PM Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > >> Let me clarify a bit on what I meant about moving `retentionPeriod` to > > >> WindowStoreBuilder: > > >> > > >> In another discussion we had around KIP-319 / 330, that the "retention > > >> period" should not really be a window spec, but only a window store > > spec, > > >> as it only affects how long to retain each window to be queryable > along > > >> with the storage cost. > > >> > > >> More specifically, today the "maintainMs" returned from Windows is > used > > in > > >> three places: > > >> > > >> 1) for windowed aggregations, they are passed in directly into > > >> `Stores.persistentWindows()` as the retention period parameters. For > > this > > >> use case we should just let the WindowStoreBuilder to specify this > value > > >> itself. > > >> > > >> NOTE: It is also returned in the KStreamWindowAggregate processor, to > > >> determine if a received record should be dropped due to its lateness. > We > > >> may need to think of another way to get this value inside the > processor > > >> > > >> 2) for windowed stream-stream join, it is used as the join range > > parameter > > >> but only to check that "windowSizeMs <= retentionPeriodMs". We can do > > this > > >> check at the store builder lever instead of at the processor level. > > >> > > >> > > >> If we can remove its usage in both 1) and 2), then we should be able > to > > >> safely remove this from the `Windows` spec. > > >> > > >> > > >> Guozhang > > >> > > >> > > >> On Mon, Jul 9, 2018 at 3:53 PM, John Roesler <j...@confluent.io> > wrote: > > >> > > >> > Thanks for the reply, Guozhang, > > >> > > > >> > Good! I agree, that is also a good reason, and I actually made use > of > > >> that > > >> > in my tests. I'll update the KIP. > > >> > > > >> > By the way, I chose "allowedLateness" as I was trying to pick a > better > > >> name > > >> > than "close", but I think it's actually the wrong name. We don't > want > > to > > >> > bound the lateness of events in general, only with respect to the > end > > of > > >> > their window. > > >> > > > >> > If we have a window [0,10), with "allowedLateness" of 5, then if we > > get > > >> an > > >> > event with timestamp 3 at time 9, the name implies we'd reject it, > > which > > >> > seems silly. Really, we'd only want to start rejecting that event at > > >> stream > > >> > time 15. > > >> > > > >> > What I meant was more like "allowedLatenessAfterWindowEnd", but > > that's > > >> too > > >> > verbose. I think that "close" + some documentation about what it > means > > >> will > > >> > be better. > > >> > > > >> > 1: "Close" would be measured from the end of the window, so a > > reasonable > > >> > default would be "0". Recall that "close" really only needs to be > > >> specified > > >> > for final results, and a default of 0 would produce the most > intuitive > > >> > results. If folks later discover that they are missing some late > > events, > > >> > they can adjust the parameter accordingly. IMHO, any other value > would > > >> just > > >> > be a guess on our part. > > >> > > > >> > 2a: > > >> > I think you're saying to re-use "until" instead of adding "close" to > > the > > >> > window. > > >> > > > >> > The downside here would be that the semantic change could be more > > >> confusing > > >> > than deprecating "until" and introducing window "close" and a > > >> > "retentionTime" on the store builder. The deprecation is a good, > > >> controlled > > >> > way for us to make sure people are getting the semantics they think > > >> they're > > >> > getting, as well as giving us an opportunity to link people to the > API > > >> they > > >> > should use instead. > > >> > > > >> > I didn't fully understand the second part, but it sounds like you're > > >> > suggesting to add a new "retentionTime" setter to Windows to bridge > > the > > >> gap > > >> > until we add it to the store builder? That seems kind of roundabout > to > > >> me, > > >> > if that's what you meant. We could just immediately add it to the > > store > > >> > builders in the same PR. > > >> > > > >> > 2b: Sounds good to me! > > >> > > > >> > Thanks again, > > >> > -John > > >> > > > >> > > > >> > On Mon, Jul 9, 2018 at 4:55 PM Guozhang Wang <wangg...@gmail.com> > > >> wrote: > > >> > > > >> > > John, > > >> > > > > >> > > Thanks for your replies. As for the two options of the API, I > think > > >> I'm > > >> > > slightly inclined to the first option as well. My motivation is a > > bit > > >> > > different, as I think of the first one maybe more flexible, for > > >> example: > > >> > > > > >> > > KTable<Windowed<..>> table = ... count(); > > >> > > > > >> > > table.toStream().peek(..); // want to peek at the changelog > > stream, > > >> do > > >> > > not care about final results. > > >> > > > > >> > > table.suppress().toStream().to("topic"); // sending to a topic, > > >> want > > >> > to > > >> > > only send the final results. > > >> > > > > >> > > -------------- > > >> > > > > >> > > Besides that, I have a few more minor questions: > > >> > > > > >> > > 1. For "allowedLateness", what should be the default value? I.e. > if > > >> user > > >> > do > > >> > > not specify "allowedLateness" in TimeWindows, what value should we > > >> set? > > >> > > > > >> > > 2. For API names, some personal suggestions here: > > >> > > > > >> > > 2.a) "allowedLateness" -> "until" (semantics changed, and also > > value > > >> is > > >> > > defined as delta on top of window length), where "until" -> > > >> > > "retentionPeriod", and the latter will be removed from `Windows` > to > > ` > > >> > > WindowStoreBuilder` in the future. > > >> > > > > >> > > 2.b) "BufferConfig" -> "Buffered" ? > > >> > > > > >> > > > > >> > > > > >> > > Guozhang > > >> > > > > >> > > > > >> > > On Mon, Jul 9, 2018 at 2:09 PM, John Roesler <j...@confluent.io> > > >> wrote: > > >> > > > > >> > > > Hey Matthias and Guozhang, > > >> > > > > > >> > > > Sorry for the slow reply. I was mulling about your feedback and > > >> > weighing > > >> > > > some ideas in a sketchbook PR: https://github.com/apache/ > > >> > kafka/pull/5337 > > >> > > . > > >> > > > > > >> > > > Your thought about keeping suppression independent of business > > logic > > >> > is a > > >> > > > very good one. I agree that it would make more sense to add some > > >> kind > > >> > of > > >> > > > "window close" concept to the window definition. > > >> > > > > > >> > > > In fact, doing that immediately solves the inconsistency problem > > >> > Guozhang > > >> > > > brought up. There's no need to add a "final results" or > "emission" > > >> > option > > >> > > > to the windowed aggregation. > > >> > > > > > >> > > > What do you think about an API more like this: > > >> > > > > > >> > > > final StreamsBuilder builder = new StreamsBuilder(); > > >> > > > > > >> > > > builder > > >> > > > .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)) > > >> > > > .groupBy( > > >> > > > (String k1, String v1) -> k1, > > >> > > > Serialized.with(STRING_SERDE, STRING_SERDE) > > >> > > > ) > > >> > > > .windowedBy(TimeWindows > > >> > > > .of(scaledTime(2L)) > > >> > > > .until(scaledTime(3L)) > > >> > > > .allowedLateness(scaledTime(1L)) > > >> > > > ) > > >> > > > .count(Materialized.as("counts")) > > >> > > > .suppress( > > >> > > > emitFinalResultsOnly( > > >> > > > BufferConfig.withBufferKeys(10_000L).bufferFullStrategy( > > >> > SHUT_DOWN) > > >> > > > ) > > >> > > > ) > > >> > > > .toStream() > > >> > > > .to("output-suppressed", Produced.with(STRING_SERDE, > > LONG_SERDE)); > > >> > > > > > >> > > > Note that: > > >> > > > * "emitFinalResultsOnly" is available *only* on windowed tables > > >> > > (enforced > > >> > > > by the type system at compile time), and it determines the time > to > > >> wait > > >> > > by > > >> > > > looking at "allowedLateness" on the TimeWindows config. > > >> > > > * querying "counts" will produce results (eventually) > consistent > > >> with > > >> > > > what's observable in "output-suppressed". > > >> > > > * in all cases, "suppress" has no effect on business logic, > just > > on > > >> > > event > > >> > > > suppression. > > >> > > > > > >> > > > Is this API straightforward? Or do you still prefer the version > > that > > >> > both > > >> > > > proposed: > > >> > > > > > >> > > > ... > > >> > > > .windowedBy(TimeWindows > > >> > > > .of(scaledTime(2L)) > > >> > > > .until(scaledTime(3L)) > > >> > > > .allowedLateness(scaledTime(1L)) > > >> > > > ) > > >> > > > .count( > > >> > > > Materialized.as("counts"), > > >> > > > emitFinalResultsOnly( > > >> > > > BufferConfig.withBufferKeys(10_000L).bufferFullStrategy( > > >> > SHUT_DOWN) > > >> > > > ) > > >> > > > ) > > >> > > > ... > > >> > > > > > >> > > > To me, these two are practically identical, and I still vaguely > > >> prefer > > >> > > the > > >> > > > first one. > > >> > > > > > >> > > > The prototype has made clearer to me that users of "final > results > > >> for > > >> > > > windows" and users of "suppression for table events" both need > to > > >> > > configure > > >> > > > the suppression buffer. > > >> > > > > > >> > > > This buffer configuration consists of: > > >> > > > 1. how many keys or bytes to keep in memory > > >> > > > 2. what to do if memory runs out (shut down, start using disk, > > ...) > > >> > > > > > >> > > > So it's not as simple as setting a "final results" flag. We'll > > >> either > > >> > > have > > >> > > > an "Emit" config object on the windowed aggregators that takes > the > > >> same > > >> > > > BufferConfig that the "Suppress" config on the suppression > > >> operator, or > > >> > > we > > >> > > > just use the suppression operator for both. > > >> > > > > > >> > > > Perhaps it would sweeten the deal a little to point out that we > > >> have 2 > > >> > > > overloads already for each windowed aggregator (with and without > > >> > > > Materialized). Adding "Emitted" or something would mean that > we'd > > >> add a > > >> > > new > > >> > > > overload for each one, taking us up to 4 overloads each for > > "count", > > >> > > > "aggregate" and "reduce". Using "suppress" means that we don't > add > > >> any > > >> > > new > > >> > > > overloads. > > >> > > > > > >> > > > Thanks again for helping to hash this out, > > >> > > > -John > > >> > > > > > >> > > > On Fri, Jul 6, 2018 at 6:20 PM Guozhang Wang < > wangg...@gmail.com> > > >> > wrote: > > >> > > > > > >> > > > > I think I agree with Matthias for having dedicated APIs for > > >> windowed > > >> > > > > operation final output scenario, PLUS separating the window > > close > > >> > which > > >> > > > the > > >> > > > > "final output" would rely on, from the window retention time > > >> itself > > >> > > > > (admittedly it would make this KIP effort larger, but if we > > >> believe > > >> > we > > >> > > > need > > >> > > > > to do this separation anyways we could just do it now). > > >> > > > > > > >> > > > > And then we can have the `KTable#suppress()` for > > >> > > intermediate-suppression > > >> > > > > only, not for late-record-suppression, until we've seen that > > >> becomes > > >> > a > > >> > > > > common feature request because our current design still allows > > to > > >> be > > >> > > > > extended for that purpose. > > >> > > > > > > >> > > > > > > >> > > > > Guozhang > > >> > > > > > > >> > > > > On Wed, Jul 4, 2018 at 12:53 PM, Matthias J. Sax < > > >> > > matth...@confluent.io> > > >> > > > > wrote: > > >> > > > > > > >> > > > > > Thanks for the discussion. I am just catching up. > > >> > > > > > > > >> > > > > > In general, I think we have different uses cases and > > >> non-windowed > > >> > and > > >> > > > > > windowed is quite different. For the non-windowed case, > > >> suppress() > > >> > > has > > >> > > > > > no (useful) close or retention time, no final semantics, and > > >> also > > >> > no > > >> > > > > > business logic impact. > > >> > > > > > > > >> > > > > > On the other hand, for windowed aggregations, close time and > > >> final > > >> > > > > > result do have a meaning. IMHO, `close()` is part of > business > > >> logic > > >> > > > > > while retention time is not. Also, suppression of > intermediate > > >> > result > > >> > > > is > > >> > > > > > not a business rule and there might be use case for which > > either > > >> > > "early > > >> > > > > > intermediate" (before window end time) are suppressed only, > or > > >> all > > >> > > > > > intermediates are suppressed (maybe also something in the > > >> middle, > > >> > ie, > > >> > > > > > just reduce the load of intermediate updates). Thus, > > >> > > window-suppression > > >> > > > > > is much richer. > > >> > > > > > > > >> > > > > > IMHO, a generic `suppress()` operator that can be inserted > > into > > >> the > > >> > > > data > > >> > > > > > flow at any point is useful. Maybe we should keep is as > > generic > > >> as > > >> > > > > > possible. However, it might be difficult to use with regard > to > > >> > > > > > windowing, as the mental effort to use it is high. > > >> > > > > > > > >> > > > > > With regard to Guozhang's comment: > > >> > > > > > > > >> > > > > > > we will actually > > >> > > > > > > process data as old as 30 days as well, while most of the > > late > > >> > > > updates > > >> > > > > > > beyond 5 minutes would be discarded anyways. > > >> > > > > > > > >> > > > > > If we use `suppress()` as a standalone operator, this is > > correct > > >> > and > > >> > > > > > intended IMHO. To address the issue if the behavior is > > >> unwanted, I > > >> > > > would > > >> > > > > > suggest to add a "suppress option" directly to > > >> > > > > > `count()/reduce()/aggregate()` window operator similar to > > >> > > > > > `Materialized`. This would be an "embedded suppress" and > avoid > > >> the > > >> > > > > > issue. It would also address the issue about mental effort > for > > >> > > "single > > >> > > > > > final window result" use case. > > >> > > > > > > > >> > > > > > I also think that a shorter close-time than retention time > is > > >> > useful > > >> > > > for > > >> > > > > > window aggregation. If we add close() to the window > definition > > >> and > > >> > > > > > until() to `Materialized`, we can separate both correctly > > IMHO. > > >> > > > > > > > >> > > > > > About setting `close = min(close,retention)` I am not sure. > We > > >> > might > > >> > > > > > rather throw an exception than reducing the close time > > >> > automatically. > > >> > > > > > Otherwise, I see many user question about "I set close to X > > but > > >> it > > >> > > does > > >> > > > > > not get updated for some data that is with delay of X". > > >> > > > > > > > >> > > > > > The tricky question might be to design the API in a backward > > >> > > compatible > > >> > > > > > way though. > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > -Matthias > > >> > > > > > > > >> > > > > > On 7/3/18 5:38 AM, John Roesler wrote: > > >> > > > > > > Hi Guozhang, > > >> > > > > > > > > >> > > > > > > I see. It seems like if we want to decouple 1) and 2), we > > >> need to > > >> > > > alter > > >> > > > > > the > > >> > > > > > > definition of the window. Do you think it would close the > > gap > > >> if > > >> > we > > >> > > > > > added a > > >> > > > > > > "window close" time to the window definition? > > >> > > > > > > > > >> > > > > > > Such as: > > >> > > > > > > > > >> > > > > > > builder.stream("input") > > >> > > > > > > .groupByKey() > > >> > > > > > > .windowedBy( > > >> > > > > > > TimeWindows > > >> > > > > > > .of(60_000) > > >> > > > > > > .closeAfter(10 * 60) > > >> > > > > > > .until(30L * 24 * 60 * 60 * 1000) > > >> > > > > > > ) > > >> > > > > > > .count() > > >> > > > > > > .suppress(Suppression.finalResultsOnly()); > > >> > > > > > > > > >> > > > > > > Possibly called "finalResultsAtWindowClose" or something? > > >> > > > > > > > > >> > > > > > > Thanks, > > >> > > > > > > -John > > >> > > > > > > > > >> > > > > > > On Mon, Jul 2, 2018 at 6:50 PM Guozhang Wang < > > >> wangg...@gmail.com > > >> > > > > >> > > > > wrote: > > >> > > > > > > > > >> > > > > > >> Hey John, > > >> > > > > > >> > > >> > > > > > >> Obviously I'm too lazy on email replying diligence > compared > > >> with > > >> > > you > > >> > > > > :) > > >> > > > > > >> Will try to reply them separately: > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> ------------------------------ > > ------------------------------ > > >> > > > > > ----------------- > > >> > > > > > >> > > >> > > > > > >> To reply your email on "Mon, Jul 2, 2018 at 8:23 AM": > > >> > > > > > >> > > >> > > > > > >> I'm aware of this use case, but again, the concern is > that, > > >> in > > >> > > this > > >> > > > > > setting > > >> > > > > > >> in order to let the window be queryable for 30 days, we > > will > > >> > > > actually > > >> > > > > > >> process data as old as 30 days as well, while most of the > > >> late > > >> > > > updates > > >> > > > > > >> beyond 5 minutes would be discarded anyways. Personally I > > >> think > > >> > > for > > >> > > > > the > > >> > > > > > >> final update scenario, the ideal situation users would > want > > >> is > > >> > > that > > >> > > > > "do > > >> > > > > > not > > >> > > > > > >> process any data that is less than 5 minutes, and of > course > > >> no > > >> > > > update > > >> > > > > > >> records to the downstream later than 5 minutes either; > but > > >> > retain > > >> > > > the > > >> > > > > > >> window to be queryable for 30 days". And by doing that > the > > >> final > > >> > > > > window > > >> > > > > > >> snapshot would also be aligned with the update stream as > > >> well. > > >> > In > > >> > > > > other > > >> > > > > > >> words, among these three periods: > > >> > > > > > >> > > >> > > > > > >> 1) the retention length of the window / table. > > >> > > > > > >> 2) the late records acceptance for updating the window. > > >> > > > > > >> 3) the late records update to be sent downstream. > > >> > > > > > >> > > >> > > > > > >> Final update use cases would naturally want 2) = 3), > while > > 1) > > >> > may > > >> > > be > > >> > > > > > >> different and larger, while what we provide now is that > 1) > > = > > >> 2), > > >> > > > which > > >> > > > > > >> could be different and in practice larger than 3), hence > > not > > >> the > > >> > > > most > > >> > > > > > >> intuitive for their needs. > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> ------------------------------ > > ------------------------------ > > >> > > > > > ----------------- > > >> > > > > > >> > > >> > > > > > >> To reply your email on "Mon, Jul 2, 2018 at 10:27 AM": > > >> > > > > > >> > > >> > > > > > >> I'd like option 2) over option 1) better as well from > > >> > programming > > >> > > > pov. > > >> > > > > > But > > >> > > > > > >> I'm wondering if option 2) would provide the above > > semantics > > >> or > > >> > it > > >> > > > is > > >> > > > > > still > > >> > > > > > >> coupling 1) with 2) as well ? > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> Guozhang > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> On Mon, Jul 2, 2018 at 1:08 PM, John Roesler < > > >> j...@confluent.io > > >> > > > > >> > > > > wrote: > > >> > > > > > >> > > >> > > > > > >>> In fact, to push the idea further (which IIRC is what > > >> Matthias > > >> > > > > > originally > > >> > > > > > >>> proposed), if we can accept > "Suppression#finalResultsOnly" > > >> in > > >> > my > > >> > > > last > > >> > > > > > >>> email, then we could also consider whether to eliminate > > >> > > > > > >>> "suppressLateEvents" entirely. > > >> > > > > > >>> > > >> > > > > > >>> We could always add it later, but you've both expressed > > >> doubt > > >> > > that > > >> > > > > > there > > >> > > > > > >>> are practical use cases for it outside of final-results. > > >> > > > > > >>> > > >> > > > > > >>> -John > > >> > > > > > >>> > > >> > > > > > >>> On Mon, Jul 2, 2018 at 12:27 PM John Roesler < > > >> > j...@confluent.io> > > >> > > > > > wrote: > > >> > > > > > >>> > > >> > > > > > >>>> Hi again, Guozhang ;) Here's the second part of my > > >> response... > > >> > > > > > >>>> > > >> > > > > > >>>> It seems like your main concern is: "if I'm a user who > > >> wants > > >> > > final > > >> > > > > > >> update > > >> > > > > > >>>> semantics, how complicated is it for me to get it?" > > >> > > > > > >>>> > > >> > > > > > >>>> I think we have to assume that people don't always have > > >> time > > >> > to > > >> > > > > become > > >> > > > > > >>>> deeply familiar with all the nuances of a programming > > >> > > environment > > >> > > > > > >> before > > >> > > > > > >>>> they use it. Especially if they're evaluating several > > >> > frameworks > > >> > > > for > > >> > > > > > >>> their > > >> > > > > > >>>> use case, it's very valuable to make it as obvious as > > >> possible > > >> > > how > > >> > > > > to > > >> > > > > > >>>> accomplish various computations with Streams. > > >> > > > > > >>>> > > >> > > > > > >>>> To me the biggest question is whether with a fresh > > >> > perspective, > > >> > > > > people > > >> > > > > > >>>> would say "oh, I get it, I have to bound my lateness > and > > >> > > suppress > > >> > > > > > >>>> intermediate updates, and of course I'll get only the > > final > > >> > > > > result!", > > >> > > > > > >> or > > >> > > > > > >>> if > > >> > > > > > >>>> it's more like "wtf? all I want is the final result, > what > > >> are > > >> > > all > > >> > > > > > these > > >> > > > > > >>>> parameters?". > > >> > > > > > >>>> > > >> > > > > > >>>> I was talking with Matthias a while back, and he had an > > >> idea > > >> > > that > > >> > > > I > > >> > > > > > >> think > > >> > > > > > >>>> can help, which is to essentially set up a final-result > > >> recipe > > >> > > in > > >> > > > > > >>> addition > > >> > > > > > >>>> to the raw parameters. I previously thought that it > > >> wouldn't > > >> > be > > >> > > > > > >> possible > > >> > > > > > >>> to > > >> > > > > > >>>> restrict its usage to Windowed KTables, but thinking > > about > > >> it > > >> > > > again > > >> > > > > > >> this > > >> > > > > > >>>> weekend, I have a couple of ideas: > > >> > > > > > >>>> > > >> > > > > > >>>> ================ > > >> > > > > > >>>> = 1. Static Wrapper = > > >> > > > > > >>>> ================ > > >> > > > > > >>>> We can define an extra static function that "wraps" a > > >> KTable > > >> > > with > > >> > > > > > >>>> final-result semantics. > > >> > > > > > >>>> > > >> > > > > > >>>> public static <K extends Windowed, V> KTable<K, V> > > >> > > > finalResultsOnly( > > >> > > > > > >>>> final KTable<K, V> windowedKTable, > > >> > > > > > >>>> final Duration maxAllowedLateness, > > >> > > > > > >>>> final Suppression.BufferFullStrategy > > bufferFullStrategy) > > >> { > > >> > > > > > >>>> return windowedKTable.suppress( > > >> > > > > > >>>> Suppression.suppressLateEvents( > > maxAllowedLateness) > > >> > > > > > >>>> .suppressIntermediateEvents( > > >> > > > > > >>>> IntermediateSuppression > > >> > > > > > >>>> .emitAfter(maxAllowedLateness) > > >> > > > > > >>>> .bufferFullStrategy( > > >> > bufferFullStrategy) > > >> > > > > > >>>> ) > > >> > > > > > >>>> ); > > >> > > > > > >>>> } > > >> > > > > > >>>> > > >> > > > > > >>>> Because windowedKTable is a parameter, the static > > function > > >> can > > >> > > > > easily > > >> > > > > > >>>> impose an extra bound on the key type, that it extends > > >> > Windowed. > > >> > > > > This > > >> > > > > > >>> would > > >> > > > > > >>>> make "final results only" only available on windowed > > >> ktables. > > >> > > > > > >>>> > > >> > > > > > >>>> Here's how it would look to use: > > >> > > > > > >>>> > > >> > > > > > >>>> final KTable<Windowed<Integer>, Long> windowCounts = > ... > > >> > > > > > >>>> final KTable<Windowed<Integer>, Long> finalCounts = > > >> > > > > > >>>> finalResultsOnly( > > >> > > > > > >>>> windowCounts, > > >> > > > > > >>>> Duration.ofMinutes(10), > > >> > > > > > >>>> Suppression.BufferFullStrategy.SHUT_DOWN > > >> > > > > > >>>> ); > > >> > > > > > >>>> > > >> > > > > > >>>> Trying to use it on a non-windowed KTable yields: > > >> > > > > > >>>> > > >> > > > > > >>>>> Error:(129, 35) java: method finalResultsOnly in class > > >> > > > > > >>>>> org.apache.kafka.streams.kstream.internals. > > >> > KTableAggregateTest > > >> > > > > > cannot > > >> > > > > > >>> be > > >> > > > > > >>>>> applied to given types; > > >> > > > > > >>>>> required: > > >> > > > > > >>>>> > org.apache.kafka.streams.kstream.KTable<K,V>,java.time. > > >> > > > > > >>> Duration,org.apache.kafka.streams.kstream.Suppression. > > >> > > > > > BufferFullStrategy > > >> > > > > > >>>>> found: > > >> > > > > > >>>>> org.apache.kafka.streams.kstream.KTable<java.lang. > > >> > > > > > >>> String,java.lang.String>,java.time.Duration,org.apache. > > >> > > > > > >>> kafka.streams.kstream.Suppression.BufferFullStrategy > > >> > > > > > >>>>> reason: inference variable K has incompatible bounds > > >> > > > > > >>>>> equality constraints: java.lang.String > > >> > > > > > >>>>> upper bounds: > > >> org.apache.kafka.streams.kstream.Windowed > > >> > > > > > >>>> > > >> > > > > > >>>> > > >> > > > > > >>>> > > >> > > > > > >>>> ================================================= > > >> > > > > > >>>> = 2. Add <K,V> parameters and recipe method to > > Suppression > > >> = > > >> > > > > > >>>> ================================================= > > >> > > > > > >>>> > > >> > > > > > >>>> By adding K,V parameters to Suppression, we can > provide a > > >> > > > similarly > > >> > > > > > >>>> bounded config method directly on the Suppression > class: > > >> > > > > > >>>> > > >> > > > > > >>>> public static <K extends Windowed, V> Suppression<K, V> > > >> > > > > > >>>> finalResultsOnly(final Duration maxAllowedLateness, > final > > >> > > > > > >>>> BufferFullStrategy bufferFullStrategy) { > > >> > > > > > >>>> return Suppression > > >> > > > > > >>>> .<K, V>suppressLateEvents(maxAllowedLateness) > > >> > > > > > >>>> .suppressIntermediateEvents( > > IntermediateSuppression > > >> > > > > > >>>> .emitAfter(maxAllowedLateness) > > >> > > > > > >>>> .bufferFullStrategy(bufferFullStrategy) > > >> > > > > > >>>> ); > > >> > > > > > >>>> } > > >> > > > > > >>>> > > >> > > > > > >>>> Then, here's how it would look to use it: > > >> > > > > > >>>> > > >> > > > > > >>>> final KTable<Windowed<Integer>, Long> windowCounts = > ... > > >> > > > > > >>>> final KTable<Windowed<Integer>, Long> finalCounts = > > >> > > > > > >>>> windowCounts.suppress( > > >> > > > > > >>>> Suppression.finalResultsOnly( > > >> > > > > > >>>> Duration.ofMinutes(10) > > >> > > > > > >>>> Suppression.BufferFullStrategy.SHUT_DOWN > > >> > > > > > >>>> ) > > >> > > > > > >>>> ); > > >> > > > > > >>>> > > >> > > > > > >>>> Trying to use it on a non-windowed ktable yields: > > >> > > > > > >>>> > > >> > > > > > >>>>> Error:(127, 35) java: method finalResultsOnly in class > > >> > > > > > >>>>> org.apache.kafka.streams.kstream.Suppression<K,V> > > cannot > > >> be > > >> > > > applied > > >> > > > > > to > > >> > > > > > >>>>> given types; > > >> > > > > > >>>>> required: > > >> > > > > > >>>>> java.time.Duration,org.apache.kafka.streams.kstream. > > >> > > > > > >>> Suppression.BufferFullStrategy > > >> > > > > > >>>>> found: > > >> > > > > > >>>>> java.time.Duration,org.apache.kafka.streams.kstream. > > >> > > > > > >>> Suppression.BufferFullStrategy > > >> > > > > > >>>>> reason: explicit type argument java.lang.String does > > not > > >> > > > conform > > >> > > > > to > > >> > > > > > >>>>> declared bound(s) > > >> org.apache.kafka.streams.kstream.Windowed > > >> > > > > > >>>> > > >> > > > > > >>>> > > >> > > > > > >>>> > > >> > > > > > >>>> ============ > > >> > > > > > >>>> = Downsides = > > >> > > > > > >>>> ============ > > >> > > > > > >>>> > > >> > > > > > >>>> Of course, there's a downside either way: > > >> > > > > > >>>> * for 1: this "wrapper" interaction would be the first > > in > > >> the > > >> > > > DSL. > > >> > > > > Is > > >> > > > > > >> it > > >> > > > > > >>>> too strange, and how discoverable would it be? > > >> > > > > > >>>> * for 2: adding those type parameters to Suppression > will > > >> > force > > >> > > > all > > >> > > > > > >>>> callers to provide them in the event of a chained > > >> construction > > >> > > > > because > > >> > > > > > >>> Java > > >> > > > > > >>>> doesn't do RHS recursive type inference. This is > already > > >> > visible > > >> > > > in > > >> > > > > > >> other > > >> > > > > > >>>> parts of the Streams DSL. For example, often calls to > > >> > > Materialized > > >> > > > > > >>> builders > > >> > > > > > >>>> have to provide seemingly obvious type bounds. > > >> > > > > > >>>> > > >> > > > > > >>>> ============ > > >> > > > > > >>>> = Conclusion = > > >> > > > > > >>>> ============ > > >> > > > > > >>>> > > >> > > > > > >>>> I think option 2 is more "normal" and discoverable. It > > does > > >> > > have a > > >> > > > > > >>>> downside, but it's one that's pre-existing elsewhere in > > the > > >> > DSL. > > >> > > > > > >>>> > > >> > > > > > >>>> WDYT? Would the addition of this "recipe" method to > > >> > Suppression > > >> > > > > > resolve > > >> > > > > > >>>> your concern? > > >> > > > > > >>>> > > >> > > > > > >>>> Thanks again, > > >> > > > > > >>>> -John > > >> > > > > > >>>> > > >> > > > > > >>>> On Sun, Jul 1, 2018 at 11:24 PM Guozhang Wang < > > >> > > wangg...@gmail.com > > >> > > > > > > >> > > > > > >>> wrote: > > >> > > > > > >>>> > > >> > > > > > >>>>> Hi John, > > >> > > > > > >>>>> > > >> > > > > > >>>>> Regarding the metrics: yeah I think I'm with you that > > the > > >> > > dropped > > >> > > > > > >>> records > > >> > > > > > >>>>> due to window retention or emit suppression policies > > >> should > > >> > be > > >> > > > > > >> recorded > > >> > > > > > >>>>> differently, and using this KIP's proposed metric > would > > be > > >> > > fine. > > >> > > > If > > >> > > > > > >> you > > >> > > > > > >>>>> also think we can use this KIP's proposed metrics to > > cover > > >> > the > > >> > > > > window > > >> > > > > > >>>>> retention cased skipping records, then we can include > > the > > >> > > changes > > >> > > > > in > > >> > > > > > >>> this > > >> > > > > > >>>>> KIP as well. > > >> > > > > > >>>>> > > >> > > > > > >>>>> Regarding the current proposal, I'm actually not too > > >> worried > > >> > > > about > > >> > > > > > the > > >> > > > > > >>>>> inconsistency between query semantics and downstream > > emit > > >> > > > > semantics. > > >> > > > > > >> For > > >> > > > > > >>>>> queries, we will always return the current running > > >> results of > > >> > > the > > >> > > > > > >>> windows, > > >> > > > > > >>>>> being it partial or final results depending on the > > window > > >> > > > retention > > >> > > > > > >> time > > >> > > > > > >>>>> anyways, which has nothing to do whether the emitted > > >> stream > > >> > > > should > > >> > > > > be > > >> > > > > > >>> one > > >> > > > > > >>>>> final output per key or not. I also agree that having > a > > >> > unified > > >> > > > > > >>> operation > > >> > > > > > >>>>> is generally better for users to focus on leveraging > > that > > >> one > > >> > > > only > > >> > > > > > >> than > > >> > > > > > >>>>> learning about two set of operations. The only > question > > I > > >> had > > >> > > is, > > >> > > > > for > > >> > > > > > >>>>> final > > >> > > > > > >>>>> updates of window stores, if it is a bit awkward to > > >> > understand > > >> > > > the > > >> > > > > > >>>>> configuration combo. Thinking about this more, I think > > my > > >> > root > > >> > > > > worry > > >> > > > > > >> in > > >> > > > > > >>>>> the > > >> > > > > > >>>>> "suppressLateEvents" call for windowed tables, since > > from > > >> a > > >> > > user > > >> > > > > > >>>>> perspective: if my retention time is X which means > "pay > > >> the > > >> > > cost > > >> > > > to > > >> > > > > > >>> allow > > >> > > > > > >>>>> late records up to X to still be applied updating the > > >> > tables", > > >> > > > why > > >> > > > > > >>> would I > > >> > > > > > >>>>> ever want to suppressLateEvents by Y ( < X), to say > "do > > >> not > > >> > > send > > >> > > > > the > > >> > > > > > >>>>> updates up to Y, which means the downstream operator > or > > >> sink > > >> > > > topic > > >> > > > > > for > > >> > > > > > >>>>> this > > >> > > > > > >>>>> stream would actually see a truncated update stream > > while > > >> > I've > > >> > > > paid > > >> > > > > > >>> larger > > >> > > > > > >>>>> cost for that"; and of course, Y > X would not make > > sense > > >> > > either > > >> > > > as > > >> > > > > > >> you > > >> > > > > > >>>>> would not see any updates later than X anyways. So in > > >> all, my > > >> > > > > feeling > > >> > > > > > >> is > > >> > > > > > >>>>> that it makes less sense for windowed table's > > >> > > > "suppressLateEvents" > > >> > > > > > >> with > > >> > > > > > >>> a > > >> > > > > > >>>>> parameter that is not equal to the window retention, > and > > >> > > opening > > >> > > > > the > > >> > > > > > >>> door > > >> > > > > > >>>>> in the current proposal may confuse people with that. > > >> > > > > > >>>>> > > >> > > > > > >>>>> Again, above is just a subjective opinion and probably > > we > > >> can > > >> > > > also > > >> > > > > > >> bring > > >> > > > > > >>>>> up > > >> > > > > > >>>>> some scenarios that users does want to set X != Y.. > but > > >> > > > personally > > >> > > > > I > > >> > > > > > >>> feel > > >> > > > > > >>>>> that even if the semantics for this scenario if > > intuitive > > >> for > > >> > > > user > > >> > > > > to > > >> > > > > > >>>>> understand, doe that really make sense and should we > > >> really > > >> > > open > > >> > > > > the > > >> > > > > > >>> door > > >> > > > > > >>>>> for it. So I think maybe separating the final update > in > > a > > >> > > > separate > > >> > > > > > >> API's > > >> > > > > > >>>>> benefits may overwhelm the advantage of having one > > uniform > > >> > > > > > definition. > > >> > > > > > >>> And > > >> > > > > > >>>>> for my alternative proposal, the rationale was from > both > > >> my > > >> > > > concern > > >> > > > > > >>> about > > >> > > > > > >>>>> "suppressLateEvents" for windowed store, and Matthias' > > >> > question > > >> > > > > about > > >> > > > > > >>>>> "suppressLateEvents" for non-windowed stores, that if > it > > >> is > > >> > > less > > >> > > > > > >>>>> meaningful > > >> > > > > > >>>>> for both, we can consider removing it completely and > > only > > >> do > > >> > > > > > >>>>> "IntermediateSuppression" in Suppress instead. > > >> > > > > > >>>>> > > >> > > > > > >>>>> So I'd summarize my thoughts in the following > questions: > > >> > > > > > >>>>> > > >> > > > > > >>>>> 1. Does "suppressLateEvents" with parameter Y != X > > (window > > >> > > > > retention > > >> > > > > > >>> time) > > >> > > > > > >>>>> for windowed stores make sense in practice? > > >> > > > > > >>>>> 2. Does "suppressLateEvents" with any parameter Y for > > >> > > > non-windowed > > >> > > > > > >>> stores > > >> > > > > > >>>>> make sense in practice? > > >> > > > > > >>>>> > > >> > > > > > >>>>> > > >> > > > > > >>>>> > > >> > > > > > >>>>> Guozhang > > >> > > > > > >>>>> > > >> > > > > > >>>>> > > >> > > > > > >>>>> On Fri, Jun 29, 2018 at 2:26 PM, Bill Bejeck < > > >> > > bbej...@gmail.com> > > >> > > > > > >> wrote: > > >> > > > > > >>>>> > > >> > > > > > >>>>>> Thanks for the explanation, that does make sense. I > > have > > >> > some > > >> > > > > > >>>>> questions on > > >> > > > > > >>>>>> operations, but I'll just wait for the PR and tests. > > >> > > > > > >>>>>> > > >> > > > > > >>>>>> Thanks, > > >> > > > > > >>>>>> Bill > > >> > > > > > >>>>>> > > >> > > > > > >>>>>> On Wed, Jun 27, 2018 at 8:14 PM John Roesler < > > >> > > j...@confluent.io > > >> > > > > > > >> > > > > > >>> wrote: > > >> > > > > > >>>>>> > > >> > > > > > >>>>>>> Hi Bill, > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Thanks for the review! > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Your question is very much applicable to the KIP and > > >> not at > > >> > > all > > >> > > > > an > > >> > > > > > >>>>>>> implementation detail. Thanks for bringing it up. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> I'm proposing not to change the existing caches and > > >> > > > > configurations > > >> > > > > > >>> at > > >> > > > > > >>>>> all > > >> > > > > > >>>>>>> (for now). > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Imagine you have a topology like this: > > >> > > > > > >>>>>>> commit.interval.ms = 100 > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> (ktable1 (cached)) -> (suppress emitAfter 200) > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> The first ktable (ktable1) will respect the commit > > >> interval > > >> > > and > > >> > > > > > >>> buffer > > >> > > > > > >>>>>>> events for 100ms before logging, storing, or > > forwarding > > >> > them > > >> > > > > > >> (IIRC). > > >> > > > > > >>>>>>> Therefore, the second ktable (suppress) will only > see > > >> the > > >> > > > events > > >> > > > > > >> at > > >> > > > > > >>> a > > >> > > > > > >>>>>> rate > > >> > > > > > >>>>>>> of once per 100ms. It will apply its own buffering, > > and > > >> > emit > > >> > > > once > > >> > > > > > >>> per > > >> > > > > > >>>>>> 200ms > > >> > > > > > >>>>>>> This case is pretty trivial because the suppress > time > > >> is a > > >> > > > > > >> multiple > > >> > > > > > >>> of > > >> > > > > > >>>>>> the > > >> > > > > > >>>>>>> commit interval. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> When it's not an integer multiple, you'll get > behavior > > >> like > > >> > > in > > >> > > > > > >> this > > >> > > > > > >>>>>> marble > > >> > > > > > >>>>>>> diagram: > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> <-(k:1)--(k:2)--(k:3)--(k:4)--(k:5)--(k:6)-> > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> [ KTable caching with commit interval = 2 ] > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> <--------(k:2)---------(k:4)---------(k:6)-> > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> [ suppress with emitAfter = 3 ] > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> <---------------(k:2)----------------(k:6)-> > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> If this behavior isn't desired (for example, if you > > >> wanted > > >> > to > > >> > > > > emit > > >> > > > > > >>>>> (k:3) > > >> > > > > > >>>>>> at > > >> > > > > > >>>>>>> time 3, I'd recommend setting the > > >> > "cache.max.bytes.buffering" > > >> > > > to > > >> > > > > 0 > > >> > > > > > >>> or > > >> > > > > > >>>>>>> modifying the topology to disable caching. Then, the > > >> > behavior > > >> > > > is > > >> > > > > > >>> more > > >> > > > > > >>>>>>> simply determined just by the suppress operator. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Does that seem right to you? > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Regarding the changelogs, because the suppression > > >> operator > > >> > > > hangs > > >> > > > > > >>> onto > > >> > > > > > >>>>>>> events for a while, it will need its own changelog. > > The > > >> > > > changelog > > >> > > > > > >>>>>>> should represent the current state of the buffer at > > all > > >> > > times. > > >> > > > So > > >> > > > > > >>> when > > >> > > > > > >>>>>> the > > >> > > > > > >>>>>>> suppress operator sees (k:2), for example, it will > log > > >> > (k:2). > > >> > > > > When > > >> > > > > > >>> it > > >> > > > > > >>>>>>> later gets to time 3, it's time to emit (k:2) > > >> downstream. > > >> > > > Because > > >> > > > > > >> k > > >> > > > > > >>>>> is no > > >> > > > > > >>>>>>> longer buffered, the suppress operator will log > > >> (k:null). > > >> > > Thus, > > >> > > > > > >> when > > >> > > > > > >>>>>>> recovering, > > >> > > > > > >>>>>>> it can rebuild the buffer by reading its changelog. > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> What do you think about this? > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> Thanks, > > >> > > > > > >>>>>>> -John > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>> On Wed, Jun 27, 2018 at 4:16 PM Bill Bejeck < > > >> > > bbej...@gmail.com > > >> > > > > > > >> > > > > > >>>>> wrote: > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>>> Hi John, thanks for the KIP. > > >> > > > > > >>>>>>>> > > >> > > > > > >>>>>>>> Early on in the KIP, you mention the current > > approaches > > >> > for > > >> > > > > > >>>>> controlling > > >> > > > > > >>>>>>> the > > >> > > > > > >>>>>>>> rate of downstream records from a KTable, cache > size > > >> > > > > > >> configuration > > >> > > > > > >>>>> and > > >> > > > > > >>>>>>>> commit time. > > >> > > > > > >>>>>>>> > > >> > > > > > >>>>>>>> Will these configuration parameters still be in > > effect > > >> for > > >> > > > > > >> tables > > >> > > > > > >>>>> that > > >> > > > > > >>>>>>>> don't use suppression? For tables taking advantage > > of > > >> > > > > > >>> suppression, > > >> > > > > > >>>>>> will > > >> > > > > > >>>>>>>> these configurations have no impact? > > >> > > > > > >>>>>>>> This last question may be to implementation > specific > > >> but > > >> > if > > >> > > > the > > >> > > > > > >>>>>> requested > > >> > > > > > >>>>>>>> suppression time is longer than the specified > commit > > >> time, > > >> > > > will > > >> > > > > > >>> the > > >> > > > > > >>>>>>> latest > > >> > > > > > >>>>>>>> record in the suppression buffer get stored in a > > >> > changelog? > > >> > > > > > >>>>>>>> > > >> > > > > > >>>>>>>> Thanks, > > >> > > > > > >>>>>>>> Bill > > >> > > > > > >>>>>>>> > > >> > > > > > >>>>>>>> On Wed, Jun 27, 2018 at 3:04 PM John Roesler < > > >> > > > j...@confluent.io > > >> > > > > > >>> > > >> > > > > > >>>>>> wrote: > > >> > > > > > >>>>>>>> > > >> > > > > > >>>>>>>>> Thanks for the feedback, Matthias, > > >> > > > > > >>>>>>>>> > > >> > > > > > >>>>>>>>> It seems like in straightforward relational > > processing > > >> > > cases, > > >> > > > > > >> it > > >> > > > > > >>>>>> would > > >> > > > > > >>>>>>>> not > > >> > > > > > >>>>>>>>> make sense to bound the lateness of KTables. In > > >> general, > > >> > it > > >> > > > > > >>> seems > > >> > > > > > >>>>>>> better > > >> > > > > > >>>>>>>> to > > >> > > > > > >>>>>>>>> have "guard rails" in place that make it easier to > > >> write > > >> > > > > > >>> sensible > > >> > > > > > >>>>>>>> programs > > >> > > > > > >>>>>>>>> than insensible ones. > > >> > > > > > >>>>>>>>> > > >> > > > > > >>>>>>>>> But I'm still going to argue in favor of keeping > it > > >> for > > >> > all > > >> > > > > > >>>>> KTables > > >> > > > > > >>>>>> ;) > > >> > > > > > >>>>>>>>> > > >> > > > > > >>>>>>>>> 1. I believe it is simpler to understand the > > operator > > >> if > > >> > it > > >> > > > > > >> has > > >> > > > > > >>>>> one > > >> > > > > > >>>>>>>> uniform > > >> > > > > > >>>>>>>>> definition, regardless of context. It's well > defined > > >> and > > >> > > > > > >>> intuitive > > >> > > > > > >>>>>> what > > >> > > > > > >>>>>>>>> will happen when you use late-event suppression > on a > > >> > > KTable, > > >> > > > > > >> so > > >> > > > > > >>> I > > >> > > > > > >>>>>> think > > >> > > > > > >>>>>>>>> nothing surprising or dangerous will happen in > that > > >> case. > > >> > > > From > > >> > > > > > >>> my > > >> > > > > > >>>>>>>>> perspective, having two sets of allowed operations > > is > > >> > > > actually > > >> > > > > > >>> an > > >> > > > > > >>>>>>>> increase > > >> > > > > > >>>>>>>>> in cognitive complexity. > > >> > > > > > >>>>>>>>> > > >> > > > > > >>>>>>>>> 2. To me, it's not crazy to use the operator this > > way. > > >> > For > > >> > > > > > >>>>> example, > > >> > > > > > >>>>>> in > > >> > > > > > >>>>>>>> lieu > > >> > > > > > >>>>>>>>> of full-featured timestamp semantics, I can > > implement > > >> > MVCC > > >> > > > > > >>>>> behavior > > >> > > > > > >>>>>>> when > > >> > > > > > >>>>>>>>> building a KTable by > > >> "suppressLateEvents(Duration.ZERO)". > > >> > I > > >> > > > > > >>>>> suspect > > >> > > > > > >>>>>>> that > > >> > > > > > >>>>>>>>> there are other, non-obvious applications of > > >> suppressing > > >> > > late > > >> > > > > > >>>>> events > > >> > > > > > >>>>>> on > > >> > > > > > >>>>>>>>> KTables. > > >> > > > > > >>>>>>>>> > > >> > > > > > >>>>>>>>> 3. Not to get too much into implementation details > > in > > >> a > > >> > KIP > > >> > > > > > >>>>>> discussion, > > >> > > > > > >>>>>>>> but > > >> > > > > > >>>>>>>>> if we did want to make late-event suppression > > >> available > > >> > > only > > >> > > > > > >> on > > >> > > > > > >>>>>>> windowed > > >> > > > > > >>>>>>>>> KTables, we have two enforcement options: > > >> > > > > > >>>>>>>>> a. check when we build the topology - this would > > be > > >> > > simple > > >> > > > > > >> to > > >> > > > > > >>>>>>>> implement, > > >> > > > > > >>>>>>>>> but would be a runtime check. Hopefully, people > > write > > >> > tests > > >> > > > > > >> for > > >> > > > > > >>>>> their > > >> > > > > > >>>>>>>>> topology before deploying them, so the feedback > loop > > >> > isn't > > >> > > > > > >>>>>>> instantaneous, > > >> > > > > > >>>>>>>>> but it's not too long either. > > >> > > > > > >>>>>>>>> b. add a new WindowedKTable type - this would > be a > > >> > > compile > > >> > > > > > >>> time > > >> > > > > > >>>>>>> check, > > >> > > > > > >>>>>>>>> but would also be substantial increase of both > > >> interface > > >> > > and > > >> > > > > > >>> code > > >> > > > > > >>>>>>>>> complexity. > > >> > > > > > >>>>>>>>> > > >> > > > > > >>>>>>>>> We should definitely strive to have guard rails > > >> > protecting > > >> > > > > > >>> against > > >> > > > > > >>>>>>>>> surprising or dangerous behavior. Protecting > against > > >> > > programs > > >> > > > > > >>>>> that we > > >> > > > > > >>>>>>>> don't > > >> > > > > > >>>>>>>>> currently predict is a lesser benefit, and I think > > we > > >> can > > >> > > put > > >> > > > > > >> up > > >> > > > > > >>>>>> guard > > >> > > > > > >>>>>>>>> rails on a case-by-case basis for that. It seems > > like > > >> the > > >> > > > > > >>>>> increase in > > >> > > > > > >>>>>>>>> cognitive (and potentially code and interface) > > >> complexity > > >> > > > > > >> makes > > >> > > > > > >>> me > > >> > > > > > >>>>>>> think > > >> > > > > > >>>>>>>> we > > >> > > > > > >>>>>>>>> should skip this case. > > >> > > > > > >>>>>>>>> > > >> > > > > > >>>>>>>>> What do you think? > > >> > > > > > >>>>>>>>> > > >> > > > > > >>>>>>>>> Thanks, > > >> > > > > > >>>>>>>>> -John > > >> > > > > > >>>>>>>>> > > >> > > > > > >>>>>>>>> On Wed, Jun 27, 2018 at 11:59 AM Matthias J. Sax < > > >> > > > > > >>>>>>> matth...@confluent.io> > > >> > > > > > >>>>>>>>> wrote: > > >> > > > > > >>>>>>>>> > > >> > > > > > >>>>>>>>>> Thanks for the KIP John. > > >> > > > > > >>>>>>>>>> > > >> > > > > > >>>>>>>>>> One initial comments about the last example > > "Bounded > > >> > > > > > >>> lateness": > > >> > > > > > >>>>>> For a > > >> > > > > > >>>>>>>>>> non-windowed KTable bounding the lateness does > not > > >> > really > > >> > > > > > >> make > > >> > > > > > >>>>>> sense, > > >> > > > > > >>>>>>>>>> does it? > > >> > > > > > >>>>>>>>>> > > >> > > > > > >>>>>>>>>> Thus, I am wondering if we should allow > > >> > > > > > >> `suppressLateEvents()` > > >> > > > > > >>>>> for > > >> > > > > > >>>>>>> this > > >> > > > > > >>>>>>>>>> case? It seems to be better to only allow it for > > >> > > > > > >>>>> windowed-KTables. > > >> > > > > > >>>>>>>>>> > > >> > > > > > >>>>>>>>>> > > >> > > > > > >>>>>>>>>> -Matthias > > >> > > > > > >>>>>>>>>> > > >> > > > > > >>>>>>>>>> > > >> > > > > > >>>>>>>>>> On 6/27/18 8:53 AM, Ted Yu wrote: > > >> > > > > > >>>>>>>>>>> I noticed this (lack of primary parameter) as > > well. > > >> > > > > > >>>>>>>>>>> > > >> > > > > > >>>>>>>>>>> What you gave as new example is semantically the > > >> same > > >> > as > > >> > > > > > >>> what > > >> > > > > > >>>>> I > > >> > > > > > >>>>>>>>>> suggested. > > >> > > > > > >>>>>>>>>>> So it is good by me. > > >> > > > > > >>>>>>>>>>> > > >> > > > > > >>>>>>>>>>> Thanks > > >> > > > > > >>>>>>>>>>> > > >> > > > > > >>>>>>>>>>> On Wed, Jun 27, 2018 at 7:31 AM, John Roesler < > > >> > > > > > >>>>> j...@confluent.io > > >> > > > > > >>>>>>> > > >> > > > > > >>>>>>>>> wrote: > > >> > > > > > >>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>> Thanks for taking look, Ted, > > >> > > > > > >>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>> I agree this is a departure from the > conventions > > of > > >> > > > > > >> Streams > > >> > > > > > >>>>> DSL. > > >> > > > > > >>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>> Most of our config objects have one or two > > >> "required" > > >> > > > > > >>>>>> parameters, > > >> > > > > > >>>>>>>>> which > > >> > > > > > >>>>>>>>>> fit > > >> > > > > > >>>>>>>>>>>> naturally with the static factory method > > approach. > > >> > > > > > >>>>> TimeWindow, > > >> > > > > > >>>>>> for > > >> > > > > > >>>>>>>>>> example, > > >> > > > > > >>>>>>>>>>>> requires a size parameter, so we can naturally > > say > > >> > > > > > >>>>>>>>> TimeWindows.of(size). > > >> > > > > > >>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>> I think in the case of a suppression, there's > > >> really > > >> > no > > >> > > > > > >>>>> "core" > > >> > > > > > >>>>>>>>>> parameter, > > >> > > > > > >>>>>>>>>>>> and "Suppression.of()" seems sillier than "new > > >> > > > > > >>>>> Suppression()". I > > >> > > > > > >>>>>>>> think > > >> > > > > > >>>>>>>>>> that > > >> > > > > > >>>>>>>>>>>> Suppression.of(duration) would be ambiguous, > > since > > >> > there > > >> > > > > > >>> are > > >> > > > > > >>>>>> many > > >> > > > > > >>>>>>>>>> durations > > >> > > > > > >>>>>>>>>>>> that we can configure. > > >> > > > > > >>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>> However, thinking about it again, I suppose > that > > I > > >> can > > >> > > > > > >> give > > >> > > > > > >>>>> each > > >> > > > > > >>>>>>>>>>>> configuration method a static version, which > > would > > >> let > > >> > > > > > >> you > > >> > > > > > >>>>>> replace > > >> > > > > > >>>>>>>>> "new > > >> > > > > > >>>>>>>>>>>> Suppression()." with "Suppression." in all the > > >> > examples. > > >> > > > > > >>>>>>> Basically, > > >> > > > > > >>>>>>>>>> instead > > >> > > > > > >>>>>>>>>>>> of "of()", we'd support any of the methods I > > >> listed. > > >> > > > > > >>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>> For example: > > >> > > > > > >>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>> windowCounts > > >> > > > > > >>>>>>>>>>>> .suppress( > > >> > > > > > >>>>>>>>>>>> Suppression > > >> > > > > > >>>>>>>>>>>> .suppressLateEvents(Duration. > > >> > ofMinutes(10)) > > >> > > > > > >>>>>>>>>>>> .suppressIntermediateEvents( > > >> > > > > > >>>>>>>>>>>> > > >> > > > > > >>>>>>>>>> > > >> IntermediateSuppression.emitAfter(Duration.ofMinutes( > > >> > 10)) > > >> > > > > > >>>>>>>>>>>> ) > > >> > > > > > >>>>>>>>>>>> ); > > >> > > > > > >>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>> Does that seem better? > > >> > > > > > >>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>> Thanks, > > >> > > > > > >>>>>>>>>>>> -John > > >> > > > > > >>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>> On Wed, Jun 27, 2018 at 12:44 AM Ted Yu < > > >> > > > > > >>> yuzhih...@gmail.com > > >> > > > > > >>>>>> > > >> > > > > > >>>>>>>> wrote: > > >> > > > > > >>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>>> I started to read this KIP which contains a > lot > > of > > >> > > > > > >>>>> materials. > > >> > > > > > >>>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>>> One suggestion: > > >> > > > > > >>>>>>>>>>>>> > > >> > > > > > >>>>>>>>>>>>> .suppress( > > >> > > > > > >>>>>>>>>>>>> new Suppression() > > >> > > > > > >>>>>>>>