Hi Bill, Thanks for clarifying, and the KIP looks great!!
Best regards, Aishwarya On Tue, Sep 17, 2019, 6:16 PM Bill Bejeck <bbej...@gmail.com> wrote: > Hi Aishwarya, > > On Tue, Sep 17, 2019 at 1:41 PM aishwarya kumar <ash26...@gmail.com> > wrote: > > > Will this be applicable to Kstream-Ktable joins as well? Or do users > always > > materialize these joins explicitly? > > > > No, this change applies to KStream-KStream joins only. With KStream-KTable > joins KafkaStreams has materialized the KTable already, and we don't need > to do anything with the KStream instance in this case. > > > > I'm not sure if its even necessary (or makes sense), just trying to > > understand why the change is applicable to Kstream joins only? > > > > The full details are in the KIP, but in a nutshell, we needed to break the > naming of the StateStore from `Joined.withName` and provide users a way to > name the StateStore explicitly. While making the changes, we realized it > would be beneficial to give users the ability to use different WindowStore > implementations. The most likely reason to use different WindowStores > would be to enable in-memory joins. > > > > Best, > > Aishwarya > > > > Regards, > Bill > > > > > > On Tue, Sep 17, 2019 at 4:05 PM Bill Bejeck <bbej...@gmail.com> wrote: > > > > > Guozhang, > > > > > > Thanks for the comments. > > > > > > Yes, you are correct in your assessment regarding names, *if* the users > > > provide their own StoreSuppliers When constructing as StoreSupplier, > the > > > name can't be null, and additionally, there is no way to update the > name. > > > Further downstream, the underlying StateStore instances use the > provided > > > name for registration so they must be unique. If users don't provide > > > distinct names for the StoreSuppliers, KafkaStreams will thow a > > > StreamsException when building the topology. > > > > > > Thanks, > > > Bill > > > > > > > > > > > > On Tue, Sep 17, 2019 at 9:39 AM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > Hello Bill, > > > > > > > > Thanks for the updated KIP. I made a pass on the StreamJoined > section. > > > Just > > > > a quick question from user's perspective: if a user wants to provide > a > > > > customized store-supplier, she is forced to also provide a name via > the > > > > store-supplier. So there's no way to say "I want to provide my own > > store > > > > engine but let the library decide its name", is that right? > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Tue, Sep 17, 2019 at 8:53 AM Bill Bejeck <bbej...@gmail.com> > wrote: > > > > > > > > > Bumping this discussion as we need to re-vote before the KIP > > deadline. > > > > > > > > > > On Fri, Sep 13, 2019 at 10:29 AM Bill Bejeck <bbej...@gmail.com> > > > wrote: > > > > > > > > > > > Hi All, > > > > > > > > > > > > While working on the implementation of KIP-479, some issues came > to > > > > light > > > > > > that the KIP as written won't work. I have updated the KIP with > a > > > > > solution > > > > > > I believe will solve the original problem as well as address the > > > > > > impediment to the initial approach. > > > > > > > > > > > > This update is a significant change, so please review the updated > > KIP > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join > > > > > and > > > > > > provide feedback. After we conclude the discussion, there will > be > > a > > > > > > re-vote. > > > > > > > > > > > > Thanks! > > > > > > Bill > > > > > > > > > > > > On Wed, Jul 17, 2019 at 7:01 PM Guozhang Wang < > wangg...@gmail.com> > > > > > wrote: > > > > > > > > > > > >> Hi Bill, thanks for your explanations. I'm on board with your > > > decision > > > > > >> too. > > > > > >> > > > > > >> > > > > > >> Guozhang > > > > > >> > > > > > >> On Wed, Jul 17, 2019 at 10:20 AM Bill Bejeck <bbej...@gmail.com > > > > > > wrote: > > > > > >> > > > > > >> > Thanks for the response, John. > > > > > >> > > > > > > >> > > If I can offer my thoughts, it seems better to just document > > on > > > > the > > > > > >> > > Stream join javadoc for the Materialized parameter that it > > will > > > > not > > > > > >> > > make the join result queriable. I'm not opposed to the > > queriable > > > > > flag > > > > > >> > > in general, but introducing it is a much larger > consideration > > > that > > > > > has > > > > > >> > > previously derailed this KIP discussion. In the interest of > > just > > > > > >> > > closing the gap and keeping the API change small, it seems > > > better > > > > to > > > > > >> > > just go with documentation for now. > > > > > >> > > > > > > >> > I agree with your statement here. IMHO the most important > goal > > of > > > > > this > > > > > >> KIP > > > > > >> > is to not breaking existing users and gain some consistency of > > the > > > > > API. > > > > > >> > > > > > > >> > I'll update the KIP accordingly. > > > > > >> > > > > > > >> > -Bill > > > > > >> > > > > > > >> > On Tue, Jul 16, 2019 at 11:55 AM John Roesler < > > j...@confluent.io> > > > > > >> wrote: > > > > > >> > > > > > > >> > > Hi Bill, > > > > > >> > > > > > > > >> > > Thanks for driving this KIP toward a conclusion. I'm on > board > > > with > > > > > >> > > your decision. > > > > > >> > > > > > > > >> > > You didn't mention whether you're still proposing to add the > > > > > >> > > "queriable" flag to the Materialized config object, or just > > > > document > > > > > >> > > that a Stream join is never queriable. Both options have > come > > up > > > > > >> > > earlier in the discussion, so it would be good to pin this > > down. > > > > > >> > > > > > > > >> > > If I can offer my thoughts, it seems better to just document > > on > > > > the > > > > > >> > > Stream join javadoc for the Materialized parameter that it > > will > > > > not > > > > > >> > > make the join result queriable. I'm not opposed to the > > queriable > > > > > flag > > > > > >> > > in general, but introducing it is a much larger > consideration > > > that > > > > > has > > > > > >> > > previously derailed this KIP discussion. In the interest of > > just > > > > > >> > > closing the gap and keeping the API change small, it seems > > > better > > > > to > > > > > >> > > just go with documentation for now. > > > > > >> > > > > > > > >> > > Thanks again, > > > > > >> > > -John > > > > > >> > > > > > > > >> > > On Thu, Jul 11, 2019 at 2:45 PM Bill Bejeck < > > bbej...@gmail.com> > > > > > >> wrote: > > > > > >> > > > > > > > > >> > > > Thanks all for the great discussion so far. > > > > > >> > > > > > > > > >> > > > Everyone has made excellent points, and I appreciate the > > > detail > > > > > >> > everyone > > > > > >> > > > has put into their arguments. > > > > > >> > > > > > > > > >> > > > However, after carefully evaluating all the points made so > > > far, > > > > > >> > creating > > > > > >> > > an > > > > > >> > > > overload with Materialized is still my #1 option. > > > > > >> > > > My reasoning for saying so is two-fold: > > > > > >> > > > > > > > > >> > > > 1. It's a small change, and IMHO since it's consistent > > with > > > > our > > > > > >> > > current > > > > > >> > > > API concerning state store usage, the cognitive load on > > > users > > > > > >> will > > > > > >> > be > > > > > >> > > > minimal. > > > > > >> > > > 2. It achieves the most important goal of this KIP, > > namely > > > to > > > > > >> close > > > > > >> > > the > > > > > >> > > > gap of naming state stores independently of the join > > > operator > > > > > >> name. > > > > > >> > > > > > > > > >> > > > Additionally, I agree with the points made by Matthias > > earlier > > > > (I > > > > > >> > realize > > > > > >> > > > there is some overlap here). > > > > > >> > > > > > > > > >> > > > > - the main purpose of this KIP is to close the naming > gap > > > > what > > > > > we > > > > > >> > > achieve > > > > > >> > > > > - we can allow people to use the new in-memory store > > > > > >> > > > > - we allow people to enable/disable caching > > > > > >> > > > > - we unify the API > > > > > >> > > > > - we decouple querying from naming > > > > > >> > > > > - it's a small API change > > > > > >> > > > > > > > > >> > > > Although it's not a perfect solution, IMHO the positives > of > > > > using > > > > > >> > > > Materialize far outweigh the negatives, and from what > we've > > > > > >> discussed > > > > > >> > so > > > > > >> > > > far, anything we implement seems to involve an additional > > > change > > > > > >> down > > > > > >> > the > > > > > >> > > > road. > > > > > >> > > > > > > > > >> > > > If others are still strongly opposed to using > Materialized, > > my > > > > > other > > > > > >> > > > preferences would be > > > > > >> > > > > > > > > >> > > > 1. Add a "withStoreName" to Joined. Although I agree > > with > > > > > >> Guozhang > > > > > >> > > that > > > > > >> > > > having a parameter that only applies to one use-case > > would > > > be > > > > > >> > clumsy. > > > > > >> > > > 2. Add a String overload for naming the store, but this > > > would > > > > > be > > > > > >> my > > > > > >> > > > least favorite option as IMHO this seems to be a step > > > > backward > > > > > >> from > > > > > >> > > why we > > > > > >> > > > introduced configuration objects in the first place. > > > > > >> > > > > > > > > >> > > > Thanks, > > > > > >> > > > Bill > > > > > >> > > > > > > > > >> > > > On Thu, Jun 27, 2019 at 4:45 PM Matthias J. Sax < > > > > > >> matth...@confluent.io > > > > > >> > > > > > > > >> > > > wrote: > > > > > >> > > > > > > > > >> > > > > Thanks for the KIP Bill! > > > > > >> > > > > > > > > > >> > > > > Great discussion to far. > > > > > >> > > > > > > > > > >> > > > > About John's idea about querying upstream stores and > don't > > > > > >> > materialize > > > > > >> > > a > > > > > >> > > > > store: I agree with Bill that this seems to be an > > orthogonal > > > > > >> > question, > > > > > >> > > > > and it might be better to treat it as an independent > > > > > optimization > > > > > >> and > > > > > >> > > > > exclude from this KIP. > > > > > >> > > > > > > > > > >> > > > > > What should be the behavior if there is no store > > > > > >> > > > > > configured (e.g., if Materialized with only serdes) > and > > > > > >> querying is > > > > > >> > > > > > enabled? > > > > > >> > > > > > > > > > >> > > > > IMHO, this could be an error case. If one wants to > query a > > > > > store, > > > > > >> > they > > > > > >> > > > > need to provide a name -- if you don't know the name, > how > > > > would > > > > > >> you > > > > > >> > > > > actually query the store (even if it would be possible > to > > > get > > > > > the > > > > > >> > name > > > > > >> > > > > from the `TopologyDescription`, it seems clumsy). > > > > > >> > > > > > > > > > >> > > > > If we don't want to throw an error, materializing seems > to > > > be > > > > > the > > > > > >> > right > > > > > >> > > > > option, to exclude "query optimization" from this KIP. I > > > would > > > > > be > > > > > >> ok > > > > > >> > > > > with this option, even if it's clumsy to get the name > from > > > > > >> > > > > `TopologyDescription`; hence, I would prefer to treat it > > as > > > an > > > > > >> error. > > > > > >> > > > > > > > > > >> > > > > > To get back to the current behavior, users would have > to > > > > > >> > > > > > add a "bytes store supplier" to the Materialized to > > > indicate > > > > > >> that, > > > > > >> > > > > > yes, they really want a state store there. > > > > > >> > > > > > > > > > >> > > > > This sound like a quite subtle semantic difference on > how > > to > > > > use > > > > > >> the > > > > > >> > > > > API. Might be hard to explain to users. I would prefer > to > > > not > > > > > >> > > introduce it. > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > About Guozhang's points: > > > > > >> > > > > > > > > > >> > > > > 1a) That is actually a good point. However, I believe we > > > > cannot > > > > > >> get > > > > > >> > > > > around this issue easily, and it seems ok to me, to > expose > > > the > > > > > >> actual > > > > > >> > > > > store type we are using. (More thoughts later.) > > > > > >> > > > > > > > > > >> > > > > 1b) I don't see an issue with allowing users to query > all > > > > > stores? > > > > > >> > What > > > > > >> > > > > is the rational behind it? What do we gain by not > allowing > > > it? > > > > > >> > > > > > > > > > >> > > > > 2) While I understand what you are saying, we also > > want/need > > > > to > > > > > >> have > > > > > >> > a > > > > > >> > > > > way in the PAPI to allow users adding "internal/private" > > > > > >> > non-queryable > > > > > >> > > > > stores to a topology. That's possible via > > > > > >> > > > > `Materialized#withQueryingDisabled()`. We could also > > update > > > > > >> > > > > `Topology#addStateStore(StoreBuilder, boolean > isQueryable, > > > > > >> > String...)` > > > > > >> > > > > to address this. Again, I agree with Bill that the > current > > > API > > > > > is > > > > > >> > built > > > > > >> > > > > in a certain way, and if we want to change it, it should > > be > > > a > > > > > >> > separate > > > > > >> > > > > KIP, as it seems to be an orthogonal concern. > > > > > >> > > > > > > > > > >> > > > > > Instead, we just restrict KIP-307 to NOT > > > > > >> > > > > > use the Joined.name for state store names and always > use > > > > > >> internal > > > > > >> > > names > > > > > >> > > > > as > > > > > >> > > > > > well, which admittedly indeed leaves a hole of not > being > > > > able > > > > > to > > > > > >> > > cover > > > > > >> > > > > all > > > > > >> > > > > > internal names here > > > > > >> > > > > > > > > > >> > > > > I think it's important to close this gap. Naming > entities > > > > seems > > > > > >> to a > > > > > >> > > > > binary feature: if there is a gap, the feature is more > or > > > less > > > > > >> > useless, > > > > > >> > > > > rendering KIP-307 void. > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > I like John's detailed list of required features and > what > > > > > >> > > > > Materialized/WindowByteStoreSuppliers offers. My take > is, > > > that > > > > > >> adding > > > > > >> > > > > Materialized including the required run-time checks is > the > > > > best > > > > > >> > option > > > > > >> > > > > we have, for the following reasons: > > > > > >> > > > > > > > > > >> > > > > - the main purpose of this KIP is to close the naming > gap > > > > what > > > > > we > > > > > >> > > achieve > > > > > >> > > > > - we can allow people to use the new in-memory store > > > > > >> > > > > - we allow people to enable/disable caching > > > > > >> > > > > - we unify the API > > > > > >> > > > > - we decouple querying from naming > > > > > >> > > > > - it's a small API change > > > > > >> > > > > > > > > > >> > > > > Adding an overload and only passing in a name, would > > address > > > > the > > > > > >> main > > > > > >> > > > > purpose of the KIP. However, it falls short on all the > > other > > > > > >> > "goodies". > > > > > >> > > > > As you mentioned, passing in `Materialized` might not be > > > > perfect > > > > > >> and > > > > > >> > > > > maybe we need to deprecate is at some point; but this is > > > also > > > > > true > > > > > >> > for > > > > > >> > > > > passing in just a name. > > > > > >> > > > > > > > > > >> > > > > I am also not convinced, that a `StreamJoinStore` would > > > > resolve > > > > > >> all > > > > > >> > the > > > > > >> > > > > issues. In the end, as long as we are using a > > > `WindowedStore` > > > > > >> > > > > internally, we need to expose this "implemenation > detail" > > to > > > > > >> users to > > > > > >> > > > > allow them to plug in a custom store. Adding > > `Materialized` > > > > seem > > > > > >> to > > > > > >> > be > > > > > >> > > > > the best short-term fix from my point of view. > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > -Matthias > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > On 6/27/19 9:56 AM, Guozhang Wang wrote: > > > > > >> > > > > > Hi John, > > > > > >> > > > > > > > > > > >> > > > > > I actually feels better about a new interface but I'm > > not > > > > sure > > > > > >> if > > > > > >> > we > > > > > >> > > > > would > > > > > >> > > > > > need the full configuration of store / log / cache, > now > > or > > > > in > > > > > >> the > > > > > >> > > future > > > > > >> > > > > > ever for stream-stream join. > > > > > >> > > > > > > > > > > >> > > > > > Right now I feel that 1) we want to improve our > > > > implementation > > > > > >> of > > > > > >> > > > > > stream-stream join, and potentially also allow users > to > > > > > >> customize > > > > > >> > > this > > > > > >> > > > > > implementation but with a more suitable interface than > > the > > > > > >> current > > > > > >> > > > > > WindowStore interface, how to do that is less clear > and > > > > > >> > > execution-wise > > > > > >> > > > > it's > > > > > >> > > > > > (arguably..) not urgent; 2) we want to close the last > > gap > > > > > >> > > (Stream-stream > > > > > >> > > > > > join) of allowing users to specify all internal names > to > > > > help > > > > > on > > > > > >> > > backward > > > > > >> > > > > > compatibility, which is urgent. > > > > > >> > > > > > > > > > > >> > > > > > Therefore if we want to unblock 2) from 1) in the near > > > > term, I > > > > > >> feel > > > > > >> > > > > > slightly inclined to just add overload functions that > > > takes > > > > > in a > > > > > >> > > store > > > > > >> > > > > name > > > > > >> > > > > > for stream-stream joins only -- and admittedly, in the > > > > future > > > > > >> this > > > > > >> > > > > function > > > > > >> > > > > > maybe deprecated -- i.e. if we have to do something > that > > > we > > > > > "may > > > > > >> > > regret" > > > > > >> > > > > in > > > > > >> > > > > > the future, I'd like to pick the least intrusive > option. > > > > > >> > > > > > > > > > > >> > > > > > About `Joined#withStoreName`: since the Joined class > > > itself > > > > is > > > > > >> also > > > > > >> > > used > > > > > >> > > > > in > > > > > >> > > > > > other join types, I feel less comfortable to have a > > > > > >> > > > > `Joined#withStoreName` > > > > > >> > > > > > which is only going to be used by stream-stream join. > Or > > > > > maybe I > > > > > >> > miss > > > > > >> > > > > > something here about the "latter" case that you are > > > > referring > > > > > >> to? > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > Guozhang > > > > > >> > > > > > > > > > > >> > > > > > On Mon, Jun 24, 2019 at 12:16 PM John Roesler < > > > > > >> j...@confluent.io> > > > > > >> > > wrote: > > > > > >> > > > > > > > > > > >> > > > > >> Thanks Guozhang, > > > > > >> > > > > >> > > > > > >> > > > > >> Yep. Maybe we can consider just exactly what the join > > > > needs: > > > > > >> > > > > >> > > > > > >> > > > > >>> the WindowStore<Bytes, byte[]> itself is fine, if > > overly > > > > > >> broad, > > > > > >> > > > > >>> since the only two methods we need are > > `window.put(key, > > > > > value, > > > > > >> > > > > >>> context().timestamp())` and `WindowStoreIterator<V2> > > > iter > > > > = > > > > > >> > > > > >>> window.fetch(key, timeFrom, timeTo)`. > > > > > >> > > > > >> > > > > > >> > > > > >> One "middle ground" would be to extract _this_ into a > > new > > > > > store > > > > > >> > > > > >> interface, which only supports these API calls, like > > > > > >> > > > > >> StreamJoinStore<K, V>. This would give us the > latitude > > we > > > > > need > > > > > >> to > > > > > >> > > > > >> efficiently support the exact operation without > > > concerning > > > > > >> > ourselves > > > > > >> > > > > >> with all the other things a WindowStore can do (which > > are > > > > > >> > > unreachable > > > > > >> > > > > >> for the join use case). It would also let us drop > > "store > > > > > >> > duplicates" > > > > > >> > > > > >> from the main WindowStore interface, since it only > > exists > > > > to > > > > > >> > support > > > > > >> > > > > >> the join use case. > > > > > >> > > > > >> > > > > > >> > > > > >> If we were to add a new StreamJoinStore interface, > then > > > > it'd > > > > > be > > > > > >> > > > > >> straightforward how we could add also > > > > > >> > > > > >> `Materialized.as(StreamJoinBytesStoreSupplier)` and > use > > > > > >> > > Materialized, > > > > > >> > > > > >> or alternatively add the ability to set the bytes > store > > > on > > > > > >> Joined. > > > > > >> > > > > >> > > > > > >> > > > > >> Personally, I'm kind of leaning toward the latter > (and > > > also > > > > > >> doing > > > > > >> > > > > >> `Joined#withStoreName`), since adding the new > interface > > > to > > > > > >> > > > > >> Materialized then also pollutes the interface for its > > > > > _actual_ > > > > > >> use > > > > > >> > > > > >> case of materializing a table view. Of course, to > solve > > > the > > > > > >> > > immediate > > > > > >> > > > > >> problem, all we need is the store name, but we might > > feel > > > > > >> better > > > > > >> > > about > > > > > >> > > > > >> adding the store name to Joined if we _also_ feel > like > > in > > > > the > > > > > >> > > future, > > > > > >> > > > > >> we would add store/log/cache configuration to Joined > as > > > > well. > > > > > >> > > > > >> > > > > > >> > > > > >> -John > > > > > >> > > > > >> > > > > > >> > > > > >> On Mon, Jun 24, 2019 at 12:56 PM Guozhang Wang < > > > > > >> > wangg...@gmail.com> > > > > > >> > > > > wrote: > > > > > >> > > > > >>> > > > > > >> > > > > >>> Hello John, > > > > > >> > > > > >>> > > > > > >> > > > > >>> My main concern is exactly the first point at the > > bottom > > > > of > > > > > >> your > > > > > >> > > > > analysis > > > > > >> > > > > >>> here: "* configure the bytes store". I'm not sure if > > > > using a > > > > > >> > window > > > > > >> > > > > bytes > > > > > >> > > > > >>> store would be ideal for stream-stream windowed > join; > > > e.g. > > > > > we > > > > > >> > could > > > > > >> > > > > >>> consider two dimensional list sorted by timestamps > and > > > > then > > > > > by > > > > > >> > > keys to > > > > > >> > > > > do > > > > > >> > > > > >>> the join, whereas a windowed bytes store is > basically > > > > sorted > > > > > >> by > > > > > >> > key > > > > > >> > > > > >> first, > > > > > >> > > > > >>> then by timestamp. If we expose the Materialized to > > let > > > > user > > > > > >> pass > > > > > >> > > in a > > > > > >> > > > > >>> windowed bytes store, then we would need to change > > that > > > if > > > > > we > > > > > >> > want > > > > > >> > > to > > > > > >> > > > > >>> replace it with a different implementation > interface. > > > > > >> > > > > >>> > > > > > >> > > > > >>> > > > > > >> > > > > >>> Guozhang > > > > > >> > > > > >>> > > > > > >> > > > > >>> On Mon, Jun 24, 2019 at 8:59 AM John Roesler < > > > > > >> j...@confluent.io> > > > > > >> > > > > wrote: > > > > > >> > > > > >>> > > > > > >> > > > > >>>> Hey Guozhang and Bill, > > > > > >> > > > > >>>> > > > > > >> > > > > >>>> For what it's worth, I agree with you both! > > > > > >> > > > > >>>> > > > > > >> > > > > >>>> I think it might help the discussion to look > > concretely > > > > at > > > > > >> what > > > > > >> > > > > >>>> Materialized does: > > > > > >> > > > > >>>> * set a WindowBytesStoreSupplier > > > > > >> > > > > >>>> * set a name > > > > > >> > > > > >>>> * set the key/value serdes > > > > > >> > > > > >>>> * disable/enable/configure change-logging > > > > > >> > > > > >>>> * disable/enable caching > > > > > >> > > > > >>>> * configure retention > > > > > >> > > > > >>>> > > > > > >> > > > > >>>> Further, looking into the WindowBytesStoreSupplier, > > the > > > > > >> > interface > > > > > >> > > lets > > > > > >> > > > > >> you: > > > > > >> > > > > >>>> * get the segment interval > > > > > >> > > > > >>>> * get the window size > > > > > >> > > > > >>>> * get whether "duplicates" are enabled > > > > > >> > > > > >>>> * get the retention period > > > > > >> > > > > >>>> * (obviously) get a WindowStore<Bytes, byte[]> > > > > > >> > > > > >>>> > > > > > >> > > > > >>>> We know that Materialized isn't exactly what we > need > > > for > > > > > >> stream > > > > > >> > > joins, > > > > > >> > > > > >>>> but we can see how close Materialized is to what we > > > need. > > > > > If > > > > > >> it > > > > > >> > is > > > > > >> > > > > >>>> close, maybe we can use it and document the gaps, > and > > > if > > > > it > > > > > >> is > > > > > >> > not > > > > > >> > > > > >>>> close, then maybe we should just add what we need > to > > > > > Joined. > > > > > >> > > > > >>>> Stream Join's requirements for its stores: > > > > > >> > > > > >>>> * a multimap store (i.e., it keeps duplicates) for > > > > storing > > > > > >> > general > > > > > >> > > > > >>>> (not windowed) keyed records associated with their > > > > > insertion > > > > > >> > > time, and > > > > > >> > > > > >>>> allows efficient time-bounded lookups and also > > > efficient > > > > > >> purges > > > > > >> > > of old > > > > > >> > > > > >>>> data. > > > > > >> > > > > >>>> ** Note, a properly configured > > WindowBytesStoreSupplier > > > > > >> > satisfies > > > > > >> > > this > > > > > >> > > > > >>>> requirement, and the interface supports the queries > > we > > > > need > > > > > >> to > > > > > >> > > verify > > > > > >> > > > > >>>> the configuration at run-time > > > > > >> > > > > >>>> * set a name for the store > > > > > >> > > > > >>>> * do _not_ set the serdes (they are already set in > > > > Joined) > > > > > >> > > > > >>>> * logging could be configurable (set to enabled > now) > > > > > >> > > > > >>>> * caching could be configurable (set to enabled > now) > > > > > >> > > > > >>>> * do _not_ configure retention (determined by > > > > JoinWindows) > > > > > >> > > > > >>>> > > > > > >> > > > > >>>> So, out of six capabilities for Materialized, there > > are > > > > two > > > > > >> we > > > > > >> > > don't > > > > > >> > > > > >>>> want (serdes and retention). These would become > > > run-time > > > > > >> checks > > > > > >> > > if we > > > > > >> > > > > >>>> use it. > > > > > >> > > > > >>>> > > > > > >> > > > > >>>> A third questionable capability is to provide a > > > > > >> > > > > >>>> WindowBytesStoreSupplier. Looking at whether the > > > > > >> > > > > >>>> WindowBytesStoreSupplier is the right interface for > > > > Stream > > > > > >> Join: > > > > > >> > > > > >>>> * configuring segment interval is fine > > > > > >> > > > > >>>> * should _not_ configure window size (it's > determined > > > by > > > > > >> > > JoinWindows) > > > > > >> > > > > >>>> * duplicates _must_ be enabled > > > > > >> > > > > >>>> * retention should be _at least_ windowSize + > > > > gracePeriod, > > > > > >> but > > > > > >> > > note > > > > > >> > > > > >>>> that (unlike for Table window stores) there is no > > > utility > > > > > in > > > > > >> > > having a > > > > > >> > > > > >>>> longer retention time. > > > > > >> > > > > >>>> * the WindowStore<Bytes, byte[]> itself is fine, if > > > > overly > > > > > >> > broad, > > > > > >> > > > > >>>> since the only two methods we need are > > `window.put(key, > > > > > >> value, > > > > > >> > > > > >>>> context().timestamp())` and > `WindowStoreIterator<V2> > > > > iter = > > > > > >> > > > > >>>> window.fetch(key, timeFrom, timeTo)`. > > > > > >> > > > > >>>> > > > > > >> > > > > >>>> Thus, flattening out the overlap for > > > > > WindowBytesStoreSupplier > > > > > >> > > onto the > > > > > >> > > > > >>>> overlap for Materialized, we have 9 capabilities > > total > > > > > (note > > > > > >> > > retention > > > > > >> > > > > >>>> is duplicated), we have 4 that we don't want: > > > > > >> > > > > >>>> * do _not_ set the serdes (they are already set in > > > > Joined) > > > > > >> > > > > >>>> * do _not_ configure retention (determined by > > > > JoinWindows) > > > > > >> > > > > >>>> * should _not_ configure window size (it's > determined > > > by > > > > > >> > > JoinWindows) > > > > > >> > > > > >>>> * duplicates _must_ be enabled > > > > > >> > > > > >>>> > > > > > >> > > > > >>>> These gaps would have to be covered with run-time > > > checks > > > > if > > > > > >> we > > > > > >> > > re-use > > > > > >> > > > > >>>> Materialized and WindowStoreBytesStoreSupplier > both. > > > > Maybe > > > > > >> this > > > > > >> > > sounds > > > > > >> > > > > >>>> bad, but consider the other side, that we get 5 new > > > > > >> capabilities > > > > > >> > > we > > > > > >> > > > > >>>> don't require, but are still pretty nice: > > > > > >> > > > > >>>> * configure the bytes store > > > > > >> > > > > >>>> * set a name for the store > > > > > >> > > > > >>>> * configure caching > > > > > >> > > > > >>>> * configure logging > > > > > >> > > > > >>>> * configure segment interval > > > > > >> > > > > >>>> > > > > > >> > > > > >>>> Not sure where this nets us out, but it's food for > > > > thought. > > > > > >> > > > > >>>> -John > > > > > >> > > > > >>>> > > > > > >> > > > > >>>> On Sun, Jun 23, 2019 at 7:52 PM Guozhang Wang < > > > > > >> > wangg...@gmail.com > > > > > >> > > > > > > > > >> > > > > >> wrote: > > > > > >> > > > > >>>>> > > > > > >> > > > > >>>>> Hi Bill, > > > > > >> > > > > >>>>> > > > > > >> > > > > >>>>> I think by giving a Materialized param into > > > > stream-stream > > > > > >> join, > > > > > >> > > it's > > > > > >> > > > > >> okay > > > > > >> > > > > >>>>> (though still ideal) to say "we still would not > > expose > > > > the > > > > > >> > store > > > > > >> > > for > > > > > >> > > > > >>>>> queries", but it would sound a bit awkward to say > > "we > > > > > would > > > > > >> > also > > > > > >> > > > > >> ignore > > > > > >> > > > > >>>>> whatever the passed in store supplier but just use > > our > > > > > >> default > > > > > >> > > ones" > > > > > >> > > > > >> -- > > > > > >> > > > > >>>>> again the concern is that, if in the future we'd > > want > > > to > > > > > >> change > > > > > >> > > the > > > > > >> > > > > >>>> default > > > > > >> > > > > >>>>> implementation of join algorithm which no longer > > rely > > > > on a > > > > > >> > window > > > > > >> > > > > >> store > > > > > >> > > > > >>>>> with deduping enabled, then we need to change this > > API > > > > > >> again by > > > > > >> > > > > >> changing > > > > > >> > > > > >>>>> the store supplier type. > > > > > >> > > > > >>>>> > > > > > >> > > > > >>>>> If we do want to fill this hole for stream-stream > > > join, > > > > I > > > > > >> feel > > > > > >> > > just > > > > > >> > > > > >>>> adding > > > > > >> > > > > >>>>> a String typed store-name would even be less > > > > > >> future-intrusive > > > > > >> > if > > > > > >> > > we > > > > > >> > > > > >>>> expect > > > > > >> > > > > >>>>> this parameter to be modified later. > > > > > >> > > > > >>>>> > > > > > >> > > > > >>>>> Does that makes sense? > > > > > >> > > > > >>>>> > > > > > >> > > > > >>>>> > > > > > >> > > > > >>>>> Guozhang > > > > > >> > > > > >>>>> > > > > > >> > > > > >>>>> On Sat, Jun 22, 2019 at 12:51 PM Bill Bejeck < > > > > > >> > bbej...@gmail.com> > > > > > >> > > > > >> wrote: > > > > > >> > > > > >>>>> > > > > > >> > > > > >>>>>> Thanks for the comments John and Guozhang, I'll > > > address > > > > > >> each > > > > > >> > > one of > > > > > >> > > > > >>>> your > > > > > >> > > > > >>>>>> comments in turn. > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>> John, > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>>> I'm wondering about a missing quadrant from the > > > truth > > > > > >> table > > > > > >> > > > > >> involving > > > > > >> > > > > >>>>>>> whether a Materialized is stored or not and > > querying > > > > is > > > > > >> > > > > >>>>>>> enabled/disabled... What should be the behavior > if > > > > there > > > > > >> is > > > > > >> > no > > > > > >> > > > > >> store > > > > > >> > > > > >>>>>>> configured (e.g., if Materialized with only > > serdes) > > > > and > > > > > >> > > querying > > > > > >> > > > > >> is > > > > > >> > > > > >>>>>> enabled? > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>>> It seems we have two choices: > > > > > >> > > > > >>>>>>> 1. we can force creation of a state store in > this > > > > case, > > > > > so > > > > > >> > the > > > > > >> > > > > >> store > > > > > >> > > > > >>>>>>> can be used to serve the queries > > > > > >> > > > > >>>>>>> 2. we can provide just a queriable view, > basically > > > > > >> letting IQ > > > > > >> > > > > >> query > > > > > >> > > > > >>>>>>> into the "KTableValueGetter", which would > > > > transparently > > > > > >> > > > > >> construct the > > > > > >> > > > > >>>>>>> query response by applying the operator logic to > > the > > > > > >> upstream > > > > > >> > > > > >> state > > > > > >> > > > > >>>> if > > > > > >> > > > > >>>>>>> the operator state isn't already stored. > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>> I agree with your assertion about a missing > > quadrant > > > > from > > > > > >> the > > > > > >> > > truth > > > > > >> > > > > >>>> table. > > > > > >> > > > > >>>>>> Additionally, I too like the concept of a > queriable > > > > view. > > > > > >> > But I > > > > > >> > > > > >> think > > > > > >> > > > > >>>> that > > > > > >> > > > > >>>>>> goes a bit beyond the scope of this KIP and would > > > like > > > > to > > > > > >> > pursue > > > > > >> > > > > >> that > > > > > >> > > > > >>>>>> feature as follow-on work. Also thinking about > > this > > > > KIP > > > > > >> some > > > > > >> > > > > >> more, I'm > > > > > >> > > > > >>>>>> thinking of the changes to Materialized might be > a > > > > reach > > > > > as > > > > > >> > > well. > > > > > >> > > > > >>>>>> Separating the naming from a store and its > > queryable > > > > > state > > > > > >> > seems > > > > > >> > > > > >> like a > > > > > >> > > > > >>>>>> complex issue in and of itself and should be > > treated > > > > > >> > > accordingly. > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>> So here's what I'm thinking now. We add > > Materialzied > > > > to > > > > > >> Join, > > > > > >> > > but > > > > > >> > > > > >> for > > > > > >> > > > > >>>> now, > > > > > >> > > > > >>>>>> we internally disable querying. I know this > breaks > > > our > > > > > >> > current > > > > > >> > > > > >>>> semantic > > > > > >> > > > > >>>>>> approach, but I think it's crucial that we do two > > > > things > > > > > in > > > > > >> > this > > > > > >> > > > > >> KIP > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>> 1. Break the naming of the state stores from > > > Joined > > > > to > > > > > >> > > > > >>>> Materialized, so > > > > > >> > > > > >>>>>> the naming of state stores follows our current > > > > pattern > > > > > >> and > > > > > >> > > > > >> enables > > > > > >> > > > > >>>>>> upgrades > > > > > >> > > > > >>>>>> from 2.3 to 2.4 > > > > > >> > > > > >>>>>> 2. Offer the ability to configure the state > > stores > > > > of > > > > > >> the > > > > > >> > > join, > > > > > >> > > > > >> even > > > > > >> > > > > >>>>>> providing a different implementation (i.e. > > > > in-memory) > > > > > if > > > > > >> > > > > >> desired. > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>> With that in mind I'm considering changing the > KIP > > to > > > > > >> remove > > > > > >> > the > > > > > >> > > > > >>>> changes to > > > > > >> > > > > >>>>>> Materialized, and we document very clearly that > by > > > > > >> providing a > > > > > >> > > > > >>>> Materialized > > > > > >> > > > > >>>>>> object with a name is only for naming the state > > > store, > > > > > >> hence > > > > > >> > the > > > > > >> > > > > >>>> changelog > > > > > >> > > > > >>>>>> topics and any possible configurations of the > > store, > > > > but > > > > > >> this > > > > > >> > > store > > > > > >> > > > > >>>> *will > > > > > >> > > > > >>>>>> not be available for IQ.* > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>> WDYT? > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>> Guozhang, > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>>> 1. About not breaking compatibility of > > stream-stream > > > > > join > > > > > >> > > > > >>>> materialized > > > > > >> > > > > >>>>>>> stores: I think this is a valid issue to tackle, > > but > > > > > after > > > > > >> > > > > >> thinking > > > > > >> > > > > >>>> about > > > > > >> > > > > >>>>>>> it once more I'm not sure if exposing > Materialized > > > > would > > > > > >> be a > > > > > >> > > > > >> good > > > > > >> > > > > >>>>>> solution > > > > > >> > > > > >>>>>>> here. My rationles: > > > > > >> > > > > >>>>>>> > > > > > >> > > > > >>>>>>> 1.a For stream-stream join, our current usage of > > > > > >> window-store > > > > > >> > > is > > > > > >> > > > > >> not > > > > > >> > > > > >>>>>> ideal, > > > > > >> > > > > >>>>>>> and we want to modify it in the near future to > be > > > more > > > > > >> > > > > >> efficient. Not > > > > > >> > > > > >>>>>>> allowing users to override such state store > > backend > > > > > gives > > > > > >> us > > > > > >> > > such > > > > > >> > > > > >>>> freedom > > > > > >> > > > > >>>>>>> (which was also considered in the original DSL > > > > design), > > > > > >> > whereas > > > > > >> > > > > >>>> getting a > > > > > >> > > > > >>>>>>> Materialized<WindowStore> basically kicks out > that > > > > > freedom > > > > > >> > out > > > > > >> > > > > >> of the > > > > > >> > > > > >>>>>>> window. > > > > > >> > > > > >>>>>>> 1.b For strema-stream join, in our original > design > > > we > > > > > >> intend > > > > > >> > to > > > > > >> > > > > >>>> "never" > > > > > >> > > > > >>>>>>> want users to query the state, since it is just > > for > > > > > >> buffering > > > > > >> > > the > > > > > >> > > > > >>>>>> upcoming > > > > > >> > > > > >>>>>>> records from the stream. Now I know that some > > users > > > > may > > > > > >> > indeed > > > > > >> > > > > >> want > > > > > >> > > > > >>>> to > > > > > >> > > > > >>>>>>> query it from the debugging perspective, but > > still I > > > > > >> > concerned > > > > > >> > > > > >> about > > > > > >> > > > > >>>>>>> whether leveraging IQ for debugging purposes > would > > > be > > > > > the > > > > > >> > right > > > > > >> > > > > >>>> solution > > > > > >> > > > > >>>>>>> here. And adding Materialized object opens the > > door > > > to > > > > > let > > > > > >> > > users > > > > > >> > > > > >>>> query > > > > > >> > > > > >>>>>>> about it (unless we did something intentionally > to > > > > still > > > > > >> > > forbids > > > > > >> > > > > >> it), > > > > > >> > > > > >>>>>> which > > > > > >> > > > > >>>>>>> also restricts us in the future. > > > > > >> > > > > >>>>>>> > > > > > >> > > > > >>>>>>> 2. About the coupling between > Materialized.name() > > > and > > > > > >> > > queryable: > > > > > >> > > > > >>>> again I > > > > > >> > > > > >>>>>>> think this is a valid issue. But I'm not sure if > > the > > > > > >> current > > > > > >> > > > > >>>>>>> "withQuerryingDisabled / Enabled" at > Materialized > > is > > > > the > > > > > >> best > > > > > >> > > > > >>>> approach. > > > > > >> > > > > >>>>>>> Here I think I agree with John, that generally > > > > speaking > > > > > >> it's > > > > > >> > > > > >> better > > > > > >> > > > > >>>> be a > > > > > >> > > > > >>>>>>> control function on the `KTable` itself, rather > > than > > > > on > > > > > >> > > > > >>>> `Materialized`, > > > > > >> > > > > >>>>>> so > > > > > >> > > > > >>>>>>> fixing it via adding functions through > > > `Materialized` > > > > > >> seems > > > > > >> > > not a > > > > > >> > > > > >>>> natural > > > > > >> > > > > >>>>>> approach either. > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>> I understand your thoughts here, and up to a > > point, I > > > > > agree > > > > > >> > with > > > > > >> > > > > >> you. > > > > > >> > > > > >>>>>> But concerning not providing Materialized as it > may > > > > > >> restrict > > > > > >> > us > > > > > >> > > in > > > > > >> > > > > >> the > > > > > >> > > > > >>>>>> future for delivering different implementations, > > I'm > > > > > >> wondering > > > > > >> > > if > > > > > >> > > > > >> we > > > > > >> > > > > >>>> are > > > > > >> > > > > >>>>>> doing some premature optimization here. > > > > > >> > > > > >>>>>> My rationale for saying so > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>> 1. I think the cost of not allowing the naming > > of > > > > > state > > > > > >> > > stores > > > > > >> > > > > >> for > > > > > >> > > > > >>>> joins > > > > > >> > > > > >>>>>> is too big of a gap to leave. IMHO for joins > > to > > > > > follow > > > > > >> > the > > > > > >> > > > > >> current > > > > > >> > > > > >>>>>> pattern of using Materialized for naming state > > > > stores > > > > > >> would > > > > > >> > > be > > > > > >> > > > > >> what > > > > > >> > > > > >>>> most > > > > > >> > > > > >>>>>> users would expect to use. As I said in my > > > comments > > > > > >> > above, I > > > > > >> > > > > >> think > > > > > >> > > > > >>>> we > > > > > >> > > > > >>>>>> should *not include* the changes to > Materialized > > > and > > > > > >> > enforce > > > > > >> > > > > >> named > > > > > >> > > > > >>>>>> stores for joins as unavailable for IQ. > > > > > >> > > > > >>>>>> 2. We'll still have the join methods available > > > > > without a > > > > > >> > > > > >>>> Materialized > > > > > >> > > > > >>>>>> allowing us to do something different > internally > > > if > > > > a > > > > > >> > > > > >> Materialized > > > > > >> > > > > >>>> is > > > > > >> > > > > >>>>>> not > > > > > >> > > > > >>>>>> provided. > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>>> Overall, I'm thinking maybe we should still use > > two > > > > > stones > > > > > >> > > rather > > > > > >> > > > > >>>> than > > > > > >> > > > > >>>>>> one > > > > > >> > > > > >>>>>>> to kill these two birds, and probably for this > KIP > > > we > > > > > just > > > > > >> > > focus > > > > > >> > > > > >> on > > > > > >> > > > > >>>> 1) > > > > > >> > > > > >>>>>>> above. And for that I'd like to not expose the > > > > > >> Materialized > > > > > >> > > > > >> either > > > > > >> > > > > >>>> for > > > > > >> > > > > >>>>>>> rationales that I've listed above. Instead, we > > just > > > > > >> restrict > > > > > >> > > > > >> KIP-307 > > > > > >> > > > > >>>> to > > > > > >> > > > > >>>>>> NOT > > > > > >> > > > > >>>>>>> use the Joined.name for state store names and > > always > > > > use > > > > > >> > > internal > > > > > >> > > > > >>>> names > > > > > >> > > > > >>>>>> as > > > > > >> > > > > >>>>>>> well, which admittedly indeed leaves a hole of > not > > > > being > > > > > >> able > > > > > >> > > to > > > > > >> > > > > >>>> cover > > > > > >> > > > > >>>>>> all > > > > > >> > > > > >>>>>>> internal names here, but now I feel this `hole` > > may > > > > > >> better be > > > > > >> > > > > >> filled > > > > > >> > > > > >>>> by, > > > > > >> > > > > >>>>>>> e.g. not creating changelog topics but just use > > the > > > > > >> upstream > > > > > >> > to > > > > > >> > > > > >>>>>>> re-bootstrap the materialized store, more > > > concretely: > > > > > when > > > > > >> > > > > >>>> materializing > > > > > >> > > > > >>>>>>> the store, try to piggy-back the changelog topic > > on > > > an > > > > > >> > existing > > > > > >> > > > > >>>> topic, > > > > > >> > > > > >>>>>> e.g. > > > > > >> > > > > >>>>>>> a) if the stream is coming directly from some > > source > > > > > topic > > > > > >> > > > > >> (including > > > > > >> > > > > >>>>>>> repartition topic), make that as changelog topic > > and > > > > if > > > > > >> it is > > > > > >> > > > > >>>> repartition > > > > > >> > > > > >>>>>>> topic change the retention / data purging policy > > > > > >> necessarily > > > > > >> > as > > > > > >> > > > > >>>> well; b) > > > > > >> > > > > >>>>>> if > > > > > >> > > > > >>>>>>> the stream is coming from some stateless > > operators, > > > > > >> delegate > > > > > >> > > that > > > > > >> > > > > >>>>>> stateless > > > > > >> > > > > >>>>>>> operator to the parent stream similar as a); if > > the > > > > > >> stream is > > > > > >> > > > > >> coming > > > > > >> > > > > >>>> from > > > > > >> > > > > >>>>>> a > > > > > >> > > > > >>>>>>> stream-stream join which is the only stateful > > > operator > > > > > >> that > > > > > >> > can > > > > > >> > > > > >>>> result in > > > > > >> > > > > >>>>>> a > > > > > >> > > > > >>>>>>> stream, consider merging the join into multi-way > > > joins > > > > > >> (yes, > > > > > >> > > > > >> this is > > > > > >> > > > > >>>> a > > > > > >> > > > > >>>>>> very > > > > > >> > > > > >>>>>>> hand-wavy thought, but the point here is that we > > do > > > > not > > > > > >> try > > > > > >> > to > > > > > >> > > > > >>>> tackle it > > > > > >> > > > > >>>>>>> now but leave it for a better solution :). > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>> I really like this idea! I agree with you in > that > > > this > > > > > >> > approach > > > > > >> > > > > >> to too > > > > > >> > > > > >>>>>> much for adding in this KIP, but we could pick it > > up > > > > > later > > > > > >> and > > > > > >> > > > > >>>> leverage the > > > > > >> > > > > >>>>>> Optimization framework to accomplish this re-use. > > > > > >> > > > > >>>>>> Again, while I agree we should break the naming > of > > > join > > > > > >> state > > > > > >> > > > > >> stores > > > > > >> > > > > >>>> from > > > > > >> > > > > >>>>>> KIP-307, IMHO it's something we should fix now as > > it > > > > will > > > > > >> be > > > > > >> > the > > > > > >> > > > > >> last > > > > > >> > > > > >>>> piece > > > > > >> > > > > >>>>>> we can provide to give users the ability to > > > completely > > > > > make > > > > > >> > > their > > > > > >> > > > > >>>>>> topologies "upgrade proof" when adding additional > > > > > >> operations. > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>> Thanks again to both of you for comments and I > look > > > > > >> forward to > > > > > >> > > > > >> hearing > > > > > >> > > > > >>>> back > > > > > >> > > > > >>>>>> from you. > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>> Regards, > > > > > >> > > > > >>>>>> Bill > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>> On Thu, Jun 20, 2019 at 2:33 PM Guozhang Wang < > > > > > >> > > wangg...@gmail.com> > > > > > >> > > > > >>>> wrote: > > > > > >> > > > > >>>>>> > > > > > >> > > > > >>>>>>> Hello Bill, > > > > > >> > > > > >>>>>>> > > > > > >> > > > > >>>>>>> Thanks for the KIP. Glad to see that we can > likely > > > > > >> shooting > > > > > >> > two > > > > > >> > > > > >> birds > > > > > >> > > > > >>>>>> with > > > > > >> > > > > >>>>>>> one stone. I have some concerns though about > those > > > > "two > > > > > >> > birds" > > > > > >> > > > > >>>>>> themselves: > > > > > >> > > > > >>>>>>> > > > > > >> > > > > >>>>>>> 1. About not breaking compatibility of > > stream-stream > > > > > join > > > > > >> > > > > >>>> materialized > > > > > >> > > > > >>>>>>> stores: I think this is a valid issue to tackle, > > but > > > > > after > > > > > >> > > > > >> thinking > > > > > >> > > > > >>>> about > > > > > >> > > > > >>>>>>> it once more I'm not sure if exposing > Materialized > > > > would > > > > > >> be a > > > > > >> > > > > >> good > > > > > >> > > > > >>>>>> solution > > > > > >> > > > > >>>>>>> here. My rationles: > > > > > >> > > > > >>>>>>> > > > > > >> > > > > >>>>>>> 1.a For stream-stream join, our current usage of > > > > > >> window-store > > > > > >> > > is > > > > > >> > > > > >> not > > > > > >> > > > > >>>>>> ideal, > > > > > >> > > > > >>>>>>> and we want to modify it in the near future to > be > > > more > > > > > >> > > > > >> efficient. Not > > > > > >> > > > > >>>>>>> allowing users to override such state store > > backend > > > > > gives > > > > > >> us > > > > > >> > > such > > > > > >> > > > > >>>> freedom > > > > > >> > > > > >>>>>>> (which was also considered in the original DSL > > > > design), > > > > > >> > whereas > > > > > >> > > > > >>>> getting a > > > > > >> > > > > >>>>>>> Materialized<WindowStore> basically kicks out > that > > > > > freedom > > > > > >> > out > > > > > >> > > > > >> of the > > > > > >> > > > > >>>>>>> window. > > > > > >> > > > > >>>>>>> 1.b For strema-stream join, in our original > design > > > we > > > > > >> intend > > > > > >> > to > > > > > >> > > > > >>>> "never" > > > > > >> > > > > >>>>>>> want users to query the state, since it is just > > for > > > > > >> buffering > > > > > >> > > the > > > > > >> > > > > >>>>>> upcoming > > > > > >> > > > > >>>>>>> records from the stream. Now I know that some > > users > > > > may > > > > > >> > indeed > > > > > >> > > > > >> want > > > > > >> > > > > >>>> to > > > > > >> > > > > >>>>>>> query it from the debugging perspective, but > > still I > > > > > >> > concerned > > > > > >> > > > > >> about > > > > > >> > > > > >>>>>>> whether leveraging IQ for debugging purposes > would > > > be > > > > > the > > > > > >> > right > > > > > >> > > > > >>>> solution > > > > > >> > > > > >>>>>>> here. And adding Materialized object opens the > > door > > > to > > > > > let > > > > > >> > > users > > > > > >> > > > > >>>> query > > > > > >> > > > > >>>>>>> about it (unless we did something intentionally > to > > > > still > > > > > >> > > forbids > > > > > >> > > > > >> it), > > > > > >> > > > > >>>>>> which > > > > > >> > > > > >>>>>>> also restricts us in the future. > > > > > >> > > > > >>>>>>> > > > > > >> > > > > >>>>>>> 2. About the coupling between > Materialized.name() > > > and > > > > > >> > > queryable: > > > > > >> > > > > >>>> again I > > > > > >> > > > > >>>>>>> think this is a valid issue. But I'm not sure if > > the > > > > > >> current > > > > > >> > > > > >>>>>>> "withQuerryingDisabled / Enabled" at > Materialized > > is > > > > the > > > > > >> best > > > > > >> > > > > >>>> approach. > > > > > >> > > > > >>>>>>> Here I think I agree with John, that generally > > > > speaking > > > > > >> it's > > > > > >> > > > > >> better > > > > > >> > > > > >>>> be a > > > > > >> > > > > >>>>>>> control function on the `KTable` itself, rather > > than > > > > on > > > > > >> > > > > >>>> `Materialized`, > > > > > >> > > > > >>>>>> so > > > > > >> > > > > >>>>>>> fixing it via adding functions through > > > `Materialized` > > > > > >> seems > > > > > >> > > not a > > > > > >> > > > > >>>> natural > > > > > >> > > > > >>>>>>> approach either. > > > > > >> > > > > >>>>>>> > > > > > >> > > > > >>>>>>> > > > > > >> > > > > >>>>>>> Overall, I'm thinking maybe we should still use > > two > > > > > stones > > > > > >> > > rather > > > > > >> > > > > >>>> than > > > > > >> > > > > >>>>>> one > > > > > >> > > > > >>>>>>> to kill these two birds, and probably for this > KIP > > > we > > > > > just > > > > > >> > > focus > > > > > >> > > > > >> on > > > > > >> > > > > >>>> 1) > > > > > >> > > > > >>>>>>> above. And for that I'd like to not expose the > > > > > >> Materialized > > > > > >> > > > > >> either > > > > > >> > > > > >>>> for > > > > > >> > > > > >>>>>>> rationales that I've listed above. Instead, we > > just > > > > > >> restrict > > > > > >> > > > > >> KIP-307 > > > > > >> > > > > >>>> to > > > > > >> > > > > >>>>>> NOT > > > > > >> > > > > >>>>>>> use the Joined.name for state store names and > > always > > > > use > > > > > >> > > internal > > > > > >> > > > > >>>> names > > > > > >> > > > > >>>>>> as > > > > > >> > > > > >>>>>>> well, which admittedly indeed leaves a hole of > not > > > > being > > > > > >> able > > > > > >> > > to > > > > > >> > > > > >>>> cover > > > > > >> > > > > >>>>>> all > > > > > >> > > > > >>>>>>> internal names here, but now I feel this `hole` > > may > > > > > >> better be > > > > > >> > > > > >> filled > > > > > >> > > > > >>>> by, > > > > > >> > > > > >>>>>>> e.g. not creating changelog topics but just use > > the > > > > > >> upstream > > > > > >> > to > > > > > >> > > > > >>>>>>> re-bootstrap the materialized store, more > > > concretely: > > > > > when > > > > > >> > > > > >>>> materializing > > > > > >> > > > > >>>>>>> the store, try to piggy-back the changelog topic > > on > > > an > > > > > >> > existing > > > > > >> > > > > >>>> topic, > > > > > >> > > > > >>>>>> e.g. > > > > > >> > > > > >>>>>>> a) if the stream is coming directly from some > > source > > > > > topic > > > > > >> > > > > >> (including > > > > > >> > > > > >>>>>>> repartition topic), make that as changelog topic > > and > > > > if > > > > > >> it is > > > > > >> > > > > >>>> repartition > > > > > >> > > > > >>>>>>> topic change the retention / data purging policy > > > > > >> necessarily > > > > > >> > as > > > > > >> > > > > >>>> well; b) > > > > > >> > > > > >>>>>> if > > > > > >> > > > > >>>>>>> the stream is coming from some stateless > > operators, > > > > > >> delegate > > > > > >> > > that > > > > > >> > > > > >>>>>> stateless > > > > > >> > > > > >>>>>>> operator to the parent stream similar as a); if > > the > > > > > >> stream is > > > > > >> > > > > >> coming > > > > > >> > > > > >>>>>> from a > > > > > >> > > > > >>>>>>> stream-stream join which is the only stateful > > > operator > > > > > >> that > > > > > >> > can > > > > > >> > > > > >>>> result > > > > > >> > > > > >>>>>> in a > > > > > >> > > > > >>>>>>> stream, consider merging the join into multi-way > > > joins > > > > > >> (yes, > > > > > >> > > > > >> this is > > > > > >> > > > > >>>> a > > > > > >> > > > > >>>>>> very > > > > > >> > > > > >>>>>>> hand-wavy thought, but the point here is that we > > do > > > > not > > > > > >> try > > > > > >> > to > > > > > >> > > > > >>>> tackle it > > > > > >> > > > > >>>>>>> now but leave it for a better solution :). > > > > > >> > > > > >>>>>>> > > > > > >> > > > > >>>>>>> > > > > > >> > > > > >>>>>>> Guozhang > > > > > >> > > > > >>>>>>> > > > > > >> > > > > >>>>>>> > > > > > >> > > > > >>>>>>> > > > > > >> > > > > >>>>>>> On Wed, Jun 19, 2019 at 11:41 AM John Roesler < > > > > > >> > > j...@confluent.io > > > > > >> > > > > >>> > > > > > >> > > > > >>>> wrote: > > > > > >> > > > > >>>>>>> > > > > > >> > > > > >>>>>>>> Hi Bill, > > > > > >> > > > > >>>>>>>> > > > > > >> > > > > >>>>>>>> Thanks for the KIP! Awesome job catching this > > > > > unexpected > > > > > >> > > > > >>>> consequence > > > > > >> > > > > >>>>>>>> of the prior KIPs before it was released. > > > > > >> > > > > >>>>>>>> > > > > > >> > > > > >>>>>>>> The proposal looks good to me. On top of just > > > fixing > > > > > the > > > > > >> > > > > >> problem, > > > > > >> > > > > >>>> it > > > > > >> > > > > >>>>>>>> seems to address two other pain points: > > > > > >> > > > > >>>>>>>> * that naming a s