Hi Guozhang, That sounds good to me. I'll include that in the KIP.
Thanks, -John On Mon, Jul 9, 2018 at 6:33 PM Guozhang Wang <wangg...@gmail.com> wrote: > Let me clarify a bit on what I meant about moving `retentionPeriod` to > WindowStoreBuilder: > > In another discussion we had around KIP-319 / 330, that the "retention > period" should not really be a window spec, but only a window store spec, > as it only affects how long to retain each window to be queryable along > with the storage cost. > > More specifically, today the "maintainMs" returned from Windows is used in > three places: > > 1) for windowed aggregations, they are passed in directly into > `Stores.persistentWindows()` as the retention period parameters. For this > use case we should just let the WindowStoreBuilder to specify this value > itself. > > NOTE: It is also returned in the KStreamWindowAggregate processor, to > determine if a received record should be dropped due to its lateness. We > may need to think of another way to get this value inside the processor > > 2) for windowed stream-stream join, it is used as the join range parameter > but only to check that "windowSizeMs <= retentionPeriodMs". We can do this > check at the store builder lever instead of at the processor level. > > > If we can remove its usage in both 1) and 2), then we should be able to > safely remove this from the `Windows` spec. > > > Guozhang > > > On Mon, Jul 9, 2018 at 3:53 PM, John Roesler <j...@confluent.io> wrote: > > > Thanks for the reply, Guozhang, > > > > Good! I agree, that is also a good reason, and I actually made use of > that > > in my tests. I'll update the KIP. > > > > By the way, I chose "allowedLateness" as I was trying to pick a better > name > > than "close", but I think it's actually the wrong name. We don't want to > > bound the lateness of events in general, only with respect to the end of > > their window. > > > > If we have a window [0,10), with "allowedLateness" of 5, then if we get > an > > event with timestamp 3 at time 9, the name implies we'd reject it, which > > seems silly. Really, we'd only want to start rejecting that event at > stream > > time 15. > > > > What I meant was more like "allowedLatenessAfterWindowEnd", but that's > too > > verbose. I think that "close" + some documentation about what it means > will > > be better. > > > > 1: "Close" would be measured from the end of the window, so a reasonable > > default would be "0". Recall that "close" really only needs to be > specified > > for final results, and a default of 0 would produce the most intuitive > > results. If folks later discover that they are missing some late events, > > they can adjust the parameter accordingly. IMHO, any other value would > just > > be a guess on our part. > > > > 2a: > > I think you're saying to re-use "until" instead of adding "close" to the > > window. > > > > The downside here would be that the semantic change could be more > confusing > > than deprecating "until" and introducing window "close" and a > > "retentionTime" on the store builder. The deprecation is a good, > controlled > > way for us to make sure people are getting the semantics they think > they're > > getting, as well as giving us an opportunity to link people to the API > they > > should use instead. > > > > I didn't fully understand the second part, but it sounds like you're > > suggesting to add a new "retentionTime" setter to Windows to bridge the > gap > > until we add it to the store builder? That seems kind of roundabout to > me, > > if that's what you meant. We could just immediately add it to the store > > builders in the same PR. > > > > 2b: Sounds good to me! > > > > Thanks again, > > -John > > > > > > On Mon, Jul 9, 2018 at 4:55 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > > > John, > > > > > > Thanks for your replies. As for the two options of the API, I think I'm > > > slightly inclined to the first option as well. My motivation is a bit > > > different, as I think of the first one maybe more flexible, for > example: > > > > > > KTable<Windowed<..>> table = ... count(); > > > > > > table.toStream().peek(..); // want to peek at the changelog stream, > do > > > not care about final results. > > > > > > table.suppress().toStream().to("topic"); // sending to a topic, want > > to > > > only send the final results. > > > > > > -------------- > > > > > > Besides that, I have a few more minor questions: > > > > > > 1. For "allowedLateness", what should be the default value? I.e. if > user > > do > > > not specify "allowedLateness" in TimeWindows, what value should we set? > > > > > > 2. For API names, some personal suggestions here: > > > > > > 2.a) "allowedLateness" -> "until" (semantics changed, and also value > is > > > defined as delta on top of window length), where "until" -> > > > "retentionPeriod", and the latter will be removed from `Windows` to ` > > > WindowStoreBuilder` in the future. > > > > > > 2.b) "BufferConfig" -> "Buffered" ? > > > > > > > > > > > > Guozhang > > > > > > > > > On Mon, Jul 9, 2018 at 2:09 PM, John Roesler <j...@confluent.io> > wrote: > > > > > > > Hey Matthias and Guozhang, > > > > > > > > Sorry for the slow reply. I was mulling about your feedback and > > weighing > > > > some ideas in a sketchbook PR: https://github.com/apache/ > > kafka/pull/5337 > > > . > > > > > > > > Your thought about keeping suppression independent of business logic > > is a > > > > very good one. I agree that it would make more sense to add some kind > > of > > > > "window close" concept to the window definition. > > > > > > > > In fact, doing that immediately solves the inconsistency problem > > Guozhang > > > > brought up. There's no need to add a "final results" or "emission" > > option > > > > to the windowed aggregation. > > > > > > > > What do you think about an API more like this: > > > > > > > > final StreamsBuilder builder = new StreamsBuilder(); > > > > > > > > builder > > > > .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)) > > > > .groupBy( > > > > (String k1, String v1) -> k1, > > > > Serialized.with(STRING_SERDE, STRING_SERDE) > > > > ) > > > > .windowedBy(TimeWindows > > > > .of(scaledTime(2L)) > > > > .until(scaledTime(3L)) > > > > .allowedLateness(scaledTime(1L)) > > > > ) > > > > .count(Materialized.as("counts")) > > > > .suppress( > > > > emitFinalResultsOnly( > > > > BufferConfig.withBufferKeys(10_000L).bufferFullStrategy( > > SHUT_DOWN) > > > > ) > > > > ) > > > > .toStream() > > > > .to("output-suppressed", Produced.with(STRING_SERDE, LONG_SERDE)); > > > > > > > > Note that: > > > > * "emitFinalResultsOnly" is available *only* on windowed tables > > > (enforced > > > > by the type system at compile time), and it determines the time to > wait > > > by > > > > looking at "allowedLateness" on the TimeWindows config. > > > > * querying "counts" will produce results (eventually) consistent > with > > > > what's observable in "output-suppressed". > > > > * in all cases, "suppress" has no effect on business logic, just on > > > event > > > > suppression. > > > > > > > > Is this API straightforward? Or do you still prefer the version that > > both > > > > proposed: > > > > > > > > ... > > > > .windowedBy(TimeWindows > > > > .of(scaledTime(2L)) > > > > .until(scaledTime(3L)) > > > > .allowedLateness(scaledTime(1L)) > > > > ) > > > > .count( > > > > Materialized.as("counts"), > > > > emitFinalResultsOnly( > > > > BufferConfig.withBufferKeys(10_000L).bufferFullStrategy( > > SHUT_DOWN) > > > > ) > > > > ) > > > > ... > > > > > > > > To me, these two are practically identical, and I still vaguely > prefer > > > the > > > > first one. > > > > > > > > The prototype has made clearer to me that users of "final results for > > > > windows" and users of "suppression for table events" both need to > > > configure > > > > the suppression buffer. > > > > > > > > This buffer configuration consists of: > > > > 1. how many keys or bytes to keep in memory > > > > 2. what to do if memory runs out (shut down, start using disk, ...) > > > > > > > > So it's not as simple as setting a "final results" flag. We'll either > > > have > > > > an "Emit" config object on the windowed aggregators that takes the > same > > > > BufferConfig that the "Suppress" config on the suppression operator, > or > > > we > > > > just use the suppression operator for both. > > > > > > > > Perhaps it would sweeten the deal a little to point out that we have > 2 > > > > overloads already for each windowed aggregator (with and without > > > > Materialized). Adding "Emitted" or something would mean that we'd > add a > > > new > > > > overload for each one, taking us up to 4 overloads each for "count", > > > > "aggregate" and "reduce". Using "suppress" means that we don't add > any > > > new > > > > overloads. > > > > > > > > Thanks again for helping to hash this out, > > > > -John > > > > > > > > On Fri, Jul 6, 2018 at 6:20 PM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > > > I think I agree with Matthias for having dedicated APIs for > windowed > > > > > operation final output scenario, PLUS separating the window close > > which > > > > the > > > > > "final output" would rely on, from the window retention time itself > > > > > (admittedly it would make this KIP effort larger, but if we believe > > we > > > > need > > > > > to do this separation anyways we could just do it now). > > > > > > > > > > And then we can have the `KTable#suppress()` for > > > intermediate-suppression > > > > > only, not for late-record-suppression, until we've seen that > becomes > > a > > > > > common feature request because our current design still allows to > be > > > > > extended for that purpose. > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > On Wed, Jul 4, 2018 at 12:53 PM, Matthias J. Sax < > > > matth...@confluent.io> > > > > > wrote: > > > > > > > > > > > Thanks for the discussion. I am just catching up. > > > > > > > > > > > > In general, I think we have different uses cases and non-windowed > > and > > > > > > windowed is quite different. For the non-windowed case, > suppress() > > > has > > > > > > no (useful) close or retention time, no final semantics, and also > > no > > > > > > business logic impact. > > > > > > > > > > > > On the other hand, for windowed aggregations, close time and > final > > > > > > result do have a meaning. IMHO, `close()` is part of business > logic > > > > > > while retention time is not. Also, suppression of intermediate > > result > > > > is > > > > > > not a business rule and there might be use case for which either > > > "early > > > > > > intermediate" (before window end time) are suppressed only, or > all > > > > > > intermediates are suppressed (maybe also something in the middle, > > ie, > > > > > > just reduce the load of intermediate updates). Thus, > > > window-suppression > > > > > > is much richer. > > > > > > > > > > > > IMHO, a generic `suppress()` operator that can be inserted into > the > > > > data > > > > > > flow at any point is useful. Maybe we should keep is as generic > as > > > > > > possible. However, it might be difficult to use with regard to > > > > > > windowing, as the mental effort to use it is high. > > > > > > > > > > > > With regard to Guozhang's comment: > > > > > > > > > > > > > we will actually > > > > > > > process data as old as 30 days as well, while most of the late > > > > updates > > > > > > > beyond 5 minutes would be discarded anyways. > > > > > > > > > > > > If we use `suppress()` as a standalone operator, this is correct > > and > > > > > > intended IMHO. To address the issue if the behavior is unwanted, > I > > > > would > > > > > > suggest to add a "suppress option" directly to > > > > > > `count()/reduce()/aggregate()` window operator similar to > > > > > > `Materialized`. This would be an "embedded suppress" and avoid > the > > > > > > issue. It would also address the issue about mental effort for > > > "single > > > > > > final window result" use case. > > > > > > > > > > > > I also think that a shorter close-time than retention time is > > useful > > > > for > > > > > > window aggregation. If we add close() to the window definition > and > > > > > > until() to `Materialized`, we can separate both correctly IMHO. > > > > > > > > > > > > About setting `close = min(close,retention)` I am not sure. We > > might > > > > > > rather throw an exception than reducing the close time > > automatically. > > > > > > Otherwise, I see many user question about "I set close to X but > it > > > does > > > > > > not get updated for some data that is with delay of X". > > > > > > > > > > > > The tricky question might be to design the API in a backward > > > compatible > > > > > > way though. > > > > > > > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > On 7/3/18 5:38 AM, John Roesler wrote: > > > > > > > Hi Guozhang, > > > > > > > > > > > > > > I see. It seems like if we want to decouple 1) and 2), we need > to > > > > alter > > > > > > the > > > > > > > definition of the window. Do you think it would close the gap > if > > we > > > > > > added a > > > > > > > "window close" time to the window definition? > > > > > > > > > > > > > > Such as: > > > > > > > > > > > > > > builder.stream("input") > > > > > > > .groupByKey() > > > > > > > .windowedBy( > > > > > > > TimeWindows > > > > > > > .of(60_000) > > > > > > > .closeAfter(10 * 60) > > > > > > > .until(30L * 24 * 60 * 60 * 1000) > > > > > > > ) > > > > > > > .count() > > > > > > > .suppress(Suppression.finalResultsOnly()); > > > > > > > > > > > > > > Possibly called "finalResultsAtWindowClose" or something? > > > > > > > > > > > > > > Thanks, > > > > > > > -John > > > > > > > > > > > > > > On Mon, Jul 2, 2018 at 6:50 PM Guozhang Wang < > wangg...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > >> Hey John, > > > > > > >> > > > > > > >> Obviously I'm too lazy on email replying diligence compared > with > > > you > > > > > :) > > > > > > >> Will try to reply them separately: > > > > > > >> > > > > > > >> > > > > > > >> ------------------------------------------------------------ > > > > > > ----------------- > > > > > > >> > > > > > > >> To reply your email on "Mon, Jul 2, 2018 at 8:23 AM": > > > > > > >> > > > > > > >> I'm aware of this use case, but again, the concern is that, in > > > this > > > > > > setting > > > > > > >> in order to let the window be queryable for 30 days, we will > > > > actually > > > > > > >> process data as old as 30 days as well, while most of the late > > > > updates > > > > > > >> beyond 5 minutes would be discarded anyways. Personally I > think > > > for > > > > > the > > > > > > >> final update scenario, the ideal situation users would want is > > > that > > > > > "do > > > > > > not > > > > > > >> process any data that is less than 5 minutes, and of course no > > > > update > > > > > > >> records to the downstream later than 5 minutes either; but > > retain > > > > the > > > > > > >> window to be queryable for 30 days". And by doing that the > final > > > > > window > > > > > > >> snapshot would also be aligned with the update stream as well. > > In > > > > > other > > > > > > >> words, among these three periods: > > > > > > >> > > > > > > >> 1) the retention length of the window / table. > > > > > > >> 2) the late records acceptance for updating the window. > > > > > > >> 3) the late records update to be sent downstream. > > > > > > >> > > > > > > >> Final update use cases would naturally want 2) = 3), while 1) > > may > > > be > > > > > > >> different and larger, while what we provide now is that 1) = > 2), > > > > which > > > > > > >> could be different and in practice larger than 3), hence not > the > > > > most > > > > > > >> intuitive for their needs. > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> ------------------------------------------------------------ > > > > > > ----------------- > > > > > > >> > > > > > > >> To reply your email on "Mon, Jul 2, 2018 at 10:27 AM": > > > > > > >> > > > > > > >> I'd like option 2) over option 1) better as well from > > programming > > > > pov. > > > > > > But > > > > > > >> I'm wondering if option 2) would provide the above semantics > or > > it > > > > is > > > > > > still > > > > > > >> coupling 1) with 2) as well ? > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> Guozhang > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> On Mon, Jul 2, 2018 at 1:08 PM, John Roesler < > j...@confluent.io > > > > > > > > wrote: > > > > > > >> > > > > > > >>> In fact, to push the idea further (which IIRC is what > Matthias > > > > > > originally > > > > > > >>> proposed), if we can accept "Suppression#finalResultsOnly" in > > my > > > > last > > > > > > >>> email, then we could also consider whether to eliminate > > > > > > >>> "suppressLateEvents" entirely. > > > > > > >>> > > > > > > >>> We could always add it later, but you've both expressed doubt > > > that > > > > > > there > > > > > > >>> are practical use cases for it outside of final-results. > > > > > > >>> > > > > > > >>> -John > > > > > > >>> > > > > > > >>> On Mon, Jul 2, 2018 at 12:27 PM John Roesler < > > j...@confluent.io> > > > > > > wrote: > > > > > > >>> > > > > > > >>>> Hi again, Guozhang ;) Here's the second part of my > response... > > > > > > >>>> > > > > > > >>>> It seems like your main concern is: "if I'm a user who wants > > > final > > > > > > >> update > > > > > > >>>> semantics, how complicated is it for me to get it?" > > > > > > >>>> > > > > > > >>>> I think we have to assume that people don't always have time > > to > > > > > become > > > > > > >>>> deeply familiar with all the nuances of a programming > > > environment > > > > > > >> before > > > > > > >>>> they use it. Especially if they're evaluating several > > frameworks > > > > for > > > > > > >>> their > > > > > > >>>> use case, it's very valuable to make it as obvious as > possible > > > how > > > > > to > > > > > > >>>> accomplish various computations with Streams. > > > > > > >>>> > > > > > > >>>> To me the biggest question is whether with a fresh > > perspective, > > > > > people > > > > > > >>>> would say "oh, I get it, I have to bound my lateness and > > > suppress > > > > > > >>>> intermediate updates, and of course I'll get only the final > > > > > result!", > > > > > > >> or > > > > > > >>> if > > > > > > >>>> it's more like "wtf? all I want is the final result, what > are > > > all > > > > > > these > > > > > > >>>> parameters?". > > > > > > >>>> > > > > > > >>>> I was talking with Matthias a while back, and he had an idea > > > that > > > > I > > > > > > >> think > > > > > > >>>> can help, which is to essentially set up a final-result > recipe > > > in > > > > > > >>> addition > > > > > > >>>> to the raw parameters. I previously thought that it wouldn't > > be > > > > > > >> possible > > > > > > >>> to > > > > > > >>>> restrict its usage to Windowed KTables, but thinking about > it > > > > again > > > > > > >> this > > > > > > >>>> weekend, I have a couple of ideas: > > > > > > >>>> > > > > > > >>>> ================ > > > > > > >>>> = 1. Static Wrapper = > > > > > > >>>> ================ > > > > > > >>>> We can define an extra static function that "wraps" a KTable > > > with > > > > > > >>>> final-result semantics. > > > > > > >>>> > > > > > > >>>> public static <K extends Windowed, V> KTable<K, V> > > > > finalResultsOnly( > > > > > > >>>> final KTable<K, V> windowedKTable, > > > > > > >>>> final Duration maxAllowedLateness, > > > > > > >>>> final Suppression.BufferFullStrategy bufferFullStrategy) { > > > > > > >>>> return windowedKTable.suppress( > > > > > > >>>> Suppression.suppressLateEvents(maxAllowedLateness) > > > > > > >>>> .suppressIntermediateEvents( > > > > > > >>>> IntermediateSuppression > > > > > > >>>> .emitAfter(maxAllowedLateness) > > > > > > >>>> .bufferFullStrategy( > > bufferFullStrategy) > > > > > > >>>> ) > > > > > > >>>> ); > > > > > > >>>> } > > > > > > >>>> > > > > > > >>>> Because windowedKTable is a parameter, the static function > can > > > > > easily > > > > > > >>>> impose an extra bound on the key type, that it extends > > Windowed. > > > > > This > > > > > > >>> would > > > > > > >>>> make "final results only" only available on windowed > ktables. > > > > > > >>>> > > > > > > >>>> Here's how it would look to use: > > > > > > >>>> > > > > > > >>>> final KTable<Windowed<Integer>, Long> windowCounts = ... > > > > > > >>>> final KTable<Windowed<Integer>, Long> finalCounts = > > > > > > >>>> finalResultsOnly( > > > > > > >>>> windowCounts, > > > > > > >>>> Duration.ofMinutes(10), > > > > > > >>>> Suppression.BufferFullStrategy.SHUT_DOWN > > > > > > >>>> ); > > > > > > >>>> > > > > > > >>>> Trying to use it on a non-windowed KTable yields: > > > > > > >>>> > > > > > > >>>>> Error:(129, 35) java: method finalResultsOnly in class > > > > > > >>>>> org.apache.kafka.streams.kstream.internals. > > KTableAggregateTest > > > > > > cannot > > > > > > >>> be > > > > > > >>>>> applied to given types; > > > > > > >>>>> required: > > > > > > >>>>> org.apache.kafka.streams.kstream.KTable<K,V>,java.time. > > > > > > >>> Duration,org.apache.kafka.streams.kstream.Suppression. > > > > > > BufferFullStrategy > > > > > > >>>>> found: > > > > > > >>>>> org.apache.kafka.streams.kstream.KTable<java.lang. > > > > > > >>> String,java.lang.String>,java.time.Duration,org.apache. > > > > > > >>> kafka.streams.kstream.Suppression.BufferFullStrategy > > > > > > >>>>> reason: inference variable K has incompatible bounds > > > > > > >>>>> equality constraints: java.lang.String > > > > > > >>>>> upper bounds: org.apache.kafka.streams.kstream.Windowed > > > > > > >>>> > > > > > > >>>> > > > > > > >>>> > > > > > > >>>> ================================================= > > > > > > >>>> = 2. Add <K,V> parameters and recipe method to Suppression = > > > > > > >>>> ================================================= > > > > > > >>>> > > > > > > >>>> By adding K,V parameters to Suppression, we can provide a > > > > similarly > > > > > > >>>> bounded config method directly on the Suppression class: > > > > > > >>>> > > > > > > >>>> public static <K extends Windowed, V> Suppression<K, V> > > > > > > >>>> finalResultsOnly(final Duration maxAllowedLateness, final > > > > > > >>>> BufferFullStrategy bufferFullStrategy) { > > > > > > >>>> return Suppression > > > > > > >>>> .<K, V>suppressLateEvents(maxAllowedLateness) > > > > > > >>>> .suppressIntermediateEvents(IntermediateSuppression > > > > > > >>>> .emitAfter(maxAllowedLateness) > > > > > > >>>> .bufferFullStrategy(bufferFullStrategy) > > > > > > >>>> ); > > > > > > >>>> } > > > > > > >>>> > > > > > > >>>> Then, here's how it would look to use it: > > > > > > >>>> > > > > > > >>>> final KTable<Windowed<Integer>, Long> windowCounts = ... > > > > > > >>>> final KTable<Windowed<Integer>, Long> finalCounts = > > > > > > >>>> windowCounts.suppress( > > > > > > >>>> Suppression.finalResultsOnly( > > > > > > >>>> Duration.ofMinutes(10) > > > > > > >>>> Suppression.BufferFullStrategy.SHUT_DOWN > > > > > > >>>> ) > > > > > > >>>> ); > > > > > > >>>> > > > > > > >>>> Trying to use it on a non-windowed ktable yields: > > > > > > >>>> > > > > > > >>>>> Error:(127, 35) java: method finalResultsOnly in class > > > > > > >>>>> org.apache.kafka.streams.kstream.Suppression<K,V> cannot be > > > > applied > > > > > > to > > > > > > >>>>> given types; > > > > > > >>>>> required: > > > > > > >>>>> java.time.Duration,org.apache.kafka.streams.kstream. > > > > > > >>> Suppression.BufferFullStrategy > > > > > > >>>>> found: > > > > > > >>>>> java.time.Duration,org.apache.kafka.streams.kstream. > > > > > > >>> Suppression.BufferFullStrategy > > > > > > >>>>> reason: explicit type argument java.lang.String does not > > > > conform > > > > > to > > > > > > >>>>> declared bound(s) org.apache.kafka.streams.kstream.Windowed > > > > > > >>>> > > > > > > >>>> > > > > > > >>>> > > > > > > >>>> ============ > > > > > > >>>> = Downsides = > > > > > > >>>> ============ > > > > > > >>>> > > > > > > >>>> Of course, there's a downside either way: > > > > > > >>>> * for 1: this "wrapper" interaction would be the first in > the > > > > DSL. > > > > > Is > > > > > > >> it > > > > > > >>>> too strange, and how discoverable would it be? > > > > > > >>>> * for 2: adding those type parameters to Suppression will > > force > > > > all > > > > > > >>>> callers to provide them in the event of a chained > construction > > > > > because > > > > > > >>> Java > > > > > > >>>> doesn't do RHS recursive type inference. This is already > > visible > > > > in > > > > > > >> other > > > > > > >>>> parts of the Streams DSL. For example, often calls to > > > Materialized > > > > > > >>> builders > > > > > > >>>> have to provide seemingly obvious type bounds. > > > > > > >>>> > > > > > > >>>> ============ > > > > > > >>>> = Conclusion = > > > > > > >>>> ============ > > > > > > >>>> > > > > > > >>>> I think option 2 is more "normal" and discoverable. It does > > > have a > > > > > > >>>> downside, but it's one that's pre-existing elsewhere in the > > DSL. > > > > > > >>>> > > > > > > >>>> WDYT? Would the addition of this "recipe" method to > > Suppression > > > > > > resolve > > > > > > >>>> your concern? > > > > > > >>>> > > > > > > >>>> Thanks again, > > > > > > >>>> -John > > > > > > >>>> > > > > > > >>>> On Sun, Jul 1, 2018 at 11:24 PM Guozhang Wang < > > > wangg...@gmail.com > > > > > > > > > > > >>> wrote: > > > > > > >>>> > > > > > > >>>>> Hi John, > > > > > > >>>>> > > > > > > >>>>> Regarding the metrics: yeah I think I'm with you that the > > > dropped > > > > > > >>> records > > > > > > >>>>> due to window retention or emit suppression policies should > > be > > > > > > >> recorded > > > > > > >>>>> differently, and using this KIP's proposed metric would be > > > fine. > > > > If > > > > > > >> you > > > > > > >>>>> also think we can use this KIP's proposed metrics to cover > > the > > > > > window > > > > > > >>>>> retention cased skipping records, then we can include the > > > changes > > > > > in > > > > > > >>> this > > > > > > >>>>> KIP as well. > > > > > > >>>>> > > > > > > >>>>> Regarding the current proposal, I'm actually not too > worried > > > > about > > > > > > the > > > > > > >>>>> inconsistency between query semantics and downstream emit > > > > > semantics. > > > > > > >> For > > > > > > >>>>> queries, we will always return the current running results > of > > > the > > > > > > >>> windows, > > > > > > >>>>> being it partial or final results depending on the window > > > > retention > > > > > > >> time > > > > > > >>>>> anyways, which has nothing to do whether the emitted stream > > > > should > > > > > be > > > > > > >>> one > > > > > > >>>>> final output per key or not. I also agree that having a > > unified > > > > > > >>> operation > > > > > > >>>>> is generally better for users to focus on leveraging that > one > > > > only > > > > > > >> than > > > > > > >>>>> learning about two set of operations. The only question I > had > > > is, > > > > > for > > > > > > >>>>> final > > > > > > >>>>> updates of window stores, if it is a bit awkward to > > understand > > > > the > > > > > > >>>>> configuration combo. Thinking about this more, I think my > > root > > > > > worry > > > > > > >> in > > > > > > >>>>> the > > > > > > >>>>> "suppressLateEvents" call for windowed tables, since from a > > > user > > > > > > >>>>> perspective: if my retention time is X which means "pay the > > > cost > > > > to > > > > > > >>> allow > > > > > > >>>>> late records up to X to still be applied updating the > > tables", > > > > why > > > > > > >>> would I > > > > > > >>>>> ever want to suppressLateEvents by Y ( < X), to say "do not > > > send > > > > > the > > > > > > >>>>> updates up to Y, which means the downstream operator or > sink > > > > topic > > > > > > for > > > > > > >>>>> this > > > > > > >>>>> stream would actually see a truncated update stream while > > I've > > > > paid > > > > > > >>> larger > > > > > > >>>>> cost for that"; and of course, Y > X would not make sense > > > either > > > > as > > > > > > >> you > > > > > > >>>>> would not see any updates later than X anyways. So in all, > my > > > > > feeling > > > > > > >> is > > > > > > >>>>> that it makes less sense for windowed table's > > > > "suppressLateEvents" > > > > > > >> with > > > > > > >>> a > > > > > > >>>>> parameter that is not equal to the window retention, and > > > opening > > > > > the > > > > > > >>> door > > > > > > >>>>> in the current proposal may confuse people with that. > > > > > > >>>>> > > > > > > >>>>> Again, above is just a subjective opinion and probably we > can > > > > also > > > > > > >> bring > > > > > > >>>>> up > > > > > > >>>>> some scenarios that users does want to set X != Y.. but > > > > personally > > > > > I > > > > > > >>> feel > > > > > > >>>>> that even if the semantics for this scenario if intuitive > for > > > > user > > > > > to > > > > > > >>>>> understand, doe that really make sense and should we really > > > open > > > > > the > > > > > > >>> door > > > > > > >>>>> for it. So I think maybe separating the final update in a > > > > separate > > > > > > >> API's > > > > > > >>>>> benefits may overwhelm the advantage of having one uniform > > > > > > definition. > > > > > > >>> And > > > > > > >>>>> for my alternative proposal, the rationale was from both my > > > > concern > > > > > > >>> about > > > > > > >>>>> "suppressLateEvents" for windowed store, and Matthias' > > question > > > > > about > > > > > > >>>>> "suppressLateEvents" for non-windowed stores, that if it is > > > less > > > > > > >>>>> meaningful > > > > > > >>>>> for both, we can consider removing it completely and only > do > > > > > > >>>>> "IntermediateSuppression" in Suppress instead. > > > > > > >>>>> > > > > > > >>>>> So I'd summarize my thoughts in the following questions: > > > > > > >>>>> > > > > > > >>>>> 1. Does "suppressLateEvents" with parameter Y != X (window > > > > > retention > > > > > > >>> time) > > > > > > >>>>> for windowed stores make sense in practice? > > > > > > >>>>> 2. Does "suppressLateEvents" with any parameter Y for > > > > non-windowed > > > > > > >>> stores > > > > > > >>>>> make sense in practice? > > > > > > >>>>> > > > > > > >>>>> > > > > > > >>>>> > > > > > > >>>>> Guozhang > > > > > > >>>>> > > > > > > >>>>> > > > > > > >>>>> On Fri, Jun 29, 2018 at 2:26 PM, Bill Bejeck < > > > bbej...@gmail.com> > > > > > > >> wrote: > > > > > > >>>>> > > > > > > >>>>>> Thanks for the explanation, that does make sense. I have > > some > > > > > > >>>>> questions on > > > > > > >>>>>> operations, but I'll just wait for the PR and tests. > > > > > > >>>>>> > > > > > > >>>>>> Thanks, > > > > > > >>>>>> Bill > > > > > > >>>>>> > > > > > > >>>>>> On Wed, Jun 27, 2018 at 8:14 PM John Roesler < > > > j...@confluent.io > > > > > > > > > > > >>> wrote: > > > > > > >>>>>> > > > > > > >>>>>>> Hi Bill, > > > > > > >>>>>>> > > > > > > >>>>>>> Thanks for the review! > > > > > > >>>>>>> > > > > > > >>>>>>> Your question is very much applicable to the KIP and not > at > > > all > > > > > an > > > > > > >>>>>>> implementation detail. Thanks for bringing it up. > > > > > > >>>>>>> > > > > > > >>>>>>> I'm proposing not to change the existing caches and > > > > > configurations > > > > > > >>> at > > > > > > >>>>> all > > > > > > >>>>>>> (for now). > > > > > > >>>>>>> > > > > > > >>>>>>> Imagine you have a topology like this: > > > > > > >>>>>>> commit.interval.ms = 100 > > > > > > >>>>>>> > > > > > > >>>>>>> (ktable1 (cached)) -> (suppress emitAfter 200) > > > > > > >>>>>>> > > > > > > >>>>>>> The first ktable (ktable1) will respect the commit > interval > > > and > > > > > > >>> buffer > > > > > > >>>>>>> events for 100ms before logging, storing, or forwarding > > them > > > > > > >> (IIRC). > > > > > > >>>>>>> Therefore, the second ktable (suppress) will only see the > > > > events > > > > > > >> at > > > > > > >>> a > > > > > > >>>>>> rate > > > > > > >>>>>>> of once per 100ms. It will apply its own buffering, and > > emit > > > > once > > > > > > >>> per > > > > > > >>>>>> 200ms > > > > > > >>>>>>> This case is pretty trivial because the suppress time is > a > > > > > > >> multiple > > > > > > >>> of > > > > > > >>>>>> the > > > > > > >>>>>>> commit interval. > > > > > > >>>>>>> > > > > > > >>>>>>> When it's not an integer multiple, you'll get behavior > like > > > in > > > > > > >> this > > > > > > >>>>>> marble > > > > > > >>>>>>> diagram: > > > > > > >>>>>>> > > > > > > >>>>>>> > > > > > > >>>>>>> <-(k:1)--(k:2)--(k:3)--(k:4)--(k:5)--(k:6)-> > > > > > > >>>>>>> > > > > > > >>>>>>> [ KTable caching with commit interval = 2 ] > > > > > > >>>>>>> > > > > > > >>>>>>> <--------(k:2)---------(k:4)---------(k:6)-> > > > > > > >>>>>>> > > > > > > >>>>>>> [ suppress with emitAfter = 3 ] > > > > > > >>>>>>> > > > > > > >>>>>>> <---------------(k:2)----------------(k:6)-> > > > > > > >>>>>>> > > > > > > >>>>>>> > > > > > > >>>>>>> If this behavior isn't desired (for example, if you > wanted > > to > > > > > emit > > > > > > >>>>> (k:3) > > > > > > >>>>>> at > > > > > > >>>>>>> time 3, I'd recommend setting the > > "cache.max.bytes.buffering" > > > > to > > > > > 0 > > > > > > >>> or > > > > > > >>>>>>> modifying the topology to disable caching. Then, the > > behavior > > > > is > > > > > > >>> more > > > > > > >>>>>>> simply determined just by the suppress operator. > > > > > > >>>>>>> > > > > > > >>>>>>> Does that seem right to you? > > > > > > >>>>>>> > > > > > > >>>>>>> > > > > > > >>>>>>> Regarding the changelogs, because the suppression > operator > > > > hangs > > > > > > >>> onto > > > > > > >>>>>>> events for a while, it will need its own changelog. The > > > > changelog > > > > > > >>>>>>> should represent the current state of the buffer at all > > > times. > > > > So > > > > > > >>> when > > > > > > >>>>>> the > > > > > > >>>>>>> suppress operator sees (k:2), for example, it will log > > (k:2). > > > > > When > > > > > > >>> it > > > > > > >>>>>>> later gets to time 3, it's time to emit (k:2) downstream. > > > > Because > > > > > > >> k > > > > > > >>>>> is no > > > > > > >>>>>>> longer buffered, the suppress operator will log (k:null). > > > Thus, > > > > > > >> when > > > > > > >>>>>>> recovering, > > > > > > >>>>>>> it can rebuild the buffer by reading its changelog. > > > > > > >>>>>>> > > > > > > >>>>>>> What do you think about this? > > > > > > >>>>>>> > > > > > > >>>>>>> Thanks, > > > > > > >>>>>>> -John > > > > > > >>>>>>> > > > > > > >>>>>>> > > > > > > >>>>>>> > > > > > > >>>>>>> On Wed, Jun 27, 2018 at 4:16 PM Bill Bejeck < > > > bbej...@gmail.com > > > > > > > > > > > >>>>> wrote: > > > > > > >>>>>>> > > > > > > >>>>>>>> Hi John, thanks for the KIP. > > > > > > >>>>>>>> > > > > > > >>>>>>>> Early on in the KIP, you mention the current approaches > > for > > > > > > >>>>> controlling > > > > > > >>>>>>> the > > > > > > >>>>>>>> rate of downstream records from a KTable, cache size > > > > > > >> configuration > > > > > > >>>>> and > > > > > > >>>>>>>> commit time. > > > > > > >>>>>>>> > > > > > > >>>>>>>> Will these configuration parameters still be in effect > for > > > > > > >> tables > > > > > > >>>>> that > > > > > > >>>>>>>> don't use suppression? For tables taking advantage of > > > > > > >>> suppression, > > > > > > >>>>>> will > > > > > > >>>>>>>> these configurations have no impact? > > > > > > >>>>>>>> This last question may be to implementation specific but > > if > > > > the > > > > > > >>>>>> requested > > > > > > >>>>>>>> suppression time is longer than the specified commit > time, > > > > will > > > > > > >>> the > > > > > > >>>>>>> latest > > > > > > >>>>>>>> record in the suppression buffer get stored in a > > changelog? > > > > > > >>>>>>>> > > > > > > >>>>>>>> Thanks, > > > > > > >>>>>>>> Bill > > > > > > >>>>>>>> > > > > > > >>>>>>>> On Wed, Jun 27, 2018 at 3:04 PM John Roesler < > > > > j...@confluent.io > > > > > > >>> > > > > > > >>>>>> wrote: > > > > > > >>>>>>>> > > > > > > >>>>>>>>> Thanks for the feedback, Matthias, > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> It seems like in straightforward relational processing > > > cases, > > > > > > >> it > > > > > > >>>>>> would > > > > > > >>>>>>>> not > > > > > > >>>>>>>>> make sense to bound the lateness of KTables. In > general, > > it > > > > > > >>> seems > > > > > > >>>>>>> better > > > > > > >>>>>>>> to > > > > > > >>>>>>>>> have "guard rails" in place that make it easier to > write > > > > > > >>> sensible > > > > > > >>>>>>>> programs > > > > > > >>>>>>>>> than insensible ones. > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> But I'm still going to argue in favor of keeping it for > > all > > > > > > >>>>> KTables > > > > > > >>>>>> ;) > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> 1. I believe it is simpler to understand the operator > if > > it > > > > > > >> has > > > > > > >>>>> one > > > > > > >>>>>>>> uniform > > > > > > >>>>>>>>> definition, regardless of context. It's well defined > and > > > > > > >>> intuitive > > > > > > >>>>>> what > > > > > > >>>>>>>>> will happen when you use late-event suppression on a > > > KTable, > > > > > > >> so > > > > > > >>> I > > > > > > >>>>>> think > > > > > > >>>>>>>>> nothing surprising or dangerous will happen in that > case. > > > > From > > > > > > >>> my > > > > > > >>>>>>>>> perspective, having two sets of allowed operations is > > > > actually > > > > > > >>> an > > > > > > >>>>>>>> increase > > > > > > >>>>>>>>> in cognitive complexity. > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> 2. To me, it's not crazy to use the operator this way. > > For > > > > > > >>>>> example, > > > > > > >>>>>> in > > > > > > >>>>>>>> lieu > > > > > > >>>>>>>>> of full-featured timestamp semantics, I can implement > > MVCC > > > > > > >>>>> behavior > > > > > > >>>>>>> when > > > > > > >>>>>>>>> building a KTable by > "suppressLateEvents(Duration.ZERO)". > > I > > > > > > >>>>> suspect > > > > > > >>>>>>> that > > > > > > >>>>>>>>> there are other, non-obvious applications of > suppressing > > > late > > > > > > >>>>> events > > > > > > >>>>>> on > > > > > > >>>>>>>>> KTables. > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> 3. Not to get too much into implementation details in a > > KIP > > > > > > >>>>>> discussion, > > > > > > >>>>>>>> but > > > > > > >>>>>>>>> if we did want to make late-event suppression available > > > only > > > > > > >> on > > > > > > >>>>>>> windowed > > > > > > >>>>>>>>> KTables, we have two enforcement options: > > > > > > >>>>>>>>> a. check when we build the topology - this would be > > > simple > > > > > > >> to > > > > > > >>>>>>>> implement, > > > > > > >>>>>>>>> but would be a runtime check. Hopefully, people write > > tests > > > > > > >> for > > > > > > >>>>> their > > > > > > >>>>>>>>> topology before deploying them, so the feedback loop > > isn't > > > > > > >>>>>>> instantaneous, > > > > > > >>>>>>>>> but it's not too long either. > > > > > > >>>>>>>>> b. add a new WindowedKTable type - this would be a > > > compile > > > > > > >>> time > > > > > > >>>>>>> check, > > > > > > >>>>>>>>> but would also be substantial increase of both > interface > > > and > > > > > > >>> code > > > > > > >>>>>>>>> complexity. > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> We should definitely strive to have guard rails > > protecting > > > > > > >>> against > > > > > > >>>>>>>>> surprising or dangerous behavior. Protecting against > > > programs > > > > > > >>>>> that we > > > > > > >>>>>>>> don't > > > > > > >>>>>>>>> currently predict is a lesser benefit, and I think we > can > > > put > > > > > > >> up > > > > > > >>>>>> guard > > > > > > >>>>>>>>> rails on a case-by-case basis for that. It seems like > the > > > > > > >>>>> increase in > > > > > > >>>>>>>>> cognitive (and potentially code and interface) > complexity > > > > > > >> makes > > > > > > >>> me > > > > > > >>>>>>> think > > > > > > >>>>>>>> we > > > > > > >>>>>>>>> should skip this case. > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> What do you think? > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> Thanks, > > > > > > >>>>>>>>> -John > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> On Wed, Jun 27, 2018 at 11:59 AM Matthias J. Sax < > > > > > > >>>>>>> matth...@confluent.io> > > > > > > >>>>>>>>> wrote: > > > > > > >>>>>>>>> > > > > > > >>>>>>>>>> Thanks for the KIP John. > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> One initial comments about the last example "Bounded > > > > > > >>> lateness": > > > > > > >>>>>> For a > > > > > > >>>>>>>>>> non-windowed KTable bounding the lateness does not > > really > > > > > > >> make > > > > > > >>>>>> sense, > > > > > > >>>>>>>>>> does it? > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> Thus, I am wondering if we should allow > > > > > > >> `suppressLateEvents()` > > > > > > >>>>> for > > > > > > >>>>>>> this > > > > > > >>>>>>>>>> case? It seems to be better to only allow it for > > > > > > >>>>> windowed-KTables. > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> -Matthias > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> On 6/27/18 8:53 AM, Ted Yu wrote: > > > > > > >>>>>>>>>>> I noticed this (lack of primary parameter) as well. > > > > > > >>>>>>>>>>> > > > > > > >>>>>>>>>>> What you gave as new example is semantically the same > > as > > > > > > >>> what > > > > > > >>>>> I > > > > > > >>>>>>>>>> suggested. > > > > > > >>>>>>>>>>> So it is good by me. > > > > > > >>>>>>>>>>> > > > > > > >>>>>>>>>>> Thanks > > > > > > >>>>>>>>>>> > > > > > > >>>>>>>>>>> On Wed, Jun 27, 2018 at 7:31 AM, John Roesler < > > > > > > >>>>> j...@confluent.io > > > > > > >>>>>>> > > > > > > >>>>>>>>> wrote: > > > > > > >>>>>>>>>>> > > > > > > >>>>>>>>>>>> Thanks for taking look, Ted, > > > > > > >>>>>>>>>>>> > > > > > > >>>>>>>>>>>> I agree this is a departure from the conventions of > > > > > > >> Streams > > > > > > >>>>> DSL. > > > > > > >>>>>>>>>>>> > > > > > > >>>>>>>>>>>> Most of our config objects have one or two > "required" > > > > > > >>>>>> parameters, > > > > > > >>>>>>>>> which > > > > > > >>>>>>>>>> fit > > > > > > >>>>>>>>>>>> naturally with the static factory method approach. > > > > > > >>>>> TimeWindow, > > > > > > >>>>>> for > > > > > > >>>>>>>>>> example, > > > > > > >>>>>>>>>>>> requires a size parameter, so we can naturally say > > > > > > >>>>>>>>> TimeWindows.of(size). > > > > > > >>>>>>>>>>>> > > > > > > >>>>>>>>>>>> I think in the case of a suppression, there's really > > no > > > > > > >>>>> "core" > > > > > > >>>>>>>>>> parameter, > > > > > > >>>>>>>>>>>> and "Suppression.of()" seems sillier than "new > > > > > > >>>>> Suppression()". I > > > > > > >>>>>>>> think > > > > > > >>>>>>>>>> that > > > > > > >>>>>>>>>>>> Suppression.of(duration) would be ambiguous, since > > there > > > > > > >>> are > > > > > > >>>>>> many > > > > > > >>>>>>>>>> durations > > > > > > >>>>>>>>>>>> that we can configure. > > > > > > >>>>>>>>>>>> > > > > > > >>>>>>>>>>>> However, thinking about it again, I suppose that I > can > > > > > > >> give > > > > > > >>>>> each > > > > > > >>>>>>>>>>>> configuration method a static version, which would > let > > > > > > >> you > > > > > > >>>>>> replace > > > > > > >>>>>>>>> "new > > > > > > >>>>>>>>>>>> Suppression()." with "Suppression." in all the > > examples. > > > > > > >>>>>>> Basically, > > > > > > >>>>>>>>>> instead > > > > > > >>>>>>>>>>>> of "of()", we'd support any of the methods I listed. > > > > > > >>>>>>>>>>>> > > > > > > >>>>>>>>>>>> For example: > > > > > > >>>>>>>>>>>> > > > > > > >>>>>>>>>>>> windowCounts > > > > > > >>>>>>>>>>>> .suppress( > > > > > > >>>>>>>>>>>> Suppression > > > > > > >>>>>>>>>>>> .suppressLateEvents(Duration. > > ofMinutes(10)) > > > > > > >>>>>>>>>>>> .suppressIntermediateEvents( > > > > > > >>>>>>>>>>>> > > > > > > >>>>>>>>>> IntermediateSuppression.emitAfter(Duration.ofMinutes( > > 10)) > > > > > > >>>>>>>>>>>> ) > > > > > > >>>>>>>>>>>> ); > > > > > > >>>>>>>>>>>> > > > > > > >>>>>>>>>>>> > > > > > > >>>>>>>>>>>> Does that seem better? > > > > > > >>>>>>>>>>>> > > > > > > >>>>>>>>>>>> Thanks, > > > > > > >>>>>>>>>>>> -John > > > > > > >>>>>>>>>>>> > > > > > > >>>>>>>>>>>> > > > > > > >>>>>>>>>>>> On Wed, Jun 27, 2018 at 12:44 AM Ted Yu < > > > > > > >>> yuzhih...@gmail.com > > > > > > >>>>>> > > > > > > >>>>>>>> wrote: > > > > > > >>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> I started to read this KIP which contains a lot of > > > > > > >>>>> materials. > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> One suggestion: > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> .suppress( > > > > > > >>>>>>>>>>>>> new Suppression() > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> Do you think it would be more consistent with the > > rest > > > > > > >> of > > > > > > >>>>>> Streams > > > > > > >>>>>>>>> data > > > > > > >>>>>>>>>>>>> structures by supporting `of` ? > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> Suppression.of(Duration.ofMinutes(10)) > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> Cheers > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> On Tue, Jun 26, 2018 at 1:11 PM, John Roesler < > > > > > > >>>>>> j...@confluent.io > > > > > > >>>>>>>> > > > > > > >>>>>>>>>> wrote: > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>> Hello devs and users, > > > > > > >>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>> Please take some time to consider this proposal > for > > > > > > >> Kafka > > > > > > >>>>>>> Streams: > > > > > > >>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>> KIP-328: Ability to suppress updates for KTables > > > > > > >>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>> link: > https://cwiki.apache.org/confluence/x/sQU0BQ > > > > > > >>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>> The basic idea is to provide: > > > > > > >>>>>>>>>>>>>> * more usable control over update rate (vs the > > current > > > > > > >>>>> state > > > > > > >>>>>>> store > > > > > > >>>>>>>>>>>>> caches) > > > > > > >>>>>>>>>>>>>> * the final-result-for-windowed-computations > > feature > > > > > > >>> which > > > > > > >>>>>>> several > > > > > > >>>>>>>>>>>> people > > > > > > >>>>>>>>>>>>>> have requested > > > > > > >>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>> I look forward to your feedback! > > > > > > >>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>>> Thanks, > > > > > > >>>>>>>>>>>>>> -John > > > > > > >>>>>>>>>>>>>> > > > > > > >>>>>>>>>>>>> > > > > > > >>>>>>>>>>>> > > > > > > >>>>>>>>>>> > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>> > > > > > > >>>>>>>> > > > > > > >>>>>>> > > > > > > >>>>>> > > > > > > >>>>> > > > > > > >>>>> > > > > > > >>>>> > > > > > > >>>>> -- > > > > > > >>>>> -- Guozhang > > > > > > >>>>> > > > > > > >>>> > > > > > > >>> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> -- > > > > > > >> -- Guozhang > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >