Thanks for the typo catch, John. Let me know if anyone else has thoughts or ideas.
Cheers, Leah On Fri, Aug 21, 2020 at 2:50 PM John Roesler <vvcep...@apache.org> wrote: > Thanks, all, > > Based on my reading of the conversation, it sounds like I > have some legwork to do in KIP-645, but our collective > instinct is that Leah's proposal doesn't need to change to > account for whatever we might decide to do in KIP-645. > > I have no further concerns about KIP-645, and I think it's a > good proposal. > > Thanks, > -John > > P.s., there's still a typo on the wiki that says > "ConsumerConfig" on the code block, even though the text now > says "StreamsConfig". > > > On Fri, 2020-08-21 at 10:56 -0700, Sophie Blee-Goldman > wrote: > > Just want to make a quick comment on the question that John raised about > > whether we > > should introduce a separate config for "key" and "value" window sizes: > > > > My short answer is No, I don't think that's necessary. First of all, as > you > > said, there is no > > first-class concept of a "Windowed value" in the DSL. Second, to engage > in > > your rhetorical > > question, if there's no default window size for a Streams program then > how > > can there be a > > sensible default for the key AND a separate sensible default for a value? > > > > I don't think we need to follow the existing pattern if it doesn't make > > sense, and to be honest > > I'm a bit skeptical that anyone was even using these default windowed > inner > > classes since > > the config wasn't even defined/documented until quite recently. I'd > > actually be in favor > > of deprecating StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS > > but I don't want to drag that into this discussion as well. > > > > My understanding is that these were meant to mirror the default key/value > > serde configs, but > > the real use of the DEFAULT_WINDOWED_SERDE_INNER_CLASS config is actually > > that you > > can at least use it to configure the inner class for a Consumer, thus > > making the TimeWindowed > > serdes functional at a basic level. With the window size configs, the > point > > is not really to set a > > default but to make it actually work with a Consumer which instantiates > the > > deserializer by > > reflection. So I don't think we should position this new config as a > > "default" (although it may > > technically behave as one) -- within Streams users can and should always > > supply the window > > size through the constructor. I don't think that's such an inconvenience, > > vs the amount of > > confusion that will (and has) been caused by default serde-related > configs > > in streams. > > > > Regarding the fixed vs variable sized config, one idea I had was to just > > keep the fixed-size config > > and constructor and let users of enumerable windows override the > > TimeWindowedSerde class(es) > > to do whatever it is they need. IIUC you already have to override some > > other windows-related > > classes to get variable-sized windows so doing the same for the serdes > > sounds reasonable to me. > > Just my take on the "simple things should be easy, difficult things > should > > be possible" mantra > > > > One last quick side note: the reason we don't really need to discuss > > SessionWindows here > > is that they already encode both the start and end time for the window. > > This is probably the best > > way to go for TimeWindows as well, but making this change in a backwards > > compatible way is a > > much larger scope of work. And even then, we might want to consider > making > > it possible to still > > just encode the start time to save space, thus requiring this config > either > > way > > > > On Fri, Aug 21, 2020 at 9:26 AM Leah Thomas <ltho...@confluent.io> > wrote: > > > > > Thanks John and Walker for your thoughts. > > > > > > I agree with your two scenarios John, that you configure fully in the > > > constructor, or you don't need to call `init()`. IIUC, if we pass the > > > deserializer to the consumer, we want to make sure it has the window > size > > > is set using the newly required constructor. If we don't pass in the > > > deserializer, the window size will be set through the configs. To > answer > > > Walker's question directly, because the configs aren't passed to the > > > constructor, we can't set the window size unless we pass it to the > > > constructor or configure the constructor after initializing it. > > > > > > For users who would rather not set a strict window size (outside of the > > > variable size scenario), they can pass in Long.MAX_VALUE. The way I see > > > this is instead of having the default be for scenarios that don't > require a > > > window size, we have the default be the scenarios that *do*, flipping > the > > > current implementation to fit with typical use cases. > > > > > > On your points John: > > > 1. I agree that it makes sense to store it in StreamsConfig, this > shouldn't > > > cause any issues. I've updated the KIP accordingly. > > > > > > 2. The non-fixed time windows issue is a good point. It seems like > calendar > > > windows in particular are quite useful, so I think we want to make sure > > > that this wouldn't inhibit flexible sized windows. I think having two > > > different configs and functions makes sense, although it is slightly > > > messier. While requiring all time windows to use the WindowFunction > > > constructor would work, I think that allowing users to access the > > > WindowSize constructor is preferable because it seems easier to use for > > > people who are not at all interested in delving into variably sized > > > windows. This assumption could be wrong though, and perhaps users would > > > adapt quickly to the new WindowFunction style, but my immediate > reaction is > > > to support both configs and constructors. > > > > > > One note on this is that Session Windows are handled separately from > time > > > windows and also have variable window sizes. I assume that the > TimeWindowed > > > option is preferable for variably sized windows because you still want > to > > > access the window end times? But I think one alternative could be > > > separating the variably sized windows from the current implementation > of > > > time windows, although I think KIP-645 > > > < > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface > > > would make this not strictly necessary. > > > > > > Cheers, > > > Leah > > > > > > On Fri, Aug 21, 2020 at 10:04 AM John Roesler <vvcep...@apache.org> > wrote: > > > > > > > Hi Leah, > > > > > > > > Thanks for the KIP! This has been a real pain for some use > > > > cases, so it's really good to see a proposal to fix it. > > > > > > > > We do need a default constructor so that it can be > > > > dynamically instantiated by the consumer (or any other > > > > component). But I'm +1 on deprecating the constructor you're > > > > proposing to deprecate, which only partially configures the > > > > class. It seems like there are exactly two patterns: either > > > > you fully configure the class in the constructor and don't > > > > call `init()`, or you call the default constructor and then > > > > configure the class by calling `init()`. > > > > > > > > I can appreciate Walker's point, but stepping back, it > > > > doesn't actually seem that useful to partially configure the > > > > class in the constructor and then finish up the > > > > configuration by calling `init()`. I could see the argument > > > > if there were a sensible default, but for this particular > > > > class, there isn't one. Rhetorical question: what is the > > > > default window size for Streams programs? > > > > > > > > I have a couple of concerns to discuss: > > > > > > > > 1. Config Location > > > > > > > > I don't think I would add the new configs to ConsumerConfig, > > > > but would add it to StreamsConfig instead. The deserailzier > > > > itself is in Streams (it is > > > > o.a.k.streams.kstream.TimeWindowedDeserializer), so it seems > > > > odd to have one of its configurations in a completely > > > > different module. > > > > > > > > Also, this class already has two configs, which are in > > > > StreamsConfig: > > > > StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS > > > > StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS > > > > > > > > It seems like the new config belongs right next to the > > > > existing ones. > > > > > > > > For me, it raises a secondary question: > > > > 1b: Should there be a KEY_WINDOW_SIZE and a > > > > VALUE_WINDOW_SIZE? I'm honestly not sure what a "windowed > > > > value" even is, but the fact that we can configure serdes > > > > for it implies that perhaps we should symmetrically > > > > configure its size as well. > > > > > > > > 2. Fixed Size Assumption > > > > > > > > In KIP-645, I'm proposing to lift the assumption that > > > > TimeWindows have a fixed size at all, but KIP-659 is > > > > currently built on that assumption. > > > > > > > > For details on why this is not a good assumtion, see: > > > > https://issues.apache.org/jira/browse/KAFKA-10408 > > > > > > > > In fact, in my POC PR for KIP-659, I'm dropping the > > > > constructor that takes a "window size" parameter in favor of > > > > one that takes a window function, mapping a window start > > > > time to a full Window(start, end). > > > > > > > > In that context, it seems incongruous to introduce a > > > > configuration that specifies a window size. Of course, my > > > > KIP is also under discussion, so my proposal may not > > > > eventually be accepted. But it is necessary to consider both > > > > of these concerns together. > > > > > > > > One option seems to be to accept both. Namely, we keep the > > > > "fixed size" constructor AND add my new constructor (for > > > > variably sized windows). Likewise, we accept your proposal, > > > > and KIP-659 would propose to add a new config specifying a > > > > windowing function, such as: > > > > > > > > > StreamsConfig.WINDOW_FUNCTION_CONFIG > > > > > > > > which would be an instance of: > > > > > > > > > public interface WindowFunction implements Function<Long, > > > > Window>; > > > > > > > > I'm not bringing these up for discussion in your KIP right > > > > now, just demonstrating the feasibility of merging both > > > > proposals. > > > > > > > > My question for you: do you think the general strategy of > > > > having two constructors and two configs, one for fixed and > > > > one for variable windows, makes sense? Is it too > > > > complicated? Do you have a better idea? > > > > > > > > Thanks! > > > > -John > > > > > > > > On Thu, 2020-08-20 at 14:49 -0700, Walker Carlson wrote: > > > > > Hi Leah, > > > > > > > > > > Could you explain a bit more why we do not wish to > > > > > let TimeWindowedDeserializer and WindowedSerdes be created without > a > > > > > specified time as a parameter? > > > > > > > > > > I understand the long.MAX_VALUE could cause problems but would it > not > > > be > > > > a > > > > > good idea to have a usable default or fetch from the config if > > > available? > > > > > After all you are proposing to add "window.size.ms" > > > > > > > > > > We definitely need a fix to this problem and adding " > window.size.ms" > > > > makes > > > > > sense to me. > > > > > > > > > > Thanks for the KIP, > > > > > Walker > > > > > > > > > > On Thu, Aug 20, 2020 at 2:22 PM Leah Thomas <ltho...@confluent.io> > > > > wrote: > > > > > > Hi all, > > > > > > > > > > > > I'd like to start a discussion for KIP-659: > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size > > > > > > > > > > > > The goal of the KIP is to ensure that window size is passed to > the > > > > consumer > > > > > > when needed, which will generally be for testing purposes, and to > > > avoid > > > > > > runtime errors when the *TimeWindowedSerde* is created without a > > > window > > > > > > size. > > > > > > > > > > > > Looking forward to hearing your feedback. > > > > > > > > > > > > Cheers, > > > > > > Leah > > > > > > > >