Thanks for the update. I did a first pass over the updated KIP and think it makes sense.
-Matthias On 7/11/18 5:47 PM, John Roesler wrote: > Hi all, > > I have updated KIP-328 with all the feedback I've gotten so far. Please > take another look and let me know what you think! > > Thanks, > -John > > On Wed, Jul 11, 2018 at 12:28 AM Guozhang Wang <wangg...@gmail.com> wrote: > >> That is a good point.. >> >> I cannot think of a better option than documentation and warning, and also >> given that we'd probably better not reusing the function name `until` for >> close time. >> >> >> Guozhang >> >> >> On Tue, Jul 10, 2018 at 3:31 PM, John Roesler <j...@confluent.io> wrote: >> >>> I had some opportunity to reflect on the default for close time today... >>> >>> Note that the current "close time" is equal to the retention time, and >>> therefore "close" today shares the default retention of 24h. >>> >>> It would definitely break any application that today specifies a >> retention >>> time to set close shorter than that time. It's also likely to break apps >> if >>> they *don't* set the retention time and rely on the 24h default. So it's >>> unfortunate, but I think if "close" isn't set, we should use the >> retention >>> time instead of a fixed default. >>> >>> When we ultimately remove the retention time parameter ("until"), we will >>> have to set "close" to a default of 24h. >>> >>> Of course, this has a negative impact on the user of "final results", >> since >>> they won't see any output at all for retentionTime/24h, and may find this >>> confusing. What can we do about this except document it well? Maybe log a >>> warning if we see that close wasn't explicitly set while using "final >>> results"? >>> >>> Thanks, >>> -John >>> >>> On Tue, Jul 10, 2018 at 10:46 AM John Roesler <j...@confluent.io> wrote: >>> >>>> Hi Guozhang, >>>> >>>> That sounds good to me. I'll include that in the KIP. >>>> >>>> Thanks, >>>> -John >>>> >>>> On Mon, Jul 9, 2018 at 6:33 PM Guozhang Wang <wangg...@gmail.com> >> wrote: >>>> >>>>> Let me clarify a bit on what I meant about moving `retentionPeriod` to >>>>> WindowStoreBuilder: >>>>> >>>>> In another discussion we had around KIP-319 / 330, that the "retention >>>>> period" should not really be a window spec, but only a window store >>> spec, >>>>> as it only affects how long to retain each window to be queryable >> along >>>>> with the storage cost. >>>>> >>>>> More specifically, today the "maintainMs" returned from Windows is >> used >>> in >>>>> three places: >>>>> >>>>> 1) for windowed aggregations, they are passed in directly into >>>>> `Stores.persistentWindows()` as the retention period parameters. For >>> this >>>>> use case we should just let the WindowStoreBuilder to specify this >> value >>>>> itself. >>>>> >>>>> NOTE: It is also returned in the KStreamWindowAggregate processor, to >>>>> determine if a received record should be dropped due to its lateness. >> We >>>>> may need to think of another way to get this value inside the >> processor >>>>> >>>>> 2) for windowed stream-stream join, it is used as the join range >>> parameter >>>>> but only to check that "windowSizeMs <= retentionPeriodMs". We can do >>> this >>>>> check at the store builder lever instead of at the processor level. >>>>> >>>>> >>>>> If we can remove its usage in both 1) and 2), then we should be able >> to >>>>> safely remove this from the `Windows` spec. >>>>> >>>>> >>>>> Guozhang >>>>> >>>>> >>>>> On Mon, Jul 9, 2018 at 3:53 PM, John Roesler <j...@confluent.io> >> wrote: >>>>> >>>>>> Thanks for the reply, Guozhang, >>>>>> >>>>>> Good! I agree, that is also a good reason, and I actually made use >> of >>>>> that >>>>>> in my tests. I'll update the KIP. >>>>>> >>>>>> By the way, I chose "allowedLateness" as I was trying to pick a >> better >>>>> name >>>>>> than "close", but I think it's actually the wrong name. We don't >> want >>> to >>>>>> bound the lateness of events in general, only with respect to the >> end >>> of >>>>>> their window. >>>>>> >>>>>> If we have a window [0,10), with "allowedLateness" of 5, then if we >>> get >>>>> an >>>>>> event with timestamp 3 at time 9, the name implies we'd reject it, >>> which >>>>>> seems silly. Really, we'd only want to start rejecting that event at >>>>> stream >>>>>> time 15. >>>>>> >>>>>> What I meant was more like "allowedLatenessAfterWindowEnd", but >>> that's >>>>> too >>>>>> verbose. I think that "close" + some documentation about what it >> means >>>>> will >>>>>> be better. >>>>>> >>>>>> 1: "Close" would be measured from the end of the window, so a >>> reasonable >>>>>> default would be "0". Recall that "close" really only needs to be >>>>> specified >>>>>> for final results, and a default of 0 would produce the most >> intuitive >>>>>> results. If folks later discover that they are missing some late >>> events, >>>>>> they can adjust the parameter accordingly. IMHO, any other value >> would >>>>> just >>>>>> be a guess on our part. >>>>>> >>>>>> 2a: >>>>>> I think you're saying to re-use "until" instead of adding "close" to >>> the >>>>>> window. >>>>>> >>>>>> The downside here would be that the semantic change could be more >>>>> confusing >>>>>> than deprecating "until" and introducing window "close" and a >>>>>> "retentionTime" on the store builder. The deprecation is a good, >>>>> controlled >>>>>> way for us to make sure people are getting the semantics they think >>>>> they're >>>>>> getting, as well as giving us an opportunity to link people to the >> API >>>>> they >>>>>> should use instead. >>>>>> >>>>>> I didn't fully understand the second part, but it sounds like you're >>>>>> suggesting to add a new "retentionTime" setter to Windows to bridge >>> the >>>>> gap >>>>>> until we add it to the store builder? That seems kind of roundabout >> to >>>>> me, >>>>>> if that's what you meant. We could just immediately add it to the >>> store >>>>>> builders in the same PR. >>>>>> >>>>>> 2b: Sounds good to me! >>>>>> >>>>>> Thanks again, >>>>>> -John >>>>>> >>>>>> >>>>>> On Mon, Jul 9, 2018 at 4:55 PM Guozhang Wang <wangg...@gmail.com> >>>>> wrote: >>>>>> >>>>>>> John, >>>>>>> >>>>>>> Thanks for your replies. As for the two options of the API, I >> think >>>>> I'm >>>>>>> slightly inclined to the first option as well. My motivation is a >>> bit >>>>>>> different, as I think of the first one maybe more flexible, for >>>>> example: >>>>>>> >>>>>>> KTable<Windowed<..>> table = ... count(); >>>>>>> >>>>>>> table.toStream().peek(..); // want to peek at the changelog >>> stream, >>>>> do >>>>>>> not care about final results. >>>>>>> >>>>>>> table.suppress().toStream().to("topic"); // sending to a topic, >>>>> want >>>>>> to >>>>>>> only send the final results. >>>>>>> >>>>>>> -------------- >>>>>>> >>>>>>> Besides that, I have a few more minor questions: >>>>>>> >>>>>>> 1. For "allowedLateness", what should be the default value? I.e. >> if >>>>> user >>>>>> do >>>>>>> not specify "allowedLateness" in TimeWindows, what value should we >>>>> set? >>>>>>> >>>>>>> 2. For API names, some personal suggestions here: >>>>>>> >>>>>>> 2.a) "allowedLateness" -> "until" (semantics changed, and also >>> value >>>>> is >>>>>>> defined as delta on top of window length), where "until" -> >>>>>>> "retentionPeriod", and the latter will be removed from `Windows` >> to >>> ` >>>>>>> WindowStoreBuilder` in the future. >>>>>>> >>>>>>> 2.b) "BufferConfig" -> "Buffered" ? >>>>>>> >>>>>>> >>>>>>> >>>>>>> Guozhang >>>>>>> >>>>>>> >>>>>>> On Mon, Jul 9, 2018 at 2:09 PM, John Roesler <j...@confluent.io> >>>>> wrote: >>>>>>> >>>>>>>> Hey Matthias and Guozhang, >>>>>>>> >>>>>>>> Sorry for the slow reply. I was mulling about your feedback and >>>>>> weighing >>>>>>>> some ideas in a sketchbook PR: https://github.com/apache/ >>>>>> kafka/pull/5337 >>>>>>> . >>>>>>>> >>>>>>>> Your thought about keeping suppression independent of business >>> logic >>>>>> is a >>>>>>>> very good one. I agree that it would make more sense to add some >>>>> kind >>>>>> of >>>>>>>> "window close" concept to the window definition. >>>>>>>> >>>>>>>> In fact, doing that immediately solves the inconsistency problem >>>>>> Guozhang >>>>>>>> brought up. There's no need to add a "final results" or >> "emission" >>>>>> option >>>>>>>> to the windowed aggregation. >>>>>>>> >>>>>>>> What do you think about an API more like this: >>>>>>>> >>>>>>>> final StreamsBuilder builder = new StreamsBuilder(); >>>>>>>> >>>>>>>> builder >>>>>>>> .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)) >>>>>>>> .groupBy( >>>>>>>> (String k1, String v1) -> k1, >>>>>>>> Serialized.with(STRING_SERDE, STRING_SERDE) >>>>>>>> ) >>>>>>>> .windowedBy(TimeWindows >>>>>>>> .of(scaledTime(2L)) >>>>>>>> .until(scaledTime(3L)) >>>>>>>> .allowedLateness(scaledTime(1L)) >>>>>>>> ) >>>>>>>> .count(Materialized.as("counts")) >>>>>>>> .suppress( >>>>>>>> emitFinalResultsOnly( >>>>>>>> BufferConfig.withBufferKeys(10_000L).bufferFullStrategy( >>>>>> SHUT_DOWN) >>>>>>>> ) >>>>>>>> ) >>>>>>>> .toStream() >>>>>>>> .to("output-suppressed", Produced.with(STRING_SERDE, >>> LONG_SERDE)); >>>>>>>> >>>>>>>> Note that: >>>>>>>> * "emitFinalResultsOnly" is available *only* on windowed tables >>>>>>> (enforced >>>>>>>> by the type system at compile time), and it determines the time >> to >>>>> wait >>>>>>> by >>>>>>>> looking at "allowedLateness" on the TimeWindows config. >>>>>>>> * querying "counts" will produce results (eventually) >> consistent >>>>> with >>>>>>>> what's observable in "output-suppressed". >>>>>>>> * in all cases, "suppress" has no effect on business logic, >> just >>> on >>>>>>> event >>>>>>>> suppression. >>>>>>>> >>>>>>>> Is this API straightforward? Or do you still prefer the version >>> that >>>>>> both >>>>>>>> proposed: >>>>>>>> >>>>>>>> ... >>>>>>>> .windowedBy(TimeWindows >>>>>>>> .of(scaledTime(2L)) >>>>>>>> .until(scaledTime(3L)) >>>>>>>> .allowedLateness(scaledTime(1L)) >>>>>>>> ) >>>>>>>> .count( >>>>>>>> Materialized.as("counts"), >>>>>>>> emitFinalResultsOnly( >>>>>>>> BufferConfig.withBufferKeys(10_000L).bufferFullStrategy( >>>>>> SHUT_DOWN) >>>>>>>> ) >>>>>>>> ) >>>>>>>> ... >>>>>>>> >>>>>>>> To me, these two are practically identical, and I still vaguely >>>>> prefer >>>>>>> the >>>>>>>> first one. >>>>>>>> >>>>>>>> The prototype has made clearer to me that users of "final >> results >>>>> for >>>>>>>> windows" and users of "suppression for table events" both need >> to >>>>>>> configure >>>>>>>> the suppression buffer. >>>>>>>> >>>>>>>> This buffer configuration consists of: >>>>>>>> 1. how many keys or bytes to keep in memory >>>>>>>> 2. what to do if memory runs out (shut down, start using disk, >>> ...) >>>>>>>> >>>>>>>> So it's not as simple as setting a "final results" flag. We'll >>>>> either >>>>>>> have >>>>>>>> an "Emit" config object on the windowed aggregators that takes >> the >>>>> same >>>>>>>> BufferConfig that the "Suppress" config on the suppression >>>>> operator, or >>>>>>> we >>>>>>>> just use the suppression operator for both. >>>>>>>> >>>>>>>> Perhaps it would sweeten the deal a little to point out that we >>>>> have 2 >>>>>>>> overloads already for each windowed aggregator (with and without >>>>>>>> Materialized). Adding "Emitted" or something would mean that >> we'd >>>>> add a >>>>>>> new >>>>>>>> overload for each one, taking us up to 4 overloads each for >>> "count", >>>>>>>> "aggregate" and "reduce". Using "suppress" means that we don't >> add >>>>> any >>>>>>> new >>>>>>>> overloads. >>>>>>>> >>>>>>>> Thanks again for helping to hash this out, >>>>>>>> -John >>>>>>>> >>>>>>>> On Fri, Jul 6, 2018 at 6:20 PM Guozhang Wang < >> wangg...@gmail.com> >>>>>> wrote: >>>>>>>> >>>>>>>>> I think I agree with Matthias for having dedicated APIs for >>>>> windowed >>>>>>>>> operation final output scenario, PLUS separating the window >>> close >>>>>> which >>>>>>>> the >>>>>>>>> "final output" would rely on, from the window retention time >>>>> itself >>>>>>>>> (admittedly it would make this KIP effort larger, but if we >>>>> believe >>>>>> we >>>>>>>> need >>>>>>>>> to do this separation anyways we could just do it now). >>>>>>>>> >>>>>>>>> And then we can have the `KTable#suppress()` for >>>>>>> intermediate-suppression >>>>>>>>> only, not for late-record-suppression, until we've seen that >>>>> becomes >>>>>> a >>>>>>>>> common feature request because our current design still allows >>> to >>>>> be >>>>>>>>> extended for that purpose. >>>>>>>>> >>>>>>>>> >>>>>>>>> Guozhang >>>>>>>>> >>>>>>>>> On Wed, Jul 4, 2018 at 12:53 PM, Matthias J. Sax < >>>>>>> matth...@confluent.io> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks for the discussion. I am just catching up. >>>>>>>>>> >>>>>>>>>> In general, I think we have different uses cases and >>>>> non-windowed >>>>>> and >>>>>>>>>> windowed is quite different. For the non-windowed case, >>>>> suppress() >>>>>>> has >>>>>>>>>> no (useful) close or retention time, no final semantics, and >>>>> also >>>>>> no >>>>>>>>>> business logic impact. >>>>>>>>>> >>>>>>>>>> On the other hand, for windowed aggregations, close time and >>>>> final >>>>>>>>>> result do have a meaning. IMHO, `close()` is part of >> business >>>>> logic >>>>>>>>>> while retention time is not. Also, suppression of >> intermediate >>>>>> result >>>>>>>> is >>>>>>>>>> not a business rule and there might be use case for which >>> either >>>>>>> "early >>>>>>>>>> intermediate" (before window end time) are suppressed only, >> or >>>>> all >>>>>>>>>> intermediates are suppressed (maybe also something in the >>>>> middle, >>>>>> ie, >>>>>>>>>> just reduce the load of intermediate updates). Thus, >>>>>>> window-suppression >>>>>>>>>> is much richer. >>>>>>>>>> >>>>>>>>>> IMHO, a generic `suppress()` operator that can be inserted >>> into >>>>> the >>>>>>>> data >>>>>>>>>> flow at any point is useful. Maybe we should keep is as >>> generic >>>>> as >>>>>>>>>> possible. However, it might be difficult to use with regard >> to >>>>>>>>>> windowing, as the mental effort to use it is high. >>>>>>>>>> >>>>>>>>>> With regard to Guozhang's comment: >>>>>>>>>> >>>>>>>>>>> we will actually >>>>>>>>>>> process data as old as 30 days as well, while most of the >>> late >>>>>>>> updates >>>>>>>>>>> beyond 5 minutes would be discarded anyways. >>>>>>>>>> >>>>>>>>>> If we use `suppress()` as a standalone operator, this is >>> correct >>>>>> and >>>>>>>>>> intended IMHO. To address the issue if the behavior is >>>>> unwanted, I >>>>>>>> would >>>>>>>>>> suggest to add a "suppress option" directly to >>>>>>>>>> `count()/reduce()/aggregate()` window operator similar to >>>>>>>>>> `Materialized`. This would be an "embedded suppress" and >> avoid >>>>> the >>>>>>>>>> issue. It would also address the issue about mental effort >> for >>>>>>> "single >>>>>>>>>> final window result" use case. >>>>>>>>>> >>>>>>>>>> I also think that a shorter close-time than retention time >> is >>>>>> useful >>>>>>>> for >>>>>>>>>> window aggregation. If we add close() to the window >> definition >>>>> and >>>>>>>>>> until() to `Materialized`, we can separate both correctly >>> IMHO. >>>>>>>>>> >>>>>>>>>> About setting `close = min(close,retention)` I am not sure. >> We >>>>>> might >>>>>>>>>> rather throw an exception than reducing the close time >>>>>> automatically. >>>>>>>>>> Otherwise, I see many user question about "I set close to X >>> but >>>>> it >>>>>>> does >>>>>>>>>> not get updated for some data that is with delay of X". >>>>>>>>>> >>>>>>>>>> The tricky question might be to design the API in a backward >>>>>>> compatible >>>>>>>>>> way though. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> On 7/3/18 5:38 AM, John Roesler wrote: >>>>>>>>>>> Hi Guozhang, >>>>>>>>>>> >>>>>>>>>>> I see. It seems like if we want to decouple 1) and 2), we >>>>> need to >>>>>>>> alter >>>>>>>>>> the >>>>>>>>>>> definition of the window. Do you think it would close the >>> gap >>>>> if >>>>>> we >>>>>>>>>> added a >>>>>>>>>>> "window close" time to the window definition? >>>>>>>>>>> >>>>>>>>>>> Such as: >>>>>>>>>>> >>>>>>>>>>> builder.stream("input") >>>>>>>>>>> .groupByKey() >>>>>>>>>>> .windowedBy( >>>>>>>>>>> TimeWindows >>>>>>>>>>> .of(60_000) >>>>>>>>>>> .closeAfter(10 * 60) >>>>>>>>>>> .until(30L * 24 * 60 * 60 * 1000) >>>>>>>>>>> ) >>>>>>>>>>> .count() >>>>>>>>>>> .suppress(Suppression.finalResultsOnly()); >>>>>>>>>>> >>>>>>>>>>> Possibly called "finalResultsAtWindowClose" or something? >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> -John >>>>>>>>>>> >>>>>>>>>>> On Mon, Jul 2, 2018 at 6:50 PM Guozhang Wang < >>>>> wangg...@gmail.com >>>>>>> >>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hey John, >>>>>>>>>>>> >>>>>>>>>>>> Obviously I'm too lazy on email replying diligence >> compared >>>>> with >>>>>>> you >>>>>>>>> :) >>>>>>>>>>>> Will try to reply them separately: >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> ------------------------------ >>> ------------------------------ >>>>>>>>>> ----------------- >>>>>>>>>>>> >>>>>>>>>>>> To reply your email on "Mon, Jul 2, 2018 at 8:23 AM": >>>>>>>>>>>> >>>>>>>>>>>> I'm aware of this use case, but again, the concern is >> that, >>>>> in >>>>>>> this >>>>>>>>>> setting >>>>>>>>>>>> in order to let the window be queryable for 30 days, we >>> will >>>>>>>> actually >>>>>>>>>>>> process data as old as 30 days as well, while most of the >>>>> late >>>>>>>> updates >>>>>>>>>>>> beyond 5 minutes would be discarded anyways. Personally I >>>>> think >>>>>>> for >>>>>>>>> the >>>>>>>>>>>> final update scenario, the ideal situation users would >> want >>>>> is >>>>>>> that >>>>>>>>> "do >>>>>>>>>> not >>>>>>>>>>>> process any data that is less than 5 minutes, and of >> course >>>>> no >>>>>>>> update >>>>>>>>>>>> records to the downstream later than 5 minutes either; >> but >>>>>> retain >>>>>>>> the >>>>>>>>>>>> window to be queryable for 30 days". And by doing that >> the >>>>> final >>>>>>>>> window >>>>>>>>>>>> snapshot would also be aligned with the update stream as >>>>> well. >>>>>> In >>>>>>>>> other >>>>>>>>>>>> words, among these three periods: >>>>>>>>>>>> >>>>>>>>>>>> 1) the retention length of the window / table. >>>>>>>>>>>> 2) the late records acceptance for updating the window. >>>>>>>>>>>> 3) the late records update to be sent downstream. >>>>>>>>>>>> >>>>>>>>>>>> Final update use cases would naturally want 2) = 3), >> while >>> 1) >>>>>> may >>>>>>> be >>>>>>>>>>>> different and larger, while what we provide now is that >> 1) >>> = >>>>> 2), >>>>>>>> which >>>>>>>>>>>> could be different and in practice larger than 3), hence >>> not >>>>> the >>>>>>>> most >>>>>>>>>>>> intuitive for their needs. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> ------------------------------ >>> ------------------------------ >>>>>>>>>> ----------------- >>>>>>>>>>>> >>>>>>>>>>>> To reply your email on "Mon, Jul 2, 2018 at 10:27 AM": >>>>>>>>>>>> >>>>>>>>>>>> I'd like option 2) over option 1) better as well from >>>>>> programming >>>>>>>> pov. >>>>>>>>>> But >>>>>>>>>>>> I'm wondering if option 2) would provide the above >>> semantics >>>>> or >>>>>> it >>>>>>>> is >>>>>>>>>> still >>>>>>>>>>>> coupling 1) with 2) as well ? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Guozhang >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Jul 2, 2018 at 1:08 PM, John Roesler < >>>>> j...@confluent.io >>>>>>> >>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> In fact, to push the idea further (which IIRC is what >>>>> Matthias >>>>>>>>>> originally >>>>>>>>>>>>> proposed), if we can accept >> "Suppression#finalResultsOnly" >>>>> in >>>>>> my >>>>>>>> last >>>>>>>>>>>>> email, then we could also consider whether to eliminate >>>>>>>>>>>>> "suppressLateEvents" entirely. >>>>>>>>>>>>> >>>>>>>>>>>>> We could always add it later, but you've both expressed >>>>> doubt >>>>>>> that >>>>>>>>>> there >>>>>>>>>>>>> are practical use cases for it outside of final-results. >>>>>>>>>>>>> >>>>>>>>>>>>> -John >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Jul 2, 2018 at 12:27 PM John Roesler < >>>>>> j...@confluent.io> >>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi again, Guozhang ;) Here's the second part of my >>>>> response... >>>>>>>>>>>>>> >>>>>>>>>>>>>> It seems like your main concern is: "if I'm a user who >>>>> wants >>>>>>> final >>>>>>>>>>>> update >>>>>>>>>>>>>> semantics, how complicated is it for me to get it?" >>>>>>>>>>>>>> >>>>>>>>>>>>>> I think we have to assume that people don't always have >>>>> time >>>>>> to >>>>>>>>> become >>>>>>>>>>>>>> deeply familiar with all the nuances of a programming >>>>>>> environment >>>>>>>>>>>> before >>>>>>>>>>>>>> they use it. Especially if they're evaluating several >>>>>> frameworks >>>>>>>> for >>>>>>>>>>>>> their >>>>>>>>>>>>>> use case, it's very valuable to make it as obvious as >>>>> possible >>>>>>> how >>>>>>>>> to >>>>>>>>>>>>>> accomplish various computations with Streams. >>>>>>>>>>>>>> >>>>>>>>>>>>>> To me the biggest question is whether with a fresh >>>>>> perspective, >>>>>>>>> people >>>>>>>>>>>>>> would say "oh, I get it, I have to bound my lateness >> and >>>>>>> suppress >>>>>>>>>>>>>> intermediate updates, and of course I'll get only the >>> final >>>>>>>>> result!", >>>>>>>>>>>> or >>>>>>>>>>>>> if >>>>>>>>>>>>>> it's more like "wtf? all I want is the final result, >> what >>>>> are >>>>>>> all >>>>>>>>>> these >>>>>>>>>>>>>> parameters?". >>>>>>>>>>>>>> >>>>>>>>>>>>>> I was talking with Matthias a while back, and he had an >>>>> idea >>>>>>> that >>>>>>>> I >>>>>>>>>>>> think >>>>>>>>>>>>>> can help, which is to essentially set up a final-result >>>>> recipe >>>>>>> in >>>>>>>>>>>>> addition >>>>>>>>>>>>>> to the raw parameters. I previously thought that it >>>>> wouldn't >>>>>> be >>>>>>>>>>>> possible >>>>>>>>>>>>> to >>>>>>>>>>>>>> restrict its usage to Windowed KTables, but thinking >>> about >>>>> it >>>>>>>> again >>>>>>>>>>>> this >>>>>>>>>>>>>> weekend, I have a couple of ideas: >>>>>>>>>>>>>> >>>>>>>>>>>>>> ================ >>>>>>>>>>>>>> = 1. Static Wrapper = >>>>>>>>>>>>>> ================ >>>>>>>>>>>>>> We can define an extra static function that "wraps" a >>>>> KTable >>>>>>> with >>>>>>>>>>>>>> final-result semantics. >>>>>>>>>>>>>> >>>>>>>>>>>>>> public static <K extends Windowed, V> KTable<K, V> >>>>>>>> finalResultsOnly( >>>>>>>>>>>>>> final KTable<K, V> windowedKTable, >>>>>>>>>>>>>> final Duration maxAllowedLateness, >>>>>>>>>>>>>> final Suppression.BufferFullStrategy >>> bufferFullStrategy) >>>>> { >>>>>>>>>>>>>> return windowedKTable.suppress( >>>>>>>>>>>>>> Suppression.suppressLateEvents( >>> maxAllowedLateness) >>>>>>>>>>>>>> .suppressIntermediateEvents( >>>>>>>>>>>>>> IntermediateSuppression >>>>>>>>>>>>>> .emitAfter(maxAllowedLateness) >>>>>>>>>>>>>> .bufferFullStrategy( >>>>>> bufferFullStrategy) >>>>>>>>>>>>>> ) >>>>>>>>>>>>>> ); >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> Because windowedKTable is a parameter, the static >>> function >>>>> can >>>>>>>>> easily >>>>>>>>>>>>>> impose an extra bound on the key type, that it extends >>>>>> Windowed. >>>>>>>>> This >>>>>>>>>>>>> would >>>>>>>>>>>>>> make "final results only" only available on windowed >>>>> ktables. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Here's how it would look to use: >>>>>>>>>>>>>> >>>>>>>>>>>>>> final KTable<Windowed<Integer>, Long> windowCounts = >> ... >>>>>>>>>>>>>> final KTable<Windowed<Integer>, Long> finalCounts = >>>>>>>>>>>>>> finalResultsOnly( >>>>>>>>>>>>>> windowCounts, >>>>>>>>>>>>>> Duration.ofMinutes(10), >>>>>>>>>>>>>> Suppression.BufferFullStrategy.SHUT_DOWN >>>>>>>>>>>>>> ); >>>>>>>>>>>>>> >>>>>>>>>>>>>> Trying to use it on a non-windowed KTable yields: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Error:(129, 35) java: method finalResultsOnly in class >>>>>>>>>>>>>>> org.apache.kafka.streams.kstream.internals. >>>>>> KTableAggregateTest >>>>>>>>>> cannot >>>>>>>>>>>>> be >>>>>>>>>>>>>>> applied to given types; >>>>>>>>>>>>>>> required: >>>>>>>>>>>>>>> >> org.apache.kafka.streams.kstream.KTable<K,V>,java.time. >>>>>>>>>>>>> Duration,org.apache.kafka.streams.kstream.Suppression. >>>>>>>>>> BufferFullStrategy >>>>>>>>>>>>>>> found: >>>>>>>>>>>>>>> org.apache.kafka.streams.kstream.KTable<java.lang. >>>>>>>>>>>>> String,java.lang.String>,java.time.Duration,org.apache. >>>>>>>>>>>>> kafka.streams.kstream.Suppression.BufferFullStrategy >>>>>>>>>>>>>>> reason: inference variable K has incompatible bounds >>>>>>>>>>>>>>> equality constraints: java.lang.String >>>>>>>>>>>>>>> upper bounds: >>>>> org.apache.kafka.streams.kstream.Windowed >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> ================================================= >>>>>>>>>>>>>> = 2. Add <K,V> parameters and recipe method to >>> Suppression >>>>> = >>>>>>>>>>>>>> ================================================= >>>>>>>>>>>>>> >>>>>>>>>>>>>> By adding K,V parameters to Suppression, we can >> provide a >>>>>>>> similarly >>>>>>>>>>>>>> bounded config method directly on the Suppression >> class: >>>>>>>>>>>>>> >>>>>>>>>>>>>> public static <K extends Windowed, V> Suppression<K, V> >>>>>>>>>>>>>> finalResultsOnly(final Duration maxAllowedLateness, >> final >>>>>>>>>>>>>> BufferFullStrategy bufferFullStrategy) { >>>>>>>>>>>>>> return Suppression >>>>>>>>>>>>>> .<K, V>suppressLateEvents(maxAllowedLateness) >>>>>>>>>>>>>> .suppressIntermediateEvents( >>> IntermediateSuppression >>>>>>>>>>>>>> .emitAfter(maxAllowedLateness) >>>>>>>>>>>>>> .bufferFullStrategy(bufferFullStrategy) >>>>>>>>>>>>>> ); >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> Then, here's how it would look to use it: >>>>>>>>>>>>>> >>>>>>>>>>>>>> final KTable<Windowed<Integer>, Long> windowCounts = >> ... >>>>>>>>>>>>>> final KTable<Windowed<Integer>, Long> finalCounts = >>>>>>>>>>>>>> windowCounts.suppress( >>>>>>>>>>>>>> Suppression.finalResultsOnly( >>>>>>>>>>>>>> Duration.ofMinutes(10) >>>>>>>>>>>>>> Suppression.BufferFullStrategy.SHUT_DOWN >>>>>>>>>>>>>> ) >>>>>>>>>>>>>> ); >>>>>>>>>>>>>> >>>>>>>>>>>>>> Trying to use it on a non-windowed ktable yields: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Error:(127, 35) java: method finalResultsOnly in class >>>>>>>>>>>>>>> org.apache.kafka.streams.kstream.Suppression<K,V> >>> cannot >>>>> be >>>>>>>> applied >>>>>>>>>> to >>>>>>>>>>>>>>> given types; >>>>>>>>>>>>>>> required: >>>>>>>>>>>>>>> java.time.Duration,org.apache.kafka.streams.kstream. >>>>>>>>>>>>> Suppression.BufferFullStrategy >>>>>>>>>>>>>>> found: >>>>>>>>>>>>>>> java.time.Duration,org.apache.kafka.streams.kstream. >>>>>>>>>>>>> Suppression.BufferFullStrategy >>>>>>>>>>>>>>> reason: explicit type argument java.lang.String does >>> not >>>>>>>> conform >>>>>>>>> to >>>>>>>>>>>>>>> declared bound(s) >>>>> org.apache.kafka.streams.kstream.Windowed >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> ============ >>>>>>>>>>>>>> = Downsides = >>>>>>>>>>>>>> ============ >>>>>>>>>>>>>> >>>>>>>>>>>>>> Of course, there's a downside either way: >>>>>>>>>>>>>> * for 1: this "wrapper" interaction would be the first >>> in >>>>> the >>>>>>>> DSL. >>>>>>>>> Is >>>>>>>>>>>> it >>>>>>>>>>>>>> too strange, and how discoverable would it be? >>>>>>>>>>>>>> * for 2: adding those type parameters to Suppression >> will >>>>>> force >>>>>>>> all >>>>>>>>>>>>>> callers to provide them in the event of a chained >>>>> construction >>>>>>>>> because >>>>>>>>>>>>> Java >>>>>>>>>>>>>> doesn't do RHS recursive type inference. This is >> already >>>>>> visible >>>>>>>> in >>>>>>>>>>>> other >>>>>>>>>>>>>> parts of the Streams DSL. For example, often calls to >>>>>>> Materialized >>>>>>>>>>>>> builders >>>>>>>>>>>>>> have to provide seemingly obvious type bounds. >>>>>>>>>>>>>> >>>>>>>>>>>>>> ============ >>>>>>>>>>>>>> = Conclusion = >>>>>>>>>>>>>> ============ >>>>>>>>>>>>>> >>>>>>>>>>>>>> I think option 2 is more "normal" and discoverable. It >>> does >>>>>>> have a >>>>>>>>>>>>>> downside, but it's one that's pre-existing elsewhere in >>> the >>>>>> DSL. >>>>>>>>>>>>>> >>>>>>>>>>>>>> WDYT? Would the addition of this "recipe" method to >>>>>> Suppression >>>>>>>>>> resolve >>>>>>>>>>>>>> your concern? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks again, >>>>>>>>>>>>>> -John >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, Jul 1, 2018 at 11:24 PM Guozhang Wang < >>>>>>> wangg...@gmail.com >>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi John, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regarding the metrics: yeah I think I'm with you that >>> the >>>>>>> dropped >>>>>>>>>>>>> records >>>>>>>>>>>>>>> due to window retention or emit suppression policies >>>>> should >>>>>> be >>>>>>>>>>>> recorded >>>>>>>>>>>>>>> differently, and using this KIP's proposed metric >> would >>> be >>>>>>> fine. >>>>>>>> If >>>>>>>>>>>> you >>>>>>>>>>>>>>> also think we can use this KIP's proposed metrics to >>> cover >>>>>> the >>>>>>>>> window >>>>>>>>>>>>>>> retention cased skipping records, then we can include >>> the >>>>>>> changes >>>>>>>>> in >>>>>>>>>>>>> this >>>>>>>>>>>>>>> KIP as well. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regarding the current proposal, I'm actually not too >>>>> worried >>>>>>>> about >>>>>>>>>> the >>>>>>>>>>>>>>> inconsistency between query semantics and downstream >>> emit >>>>>>>>> semantics. >>>>>>>>>>>> For >>>>>>>>>>>>>>> queries, we will always return the current running >>>>> results of >>>>>>> the >>>>>>>>>>>>> windows, >>>>>>>>>>>>>>> being it partial or final results depending on the >>> window >>>>>>>> retention >>>>>>>>>>>> time >>>>>>>>>>>>>>> anyways, which has nothing to do whether the emitted >>>>> stream >>>>>>>> should >>>>>>>>> be >>>>>>>>>>>>> one >>>>>>>>>>>>>>> final output per key or not. I also agree that having >> a >>>>>> unified >>>>>>>>>>>>> operation >>>>>>>>>>>>>>> is generally better for users to focus on leveraging >>> that >>>>> one >>>>>>>> only >>>>>>>>>>>> than >>>>>>>>>>>>>>> learning about two set of operations. The only >> question >>> I >>>>> had >>>>>>> is, >>>>>>>>> for >>>>>>>>>>>>>>> final >>>>>>>>>>>>>>> updates of window stores, if it is a bit awkward to >>>>>> understand >>>>>>>> the >>>>>>>>>>>>>>> configuration combo. Thinking about this more, I think >>> my >>>>>> root >>>>>>>>> worry >>>>>>>>>>>> in >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> "suppressLateEvents" call for windowed tables, since >>> from >>>>> a >>>>>>> user >>>>>>>>>>>>>>> perspective: if my retention time is X which means >> "pay >>>>> the >>>>>>> cost >>>>>>>> to >>>>>>>>>>>>> allow >>>>>>>>>>>>>>> late records up to X to still be applied updating the >>>>>> tables", >>>>>>>> why >>>>>>>>>>>>> would I >>>>>>>>>>>>>>> ever want to suppressLateEvents by Y ( < X), to say >> "do >>>>> not >>>>>>> send >>>>>>>>> the >>>>>>>>>>>>>>> updates up to Y, which means the downstream operator >> or >>>>> sink >>>>>>>> topic >>>>>>>>>> for >>>>>>>>>>>>>>> this >>>>>>>>>>>>>>> stream would actually see a truncated update stream >>> while >>>>>> I've >>>>>>>> paid >>>>>>>>>>>>> larger >>>>>>>>>>>>>>> cost for that"; and of course, Y > X would not make >>> sense >>>>>>> either >>>>>>>> as >>>>>>>>>>>> you >>>>>>>>>>>>>>> would not see any updates later than X anyways. So in >>>>> all, my >>>>>>>>> feeling >>>>>>>>>>>> is >>>>>>>>>>>>>>> that it makes less sense for windowed table's >>>>>>>> "suppressLateEvents" >>>>>>>>>>>> with >>>>>>>>>>>>> a >>>>>>>>>>>>>>> parameter that is not equal to the window retention, >> and >>>>>>> opening >>>>>>>>> the >>>>>>>>>>>>> door >>>>>>>>>>>>>>> in the current proposal may confuse people with that. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Again, above is just a subjective opinion and probably >>> we >>>>> can >>>>>>>> also >>>>>>>>>>>> bring >>>>>>>>>>>>>>> up >>>>>>>>>>>>>>> some scenarios that users does want to set X != Y.. >> but >>>>>>>> personally >>>>>>>>> I >>>>>>>>>>>>> feel >>>>>>>>>>>>>>> that even if the semantics for this scenario if >>> intuitive >>>>> for >>>>>>>> user >>>>>>>>> to >>>>>>>>>>>>>>> understand, doe that really make sense and should we >>>>> really >>>>>>> open >>>>>>>>> the >>>>>>>>>>>>> door >>>>>>>>>>>>>>> for it. So I think maybe separating the final update >> in >>> a >>>>>>>> separate >>>>>>>>>>>> API's >>>>>>>>>>>>>>> benefits may overwhelm the advantage of having one >>> uniform >>>>>>>>>> definition. >>>>>>>>>>>>> And >>>>>>>>>>>>>>> for my alternative proposal, the rationale was from >> both >>>>> my >>>>>>>> concern >>>>>>>>>>>>> about >>>>>>>>>>>>>>> "suppressLateEvents" for windowed store, and Matthias' >>>>>> question >>>>>>>>> about >>>>>>>>>>>>>>> "suppressLateEvents" for non-windowed stores, that if >> it >>>>> is >>>>>>> less >>>>>>>>>>>>>>> meaningful >>>>>>>>>>>>>>> for both, we can consider removing it completely and >>> only >>>>> do >>>>>>>>>>>>>>> "IntermediateSuppression" in Suppress instead. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> So I'd summarize my thoughts in the following >> questions: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1. Does "suppressLateEvents" with parameter Y != X >>> (window >>>>>>>>> retention >>>>>>>>>>>>> time) >>>>>>>>>>>>>>> for windowed stores make sense in practice? >>>>>>>>>>>>>>> 2. Does "suppressLateEvents" with any parameter Y for >>>>>>>> non-windowed >>>>>>>>>>>>> stores >>>>>>>>>>>>>>> make sense in practice? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Jun 29, 2018 at 2:26 PM, Bill Bejeck < >>>>>>> bbej...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for the explanation, that does make sense. I >>> have >>>>>> some >>>>>>>>>>>>>>> questions on >>>>>>>>>>>>>>>> operations, but I'll just wait for the PR and tests. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>> Bill >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, Jun 27, 2018 at 8:14 PM John Roesler < >>>>>>> j...@confluent.io >>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Bill, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for the review! >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Your question is very much applicable to the KIP and >>>>> not at >>>>>>> all >>>>>>>>> an >>>>>>>>>>>>>>>>> implementation detail. Thanks for bringing it up. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I'm proposing not to change the existing caches and >>>>>>>>> configurations >>>>>>>>>>>>> at >>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>> (for now). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Imagine you have a topology like this: >>>>>>>>>>>>>>>>> commit.interval.ms = 100 >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> (ktable1 (cached)) -> (suppress emitAfter 200) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The first ktable (ktable1) will respect the commit >>>>> interval >>>>>>> and >>>>>>>>>>>>> buffer >>>>>>>>>>>>>>>>> events for 100ms before logging, storing, or >>> forwarding >>>>>> them >>>>>>>>>>>> (IIRC). >>>>>>>>>>>>>>>>> Therefore, the second ktable (suppress) will only >> see >>>>> the >>>>>>>> events >>>>>>>>>>>> at >>>>>>>>>>>>> a >>>>>>>>>>>>>>>> rate >>>>>>>>>>>>>>>>> of once per 100ms. It will apply its own buffering, >>> and >>>>>> emit >>>>>>>> once >>>>>>>>>>>>> per >>>>>>>>>>>>>>>> 200ms >>>>>>>>>>>>>>>>> This case is pretty trivial because the suppress >> time >>>>> is a >>>>>>>>>>>> multiple >>>>>>>>>>>>> of >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> commit interval. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> When it's not an integer multiple, you'll get >> behavior >>>>> like >>>>>>> in >>>>>>>>>>>> this >>>>>>>>>>>>>>>> marble >>>>>>>>>>>>>>>>> diagram: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> <-(k:1)--(k:2)--(k:3)--(k:4)--(k:5)--(k:6)-> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> [ KTable caching with commit interval = 2 ] >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> <--------(k:2)---------(k:4)---------(k:6)-> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> [ suppress with emitAfter = 3 ] >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> <---------------(k:2)----------------(k:6)-> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> If this behavior isn't desired (for example, if you >>>>> wanted >>>>>> to >>>>>>>>> emit >>>>>>>>>>>>>>> (k:3) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> time 3, I'd recommend setting the >>>>>> "cache.max.bytes.buffering" >>>>>>>> to >>>>>>>>> 0 >>>>>>>>>>>>> or >>>>>>>>>>>>>>>>> modifying the topology to disable caching. Then, the >>>>>> behavior >>>>>>>> is >>>>>>>>>>>>> more >>>>>>>>>>>>>>>>> simply determined just by the suppress operator. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Does that seem right to you? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Regarding the changelogs, because the suppression >>>>> operator >>>>>>>> hangs >>>>>>>>>>>>> onto >>>>>>>>>>>>>>>>> events for a while, it will need its own changelog. >>> The >>>>>>>> changelog >>>>>>>>>>>>>>>>> should represent the current state of the buffer at >>> all >>>>>>> times. >>>>>>>> So >>>>>>>>>>>>> when >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> suppress operator sees (k:2), for example, it will >> log >>>>>> (k:2). >>>>>>>>> When >>>>>>>>>>>>> it >>>>>>>>>>>>>>>>> later gets to time 3, it's time to emit (k:2) >>>>> downstream. >>>>>>>> Because >>>>>>>>>>>> k >>>>>>>>>>>>>>> is no >>>>>>>>>>>>>>>>> longer buffered, the suppress operator will log >>>>> (k:null). >>>>>>> Thus, >>>>>>>>>>>> when >>>>>>>>>>>>>>>>> recovering, >>>>>>>>>>>>>>>>> it can rebuild the buffer by reading its changelog. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> What do you think about this? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, Jun 27, 2018 at 4:16 PM Bill Bejeck < >>>>>>> bbej...@gmail.com >>>>>>>>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi John, thanks for the KIP. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Early on in the KIP, you mention the current >>> approaches >>>>>> for >>>>>>>>>>>>>>> controlling >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> rate of downstream records from a KTable, cache >> size >>>>>>>>>>>> configuration >>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>> commit time. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Will these configuration parameters still be in >>> effect >>>>> for >>>>>>>>>>>> tables >>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>> don't use suppression? For tables taking advantage >>> of >>>>>>>>>>>>> suppression, >>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>> these configurations have no impact? >>>>>>>>>>>>>>>>>> This last question may be to implementation >> specific >>>>> but >>>>>> if >>>>>>>> the >>>>>>>>>>>>>>>> requested >>>>>>>>>>>>>>>>>> suppression time is longer than the specified >> commit >>>>> time, >>>>>>>> will >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> latest >>>>>>>>>>>>>>>>>> record in the suppression buffer get stored in a >>>>>> changelog? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>> Bill >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Wed, Jun 27, 2018 at 3:04 PM John Roesler < >>>>>>>> j...@confluent.io >>>>>>>>>>>>> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks for the feedback, Matthias, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> It seems like in straightforward relational >>> processing >>>>>>> cases, >>>>>>>>>>>> it >>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>> make sense to bound the lateness of KTables. In >>>>> general, >>>>>> it >>>>>>>>>>>>> seems >>>>>>>>>>>>>>>>> better >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> have "guard rails" in place that make it easier to >>>>> write >>>>>>>>>>>>> sensible >>>>>>>>>>>>>>>>>> programs >>>>>>>>>>>>>>>>>>> than insensible ones. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> But I'm still going to argue in favor of keeping >> it >>>>> for >>>>>> all >>>>>>>>>>>>>>> KTables >>>>>>>>>>>>>>>> ;) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 1. I believe it is simpler to understand the >>> operator >>>>> if >>>>>> it >>>>>>>>>>>> has >>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>> uniform >>>>>>>>>>>>>>>>>>> definition, regardless of context. It's well >> defined >>>>> and >>>>>>>>>>>>> intuitive >>>>>>>>>>>>>>>> what >>>>>>>>>>>>>>>>>>> will happen when you use late-event suppression >> on a >>>>>>> KTable, >>>>>>>>>>>> so >>>>>>>>>>>>> I >>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>> nothing surprising or dangerous will happen in >> that >>>>> case. >>>>>>>> From >>>>>>>>>>>>> my >>>>>>>>>>>>>>>>>>> perspective, having two sets of allowed operations >>> is >>>>>>>> actually >>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>> increase >>>>>>>>>>>>>>>>>>> in cognitive complexity. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 2. To me, it's not crazy to use the operator this >>> way. >>>>>> For >>>>>>>>>>>>>>> example, >>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>> lieu >>>>>>>>>>>>>>>>>>> of full-featured timestamp semantics, I can >>> implement >>>>>> MVCC >>>>>>>>>>>>>>> behavior >>>>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>>>> building a KTable by >>>>> "suppressLateEvents(Duration.ZERO)". >>>>>> I >>>>>>>>>>>>>>> suspect >>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>> there are other, non-obvious applications of >>>>> suppressing >>>>>>> late >>>>>>>>>>>>>>> events >>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>> KTables. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 3. Not to get too much into implementation details >>> in >>>>> a >>>>>> KIP >>>>>>>>>>>>>>>> discussion, >>>>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>> if we did want to make late-event suppression >>>>> available >>>>>>> only >>>>>>>>>>>> on >>>>>>>>>>>>>>>>> windowed >>>>>>>>>>>>>>>>>>> KTables, we have two enforcement options: >>>>>>>>>>>>>>>>>>> a. check when we build the topology - this would >>> be >>>>>>> simple >>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> implement, >>>>>>>>>>>>>>>>>>> but would be a runtime check. Hopefully, people >>> write >>>>>> tests >>>>>>>>>>>> for >>>>>>>>>>>>>>> their >>>>>>>>>>>>>>>>>>> topology before deploying them, so the feedback >> loop >>>>>> isn't >>>>>>>>>>>>>>>>> instantaneous, >>>>>>>>>>>>>>>>>>> but it's not too long either. >>>>>>>>>>>>>>>>>>> b. add a new WindowedKTable type - this would >> be a >>>>>>> compile >>>>>>>>>>>>> time >>>>>>>>>>>>>>>>> check, >>>>>>>>>>>>>>>>>>> but would also be substantial increase of both >>>>> interface >>>>>>> and >>>>>>>>>>>>> code >>>>>>>>>>>>>>>>>>> complexity. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> We should definitely strive to have guard rails >>>>>> protecting >>>>>>>>>>>>> against >>>>>>>>>>>>>>>>>>> surprising or dangerous behavior. Protecting >> against >>>>>>> programs >>>>>>>>>>>>>>> that we >>>>>>>>>>>>>>>>>> don't >>>>>>>>>>>>>>>>>>> currently predict is a lesser benefit, and I think >>> we >>>>> can >>>>>>> put >>>>>>>>>>>> up >>>>>>>>>>>>>>>> guard >>>>>>>>>>>>>>>>>>> rails on a case-by-case basis for that. It seems >>> like >>>>> the >>>>>>>>>>>>>>> increase in >>>>>>>>>>>>>>>>>>> cognitive (and potentially code and interface) >>>>> complexity >>>>>>>>>>>> makes >>>>>>>>>>>>> me >>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>> should skip this case. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> What do you think? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Wed, Jun 27, 2018 at 11:59 AM Matthias J. Sax < >>>>>>>>>>>>>>>>> matth...@confluent.io> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks for the KIP John. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> One initial comments about the last example >>> "Bounded >>>>>>>>>>>>> lateness": >>>>>>>>>>>>>>>> For a >>>>>>>>>>>>>>>>>>>> non-windowed KTable bounding the lateness does >> not >>>>>> really >>>>>>>>>>>> make >>>>>>>>>>>>>>>> sense, >>>>>>>>>>>>>>>>>>>> does it? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thus, I am wondering if we should allow >>>>>>>>>>>> `suppressLateEvents()` >>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>> case? It seems to be better to only allow it for >>>>>>>>>>>>>>> windowed-KTables. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On 6/27/18 8:53 AM, Ted Yu wrote: >>>>>>>>>>>>>>>>>>>>> I noticed this (lack of primary parameter) as >>> well. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> What you gave as new example is semantically the >>>>> same >>>>>> as >>>>>>>>>>>>> what >>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>> suggested. >>>>>>>>>>>>>>>>>>>>> So it is good by me. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Wed, Jun 27, 2018 at 7:31 AM, John Roesler < >>>>>>>>>>>>>>> j...@confluent.io >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks for taking look, Ted, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I agree this is a departure from the >> conventions >>> of >>>>>>>>>>>> Streams >>>>>>>>>>>>>>> DSL. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Most of our config objects have one or two >>>>> "required" >>>>>>>>>>>>>>>> parameters, >>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>> fit >>>>>>>>>>>>>>>>>>>>>> naturally with the static factory method >>> approach. >>>>>>>>>>>>>>> TimeWindow, >>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>> example, >>>>>>>>>>>>>>>>>>>>>> requires a size parameter, so we can naturally >>> say >>>>>>>>>>>>>>>>>>> TimeWindows.of(size). >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I think in the case of a suppression, there's >>>>> really >>>>>> no >>>>>>>>>>>>>>> "core" >>>>>>>>>>>>>>>>>>>> parameter, >>>>>>>>>>>>>>>>>>>>>> and "Suppression.of()" seems sillier than "new >>>>>>>>>>>>>>> Suppression()". I >>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>> Suppression.of(duration) would be ambiguous, >>> since >>>>>> there >>>>>>>>>>>>> are >>>>>>>>>>>>>>>> many >>>>>>>>>>>>>>>>>>>> durations >>>>>>>>>>>>>>>>>>>>>> that we can configure. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> However, thinking about it again, I suppose >> that >>> I >>>>> can >>>>>>>>>>>> give >>>>>>>>>>>>>>> each >>>>>>>>>>>>>>>>>>>>>> configuration method a static version, which >>> would >>>>> let >>>>>>>>>>>> you >>>>>>>>>>>>>>>> replace >>>>>>>>>>>>>>>>>>> "new >>>>>>>>>>>>>>>>>>>>>> Suppression()." with "Suppression." in all the >>>>>> examples. >>>>>>>>>>>>>>>>> Basically, >>>>>>>>>>>>>>>>>>>> instead >>>>>>>>>>>>>>>>>>>>>> of "of()", we'd support any of the methods I >>>>> listed. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> For example: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> windowCounts >>>>>>>>>>>>>>>>>>>>>> .suppress( >>>>>>>>>>>>>>>>>>>>>> Suppression >>>>>>>>>>>>>>>>>>>>>> .suppressLateEvents(Duration. >>>>>> ofMinutes(10)) >>>>>>>>>>>>>>>>>>>>>> .suppressIntermediateEvents( >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>> IntermediateSuppression.emitAfter(Duration.ofMinutes( >>>>>> 10)) >>>>>>>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>>>>>>> ); >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Does that seem better? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Wed, Jun 27, 2018 at 12:44 AM Ted Yu < >>>>>>>>>>>>> yuzhih...@gmail.com >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I started to read this KIP which contains a >> lot >>> of >>>>>>>>>>>>>>> materials. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> One suggestion: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> .suppress( >>>>>>>>>>>>>>>>>>>>>>> new Suppression() >>>>>>>>>>>>>>>>>> >
signature.asc
Description: OpenPGP digital signature