Hey John, Thanks for pointing this out, I wasn't sure how to handle the Scala changes.
I'm not fully versed in the Scala version of Streams, so feel free to correct me if any of my assumptions are wrong. I think logging an error message and then calling the constructor that requires a windowSize seems like the simplest fix from my point of view. So instead of calling`TimeWindowedSerde(final Serde<T> inner)`, we could call `TimeWindowedSerde(final Serde<T> inner, final long windowSize)` with Long.MAX_VALUE as the window size. I do feel like we would want to add an implicit to `Serdes.scala` that takes a serde and a window size so that users can access the constructor that initializes with the correct window size. I agree with your comment on the KIP-616 PR that the serde needs to be pre-configured when it's passed, but I'm not sure we would need a windowSize config. I think if the constructor is passed the serde and the window size, then window size should be set within the deserializer. The only catch is if the Scala version of the consumer creates a new deserializer, and at that point we'd need a window size config, but I'm not sure if that's the case. WDYT - is it possible to alter the existing implicit and add a new one? On Wed, Aug 26, 2020 at 10:00 AM John Roesler <vvcep...@apache.org> wrote: > Hi Leah, > > I was just reviewing the PR for KIP-616 and realized that we > forgot to mention the Scala API in your KIP. We should > consider it because `scala.Serdes.timeWindowedSerde` is > implicitly using the exact constructor you're deprecating. > > I had some ideas in the code review: > https://github.com/apache/kafka/pull/8955#discussion_r477358755 > > What do you think is the best approach? > > Concretely, I think Yuriy can make the call for KIP-616 (for > the new implicit that he's adding). But I think your KIP-659 > should mention how we modify the existing implicit. > > Typically, we'd try to avoid throwing new exceptions or > causing compile errors, so > * dropping the implicit is probably off the table (compile > error). > * throwing an exception in the deserializer may not be ok, > althought it might still actually be ok since it's adding a > corruption check. > * logging an ERROR message and then passing through to the > underlying deserializer would be more conservative. > > What do you think we should do? > > Thanks, > -John > > On Fri, 2020-08-21 at 16:05 -0500, Leah Thomas wrote: > > Thanks for the typo catch, John. > > > > Let me know if anyone else has thoughts or ideas. > > > > Cheers, > > Leah > > > > On Fri, Aug 21, 2020 at 2:50 PM John Roesler <vvcep...@apache.org> > wrote: > > > > > Thanks, all, > > > > > > Based on my reading of the conversation, it sounds like I > > > have some legwork to do in KIP-645, but our collective > > > instinct is that Leah's proposal doesn't need to change to > > > account for whatever we might decide to do in KIP-645. > > > > > > I have no further concerns about KIP-645, and I think it's a > > > good proposal. > > > > > > Thanks, > > > -John > > > > > > P.s., there's still a typo on the wiki that says > > > "ConsumerConfig" on the code block, even though the text now > > > says "StreamsConfig". > > > > > > > > > On Fri, 2020-08-21 at 10:56 -0700, Sophie Blee-Goldman > > > wrote: > > > > Just want to make a quick comment on the question that John raised > about > > > > whether we > > > > should introduce a separate config for "key" and "value" window > sizes: > > > > > > > > My short answer is No, I don't think that's necessary. First of all, > as > > > you > > > > said, there is no > > > > first-class concept of a "Windowed value" in the DSL. Second, to > engage > > > in > > > > your rhetorical > > > > question, if there's no default window size for a Streams program > then > > > how > > > > can there be a > > > > sensible default for the key AND a separate sensible default for a > value? > > > > > > > > I don't think we need to follow the existing pattern if it doesn't > make > > > > sense, and to be honest > > > > I'm a bit skeptical that anyone was even using these default windowed > > > inner > > > > classes since > > > > the config wasn't even defined/documented until quite recently. I'd > > > > actually be in favor > > > > of deprecating StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS > > > > but I don't want to drag that into this discussion as well. > > > > > > > > My understanding is that these were meant to mirror the default > key/value > > > > serde configs, but > > > > the real use of the DEFAULT_WINDOWED_SERDE_INNER_CLASS config is > actually > > > > that you > > > > can at least use it to configure the inner class for a Consumer, thus > > > > making the TimeWindowed > > > > serdes functional at a basic level. With the window size configs, the > > > point > > > > is not really to set a > > > > default but to make it actually work with a Consumer which > instantiates > > > the > > > > deserializer by > > > > reflection. So I don't think we should position this new config as a > > > > "default" (although it may > > > > technically behave as one) -- within Streams users can and should > always > > > > supply the window > > > > size through the constructor. I don't think that's such an > inconvenience, > > > > vs the amount of > > > > confusion that will (and has) been caused by default serde-related > > > configs > > > > in streams. > > > > > > > > Regarding the fixed vs variable sized config, one idea I had was to > just > > > > keep the fixed-size config > > > > and constructor and let users of enumerable windows override the > > > > TimeWindowedSerde class(es) > > > > to do whatever it is they need. IIUC you already have to override > some > > > > other windows-related > > > > classes to get variable-sized windows so doing the same for the > serdes > > > > sounds reasonable to me. > > > > Just my take on the "simple things should be easy, difficult things > > > should > > > > be possible" mantra > > > > > > > > One last quick side note: the reason we don't really need to discuss > > > > SessionWindows here > > > > is that they already encode both the start and end time for the > window. > > > > This is probably the best > > > > way to go for TimeWindows as well, but making this change in a > backwards > > > > compatible way is a > > > > much larger scope of work. And even then, we might want to consider > > > making > > > > it possible to still > > > > just encode the start time to save space, thus requiring this config > > > either > > > > way > > > > > > > > On Fri, Aug 21, 2020 at 9:26 AM Leah Thomas <ltho...@confluent.io> > > > wrote: > > > > > Thanks John and Walker for your thoughts. > > > > > > > > > > I agree with your two scenarios John, that you configure fully in > the > > > > > constructor, or you don't need to call `init()`. IIUC, if we pass > the > > > > > deserializer to the consumer, we want to make sure it has the > window > > > size > > > > > is set using the newly required constructor. If we don't pass in > the > > > > > deserializer, the window size will be set through the configs. To > > > answer > > > > > Walker's question directly, because the configs aren't passed to > the > > > > > constructor, we can't set the window size unless we pass it to the > > > > > constructor or configure the constructor after initializing it. > > > > > > > > > > For users who would rather not set a strict window size (outside > of the > > > > > variable size scenario), they can pass in Long.MAX_VALUE. The way > I see > > > > > this is instead of having the default be for scenarios that don't > > > require a > > > > > window size, we have the default be the scenarios that *do*, > flipping > > > the > > > > > current implementation to fit with typical use cases. > > > > > > > > > > On your points John: > > > > > 1. I agree that it makes sense to store it in StreamsConfig, this > > > shouldn't > > > > > cause any issues. I've updated the KIP accordingly. > > > > > > > > > > 2. The non-fixed time windows issue is a good point. It seems like > > > calendar > > > > > windows in particular are quite useful, so I think we want to make > sure > > > > > that this wouldn't inhibit flexible sized windows. I think having > two > > > > > different configs and functions makes sense, although it is > slightly > > > > > messier. While requiring all time windows to use the WindowFunction > > > > > constructor would work, I think that allowing users to access the > > > > > WindowSize constructor is preferable because it seems easier to > use for > > > > > people who are not at all interested in delving into variably sized > > > > > windows. This assumption could be wrong though, and perhaps users > would > > > > > adapt quickly to the new WindowFunction style, but my immediate > > > reaction is > > > > > to support both configs and constructors. > > > > > > > > > > One note on this is that Session Windows are handled separately > from > > > time > > > > > windows and also have variable window sizes. I assume that the > > > TimeWindowed > > > > > option is preferable for variably sized windows because you still > want > > > to > > > > > access the window end times? But I think one alternative could be > > > > > separating the variably sized windows from the current > implementation > > > of > > > > > time windows, although I think KIP-645 > > > > > < > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface > > > > > would make this not strictly necessary. > > > > > > > > > > Cheers, > > > > > Leah > > > > > > > > > > On Fri, Aug 21, 2020 at 10:04 AM John Roesler <vvcep...@apache.org > > > > > wrote: > > > > > > Hi Leah, > > > > > > > > > > > > Thanks for the KIP! This has been a real pain for some use > > > > > > cases, so it's really good to see a proposal to fix it. > > > > > > > > > > > > We do need a default constructor so that it can be > > > > > > dynamically instantiated by the consumer (or any other > > > > > > component). But I'm +1 on deprecating the constructor you're > > > > > > proposing to deprecate, which only partially configures the > > > > > > class. It seems like there are exactly two patterns: either > > > > > > you fully configure the class in the constructor and don't > > > > > > call `init()`, or you call the default constructor and then > > > > > > configure the class by calling `init()`. > > > > > > > > > > > > I can appreciate Walker's point, but stepping back, it > > > > > > doesn't actually seem that useful to partially configure the > > > > > > class in the constructor and then finish up the > > > > > > configuration by calling `init()`. I could see the argument > > > > > > if there were a sensible default, but for this particular > > > > > > class, there isn't one. Rhetorical question: what is the > > > > > > default window size for Streams programs? > > > > > > > > > > > > I have a couple of concerns to discuss: > > > > > > > > > > > > 1. Config Location > > > > > > > > > > > > I don't think I would add the new configs to ConsumerConfig, > > > > > > but would add it to StreamsConfig instead. The deserailzier > > > > > > itself is in Streams (it is > > > > > > o.a.k.streams.kstream.TimeWindowedDeserializer), so it seems > > > > > > odd to have one of its configurations in a completely > > > > > > different module. > > > > > > > > > > > > Also, this class already has two configs, which are in > > > > > > StreamsConfig: > > > > > > StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS > > > > > > StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS > > > > > > > > > > > > It seems like the new config belongs right next to the > > > > > > existing ones. > > > > > > > > > > > > For me, it raises a secondary question: > > > > > > 1b: Should there be a KEY_WINDOW_SIZE and a > > > > > > VALUE_WINDOW_SIZE? I'm honestly not sure what a "windowed > > > > > > value" even is, but the fact that we can configure serdes > > > > > > for it implies that perhaps we should symmetrically > > > > > > configure its size as well. > > > > > > > > > > > > 2. Fixed Size Assumption > > > > > > > > > > > > In KIP-645, I'm proposing to lift the assumption that > > > > > > TimeWindows have a fixed size at all, but KIP-659 is > > > > > > currently built on that assumption. > > > > > > > > > > > > For details on why this is not a good assumtion, see: > > > > > > https://issues.apache.org/jira/browse/KAFKA-10408 > > > > > > > > > > > > In fact, in my POC PR for KIP-659, I'm dropping the > > > > > > constructor that takes a "window size" parameter in favor of > > > > > > one that takes a window function, mapping a window start > > > > > > time to a full Window(start, end). > > > > > > > > > > > > In that context, it seems incongruous to introduce a > > > > > > configuration that specifies a window size. Of course, my > > > > > > KIP is also under discussion, so my proposal may not > > > > > > eventually be accepted. But it is necessary to consider both > > > > > > of these concerns together. > > > > > > > > > > > > One option seems to be to accept both. Namely, we keep the > > > > > > "fixed size" constructor AND add my new constructor (for > > > > > > variably sized windows). Likewise, we accept your proposal, > > > > > > and KIP-659 would propose to add a new config specifying a > > > > > > windowing function, such as: > > > > > > > > > > > > > StreamsConfig.WINDOW_FUNCTION_CONFIG > > > > > > > > > > > > which would be an instance of: > > > > > > > > > > > > > public interface WindowFunction implements Function<Long, > > > > > > Window>; > > > > > > > > > > > > I'm not bringing these up for discussion in your KIP right > > > > > > now, just demonstrating the feasibility of merging both > > > > > > proposals. > > > > > > > > > > > > My question for you: do you think the general strategy of > > > > > > having two constructors and two configs, one for fixed and > > > > > > one for variable windows, makes sense? Is it too > > > > > > complicated? Do you have a better idea? > > > > > > > > > > > > Thanks! > > > > > > -John > > > > > > > > > > > > On Thu, 2020-08-20 at 14:49 -0700, Walker Carlson wrote: > > > > > > > Hi Leah, > > > > > > > > > > > > > > Could you explain a bit more why we do not wish to > > > > > > > let TimeWindowedDeserializer and WindowedSerdes be created > without > > > a > > > > > > > specified time as a parameter? > > > > > > > > > > > > > > I understand the long.MAX_VALUE could cause problems but would > it > > > not > > > > > be > > > > > > a > > > > > > > good idea to have a usable default or fetch from the config if > > > > > available? > > > > > > > After all you are proposing to add "window.size.ms" > > > > > > > > > > > > > > We definitely need a fix to this problem and adding " > > > window.size.ms" > > > > > > makes > > > > > > > sense to me. > > > > > > > > > > > > > > Thanks for the KIP, > > > > > > > Walker > > > > > > > > > > > > > > On Thu, Aug 20, 2020 at 2:22 PM Leah Thomas < > ltho...@confluent.io> > > > > > > wrote: > > > > > > > > Hi all, > > > > > > > > > > > > > > > > I'd like to start a discussion for KIP-659: > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size > > > > > > > > The goal of the KIP is to ensure that window size is passed > to > > > the > > > > > > consumer > > > > > > > > when needed, which will generally be for testing purposes, > and to > > > > > avoid > > > > > > > > runtime errors when the *TimeWindowedSerde* is created > without a > > > > > window > > > > > > > > size. > > > > > > > > > > > > > > > > Looking forward to hearing your feedback. > > > > > > > > > > > > > > > > Cheers, > > > > > > > > Leah > > > > > > > > > >