> Just to clarify I'll update `as(StoreSupplier, StoreSupplier)` to > `with(..., ...)` and change the class name from `StreamJoined` to > `StreamJoin`
Thanks Bill. SGTM. -Matthias On 9/17/19 4:52 PM, aishwarya kumar wrote: > Hi Bill, > > Thanks for clarifying, and the KIP looks great!! > > Best regards, > Aishwarya > > On Tue, Sep 17, 2019, 6:16 PM Bill Bejeck <bbej...@gmail.com> wrote: > >> Hi Aishwarya, >> >> On Tue, Sep 17, 2019 at 1:41 PM aishwarya kumar <ash26...@gmail.com> >> wrote: >> >>> Will this be applicable to Kstream-Ktable joins as well? Or do users >> always >>> materialize these joins explicitly? >>> >> >> No, this change applies to KStream-KStream joins only. With KStream-KTable >> joins KafkaStreams has materialized the KTable already, and we don't need >> to do anything with the KStream instance in this case. >> >> >>> I'm not sure if its even necessary (or makes sense), just trying to >>> understand why the change is applicable to Kstream joins only? >>> >> >> The full details are in the KIP, but in a nutshell, we needed to break the >> naming of the StateStore from `Joined.withName` and provide users a way to >> name the StateStore explicitly. While making the changes, we realized it >> would be beneficial to give users the ability to use different WindowStore >> implementations. The most likely reason to use different WindowStores >> would be to enable in-memory joins. >> >> >>> Best, >>> Aishwarya >>> >> >> Regards, >> Bill >> >> >>> >>> On Tue, Sep 17, 2019 at 4:05 PM Bill Bejeck <bbej...@gmail.com> wrote: >>> >>>> Guozhang, >>>> >>>> Thanks for the comments. >>>> >>>> Yes, you are correct in your assessment regarding names, *if* the users >>>> provide their own StoreSuppliers When constructing as StoreSupplier, >> the >>>> name can't be null, and additionally, there is no way to update the >> name. >>>> Further downstream, the underlying StateStore instances use the >> provided >>>> name for registration so they must be unique. If users don't provide >>>> distinct names for the StoreSuppliers, KafkaStreams will thow a >>>> StreamsException when building the topology. >>>> >>>> Thanks, >>>> Bill >>>> >>>> >>>> >>>> On Tue, Sep 17, 2019 at 9:39 AM Guozhang Wang <wangg...@gmail.com> >>> wrote: >>>> >>>>> Hello Bill, >>>>> >>>>> Thanks for the updated KIP. I made a pass on the StreamJoined >> section. >>>> Just >>>>> a quick question from user's perspective: if a user wants to provide >> a >>>>> customized store-supplier, she is forced to also provide a name via >> the >>>>> store-supplier. So there's no way to say "I want to provide my own >>> store >>>>> engine but let the library decide its name", is that right? >>>>> >>>>> >>>>> Guozhang >>>>> >>>>> >>>>> On Tue, Sep 17, 2019 at 8:53 AM Bill Bejeck <bbej...@gmail.com> >> wrote: >>>>> >>>>>> Bumping this discussion as we need to re-vote before the KIP >>> deadline. >>>>>> >>>>>> On Fri, Sep 13, 2019 at 10:29 AM Bill Bejeck <bbej...@gmail.com> >>>> wrote: >>>>>> >>>>>>> Hi All, >>>>>>> >>>>>>> While working on the implementation of KIP-479, some issues came >> to >>>>> light >>>>>>> that the KIP as written won't work. I have updated the KIP with >> a >>>>>> solution >>>>>>> I believe will solve the original problem as well as address the >>>>>>> impediment to the initial approach. >>>>>>> >>>>>>> This update is a significant change, so please review the updated >>> KIP >>>>>>> >>>>>> >>>>> >>>> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join >>>>>> and >>>>>>> provide feedback. After we conclude the discussion, there will >> be >>> a >>>>>>> re-vote. >>>>>>> >>>>>>> Thanks! >>>>>>> Bill >>>>>>> >>>>>>> On Wed, Jul 17, 2019 at 7:01 PM Guozhang Wang < >> wangg...@gmail.com> >>>>>> wrote: >>>>>>> >>>>>>>> Hi Bill, thanks for your explanations. I'm on board with your >>>> decision >>>>>>>> too. >>>>>>>> >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> On Wed, Jul 17, 2019 at 10:20 AM Bill Bejeck <bbej...@gmail.com >>> >>>>> wrote: >>>>>>>> >>>>>>>>> Thanks for the response, John. >>>>>>>>> >>>>>>>>>> If I can offer my thoughts, it seems better to just document >>> on >>>>> the >>>>>>>>>> Stream join javadoc for the Materialized parameter that it >>> will >>>>> not >>>>>>>>>> make the join result queriable. I'm not opposed to the >>> queriable >>>>>> flag >>>>>>>>>> in general, but introducing it is a much larger >> consideration >>>> that >>>>>> has >>>>>>>>>> previously derailed this KIP discussion. In the interest of >>> just >>>>>>>>>> closing the gap and keeping the API change small, it seems >>>> better >>>>> to >>>>>>>>>> just go with documentation for now. >>>>>>>>> >>>>>>>>> I agree with your statement here. IMHO the most important >> goal >>> of >>>>>> this >>>>>>>> KIP >>>>>>>>> is to not breaking existing users and gain some consistency of >>> the >>>>>> API. >>>>>>>>> >>>>>>>>> I'll update the KIP accordingly. >>>>>>>>> >>>>>>>>> -Bill >>>>>>>>> >>>>>>>>> On Tue, Jul 16, 2019 at 11:55 AM John Roesler < >>> j...@confluent.io> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Bill, >>>>>>>>>> >>>>>>>>>> Thanks for driving this KIP toward a conclusion. I'm on >> board >>>> with >>>>>>>>>> your decision. >>>>>>>>>> >>>>>>>>>> You didn't mention whether you're still proposing to add the >>>>>>>>>> "queriable" flag to the Materialized config object, or just >>>>> document >>>>>>>>>> that a Stream join is never queriable. Both options have >> come >>> up >>>>>>>>>> earlier in the discussion, so it would be good to pin this >>> down. >>>>>>>>>> >>>>>>>>>> If I can offer my thoughts, it seems better to just document >>> on >>>>> the >>>>>>>>>> Stream join javadoc for the Materialized parameter that it >>> will >>>>> not >>>>>>>>>> make the join result queriable. I'm not opposed to the >>> queriable >>>>>> flag >>>>>>>>>> in general, but introducing it is a much larger >> consideration >>>> that >>>>>> has >>>>>>>>>> previously derailed this KIP discussion. In the interest of >>> just >>>>>>>>>> closing the gap and keeping the API change small, it seems >>>> better >>>>> to >>>>>>>>>> just go with documentation for now. >>>>>>>>>> >>>>>>>>>> Thanks again, >>>>>>>>>> -John >>>>>>>>>> >>>>>>>>>> On Thu, Jul 11, 2019 at 2:45 PM Bill Bejeck < >>> bbej...@gmail.com> >>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Thanks all for the great discussion so far. >>>>>>>>>>> >>>>>>>>>>> Everyone has made excellent points, and I appreciate the >>>> detail >>>>>>>>> everyone >>>>>>>>>>> has put into their arguments. >>>>>>>>>>> >>>>>>>>>>> However, after carefully evaluating all the points made so >>>> far, >>>>>>>>> creating >>>>>>>>>> an >>>>>>>>>>> overload with Materialized is still my #1 option. >>>>>>>>>>> My reasoning for saying so is two-fold: >>>>>>>>>>> >>>>>>>>>>> 1. It's a small change, and IMHO since it's consistent >>> with >>>>> our >>>>>>>>>> current >>>>>>>>>>> API concerning state store usage, the cognitive load on >>>> users >>>>>>>> will >>>>>>>>> be >>>>>>>>>>> minimal. >>>>>>>>>>> 2. It achieves the most important goal of this KIP, >>> namely >>>> to >>>>>>>> close >>>>>>>>>> the >>>>>>>>>>> gap of naming state stores independently of the join >>>> operator >>>>>>>> name. >>>>>>>>>>> >>>>>>>>>>> Additionally, I agree with the points made by Matthias >>> earlier >>>>> (I >>>>>>>>> realize >>>>>>>>>>> there is some overlap here). >>>>>>>>>>> >>>>>>>>>>>> - the main purpose of this KIP is to close the naming >> gap >>>>> what >>>>>> we >>>>>>>>>> achieve >>>>>>>>>>>> - we can allow people to use the new in-memory store >>>>>>>>>>>> - we allow people to enable/disable caching >>>>>>>>>>>> - we unify the API >>>>>>>>>>>> - we decouple querying from naming >>>>>>>>>>>> - it's a small API change >>>>>>>>>>> >>>>>>>>>>> Although it's not a perfect solution, IMHO the positives >> of >>>>> using >>>>>>>>>>> Materialize far outweigh the negatives, and from what >> we've >>>>>>>> discussed >>>>>>>>> so >>>>>>>>>>> far, anything we implement seems to involve an additional >>>> change >>>>>>>> down >>>>>>>>> the >>>>>>>>>>> road. >>>>>>>>>>> >>>>>>>>>>> If others are still strongly opposed to using >> Materialized, >>> my >>>>>> other >>>>>>>>>>> preferences would be >>>>>>>>>>> >>>>>>>>>>> 1. Add a "withStoreName" to Joined. Although I agree >>> with >>>>>>>> Guozhang >>>>>>>>>> that >>>>>>>>>>> having a parameter that only applies to one use-case >>> would >>>> be >>>>>>>>> clumsy. >>>>>>>>>>> 2. Add a String overload for naming the store, but this >>>> would >>>>>> be >>>>>>>> my >>>>>>>>>>> least favorite option as IMHO this seems to be a step >>>>> backward >>>>>>>> from >>>>>>>>>> why we >>>>>>>>>>> introduced configuration objects in the first place. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Bill >>>>>>>>>>> >>>>>>>>>>> On Thu, Jun 27, 2019 at 4:45 PM Matthias J. Sax < >>>>>>>> matth...@confluent.io >>>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thanks for the KIP Bill! >>>>>>>>>>>> >>>>>>>>>>>> Great discussion to far. >>>>>>>>>>>> >>>>>>>>>>>> About John's idea about querying upstream stores and >> don't >>>>>>>>> materialize >>>>>>>>>> a >>>>>>>>>>>> store: I agree with Bill that this seems to be an >>> orthogonal >>>>>>>>> question, >>>>>>>>>>>> and it might be better to treat it as an independent >>>>>> optimization >>>>>>>> and >>>>>>>>>>>> exclude from this KIP. >>>>>>>>>>>> >>>>>>>>>>>>> What should be the behavior if there is no store >>>>>>>>>>>>> configured (e.g., if Materialized with only serdes) >> and >>>>>>>> querying is >>>>>>>>>>>>> enabled? >>>>>>>>>>>> >>>>>>>>>>>> IMHO, this could be an error case. If one wants to >> query a >>>>>> store, >>>>>>>>> they >>>>>>>>>>>> need to provide a name -- if you don't know the name, >> how >>>>> would >>>>>>>> you >>>>>>>>>>>> actually query the store (even if it would be possible >> to >>>> get >>>>>> the >>>>>>>>> name >>>>>>>>>>>> from the `TopologyDescription`, it seems clumsy). >>>>>>>>>>>> >>>>>>>>>>>> If we don't want to throw an error, materializing seems >> to >>>> be >>>>>> the >>>>>>>>> right >>>>>>>>>>>> option, to exclude "query optimization" from this KIP. I >>>> would >>>>>> be >>>>>>>> ok >>>>>>>>>>>> with this option, even if it's clumsy to get the name >> from >>>>>>>>>>>> `TopologyDescription`; hence, I would prefer to treat it >>> as >>>> an >>>>>>>> error. >>>>>>>>>>>> >>>>>>>>>>>>> To get back to the current behavior, users would have >> to >>>>>>>>>>>>> add a "bytes store supplier" to the Materialized to >>>> indicate >>>>>>>> that, >>>>>>>>>>>>> yes, they really want a state store there. >>>>>>>>>>>> >>>>>>>>>>>> This sound like a quite subtle semantic difference on >> how >>> to >>>>> use >>>>>>>> the >>>>>>>>>>>> API. Might be hard to explain to users. I would prefer >> to >>>> not >>>>>>>>>> introduce it. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> About Guozhang's points: >>>>>>>>>>>> >>>>>>>>>>>> 1a) That is actually a good point. However, I believe we >>>>> cannot >>>>>>>> get >>>>>>>>>>>> around this issue easily, and it seems ok to me, to >> expose >>>> the >>>>>>>> actual >>>>>>>>>>>> store type we are using. (More thoughts later.) >>>>>>>>>>>> >>>>>>>>>>>> 1b) I don't see an issue with allowing users to query >> all >>>>>> stores? >>>>>>>>> What >>>>>>>>>>>> is the rational behind it? What do we gain by not >> allowing >>>> it? >>>>>>>>>>>> >>>>>>>>>>>> 2) While I understand what you are saying, we also >>> want/need >>>>> to >>>>>>>> have >>>>>>>>> a >>>>>>>>>>>> way in the PAPI to allow users adding "internal/private" >>>>>>>>> non-queryable >>>>>>>>>>>> stores to a topology. That's possible via >>>>>>>>>>>> `Materialized#withQueryingDisabled()`. We could also >>> update >>>>>>>>>>>> `Topology#addStateStore(StoreBuilder, boolean >> isQueryable, >>>>>>>>> String...)` >>>>>>>>>>>> to address this. Again, I agree with Bill that the >> current >>>> API >>>>>> is >>>>>>>>> built >>>>>>>>>>>> in a certain way, and if we want to change it, it should >>> be >>>> a >>>>>>>>> separate >>>>>>>>>>>> KIP, as it seems to be an orthogonal concern. >>>>>>>>>>>> >>>>>>>>>>>>> Instead, we just restrict KIP-307 to NOT >>>>>>>>>>>>> use the Joined.name for state store names and always >> use >>>>>>>> internal >>>>>>>>>> names >>>>>>>>>>>> as >>>>>>>>>>>>> well, which admittedly indeed leaves a hole of not >> being >>>>> able >>>>>> to >>>>>>>>>> cover >>>>>>>>>>>> all >>>>>>>>>>>>> internal names here >>>>>>>>>>>> >>>>>>>>>>>> I think it's important to close this gap. Naming >> entities >>>>> seems >>>>>>>> to a >>>>>>>>>>>> binary feature: if there is a gap, the feature is more >> or >>>> less >>>>>>>>> useless, >>>>>>>>>>>> rendering KIP-307 void. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> I like John's detailed list of required features and >> what >>>>>>>>>>>> Materialized/WindowByteStoreSuppliers offers. My take >> is, >>>> that >>>>>>>> adding >>>>>>>>>>>> Materialized including the required run-time checks is >> the >>>>> best >>>>>>>>> option >>>>>>>>>>>> we have, for the following reasons: >>>>>>>>>>>> >>>>>>>>>>>> - the main purpose of this KIP is to close the naming >> gap >>>>> what >>>>>> we >>>>>>>>>> achieve >>>>>>>>>>>> - we can allow people to use the new in-memory store >>>>>>>>>>>> - we allow people to enable/disable caching >>>>>>>>>>>> - we unify the API >>>>>>>>>>>> - we decouple querying from naming >>>>>>>>>>>> - it's a small API change >>>>>>>>>>>> >>>>>>>>>>>> Adding an overload and only passing in a name, would >>> address >>>>> the >>>>>>>> main >>>>>>>>>>>> purpose of the KIP. However, it falls short on all the >>> other >>>>>>>>> "goodies". >>>>>>>>>>>> As you mentioned, passing in `Materialized` might not be >>>>> perfect >>>>>>>> and >>>>>>>>>>>> maybe we need to deprecate is at some point; but this is >>>> also >>>>>> true >>>>>>>>> for >>>>>>>>>>>> passing in just a name. >>>>>>>>>>>> >>>>>>>>>>>> I am also not convinced, that a `StreamJoinStore` would >>>>> resolve >>>>>>>> all >>>>>>>>> the >>>>>>>>>>>> issues. In the end, as long as we are using a >>>> `WindowedStore` >>>>>>>>>>>> internally, we need to expose this "implemenation >> detail" >>> to >>>>>>>> users to >>>>>>>>>>>> allow them to plug in a custom store. Adding >>> `Materialized` >>>>> seem >>>>>>>> to >>>>>>>>> be >>>>>>>>>>>> the best short-term fix from my point of view. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -Matthias >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 6/27/19 9:56 AM, Guozhang Wang wrote: >>>>>>>>>>>>> Hi John, >>>>>>>>>>>>> >>>>>>>>>>>>> I actually feels better about a new interface but I'm >>> not >>>>> sure >>>>>>>> if >>>>>>>>> we >>>>>>>>>>>> would >>>>>>>>>>>>> need the full configuration of store / log / cache, >> now >>> or >>>>> in >>>>>>>> the >>>>>>>>>> future >>>>>>>>>>>>> ever for stream-stream join. >>>>>>>>>>>>> >>>>>>>>>>>>> Right now I feel that 1) we want to improve our >>>>> implementation >>>>>>>> of >>>>>>>>>>>>> stream-stream join, and potentially also allow users >> to >>>>>>>> customize >>>>>>>>>> this >>>>>>>>>>>>> implementation but with a more suitable interface than >>> the >>>>>>>> current >>>>>>>>>>>>> WindowStore interface, how to do that is less clear >> and >>>>>>>>>> execution-wise >>>>>>>>>>>> it's >>>>>>>>>>>>> (arguably..) not urgent; 2) we want to close the last >>> gap >>>>>>>>>> (Stream-stream >>>>>>>>>>>>> join) of allowing users to specify all internal names >> to >>>>> help >>>>>> on >>>>>>>>>> backward >>>>>>>>>>>>> compatibility, which is urgent. >>>>>>>>>>>>> >>>>>>>>>>>>> Therefore if we want to unblock 2) from 1) in the near >>>>> term, I >>>>>>>> feel >>>>>>>>>>>>> slightly inclined to just add overload functions that >>>> takes >>>>>> in a >>>>>>>>>> store >>>>>>>>>>>> name >>>>>>>>>>>>> for stream-stream joins only -- and admittedly, in the >>>>> future >>>>>>>> this >>>>>>>>>>>> function >>>>>>>>>>>>> maybe deprecated -- i.e. if we have to do something >> that >>>> we >>>>>> "may >>>>>>>>>> regret" >>>>>>>>>>>> in >>>>>>>>>>>>> the future, I'd like to pick the least intrusive >> option. >>>>>>>>>>>>> >>>>>>>>>>>>> About `Joined#withStoreName`: since the Joined class >>>> itself >>>>> is >>>>>>>> also >>>>>>>>>> used >>>>>>>>>>>> in >>>>>>>>>>>>> other join types, I feel less comfortable to have a >>>>>>>>>>>> `Joined#withStoreName` >>>>>>>>>>>>> which is only going to be used by stream-stream join. >> Or >>>>>> maybe I >>>>>>>>> miss >>>>>>>>>>>>> something here about the "latter" case that you are >>>>> referring >>>>>>>> to? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Guozhang >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Jun 24, 2019 at 12:16 PM John Roesler < >>>>>>>> j...@confluent.io> >>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks Guozhang, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Yep. Maybe we can consider just exactly what the join >>>>> needs: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> the WindowStore<Bytes, byte[]> itself is fine, if >>> overly >>>>>>>> broad, >>>>>>>>>>>>>>> since the only two methods we need are >>> `window.put(key, >>>>>> value, >>>>>>>>>>>>>>> context().timestamp())` and `WindowStoreIterator<V2> >>>> iter >>>>> = >>>>>>>>>>>>>>> window.fetch(key, timeFrom, timeTo)`. >>>>>>>>>>>>>> >>>>>>>>>>>>>> One "middle ground" would be to extract _this_ into a >>> new >>>>>> store >>>>>>>>>>>>>> interface, which only supports these API calls, like >>>>>>>>>>>>>> StreamJoinStore<K, V>. This would give us the >> latitude >>> we >>>>>> need >>>>>>>> to >>>>>>>>>>>>>> efficiently support the exact operation without >>>> concerning >>>>>>>>> ourselves >>>>>>>>>>>>>> with all the other things a WindowStore can do (which >>> are >>>>>>>>>> unreachable >>>>>>>>>>>>>> for the join use case). It would also let us drop >>> "store >>>>>>>>> duplicates" >>>>>>>>>>>>>> from the main WindowStore interface, since it only >>> exists >>>>> to >>>>>>>>> support >>>>>>>>>>>>>> the join use case. >>>>>>>>>>>>>> >>>>>>>>>>>>>> If we were to add a new StreamJoinStore interface, >> then >>>>> it'd >>>>>> be >>>>>>>>>>>>>> straightforward how we could add also >>>>>>>>>>>>>> `Materialized.as(StreamJoinBytesStoreSupplier)` and >> use >>>>>>>>>> Materialized, >>>>>>>>>>>>>> or alternatively add the ability to set the bytes >> store >>>> on >>>>>>>> Joined. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Personally, I'm kind of leaning toward the latter >> (and >>>> also >>>>>>>> doing >>>>>>>>>>>>>> `Joined#withStoreName`), since adding the new >> interface >>>> to >>>>>>>>>>>>>> Materialized then also pollutes the interface for its >>>>>> _actual_ >>>>>>>> use >>>>>>>>>>>>>> case of materializing a table view. Of course, to >> solve >>>> the >>>>>>>>>> immediate >>>>>>>>>>>>>> problem, all we need is the store name, but we might >>> feel >>>>>>>> better >>>>>>>>>> about >>>>>>>>>>>>>> adding the store name to Joined if we _also_ feel >> like >>> in >>>>> the >>>>>>>>>> future, >>>>>>>>>>>>>> we would add store/log/cache configuration to Joined >> as >>>>> well. >>>>>>>>>>>>>> >>>>>>>>>>>>>> -John >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Jun 24, 2019 at 12:56 PM Guozhang Wang < >>>>>>>>> wangg...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hello John, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> My main concern is exactly the first point at the >>> bottom >>>>> of >>>>>>>> your >>>>>>>>>>>> analysis >>>>>>>>>>>>>>> here: "* configure the bytes store". I'm not sure if >>>>> using a >>>>>>>>> window >>>>>>>>>>>> bytes >>>>>>>>>>>>>>> store would be ideal for stream-stream windowed >> join; >>>> e.g. >>>>>> we >>>>>>>>> could >>>>>>>>>>>>>>> consider two dimensional list sorted by timestamps >> and >>>>> then >>>>>> by >>>>>>>>>> keys to >>>>>>>>>>>> do >>>>>>>>>>>>>>> the join, whereas a windowed bytes store is >> basically >>>>> sorted >>>>>>>> by >>>>>>>>> key >>>>>>>>>>>>>> first, >>>>>>>>>>>>>>> then by timestamp. If we expose the Materialized to >>> let >>>>> user >>>>>>>> pass >>>>>>>>>> in a >>>>>>>>>>>>>>> windowed bytes store, then we would need to change >>> that >>>> if >>>>>> we >>>>>>>>> want >>>>>>>>>> to >>>>>>>>>>>>>>> replace it with a different implementation >> interface. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mon, Jun 24, 2019 at 8:59 AM John Roesler < >>>>>>>> j...@confluent.io> >>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hey Guozhang and Bill, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> For what it's worth, I agree with you both! >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I think it might help the discussion to look >>> concretely >>>>> at >>>>>>>> what >>>>>>>>>>>>>>>> Materialized does: >>>>>>>>>>>>>>>> * set a WindowBytesStoreSupplier >>>>>>>>>>>>>>>> * set a name >>>>>>>>>>>>>>>> * set the key/value serdes >>>>>>>>>>>>>>>> * disable/enable/configure change-logging >>>>>>>>>>>>>>>> * disable/enable caching >>>>>>>>>>>>>>>> * configure retention >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Further, looking into the WindowBytesStoreSupplier, >>> the >>>>>>>>> interface >>>>>>>>>> lets >>>>>>>>>>>>>> you: >>>>>>>>>>>>>>>> * get the segment interval >>>>>>>>>>>>>>>> * get the window size >>>>>>>>>>>>>>>> * get whether "duplicates" are enabled >>>>>>>>>>>>>>>> * get the retention period >>>>>>>>>>>>>>>> * (obviously) get a WindowStore<Bytes, byte[]> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> We know that Materialized isn't exactly what we >> need >>>> for >>>>>>>> stream >>>>>>>>>> joins, >>>>>>>>>>>>>>>> but we can see how close Materialized is to what we >>>> need. >>>>>> If >>>>>>>> it >>>>>>>>> is >>>>>>>>>>>>>>>> close, maybe we can use it and document the gaps, >> and >>>> if >>>>> it >>>>>>>> is >>>>>>>>> not >>>>>>>>>>>>>>>> close, then maybe we should just add what we need >> to >>>>>> Joined. >>>>>>>>>>>>>>>> Stream Join's requirements for its stores: >>>>>>>>>>>>>>>> * a multimap store (i.e., it keeps duplicates) for >>>>> storing >>>>>>>>> general >>>>>>>>>>>>>>>> (not windowed) keyed records associated with their >>>>>> insertion >>>>>>>>>> time, and >>>>>>>>>>>>>>>> allows efficient time-bounded lookups and also >>>> efficient >>>>>>>> purges >>>>>>>>>> of old >>>>>>>>>>>>>>>> data. >>>>>>>>>>>>>>>> ** Note, a properly configured >>> WindowBytesStoreSupplier >>>>>>>>> satisfies >>>>>>>>>> this >>>>>>>>>>>>>>>> requirement, and the interface supports the queries >>> we >>>>> need >>>>>>>> to >>>>>>>>>> verify >>>>>>>>>>>>>>>> the configuration at run-time >>>>>>>>>>>>>>>> * set a name for the store >>>>>>>>>>>>>>>> * do _not_ set the serdes (they are already set in >>>>> Joined) >>>>>>>>>>>>>>>> * logging could be configurable (set to enabled >> now) >>>>>>>>>>>>>>>> * caching could be configurable (set to enabled >> now) >>>>>>>>>>>>>>>> * do _not_ configure retention (determined by >>>>> JoinWindows) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> So, out of six capabilities for Materialized, there >>> are >>>>> two >>>>>>>> we >>>>>>>>>> don't >>>>>>>>>>>>>>>> want (serdes and retention). These would become >>>> run-time >>>>>>>> checks >>>>>>>>>> if we >>>>>>>>>>>>>>>> use it. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> A third questionable capability is to provide a >>>>>>>>>>>>>>>> WindowBytesStoreSupplier. Looking at whether the >>>>>>>>>>>>>>>> WindowBytesStoreSupplier is the right interface for >>>>> Stream >>>>>>>> Join: >>>>>>>>>>>>>>>> * configuring segment interval is fine >>>>>>>>>>>>>>>> * should _not_ configure window size (it's >> determined >>>> by >>>>>>>>>> JoinWindows) >>>>>>>>>>>>>>>> * duplicates _must_ be enabled >>>>>>>>>>>>>>>> * retention should be _at least_ windowSize + >>>>> gracePeriod, >>>>>>>> but >>>>>>>>>> note >>>>>>>>>>>>>>>> that (unlike for Table window stores) there is no >>>> utility >>>>>> in >>>>>>>>>> having a >>>>>>>>>>>>>>>> longer retention time. >>>>>>>>>>>>>>>> * the WindowStore<Bytes, byte[]> itself is fine, if >>>>> overly >>>>>>>>> broad, >>>>>>>>>>>>>>>> since the only two methods we need are >>> `window.put(key, >>>>>>>> value, >>>>>>>>>>>>>>>> context().timestamp())` and >> `WindowStoreIterator<V2> >>>>> iter = >>>>>>>>>>>>>>>> window.fetch(key, timeFrom, timeTo)`. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thus, flattening out the overlap for >>>>>> WindowBytesStoreSupplier >>>>>>>>>> onto the >>>>>>>>>>>>>>>> overlap for Materialized, we have 9 capabilities >>> total >>>>>> (note >>>>>>>>>> retention >>>>>>>>>>>>>>>> is duplicated), we have 4 that we don't want: >>>>>>>>>>>>>>>> * do _not_ set the serdes (they are already set in >>>>> Joined) >>>>>>>>>>>>>>>> * do _not_ configure retention (determined by >>>>> JoinWindows) >>>>>>>>>>>>>>>> * should _not_ configure window size (it's >> determined >>>> by >>>>>>>>>> JoinWindows) >>>>>>>>>>>>>>>> * duplicates _must_ be enabled >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> These gaps would have to be covered with run-time >>>> checks >>>>> if >>>>>>>> we >>>>>>>>>> re-use >>>>>>>>>>>>>>>> Materialized and WindowStoreBytesStoreSupplier >> both. >>>>> Maybe >>>>>>>> this >>>>>>>>>> sounds >>>>>>>>>>>>>>>> bad, but consider the other side, that we get 5 new >>>>>>>> capabilities >>>>>>>>>> we >>>>>>>>>>>>>>>> don't require, but are still pretty nice: >>>>>>>>>>>>>>>> * configure the bytes store >>>>>>>>>>>>>>>> * set a name for the store >>>>>>>>>>>>>>>> * configure caching >>>>>>>>>>>>>>>> * configure logging >>>>>>>>>>>>>>>> * configure segment interval >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Not sure where this nets us out, but it's food for >>>>> thought. >>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sun, Jun 23, 2019 at 7:52 PM Guozhang Wang < >>>>>>>>> wangg...@gmail.com >>>>>>>>>>> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Bill, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I think by giving a Materialized param into >>>>> stream-stream >>>>>>>> join, >>>>>>>>>> it's >>>>>>>>>>>>>> okay >>>>>>>>>>>>>>>>> (though still ideal) to say "we still would not >>> expose >>>>> the >>>>>>>>> store >>>>>>>>>> for >>>>>>>>>>>>>>>>> queries", but it would sound a bit awkward to say >>> "we >>>>>> would >>>>>>>>> also >>>>>>>>>>>>>> ignore >>>>>>>>>>>>>>>>> whatever the passed in store supplier but just use >>> our >>>>>>>> default >>>>>>>>>> ones" >>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>> again the concern is that, if in the future we'd >>> want >>>> to >>>>>>>> change >>>>>>>>>> the >>>>>>>>>>>>>>>> default >>>>>>>>>>>>>>>>> implementation of join algorithm which no longer >>> rely >>>>> on a >>>>>>>>> window >>>>>>>>>>>>>> store >>>>>>>>>>>>>>>>> with deduping enabled, then we need to change this >>> API >>>>>>>> again by >>>>>>>>>>>>>> changing >>>>>>>>>>>>>>>>> the store supplier type. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> If we do want to fill this hole for stream-stream >>>> join, >>>>> I >>>>>>>> feel >>>>>>>>>> just >>>>>>>>>>>>>>>> adding >>>>>>>>>>>>>>>>> a String typed store-name would even be less >>>>>>>> future-intrusive >>>>>>>>> if >>>>>>>>>> we >>>>>>>>>>>>>>>> expect >>>>>>>>>>>>>>>>> this parameter to be modified later. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Does that makes sense? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sat, Jun 22, 2019 at 12:51 PM Bill Bejeck < >>>>>>>>> bbej...@gmail.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for the comments John and Guozhang, I'll >>>> address >>>>>>>> each >>>>>>>>>> one of >>>>>>>>>>>>>>>> your >>>>>>>>>>>>>>>>>> comments in turn. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> John, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I'm wondering about a missing quadrant from the >>>> truth >>>>>>>> table >>>>>>>>>>>>>> involving >>>>>>>>>>>>>>>>>>> whether a Materialized is stored or not and >>> querying >>>>> is >>>>>>>>>>>>>>>>>>> enabled/disabled... What should be the behavior >> if >>>>> there >>>>>>>> is >>>>>>>>> no >>>>>>>>>>>>>> store >>>>>>>>>>>>>>>>>>> configured (e.g., if Materialized with only >>> serdes) >>>>> and >>>>>>>>>> querying >>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>> enabled? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> It seems we have two choices: >>>>>>>>>>>>>>>>>>> 1. we can force creation of a state store in >> this >>>>> case, >>>>>> so >>>>>>>>> the >>>>>>>>>>>>>> store >>>>>>>>>>>>>>>>>>> can be used to serve the queries >>>>>>>>>>>>>>>>>>> 2. we can provide just a queriable view, >> basically >>>>>>>> letting IQ >>>>>>>>>>>>>> query >>>>>>>>>>>>>>>>>>> into the "KTableValueGetter", which would >>>>> transparently >>>>>>>>>>>>>> construct the >>>>>>>>>>>>>>>>>>> query response by applying the operator logic to >>> the >>>>>>>> upstream >>>>>>>>>>>>>> state >>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>> the operator state isn't already stored. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I agree with your assertion about a missing >>> quadrant >>>>> from >>>>>>>> the >>>>>>>>>> truth >>>>>>>>>>>>>>>> table. >>>>>>>>>>>>>>>>>> Additionally, I too like the concept of a >> queriable >>>>> view. >>>>>>>>> But I >>>>>>>>>>>>>> think >>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>> goes a bit beyond the scope of this KIP and would >>>> like >>>>> to >>>>>>>>> pursue >>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>> feature as follow-on work. Also thinking about >>> this >>>>> KIP >>>>>>>> some >>>>>>>>>>>>>> more, I'm >>>>>>>>>>>>>>>>>> thinking of the changes to Materialized might be >> a >>>>> reach >>>>>> as >>>>>>>>>> well. >>>>>>>>>>>>>>>>>> Separating the naming from a store and its >>> queryable >>>>>> state >>>>>>>>> seems >>>>>>>>>>>>>> like a >>>>>>>>>>>>>>>>>> complex issue in and of itself and should be >>> treated >>>>>>>>>> accordingly. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> So here's what I'm thinking now. We add >>> Materialzied >>>>> to >>>>>>>> Join, >>>>>>>>>> but >>>>>>>>>>>>>> for >>>>>>>>>>>>>>>> now, >>>>>>>>>>>>>>>>>> we internally disable querying. I know this >> breaks >>>> our >>>>>>>>> current >>>>>>>>>>>>>>>> semantic >>>>>>>>>>>>>>>>>> approach, but I think it's crucial that we do two >>>>> things >>>>>> in >>>>>>>>> this >>>>>>>>>>>>>> KIP >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 1. Break the naming of the state stores from >>>> Joined >>>>> to >>>>>>>>>>>>>>>> Materialized, so >>>>>>>>>>>>>>>>>> the naming of state stores follows our current >>>>> pattern >>>>>>>> and >>>>>>>>>>>>>> enables >>>>>>>>>>>>>>>>>> upgrades >>>>>>>>>>>>>>>>>> from 2.3 to 2.4 >>>>>>>>>>>>>>>>>> 2. Offer the ability to configure the state >>> stores >>>>> of >>>>>>>> the >>>>>>>>>> join, >>>>>>>>>>>>>> even >>>>>>>>>>>>>>>>>> providing a different implementation (i.e. >>>>> in-memory) >>>>>> if >>>>>>>>>>>>>> desired. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> With that in mind I'm considering changing the >> KIP >>> to >>>>>>>> remove >>>>>>>>> the >>>>>>>>>>>>>>>> changes to >>>>>>>>>>>>>>>>>> Materialized, and we document very clearly that >> by >>>>>>>> providing a >>>>>>>>>>>>>>>> Materialized >>>>>>>>>>>>>>>>>> object with a name is only for naming the state >>>> store, >>>>>>>> hence >>>>>>>>> the >>>>>>>>>>>>>>>> changelog >>>>>>>>>>>>>>>>>> topics and any possible configurations of the >>> store, >>>>> but >>>>>>>> this >>>>>>>>>> store >>>>>>>>>>>>>>>> *will >>>>>>>>>>>>>>>>>> not be available for IQ.* >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> WDYT? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Guozhang, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 1. About not breaking compatibility of >>> stream-stream >>>>>> join >>>>>>>>>>>>>>>> materialized >>>>>>>>>>>>>>>>>>> stores: I think this is a valid issue to tackle, >>> but >>>>>> after >>>>>>>>>>>>>> thinking >>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>> it once more I'm not sure if exposing >> Materialized >>>>> would >>>>>>>> be a >>>>>>>>>>>>>> good >>>>>>>>>>>>>>>>>> solution >>>>>>>>>>>>>>>>>>> here. My rationles: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 1.a For stream-stream join, our current usage of >>>>>>>> window-store >>>>>>>>>> is >>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>> ideal, >>>>>>>>>>>>>>>>>>> and we want to modify it in the near future to >> be >>>> more >>>>>>>>>>>>>> efficient. Not >>>>>>>>>>>>>>>>>>> allowing users to override such state store >>> backend >>>>>> gives >>>>>>>> us >>>>>>>>>> such >>>>>>>>>>>>>>>> freedom >>>>>>>>>>>>>>>>>>> (which was also considered in the original DSL >>>>> design), >>>>>>>>> whereas >>>>>>>>>>>>>>>> getting a >>>>>>>>>>>>>>>>>>> Materialized<WindowStore> basically kicks out >> that >>>>>> freedom >>>>>>>>> out >>>>>>>>>>>>>> of the >>>>>>>>>>>>>>>>>>> window. >>>>>>>>>>>>>>>>>>> 1.b For strema-stream join, in our original >> design >>>> we >>>>>>>> intend >>>>>>>>> to >>>>>>>>>>>>>>>> "never" >>>>>>>>>>>>>>>>>>> want users to query the state, since it is just >>> for >>>>>>>> buffering >>>>>>>>>> the >>>>>>>>>>>>>>>>>> upcoming >>>>>>>>>>>>>>>>>>> records from the stream. Now I know that some >>> users >>>>> may >>>>>>>>> indeed >>>>>>>>>>>>>> want >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> query it from the debugging perspective, but >>> still I >>>>>>>>> concerned >>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>> whether leveraging IQ for debugging purposes >> would >>>> be >>>>>> the >>>>>>>>> right >>>>>>>>>>>>>>>> solution >>>>>>>>>>>>>>>>>>> here. And adding Materialized object opens the >>> door >>>> to >>>>>> let >>>>>>>>>> users >>>>>>>>>>>>>>>> query >>>>>>>>>>>>>>>>>>> about it (unless we did something intentionally >> to >>>>> still >>>>>>>>>> forbids >>>>>>>>>>>>>> it), >>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>> also restricts us in the future. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 2. About the coupling between >> Materialized.name() >>>> and >>>>>>>>>> queryable: >>>>>>>>>>>>>>>> again I >>>>>>>>>>>>>>>>>>> think this is a valid issue. But I'm not sure if >>> the >>>>>>>> current >>>>>>>>>>>>>>>>>>> "withQuerryingDisabled / Enabled" at >> Materialized >>> is >>>>> the >>>>>>>> best >>>>>>>>>>>>>>>> approach. >>>>>>>>>>>>>>>>>>> Here I think I agree with John, that generally >>>>> speaking >>>>>>>> it's >>>>>>>>>>>>>> better >>>>>>>>>>>>>>>> be a >>>>>>>>>>>>>>>>>>> control function on the `KTable` itself, rather >>> than >>>>> on >>>>>>>>>>>>>>>> `Materialized`, >>>>>>>>>>>>>>>>>> so >>>>>>>>>>>>>>>>>>> fixing it via adding functions through >>>> `Materialized` >>>>>>>> seems >>>>>>>>>> not a >>>>>>>>>>>>>>>> natural >>>>>>>>>>>>>>>>>> approach either. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I understand your thoughts here, and up to a >>> point, I >>>>>> agree >>>>>>>>> with >>>>>>>>>>>>>> you. >>>>>>>>>>>>>>>>>> But concerning not providing Materialized as it >> may >>>>>>>> restrict >>>>>>>>> us >>>>>>>>>> in >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> future for delivering different implementations, >>> I'm >>>>>>>> wondering >>>>>>>>>> if >>>>>>>>>>>>>> we >>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>> doing some premature optimization here. >>>>>>>>>>>>>>>>>> My rationale for saying so >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 1. I think the cost of not allowing the naming >>> of >>>>>> state >>>>>>>>>> stores >>>>>>>>>>>>>> for >>>>>>>>>>>>>>>> joins >>>>>>>>>>>>>>>>>> is too big of a gap to leave. IMHO for joins >>> to >>>>>> follow >>>>>>>>> the >>>>>>>>>>>>>> current >>>>>>>>>>>>>>>>>> pattern of using Materialized for naming state >>>>> stores >>>>>>>> would >>>>>>>>>> be >>>>>>>>>>>>>> what >>>>>>>>>>>>>>>> most >>>>>>>>>>>>>>>>>> users would expect to use. As I said in my >>>> comments >>>>>>>>> above, I >>>>>>>>>>>>>> think >>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>> should *not include* the changes to >> Materialized >>>> and >>>>>>>>> enforce >>>>>>>>>>>>>> named >>>>>>>>>>>>>>>>>> stores for joins as unavailable for IQ. >>>>>>>>>>>>>>>>>> 2. We'll still have the join methods available >>>>>> without a >>>>>>>>>>>>>>>> Materialized >>>>>>>>>>>>>>>>>> allowing us to do something different >> internally >>>> if >>>>> a >>>>>>>>>>>>>> Materialized >>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>> provided. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Overall, I'm thinking maybe we should still use >>> two >>>>>> stones >>>>>>>>>> rather >>>>>>>>>>>>>>>> than >>>>>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>> to kill these two birds, and probably for this >> KIP >>>> we >>>>>> just >>>>>>>>>> focus >>>>>>>>>>>>>> on >>>>>>>>>>>>>>>> 1) >>>>>>>>>>>>>>>>>>> above. And for that I'd like to not expose the >>>>>>>> Materialized >>>>>>>>>>>>>> either >>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>> rationales that I've listed above. Instead, we >>> just >>>>>>>> restrict >>>>>>>>>>>>>> KIP-307 >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> NOT >>>>>>>>>>>>>>>>>>> use the Joined.name for state store names and >>> always >>>>> use >>>>>>>>>> internal >>>>>>>>>>>>>>>> names >>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>> well, which admittedly indeed leaves a hole of >> not >>>>> being >>>>>>>> able >>>>>>>>>> to >>>>>>>>>>>>>>>> cover >>>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>> internal names here, but now I feel this `hole` >>> may >>>>>>>> better be >>>>>>>>>>>>>> filled >>>>>>>>>>>>>>>> by, >>>>>>>>>>>>>>>>>>> e.g. not creating changelog topics but just use >>> the >>>>>>>> upstream >>>>>>>>> to >>>>>>>>>>>>>>>>>>> re-bootstrap the materialized store, more >>>> concretely: >>>>>> when >>>>>>>>>>>>>>>> materializing >>>>>>>>>>>>>>>>>>> the store, try to piggy-back the changelog topic >>> on >>>> an >>>>>>>>> existing >>>>>>>>>>>>>>>> topic, >>>>>>>>>>>>>>>>>> e.g. >>>>>>>>>>>>>>>>>>> a) if the stream is coming directly from some >>> source >>>>>> topic >>>>>>>>>>>>>> (including >>>>>>>>>>>>>>>>>>> repartition topic), make that as changelog topic >>> and >>>>> if >>>>>>>> it is >>>>>>>>>>>>>>>> repartition >>>>>>>>>>>>>>>>>>> topic change the retention / data purging policy >>>>>>>> necessarily >>>>>>>>> as >>>>>>>>>>>>>>>> well; b) >>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>> the stream is coming from some stateless >>> operators, >>>>>>>> delegate >>>>>>>>>> that >>>>>>>>>>>>>>>>>> stateless >>>>>>>>>>>>>>>>>>> operator to the parent stream similar as a); if >>> the >>>>>>>> stream is >>>>>>>>>>>>>> coming >>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>> stream-stream join which is the only stateful >>>> operator >>>>>>>> that >>>>>>>>> can >>>>>>>>>>>>>>>> result in >>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>> stream, consider merging the join into multi-way >>>> joins >>>>>>>> (yes, >>>>>>>>>>>>>> this is >>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>> very >>>>>>>>>>>>>>>>>>> hand-wavy thought, but the point here is that we >>> do >>>>> not >>>>>>>> try >>>>>>>>> to >>>>>>>>>>>>>>>> tackle it >>>>>>>>>>>>>>>>>>> now but leave it for a better solution :). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I really like this idea! I agree with you in >> that >>>> this >>>>>>>>> approach >>>>>>>>>>>>>> to too >>>>>>>>>>>>>>>>>> much for adding in this KIP, but we could pick it >>> up >>>>>> later >>>>>>>> and >>>>>>>>>>>>>>>> leverage the >>>>>>>>>>>>>>>>>> Optimization framework to accomplish this re-use. >>>>>>>>>>>>>>>>>> Again, while I agree we should break the naming >> of >>>> join >>>>>>>> state >>>>>>>>>>>>>> stores >>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>> KIP-307, IMHO it's something we should fix now as >>> it >>>>> will >>>>>>>> be >>>>>>>>> the >>>>>>>>>>>>>> last >>>>>>>>>>>>>>>> piece >>>>>>>>>>>>>>>>>> we can provide to give users the ability to >>>> completely >>>>>> make >>>>>>>>>> their >>>>>>>>>>>>>>>>>> topologies "upgrade proof" when adding additional >>>>>>>> operations. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks again to both of you for comments and I >> look >>>>>>>> forward to >>>>>>>>>>>>>> hearing >>>>>>>>>>>>>>>> back >>>>>>>>>>>>>>>>>> from you. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>> Bill >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Thu, Jun 20, 2019 at 2:33 PM Guozhang Wang < >>>>>>>>>> wangg...@gmail.com> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hello Bill, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks for the KIP. Glad to see that we can >> likely >>>>>>>> shooting >>>>>>>>> two >>>>>>>>>>>>>> birds >>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>> one stone. I have some concerns though about >> those >>>>> "two >>>>>>>>> birds" >>>>>>>>>>>>>>>>>> themselves: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 1. About not breaking compatibility of >>> stream-stream >>>>>> join >>>>>>>>>>>>>>>> materialized >>>>>>>>>>>>>>>>>>> stores: I think this is a valid issue to tackle, >>> but >>>>>> after >>>>>>>>>>>>>> thinking >>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>> it once more I'm not sure if exposing >> Materialized >>>>> would >>>>>>>> be a >>>>>>>>>>>>>> good >>>>>>>>>>>>>>>>>> solution >>>>>>>>>>>>>>>>>>> here. My rationles: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 1.a For stream-stream join, our current usage of >>>>>>>> window-store >>>>>>>>>> is >>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>> ideal, >>>>>>>>>>>>>>>>>>> and we want to modify it in the near future to >> be >>>> more >>>>>>>>>>>>>> efficient. Not >>>>>>>>>>>>>>>>>>> allowing users to override such state store >>> backend >>>>>> gives >>>>>>>> us >>>>>>>>>> such >>>>>>>>>>>>>>>> freedom >>>>>>>>>>>>>>>>>>> (which was also considered in the original DSL >>>>> design), >>>>>>>>> whereas >>>>>>>>>>>>>>>> getting a >>>>>>>>>>>>>>>>>>> Materialized<WindowStore> basically kicks out >> that >>>>>> freedom >>>>>>>>> out >>>>>>>>>>>>>> of the >>>>>>>>>>>>>>>>>>> window. >>>>>>>>>>>>>>>>>>> 1.b For strema-stream join, in our original >> design >>>> we >>>>>>>> intend >>>>>>>>> to >>>>>>>>>>>>>>>> "never" >>>>>>>>>>>>>>>>>>> want users to query the state, since it is just >>> for >>>>>>>> buffering >>>>>>>>>> the >>>>>>>>>>>>>>>>>> upcoming >>>>>>>>>>>>>>>>>>> records from the stream. Now I know that some >>> users >>>>> may >>>>>>>>> indeed >>>>>>>>>>>>>> want >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> query it from the debugging perspective, but >>> still I >>>>>>>>> concerned >>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>> whether leveraging IQ for debugging purposes >> would >>>> be >>>>>> the >>>>>>>>> right >>>>>>>>>>>>>>>> solution >>>>>>>>>>>>>>>>>>> here. And adding Materialized object opens the >>> door >>>> to >>>>>> let >>>>>>>>>> users >>>>>>>>>>>>>>>> query >>>>>>>>>>>>>>>>>>> about it (unless we did something intentionally >> to >>>>> still >>>>>>>>>> forbids >>>>>>>>>>>>>> it), >>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>> also restricts us in the future. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 2. About the coupling between >> Materialized.name() >>>> and >>>>>>>>>> queryable: >>>>>>>>>>>>>>>> again I >>>>>>>>>>>>>>>>>>> think this is a valid issue. But I'm not sure if >>> the >>>>>>>> current >>>>>>>>>>>>>>>>>>> "withQuerryingDisabled / Enabled" at >> Materialized >>> is >>>>> the >>>>>>>> best >>>>>>>>>>>>>>>> approach. >>>>>>>>>>>>>>>>>>> Here I think I agree with John, that generally >>>>> speaking >>>>>>>> it's >>>>>>>>>>>>>> better >>>>>>>>>>>>>>>> be a >>>>>>>>>>>>>>>>>>> control function on the `KTable` itself, rather >>> than >>>>> on >>>>>>>>>>>>>>>> `Materialized`, >>>>>>>>>>>>>>>>>> so >>>>>>>>>>>>>>>>>>> fixing it via adding functions through >>>> `Materialized` >>>>>>>> seems >>>>>>>>>> not a >>>>>>>>>>>>>>>> natural >>>>>>>>>>>>>>>>>>> approach either. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Overall, I'm thinking maybe we should still use >>> two >>>>>> stones >>>>>>>>>> rather >>>>>>>>>>>>>>>> than >>>>>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>> to kill these two birds, and probably for this >> KIP >>>> we >>>>>> just >>>>>>>>>> focus >>>>>>>>>>>>>> on >>>>>>>>>>>>>>>> 1) >>>>>>>>>>>>>>>>>>> above. And for that I'd like to not expose the >>>>>>>> Materialized >>>>>>>>>>>>>> either >>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>> rationales that I've listed above. Instead, we >>> just >>>>>>>> restrict >>>>>>>>>>>>>> KIP-307 >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> NOT >>>>>>>>>>>>>>>>>>> use the Joined.name for state store names and >>> always >>>>> use >>>>>>>>>> internal >>>>>>>>>>>>>>>> names >>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>> well, which admittedly indeed leaves a hole of >> not >>>>> being >>>>>>>> able >>>>>>>>>> to >>>>>>>>>>>>>>>> cover >>>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>> internal names here, but now I feel this `hole` >>> may >>>>>>>> better be >>>>>>>>>>>>>> filled >>>>>>>>>>>>>>>> by, >>>>>>>>>>>>>>>>>>> e.g. not creating changelog topics but just use >>> the >>>>>>>> upstream >>>>>>>>> to >>>>>>>>>>>>>>>>>>> re-bootstrap the materialized store, more >>>> concretely: >>>>>> when >>>>>>>>>>>>>>>> materializing >>>>>>>>>>>>>>>>>>> the store, try to piggy-back the changelog topic >>> on >>>> an >>>>>>>>> existing >>>>>>>>>>>>>>>> topic, >>>>>>>>>>>>>>>>>> e.g. >>>>>>>>>>>>>>>>>>> a) if the stream is coming directly from some >>> source >>>>>> topic >>>>>>>>>>>>>> (including >>>>>>>>>>>>>>>>>>> repartition topic), make that as changelog topic >>> and >>>>> if >>>>>>>> it is >>>>>>>>>>>>>>>> repartition >>>>>>>>>>>>>>>>>>> topic change the retention / data purging policy >>>>>>>> necessarily >>>>>>>>> as >>>>>>>>>>>>>>>> well; b) >>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>> the stream is coming from some stateless >>> operators, >>>>>>>> delegate >>>>>>>>>> that >>>>>>>>>>>>>>>>>> stateless >>>>>>>>>>>>>>>>>>> operator to the parent stream similar as a); if >>> the >>>>>>>> stream is >>>>>>>>>>>>>> coming >>>>>>>>>>>>>>>>>> from a >>>>>>>>>>>>>>>>>>> stream-stream join which is the only stateful >>>> operator >>>>>>>> that >>>>>>>>> can >>>>>>>>>>>>>>>> result >>>>>>>>>>>>>>>>>> in a >>>>>>>>>>>>>>>>>>> stream, consider merging the join into multi-way >>>> joins >>>>>>>> (yes, >>>>>>>>>>>>>> this is >>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>> very >>>>>>>>>>>>>>>>>>> hand-wavy thought, but the point here is that we >>> do >>>>> not >>>>>>>> try >>>>>>>>> to >>>>>>>>>>>>>>>> tackle it >>>>>>>>>>>>>>>>>>> now but leave it for a better solution :). >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Wed, Jun 19, 2019 at 11:41 AM John Roesler < >>>>>>>>>> j...@confluent.io >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi Bill, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks for the KIP! Awesome job catching this >>>>>> unexpected >>>>>>>>>>>>>>>> consequence >>>>>>>>>>>>>>>>>>>> of the prior KIPs before it was released. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The proposal looks good to me. On top of just >>>> fixing >>>>>> the >>>>>>>>>>>>>> problem, >>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>> seems to address two other pain points: >>>>>>>>>>>>>>>>>>>> * that naming a s >
signature.asc
Description: OpenPGP digital signature