Hello John, My main concern is exactly the first point at the bottom of your analysis here: "* configure the bytes store". I'm not sure if using a window bytes store would be ideal for stream-stream windowed join; e.g. we could consider two dimensional list sorted by timestamps and then by keys to do the join, whereas a windowed bytes store is basically sorted by key first, then by timestamp. If we expose the Materialized to let user pass in a windowed bytes store, then we would need to change that if we want to replace it with a different implementation interface.
Guozhang On Mon, Jun 24, 2019 at 8:59 AM John Roesler <j...@confluent.io> wrote: > Hey Guozhang and Bill, > > For what it's worth, I agree with you both! > > I think it might help the discussion to look concretely at what > Materialized does: > * set a WindowBytesStoreSupplier > * set a name > * set the key/value serdes > * disable/enable/configure change-logging > * disable/enable caching > * configure retention > > Further, looking into the WindowBytesStoreSupplier, the interface lets you: > * get the segment interval > * get the window size > * get whether "duplicates" are enabled > * get the retention period > * (obviously) get a WindowStore<Bytes, byte[]> > > We know that Materialized isn't exactly what we need for stream joins, > but we can see how close Materialized is to what we need. If it is > close, maybe we can use it and document the gaps, and if it is not > close, then maybe we should just add what we need to Joined. > Stream Join's requirements for its stores: > * a multimap store (i.e., it keeps duplicates) for storing general > (not windowed) keyed records associated with their insertion time, and > allows efficient time-bounded lookups and also efficient purges of old > data. > ** Note, a properly configured WindowBytesStoreSupplier satisfies this > requirement, and the interface supports the queries we need to verify > the configuration at run-time > * set a name for the store > * do _not_ set the serdes (they are already set in Joined) > * logging could be configurable (set to enabled now) > * caching could be configurable (set to enabled now) > * do _not_ configure retention (determined by JoinWindows) > > So, out of six capabilities for Materialized, there are two we don't > want (serdes and retention). These would become run-time checks if we > use it. > > A third questionable capability is to provide a > WindowBytesStoreSupplier. Looking at whether the > WindowBytesStoreSupplier is the right interface for Stream Join: > * configuring segment interval is fine > * should _not_ configure window size (it's determined by JoinWindows) > * duplicates _must_ be enabled > * retention should be _at least_ windowSize + gracePeriod, but note > that (unlike for Table window stores) there is no utility in having a > longer retention time. > * the WindowStore<Bytes, byte[]> itself is fine, if overly broad, > since the only two methods we need are `window.put(key, value, > context().timestamp())` and `WindowStoreIterator<V2> iter = > window.fetch(key, timeFrom, timeTo)`. > > Thus, flattening out the overlap for WindowBytesStoreSupplier onto the > overlap for Materialized, we have 9 capabilities total (note retention > is duplicated), we have 4 that we don't want: > * do _not_ set the serdes (they are already set in Joined) > * do _not_ configure retention (determined by JoinWindows) > * should _not_ configure window size (it's determined by JoinWindows) > * duplicates _must_ be enabled > > These gaps would have to be covered with run-time checks if we re-use > Materialized and WindowStoreBytesStoreSupplier both. Maybe this sounds > bad, but consider the other side, that we get 5 new capabilities we > don't require, but are still pretty nice: > * configure the bytes store > * set a name for the store > * configure caching > * configure logging > * configure segment interval > > Not sure where this nets us out, but it's food for thought. > -John > > On Sun, Jun 23, 2019 at 7:52 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > > Hi Bill, > > > > I think by giving a Materialized param into stream-stream join, it's okay > > (though still ideal) to say "we still would not expose the store for > > queries", but it would sound a bit awkward to say "we would also ignore > > whatever the passed in store supplier but just use our default ones" -- > > again the concern is that, if in the future we'd want to change the > default > > implementation of join algorithm which no longer rely on a window store > > with deduping enabled, then we need to change this API again by changing > > the store supplier type. > > > > If we do want to fill this hole for stream-stream join, I feel just > adding > > a String typed store-name would even be less future-intrusive if we > expect > > this parameter to be modified later. > > > > Does that makes sense? > > > > > > Guozhang > > > > On Sat, Jun 22, 2019 at 12:51 PM Bill Bejeck <bbej...@gmail.com> wrote: > > > > > Thanks for the comments John and Guozhang, I'll address each one of > your > > > comments in turn. > > > > > > John, > > > > > > > I'm wondering about a missing quadrant from the truth table involving > > > > whether a Materialized is stored or not and querying is > > > > enabled/disabled... What should be the behavior if there is no store > > > > configured (e.g., if Materialized with only serdes) and querying is > > > enabled? > > > > > > > It seems we have two choices: > > > > 1. we can force creation of a state store in this case, so the store > > > > can be used to serve the queries > > > > 2. we can provide just a queriable view, basically letting IQ query > > > > into the "KTableValueGetter", which would transparently construct the > > > > query response by applying the operator logic to the upstream state > if > > > > the operator state isn't already stored. > > > > > > > > > I agree with your assertion about a missing quadrant from the truth > table. > > > Additionally, I too like the concept of a queriable view. But I think > that > > > goes a bit beyond the scope of this KIP and would like to pursue that > > > feature as follow-on work. Also thinking about this KIP some more, I'm > > > thinking of the changes to Materialized might be a reach as well. > > > Separating the naming from a store and its queryable state seems like a > > > complex issue in and of itself and should be treated accordingly. > > > > > > So here's what I'm thinking now. We add Materialzied to Join, but for > now, > > > we internally disable querying. I know this breaks our current > semantic > > > approach, but I think it's crucial that we do two things in this KIP > > > > > > 1. Break the naming of the state stores from Joined to > Materialized, so > > > the naming of state stores follows our current pattern and enables > > > upgrades > > > from 2.3 to 2.4 > > > 2. Offer the ability to configure the state stores of the join, even > > > providing a different implementation (i.e. in-memory) if desired. > > > > > > With that in mind I'm considering changing the KIP to remove the > changes to > > > Materialized, and we document very clearly that by providing a > Materialized > > > object with a name is only for naming the state store, hence the > changelog > > > topics and any possible configurations of the store, but this store > *will > > > not be available for IQ.* > > > > > > WDYT? > > > > > > Guozhang, > > > > > > > 1. About not breaking compatibility of stream-stream join > materialized > > > > stores: I think this is a valid issue to tackle, but after thinking > about > > > > it once more I'm not sure if exposing Materialized would be a good > > > solution > > > > here. My rationles: > > > > > > > > 1.a For stream-stream join, our current usage of window-store is not > > > ideal, > > > > and we want to modify it in the near future to be more efficient. Not > > > > allowing users to override such state store backend gives us such > freedom > > > > (which was also considered in the original DSL design), whereas > getting a > > > > Materialized<WindowStore> basically kicks out that freedom out of the > > > > window. > > > > 1.b For strema-stream join, in our original design we intend to > "never" > > > > want users to query the state, since it is just for buffering the > > > upcoming > > > > records from the stream. Now I know that some users may indeed want > to > > > > query it from the debugging perspective, but still I concerned about > > > > whether leveraging IQ for debugging purposes would be the right > solution > > > > here. And adding Materialized object opens the door to let users > query > > > > about it (unless we did something intentionally to still forbids it), > > > which > > > > also restricts us in the future. > > > > > > > > 2. About the coupling between Materialized.name() and queryable: > again I > > > > think this is a valid issue. But I'm not sure if the current > > > > "withQuerryingDisabled / Enabled" at Materialized is the best > approach. > > > > Here I think I agree with John, that generally speaking it's better > be a > > > > control function on the `KTable` itself, rather than on > `Materialized`, > > > so > > > > fixing it via adding functions through `Materialized` seems not a > natural > > > approach either. > > > > > > I understand your thoughts here, and up to a point, I agree with you. > > > But concerning not providing Materialized as it may restrict us in the > > > future for delivering different implementations, I'm wondering if we > are > > > doing some premature optimization here. > > > My rationale for saying so > > > > > > 1. I think the cost of not allowing the naming of state stores for > joins > > > is too big of a gap to leave. IMHO for joins to follow the current > > > pattern of using Materialized for naming state stores would be what > most > > > users would expect to use. As I said in my comments above, I think > we > > > should *not include* the changes to Materialized and enforce named > > > stores for joins as unavailable for IQ. > > > 2. We'll still have the join methods available without a > Materialized > > > allowing us to do something different internally if a Materialized > is > > > not > > > provided. > > > > > > > > > > Overall, I'm thinking maybe we should still use two stones rather > than > > > one > > > > to kill these two birds, and probably for this KIP we just focus on > 1) > > > > above. And for that I'd like to not expose the Materialized either > for > > > > rationales that I've listed above. Instead, we just restrict KIP-307 > to > > > NOT > > > > use the Joined.name for state store names and always use internal > names > > > as > > > > well, which admittedly indeed leaves a hole of not being able to > cover > > > all > > > > internal names here, but now I feel this `hole` may better be filled > by, > > > > e.g. not creating changelog topics but just use the upstream to > > > > re-bootstrap the materialized store, more concretely: when > materializing > > > > the store, try to piggy-back the changelog topic on an existing > topic, > > > e.g. > > > > a) if the stream is coming directly from some source topic (including > > > > repartition topic), make that as changelog topic and if it is > repartition > > > > topic change the retention / data purging policy necessarily as > well; b) > > > if > > > > the stream is coming from some stateless operators, delegate that > > > stateless > > > > operator to the parent stream similar as a); if the stream is coming > from > > > a > > > > stream-stream join which is the only stateful operator that can > result in > > > a > > > > stream, consider merging the join into multi-way joins (yes, this is > a > > > very > > > > hand-wavy thought, but the point here is that we do not try to > tackle it > > > > now but leave it for a better solution :). > > > > > > I really like this idea! I agree with you in that this approach to too > > > much for adding in this KIP, but we could pick it up later and > leverage the > > > Optimization framework to accomplish this re-use. > > > Again, while I agree we should break the naming of join state stores > from > > > KIP-307, IMHO it's something we should fix now as it will be the last > piece > > > we can provide to give users the ability to completely make their > > > topologies "upgrade proof" when adding additional operations. > > > > > > Thanks again to both of you for comments and I look forward to hearing > back > > > from you. > > > > > > Regards, > > > Bill > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Jun 20, 2019 at 2:33 PM Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > > > Hello Bill, > > > > > > > > Thanks for the KIP. Glad to see that we can likely shooting two birds > > > with > > > > one stone. I have some concerns though about those "two birds" > > > themselves: > > > > > > > > 1. About not breaking compatibility of stream-stream join > materialized > > > > stores: I think this is a valid issue to tackle, but after thinking > about > > > > it once more I'm not sure if exposing Materialized would be a good > > > solution > > > > here. My rationles: > > > > > > > > 1.a For stream-stream join, our current usage of window-store is not > > > ideal, > > > > and we want to modify it in the near future to be more efficient. Not > > > > allowing users to override such state store backend gives us such > freedom > > > > (which was also considered in the original DSL design), whereas > getting a > > > > Materialized<WindowStore> basically kicks out that freedom out of the > > > > window. > > > > 1.b For strema-stream join, in our original design we intend to > "never" > > > > want users to query the state, since it is just for buffering the > > > upcoming > > > > records from the stream. Now I know that some users may indeed want > to > > > > query it from the debugging perspective, but still I concerned about > > > > whether leveraging IQ for debugging purposes would be the right > solution > > > > here. And adding Materialized object opens the door to let users > query > > > > about it (unless we did something intentionally to still forbids it), > > > which > > > > also restricts us in the future. > > > > > > > > 2. About the coupling between Materialized.name() and queryable: > again I > > > > think this is a valid issue. But I'm not sure if the current > > > > "withQuerryingDisabled / Enabled" at Materialized is the best > approach. > > > > Here I think I agree with John, that generally speaking it's better > be a > > > > control function on the `KTable` itself, rather than on > `Materialized`, > > > so > > > > fixing it via adding functions through `Materialized` seems not a > natural > > > > approach either. > > > > > > > > > > > > Overall, I'm thinking maybe we should still use two stones rather > than > > > one > > > > to kill these two birds, and probably for this KIP we just focus on > 1) > > > > above. And for that I'd like to not expose the Materialized either > for > > > > rationales that I've listed above. Instead, we just restrict KIP-307 > to > > > NOT > > > > use the Joined.name for state store names and always use internal > names > > > as > > > > well, which admittedly indeed leaves a hole of not being able to > cover > > > all > > > > internal names here, but now I feel this `hole` may better be filled > by, > > > > e.g. not creating changelog topics but just use the upstream to > > > > re-bootstrap the materialized store, more concretely: when > materializing > > > > the store, try to piggy-back the changelog topic on an existing > topic, > > > e.g. > > > > a) if the stream is coming directly from some source topic (including > > > > repartition topic), make that as changelog topic and if it is > repartition > > > > topic change the retention / data purging policy necessarily as > well; b) > > > if > > > > the stream is coming from some stateless operators, delegate that > > > stateless > > > > operator to the parent stream similar as a); if the stream is coming > > > from a > > > > stream-stream join which is the only stateful operator that can > result > > > in a > > > > stream, consider merging the join into multi-way joins (yes, this is > a > > > very > > > > hand-wavy thought, but the point here is that we do not try to > tackle it > > > > now but leave it for a better solution :). > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > On Wed, Jun 19, 2019 at 11:41 AM John Roesler <j...@confluent.io> > wrote: > > > > > > > > > Hi Bill, > > > > > > > > > > Thanks for the KIP! Awesome job catching this unexpected > consequence > > > > > of the prior KIPs before it was released. > > > > > > > > > > The proposal looks good to me. On top of just fixing the problem, > it > > > > > seems to address two other pain points: > > > > > * that naming a state store automatically causes it to become > > > queriable. > > > > > * that there's currently no way to configure the bytes store for > join > > > > > windows. > > > > > > > > > > It's awesome that we can fix this issue and two others with one > > > feature. > > > > > > > > > > I'm wondering about a missing quadrant from the truth table > involving > > > > > whether a Materialized is stored or not and querying is > > > > > enabled/disabled... What should be the behavior if there is no > store > > > > > configured (e.g., if Materialized with only serdes) and querying is > > > > > enabled? > > > > > > > > > > It seems we have two choices: > > > > > 1. we can force creation of a state store in this case, so the > store > > > > > can be used to serve the queries > > > > > 2. we can provide just a queriable view, basically letting IQ query > > > > > into the "KTableValueGetter", which would transparently construct > the > > > > > query response by applying the operator logic to the upstream > state if > > > > > the operator state isn't already stored. > > > > > > > > > > Offhand, it seems like the second is actually a pretty awesome > > > > > capability. But it might have an awkward interaction with the > current > > > > > semantics. Presently, if I provide a Materialized.withName, it > implies > > > > > that querying should be enabled AND that the view should actually > be > > > > > stored in a state store. Under option 2 above, this behavior would > > > > > change to NOT provision a state store and instead just consult the > > > > > ValueGetter. To get back to the current behavior, users would have > to > > > > > add a "bytes store supplier" to the Materialized to indicate that, > > > > > yes, they really want a state store there. > > > > > > > > > > Behavior changes are always kind of scary, but I think in this > case, > > > > > it might actually be preferable. In the event where only the name > is > > > > > provided, it means that people just wanted to make the operation > > > > > result queriable. If we automatically convert this to a non-stored > > > > > view, then simply upgrading results in the same observable behavior > > > > > and semantics, but a linear reduction in local storage requirements > > > > > and disk i/o, as well as a corresponding linear reduction in memory > > > > > usage both on and off heap. > > > > > > > > > > What do you think? > > > > > -John > > > > > > > > > > On Tue, Jun 18, 2019 at 9:21 PM Bill Bejeck <bbej...@gmail.com> > wrote: > > > > > > > > > > > > All, > > > > > > > > > > > > I'd like to start a discussion for adding a Materialized > > > configuration > > > > > > object to KStream.join for naming state stores involved in joins. > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+Materialized+to+Join > > > > > > > > > > > > Your comments and suggestions are welcome. > > > > > > > > > > > > Thanks, > > > > > > Bill > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > -- > > -- Guozhang > -- -- Guozhang