I just sent out a call for a vote. I think everyone has had a good discussion :). If there are any more thoughts I would love to hear them.
Walker On Thu, Oct 31, 2019 at 10:36 AM Guozhang Wang <wangg...@gmail.com> wrote: > Hello Walker / Matthias, > > Thanks for your explanation. I can tell you've put a lot of thoughts into > this already and it seems we cannot avoid adding new interfaces in any > ways, so I will rest my arguments trying to reduce the number of > first-class citizens in the Streams DSL :) > > > Guozhang > > > On Thu, Oct 31, 2019 at 12:50 AM Matthias J. Sax <matth...@confluent.io> > wrote: > > > Interesting discussion. My personal take is as follows: > > > > (1) Co-group is not a special case of a multi-way KTable join, because > > multiple record wit the same key from a single input stream should be > > aggregated together and there are not update semantics. A co-group is > > rather a muti-stream aggregation operation. > > > > (2) Semantically, non-windowed co-group is the same as (as laid out in > > the KIP already): > > > > > KTable aggTable1 = stream1.groupByKey().aggregate(...); > > > KTable aggTable2 = stream2.groupByKey().aggregate(...); > > > ... > > > KTable aggTableX = streamX.groupByKey().aggregate(...); > > > > > > KTable final = aggTable1.join(aggTable(2)....join(aggTAbleX) > > > > However, I don't think that it would be possible for our optimizer to > > rewrite one into the other, given what `Initializer`, `Aggregator`, > > `ValueJoiner`, and `S a user would provide. > > > > Hence, `cogroup()` is a more efficient way to express the above using a > > single store, instead of X stores are required above. > > > > For windowed co-group, especially session-windowed, it seems not > > possible at all to rewrite co-group as independent aggregations followed > > by joins. Note that sessions boundaries would be determined _after_ the > > input streams are co-partitioned/merged in `cogroup()` and thus would be > > different compare the the aggregate-join pattern. > > > > (3) For the current KIP writeup, I agree that adding `Named` to > > `aggregate()` aligns best with the current API layout. I also don't > > think that the overloads are a big issue, because they are spread out > > over multiple helper interfaces. > > > > > > > > -Matthias > > > > > > > > On 10/29/19 10:38 AM, Walker Carlson wrote: > > > Hi Gouzhang, > > > > > > I am not sure what you mean by "Fields from different streams are never > > > aggregated together", this certainly can be the case but not the > general > > > rule. If we want to take care of the special cases where the key-sets > are > > > disjoint for each stream then they can be given no-op operators. This > > would > > > have the same effect as a stitching join as the function to update the > > > store would have to be defined either way, even to just place it in. > > > > > > Now if we look at it from the other way, if we only specify the > multiway > > > join then the user will need to aggregate each stream. Then they must > do > > > the join which either will involve aggregators and value joiners or > some > > > questionable optimization that would rely on each aggregator defined > for > > a > > > grouped stream meshing together. And this would all have to happen > inside > > > KStream. > > > > > > I do agree that there are optimizations that can be done on joining > > > multiple tables per your example, in both cases whether it be a > > "stitching > > > join" or not. But I do not think the place to do it is in Streams. This > > > could be relatively easy to accomplish. I think we save ourselves pain > if > > > we consider the tables and streams as separate cases, as aggregating > > > multiple streams into one KTable can be done more efficiently than > making > > > multiple KTables and then joining them together. We may be able to get > > > around this in the case of a stitching join but I am not sure how we > > could > > > do it safely otherwise. > > > > > > Walker > > > > > > > > > > > > > > > > > > On Mon, Oct 28, 2019 at 6:26 PM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > >> Hi Walker, > > >> > > >> This is a good point about compatibility breakage while overloading > the > > >> existing classes; while reading John and your exchanges, I think I > still > > >> need to clarify the motivations a bit more: > > >> > > >> 1) Multiple streams need to be aggregated together, inputs are always > > >> *KStreams* and end result is a *KTable*. > > >> 2) Fields from different streams are never aggregated together, i.e. > on > > the > > >> higher level it is more like a "stitching up" the fields and then > doing > > a > > >> single aggregation. > > >> > > >> In this context, I agree with you that it is still a > streams-aggregation > > >> operator that we are trying to optimize (though its a multi-way), not > a > > >> multi-way table-table-join operator that we are tying to optimize > here. > > >> > > >> > > >> ----------------- > > >> > > >> But now taking a step back looking at it, I'm wondering, because of 2) > > that > > >> all input streams do not have overlapping fields, we can generalize > > this to > > >> a broader scope. Consider this case for example: > > >> > > >> table1 = builder.table("topic1"); > > >> table2 = builder.table("topic2"); > > >> table3 = builder.table("topic3"); > > >> table4 = table1.join(table2).join(table3); > > >> > > >> Suppose the join operations do not take out any fields or add any new > > >> fields, i.e. say table1 has fields A, table2 has fields B, and table2 > > has > > >> fields C besides the key K, the table 4 has field {A, B, C} --- the > > join is > > >> just "stitching up" the fields --- then the above topology can > actually > > be > > >> optimized in a similar way: > > >> > > >> * we only keep one materialized store in the form of K -> {A, B, C} as > > the > > >> materialized store of the final join result of table4. > > >> * when a record comes in from table1/2/3, just query the store on K, > and > > >> then update the corresponding A/B/C field and then writes back to the > > >> store. > > >> > > >> > > >> Then the above streams-aggregation operator can be treated as a > special > > >> case of this: you first aggregate separately on stream1/2/3 and > generate > > >> table1/2/3, and then do this "stitching join", behind the scene we can > > >> optimize the topology to do exactly the co-group logic by updating the > > >> second bullet point above as an aggregation operator: > > >> > > >> * when a record comes in from *stream1/2/3*, just query the store on > K, > > and > > >> then update the corresponding A/B/C field *with an aggregator *and > then > > >> writes back to the store. > > >> > > >> ----------------- > > >> > > >> Personally I think this is better because with 1) larger applicable > > scope, > > >> and 2) without introducing new interfaces. But of course on the other > > side > > >> it requires us to do this optimization inside the Streams with some > > syntax > > >> hint from users (for example, users need to specify it is a "stitching > > >> join" such that all fields are still preserved in the join result). > > WDYT? > > >> > > >> > > >> Guozhang > > >> > > >> > > >> On Mon, Oct 28, 2019 at 4:20 PM Walker Carlson <wcarl...@confluent.io > > > > >> wrote: > > >> > > >>> Hi John, > > >>> > > >>> Thank you for the background information. I think I understand your > > >> point. > > >>> > > >>> I believe that this could be fixed by making the motivation a little > > >>> clearer in the KIP. I think that the motivation is when you have > > >> multiple > > >>> streams that need to aggregate together to form a single object the > > >>> current, non optimal, way to do this is through a multiway table > join. > > >> This > > >>> is a little hacky. There is a slight but significant difference in > > these > > >>> cases, as in the null value handling you pointed out. > > >>> > > >>> For the example in the motivation, these tables were grouped streams > so > > >>> they already dropped the null values. If we consider Cogroup sitting > in > > >> the > > >>> same grey area that KGroupedStream does it should also behave this > way. > > >> If > > >>> you think about it that way it is more of an extension of > > KGroupedStream > > >>> than KTable or KStream. Therefore I handle null values the same way > > >>> KGroupedStream#aggregate does. > > >>> > > >>> Looking back I am not sure I understood you previous question fully > at > > >> the > > >>> time. I am sorry if my answer caused any confusion! > > >>> > > >>> Walker > > >>> > > >>> On Mon, Oct 28, 2019 at 2:49 PM John Roesler <j...@confluent.io> > > wrote: > > >>> > > >>>> Hi Walker, > > >>>> > > >>>> Sorry for the delay in responding. Thanks for your response earlier. > > >>>> > > >>>> I think there might be a subtlety getting overlooked in considering > > >>>> whether we're talking about streams versus tables in cogroup. As I'm > > >>>> sure you know, Kafka Streams treats "stream" records as independent, > > >>>> immutable, and opaque "facts", whereas we treat "table" records as a > > >>>> sequence of updates to an entity identified by the record key (where > > >>>> "update" means that each record's value represents the new state > after > > >>>> applying the update). For the most part, this is a clean separation, > > >>>> but there is one special case where records with a "null" value are > > >>>> interpreted as a tombstone in the table context (i.e., the record > > >>>> indicates not that the new value of the entity is "null", but rather > > >>>> that the entity has been deleted). In contrast, a record with a null > > >>>> value in the stream context is _just_ a record with a null value; no > > >>>> special semantics. > > >>>> > > >>>> The difficulty is that these two semantics clash at the stream/table > > >>>> boundary. So, operations that convert streams to tables (like > > >>>> KGroupedStream#aggregate) have to cope with ambiguity about whether > to > > >>>> treat null values opaquely as null values, or as tombstones. I think > > >>>> I'll make a long story short and just say that this is a very, very > > >>>> complex issue. As a result (and as a bit of a punt), our > > >>>> KGroupedStream operations actually just discard null-valued records. > > >>>> This means that the following are _not_ equivalent programs: > > >>>> > > >>>> table1 = > > >>>> stream<Id,Record>("records") > > >>>> .filter(Record::isOk) > > >>>> .groupByKey() > > >>>> .aggregate(() -> new Record(), (key, value, agg) -> value) > > >>>> table2 = > > >>>> table<Id,Record>("record") > > >>>> .filter(Record::isOk) > > >>>> > > >>>> They look about the same, in that they'll both produce a > > >>>> KTable<Id,Record> with the value being the latest state. But if a > > >>>> record is deleted in the upstream data (represented as a "null" > > >>>> value), that record would also be deleted in table2, but not in > > >>>> table1. Table1 would just perpetually contain the value immediately > > >>>> prior to the delete. > > >>>> > > >>>> This is why it makes me nervous to propose a new kind of _stream_ > > >>>> operation ostensibly in order to solve a problem that presents > itself > > >>>> in the _table_ context. > > >>>> > > >>>> If the goal is to provide a more efficient and convenient multi-way > > >>>> KTable join, I think it would be a good idea to consider an > extension > > >>>> to the KTable API, not the KStream API. On the other hand, if this > is > > >>>> not the goal, then the motivation of the KIP shouldn't say that it > is. > > >>>> Instead, the KIP could provide some other motivation specifically > for > > >>>> augmenting the KStream API. > > >>>> > > >>>> There is a third alternative that comes to mind, if you wish to > > >>>> resolve the long-standing dilemma around this semantic problem and > > >>>> specify in the KIP how exactly nulls are handled in this operator. > But > > >>>> (although this seems on the face to be a good option), I think it > > >>>> might be a briarpatch. Even if we are able to reach a suitable > design, > > >>>> we'd have to contend with the fact that it looks like the > > >>>> KGroupedStream API, but behaves differently. > > >>>> > > >>>> What do you think about all this? > > >>>> > > >>>> Thanks again for the KIP and the discussion! > > >>>> -John > > >>>> > > >>>> On Mon, Oct 28, 2019 at 3:32 PM Walker Carlson < > wcarl...@confluent.io > > > > > >>>> wrote: > > >>>>> > > >>>>> Hi Gouzhang, > > >>>>> > > >>>>> Matthias and I did talk about overloading different a type of > > >> aggregate > > >>>>> methods in the cogroup that would take in the windows and returns a > > >>>>> windowed KTable. We decided that it would break too much with the > > >>> current > > >>>>> pattern that was established in the normal KStream. We can revisit > > >> this > > >>>> if > > >>>>> you have a different opinion on the tradeoff. > > >>>>> > > >>>>> Walker > > >>>>> > > >>>>> On Mon, Oct 28, 2019 at 12:14 PM Guozhang Wang <wangg...@gmail.com > > > > >>>> wrote: > > >>>>> > > >>>>>> Hi Walker, > > >>>>>> > > >>>>>> On Fri, Oct 25, 2019 at 1:34 PM Walker Carlson < > > >>> wcarl...@confluent.io> > > >>>>>> wrote: > > >>>>>> > > >>>>>>> Hi Guozhang, > > >>>>>>> > > >>>>>>> 1. I am familiar with the cogroup of spark, it is very similar to > > >>>>>>> their join operator but instead it makes the values iterable. I > > >>> think > > >>>>>> that > > >>>>>>> the use cases are different enough that it makes sense to specify > > >>> the > > >>>>>>> aggregator when we do. > > >>>>>>> > > >>>>>>> I like the idea of "absorb" and I think it could be useful. > > >>> Although > > >>>> I do > > >>>>>>> not think it is as intuitive. > > >>>>>>> > > >>>>>>> If we were to go that route we would either use more processors > > >> or > > >>> do > > >>>>>>> essentially the same thing but would have to store the > > >> information > > >>>>>>> required to cogroup inside that KTable. I think this would > > >> violate > > >>>> some > > >>>>>>> design principles. I would argue that we should consider adding > > >>>> absorb as > > >>>>>>> well and auto re-write it to use cogroup. > > >>>>>>> > > >>>>>> > > >>>>>> Yeah I think I agree with you about the internal design complexity > > >>> with > > >>>>>> "absorb"; I was primarily thinking if we can save ourselves from > > >>>>>> introducing 3 more public classes with co-group. But it seems that > > >>>> without > > >>>>>> introducing new classes there's no easy way for us to bound the > > >> scope > > >>>> of > > >>>>>> co-grouping (like how many streams will be co-grouped together). > > >>>>>> > > >>>>>> LMK if you have some better ideas: generally speaking the less new > > >>>> public > > >>>>>> interfaces we are introducing to fulfill a new feature the better, > > >> so > > >>>> I'd > > >>>>>> push us to think twice and carefully before we go down the route. > > >>>>>> > > >>>>>> > > >>>>>>> > > >>>>>>> 2. We have not considered this thought that would be a convenient > > >>>>>>> operation. > > >>>>>>> > > >>>>>>> 3. There is only one processor made. We are actually having the > > >>>> naming > > >>>>>>> conversation right now in the above thread > > >>>>>>> > > >>>>>>> 4, 5. fair points > > >>>>>>> > > >>>>>>> Walker > > >>>>>>> > > >>>>>>> On Fri, Oct 25, 2019 at 11:58 AM Guozhang Wang < > > >> wangg...@gmail.com > > >>>> > > >>>>>> wrote: > > >>>>>>> > > >>>>>>>> Hi Walker, thanks for the KIP! I made a pass on the writeup and > > >>>> have > > >>>>>> some > > >>>>>>>> comments below: > > >>>>>>>> > > >>>>>>>> Meta: > > >>>>>>>> > > >>>>>>>> 1. Syntax-wise, I'm wondering if we have compared our current > > >>>> proposal > > >>>>>>> with > > >>>>>>>> Spark's co-group syntax (I know they are targeting for > > >> different > > >>>> use > > >>>>>>> cases, > > >>>>>>>> but wondering if their syntax is closer to the join operator), > > >>>> what are > > >>>>>>> the > > >>>>>>>> syntax / semantics trade-off here? > > >>>>>>>> > > >>>>>>>> Just playing a devil's advocate here, if the main motivation is > > >>> to > > >>>>>>> provide > > >>>>>>>> a more convienent multi-way join syntax, and in order to only > > >>> have > > >>>> one > > >>>>>>>> materialized store we need to specify the final joined format > > >> at > > >>>> the > > >>>>>>>> beginning, then what about the following alternative (with the > > >>>> given > > >>>>>>>> example in your wiki page): > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> KGroupedStream<K, V1> grouped1 = > > >>>> builder.stream("topic1").groupByKey(); > > >>>>>>>> KGroupedStream<K, V2> grouped2 = > > >>>> builder.stream("topic2").groupByKey(); > > >>>>>>>> KGroupedStream<K, V3> grouped3 = > > >>>> builder.stream("topic3").groupByKey(); > > >>>>>>>> > > >>>>>>>> KTable<K, CG> aggregated = grouped1.aggregate(initializer, > > >>>>>> materialized, > > >>>>>>>> aggregator1); > > >>>>>>>> > > >>>>>>>> aggregated.absorb(grouped2, aggregator2); // I'm just using a > > >>>> random > > >>>>>>> name > > >>>>>>>> on top of my head here > > >>>>>>>> .absorb(grouped3, aggregator3); > > >>>>>>>> > > >>>>>>>> In this way, we just add a new API to the KTable to "absorb" > > >> new > > >>>>>> streams > > >>>>>>> as > > >>>>>>>> aggregated results without needing to introduce new first > > >> citizen > > >>>>>>> classes. > > >>>>>>>> > > >>>>>>>> 2. From the DSL optimization, have we considered if we can auto > > >>>>>> re-write > > >>>>>>>> the user written old fashioned multi-join into this new DSL > > >>>> operator? > > >>>>>>>> > > >>>>>>>> 3. Although it is not needed for the wiki page itself, for > > >>> internal > > >>>>>>>> implementation how many processor nodes would we create for the > > >>> new > > >>>>>>>> operator, and how we can allow users to name them? > > >>>>>>>> > > >>>>>>>> Minor: > > >>>>>>>> > > >>>>>>>> 4. In "Public Interfaces", better add the templated generics to > > >>>>>>>> "KGroupedStream" as "KGroupedStream<K, V>". > > >>>>>>>> > > >>>>>>>> 5. Naming wise, I'd suggest we keep the "K" together with > > >>>> Stream/Table, > > >>>>>>>> e.g. "TimeWindowed*CogroupedKStream*<K, V>". > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> Guozhang > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On Thu, Oct 24, 2019 at 11:43 PM Matthias J. Sax < > > >>>>>> matth...@confluent.io> > > >>>>>>>> wrote: > > >>>>>>>> > > >>>>>>>>> Walker, > > >>>>>>>>> > > >>>>>>>>> I am not sure if I can follow your argument. What do you > > >>> exactly > > >>>> mean > > >>>>>>> by > > >>>>>>>>> > > >>>>>>>>>> I also > > >>>>>>>>>>> think that in this case it would be better to separate > > >> the 2 > > >>>>>> option > > >>>>>>>> out > > >>>>>>>>>>> into separate overloads. > > >>>>>>>>> > > >>>>>>>>> Maybe you can give an example what method signature you have > > >> in > > >>>> mind? > > >>>>>>>>> > > >>>>>>>>>>> We could take a named parameter from upstream or add an > > >>> extra > > >>>>>> naming > > >>>>>>>>> option > > >>>>>>>>>>> however I don't really see the advantage that would give. > > >>>>>>>>> > > >>>>>>>>> Are you familiar with KIP-307? Before KIP-307, KS generated > > >> all > > >>>> names > > >>>>>>>>> for all Processors. This makes it hard to reason about a > > >>>> Topology if > > >>>>>>>>> it's getting complex. Adding `Named` to the new co-group > > >>> operator > > >>>>>> would > > >>>>>>>>> actually align with KIP-307. > > >>>>>>>>> > > >>>>>>>>>> It seems to go in > > >>>>>>>>>>> the opposite direction from the cogroup configuration idea > > >>> you > > >>>>>>>> proposed. > > >>>>>>>>> > > >>>>>>>>> Can you elaborate? Not sure if I can follow. > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> -Matthias > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> On 10/24/19 10:20 AM, Walker Carlson wrote: > > >>>>>>>>>> While I like the idea Sophie I don't think that it is > > >>>> necessary. I > > >>>>>>> also > > >>>>>>>>>> think that in this case it would be better to separate the > > >> 2 > > >>>> option > > >>>>>>> out > > >>>>>>>>>> into separate overloads. > > >>>>>>>>>> We could take a named parameter from upstream or add an > > >> extra > > >>>>>> naming > > >>>>>>>>> option > > >>>>>>>>>> however I don't really see the advantage that would give. > > >> It > > >>>> seems > > >>>>>> to > > >>>>>>>> go > > >>>>>>>>> in > > >>>>>>>>>> the opposite direction from the cogroup configuration idea > > >>> you > > >>>>>>>> proposed. > > >>>>>>>>>> > > >>>>>>>>>> John, I think it could be both. It depends on when you > > >>>> aggregate > > >>>>>> and > > >>>>>>>> what > > >>>>>>>>>> kind of data you have. In the example it is aggregating > > >>> before > > >>>>>>> joining, > > >>>>>>>>>> there are probably some cases where you could join before > > >>>>>>> aggregating. > > >>>>>>>>> IMHO > > >>>>>>>>>> it would be easier to group all the streams together then > > >>>> perform > > >>>>>> the > > >>>>>>>> one > > >>>>>>>>>> operation that results in a single KTable. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On Wed, Oct 23, 2019 at 9:58 PM Sophie Blee-Goldman < > > >>>>>>>> sop...@confluent.io > > >>>>>>>>>> > > >>>>>>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>>>> I can personally not see any need to add other > > >>> configuration > > >>>>>>>>>>> Famous last words? > > >>>>>>>>>>> > > >>>>>>>>>>> Just kidding, 95% confidence is more than enough to me > > >> (and > > >>>>>> better > > >>>>>>> to > > >>>>>>>>>>> optimize for current > > >>>>>>>>>>> design than for hypothetical future changes). > > >>>>>>>>>>> > > >>>>>>>>>>> One last question I have then is about the > > >>>>>>> operator/store/repartition > > >>>>>>>>>>> naming -- seems like > > >>>>>>>>>>> we can name the underlying store/changelog through the > > >>>>>> Materialized > > >>>>>>>>>>> parameter, but do we > > >>>>>>>>>>> also want to include an overload taking a Named parameter > > >>> for > > >>>> the > > >>>>>>>>> operator > > >>>>>>>>>>> name (as in the > > >>>>>>>>>>> KTable#join variations)? > > >>>>>>>>>>> > > >>>>>>>>>>> On Wed, Oct 23, 2019 at 5:14 PM Matthias J. Sax < > > >>>>>>>> matth...@confluent.io> > > >>>>>>>>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>>> Interesting idea, Sophie. > > >>>>>>>>>>>> > > >>>>>>>>>>>> So far, we tried to reuse existing config objects and > > >> only > > >>>> add > > >>>>>> new > > >>>>>>>> ones > > >>>>>>>>>>>> when needed to avoid creating "redundant" classes. This > > >> is > > >>> of > > >>>>>>> course > > >>>>>>>> a > > >>>>>>>>>>>> reactive approach (with the drawback to deprecate stuff > > >> if > > >>> we > > >>>>>>> change > > >>>>>>>>> it, > > >>>>>>>>>>>> as you described). > > >>>>>>>>>>>> > > >>>>>>>>>>>> I can personally not see any need to add other > > >>> configuration > > >>>>>>>> parameters > > >>>>>>>>>>>> atm, so it's a 95% obvious "no" IMHO. The final > > >>>> `aggregate()` has > > >>>>>>>> only > > >>>>>>>>> a > > >>>>>>>>>>>> single state store that we need to configure, and reusing > > >>>>>>>>> `Materialized` > > >>>>>>>>>>>> seems to be appropriate. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Also note, that the `Initializer` is a mandatory > > >> parameter > > >>>> and > > >>>>>> not > > >>>>>>> a > > >>>>>>>>>>>> configuration and should be passed directly, and not via > > >> a > > >>>>>>>>> configuration > > >>>>>>>>>>>> object. > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> -Matthias > > >>>>>>>>>>>> > > >>>>>>>>>>>> On 10/23/19 11:37 AM, Sophie Blee-Goldman wrote: > > >>>>>>>>>>>>> Thanks for the explanation, makes sense to me! As for > > >> the > > >>>> API, > > >>>>>> one > > >>>>>>>>>>> other > > >>>>>>>>>>>>> thought I had is might we ever want or need to introduce > > >>> any > > >>>>>> other > > >>>>>>>>>>>> configs > > >>>>>>>>>>>>> or parameters in the future? Obviously that's difficult > > >> to > > >>>> say > > >>>>>> now > > >>>>>>>> (or > > >>>>>>>>>>>>> maybe the > > >>>>>>>>>>>>> answer seems obviously "no") but we seem to often end up > > >>>> needing > > >>>>>>> to > > >>>>>>>>> add > > >>>>>>>>>>>> new > > >>>>>>>>>>>>> overloads and/or deprecate old ones as new features or > > >>>>>>> requirements > > >>>>>>>>>>> come > > >>>>>>>>>>>>> into > > >>>>>>>>>>>>> play. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> What do you (and others?) think about wrapping the > > >> config > > >>>>>>> parameters > > >>>>>>>>>>> (ie > > >>>>>>>>>>>>> everything > > >>>>>>>>>>>>> except the actual grouped streams) in a new config > > >> object? > > >>>> For > > >>>>>>>>> example, > > >>>>>>>>>>>> the > > >>>>>>>>>>>>> CogroupedStream#aggregate field could take a single > > >>>> Cogrouped > > >>>>>>>> object, > > >>>>>>>>>>>>> which itself would have an initializer and a > > >> materialized. > > >>>> If we > > >>>>>>>> ever > > >>>>>>>>>>>> need > > >>>>>>>>>>>>> to add > > >>>>>>>>>>>>> a new parameter, we can just add it to the Cogrouped > > >>> class. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Also, will the backing store be available for IQ if a > > >>>>>> Materialized > > >>>>>>>> is > > >>>>>>>>>>>>> passed in? > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On Wed, Oct 23, 2019 at 10:49 AM Walker Carlson < > > >>>>>>>>> wcarl...@confluent.io > > >>>>>>>>>>>> > > >>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> Hi Sophie, > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Thank you for your comments. As for the different > > >> methods > > >>>>>>>> signatures > > >>>>>>>>> I > > >>>>>>>>>>>> have > > >>>>>>>>>>>>>> not really considered any other options but while I do > > >>>> agree > > >>>>>> it > > >>>>>>> is > > >>>>>>>>>>>>>> confusing, I don't see any obvious solutions. The > > >> problem > > >>>> is > > >>>>>> that > > >>>>>>>> the > > >>>>>>>>>>>>>> cogroup essentially pairs a group stream with an > > >>>> aggregator and > > >>>>>>>> when > > >>>>>>>>>>> it > > >>>>>>>>>>>> is > > >>>>>>>>>>>>>> first made the method is called on a groupedStream > > >>> already. > > >>>>>>> However > > >>>>>>>>>>> each > > >>>>>>>>>>>>>> subsequent stream-aggregator pair is added on to a > > >>> cogroup > > >>>>>> stream > > >>>>>>>> so > > >>>>>>>>>>> it > > >>>>>>>>>>>>>> needs both arguments. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> For the second question you should not need a joiner. > > >> The > > >>>> idea > > >>>>>> is > > >>>>>>>>> that > > >>>>>>>>>>>> you > > >>>>>>>>>>>>>> can collect many grouped streams with overlapping key > > >>>> spaces > > >>>>>> and > > >>>>>>>> any > > >>>>>>>>>>>> kind > > >>>>>>>>>>>>>> of value types. Once aggregated its value will be > > >> reduced > > >>>> into > > >>>>>>> one > > >>>>>>>>>>> type. > > >>>>>>>>>>>>>> This is why you need only one initializer. Each > > >>> aggregator > > >>>> will > > >>>>>>>> need > > >>>>>>>>>>> to > > >>>>>>>>>>>>>> integrate the new value with the new object made in the > > >>>>>>>> initializer. > > >>>>>>>>>>>>>> Does that make sense? > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> This is a good question and I will include this > > >>>> explanation in > > >>>>>>> the > > >>>>>>>>> kip > > >>>>>>>>>>>> as > > >>>>>>>>>>>>>> well. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>> Walker > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On Tue, Oct 22, 2019 at 8:59 PM Sophie Blee-Goldman < > > >>>>>>>>>>>> sop...@confluent.io> > > >>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Hey Walker, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Thanks for the KIP! I have just a couple of questions: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 1) It seems a little awkward to me that with the > > >> current > > >>>> API, > > >>>>>> we > > >>>>>>>>>>> have a > > >>>>>>>>>>>>>>> nearly identical > > >>>>>>>>>>>>>>> "add stream to cogroup" method, except for the first > > >>> which > > >>>>>> has a > > >>>>>>>>>>>>>> different > > >>>>>>>>>>>>>>> signature > > >>>>>>>>>>>>>>> (ie the first stream is joined as > > >>>> stream.cogroup(Aggregator) > > >>>>>>> while > > >>>>>>>>>>> the > > >>>>>>>>>>>>>>> subsequent ones > > >>>>>>>>>>>>>>> are joined as .cogroup(Stream, Aggregator) ). I'm not > > >>> sure > > >>>>>> what > > >>>>>>> it > > >>>>>>>>>>>> would > > >>>>>>>>>>>>>>> look like exactly, > > >>>>>>>>>>>>>>> but I was just wondering if you'd considered and/or > > >>>> rejected > > >>>>>> any > > >>>>>>>>>>>>>>> alternative APIs? > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 2) This might just be my lack of familiarity with > > >>>> "cogroup" > > >>>>>> as a > > >>>>>>>>>>>> concept, > > >>>>>>>>>>>>>>> but with the > > >>>>>>>>>>>>>>> current (non-optimal) API the user seems to have some > > >>>> control > > >>>>>>> over > > >>>>>>>>>>> how > > >>>>>>>>>>>>>>> exactly > > >>>>>>>>>>>>>>> the different streams are joined through the > > >>> ValueJoiners. > > >>>>>> Would > > >>>>>>>>> this > > >>>>>>>>>>>> new > > >>>>>>>>>>>>>>> cogroup > > >>>>>>>>>>>>>>> simply concatenate the values from the different > > >> cogroup > > >>>>>>> streams, > > >>>>>>>> or > > >>>>>>>>>>>>>> could > > >>>>>>>>>>>>>>> users > > >>>>>>>>>>>>>>> potentially pass some kind of Joiner to the > > >>>> cogroup/aggregate > > >>>>>>>>>>> methods? > > >>>>>>>>>>>>>> Or, > > >>>>>>>>>>>>>>> is the > > >>>>>>>>>>>>>>> whole point of cogroups that you no longer ever need > > >> to > > >>>>>> specify > > >>>>>>> a > > >>>>>>>>>>>> Joiner? > > >>>>>>>>>>>>>>> If so, you > > >>>>>>>>>>>>>>> should add a short line to the KIP explaining that for > > >>>> those > > >>>>>> of > > >>>>>>> us > > >>>>>>>>>>> who > > >>>>>>>>>>>>>>> aren't fluent > > >>>>>>>>>>>>>>> in cogroup semantics :) > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>>>> Sophie > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On Thu, Oct 17, 2019 at 3:06 PM Walker Carlson < > > >>>>>>>>>>> wcarl...@confluent.io> > > >>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Good catch I updated that. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> I have made a PR for this KIP > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> I then am splitting it into 3 parts, first cogroup > > >> for > > >>> a > > >>>>>>>> key-value > > >>>>>>>>>>>>>> store > > >>>>>>>>>>>>>>> ( > > >>>>>>>>>>>>>>>> here <https://github.com/apache/kafka/pull/7538>), > > >>> then > > >>>> for > > >>>>>> a > > >>>>>>>>>>>>>>>> timeWindowedStore, and then a sessionWindowedStore + > > >>>> ensuring > > >>>>>>>>>>>>>>> partitioning. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Walker > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> On Tue, Oct 15, 2019 at 12:47 PM Matthias J. Sax < > > >>>>>>>>>>>>>> matth...@confluent.io> > > >>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Walker, > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> thanks for picking up the KIP and reworking it for > > >> the > > >>>>>> changed > > >>>>>>>>> API. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Overall, the updated API suggestions make sense to > > >> me. > > >>>> The > > >>>>>>> seem > > >>>>>>>> to > > >>>>>>>>>>>>>>> align > > >>>>>>>>>>>>>>>>> quite nicely with our current API design. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> One nit: In `CogroupedKStream#aggregate(...)` the > > >> type > > >>>>>>> parameter > > >>>>>>>>> of > > >>>>>>>>>>>>>>>>> `Materialized` should be `V`, not `VR`? > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> -Matthias > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> On 10/14/19 2:57 PM, Walker Carlson wrote: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>> > > >>> > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup > > >>>>>>>>>>>>>>>>>> here > > >>>>>>>>>>>>>>>>>> is a link > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> On Mon, Oct 14, 2019 at 2:52 PM Walker Carlson < > > >>>>>>>>>>>>>>> wcarl...@confluent.io> > > >>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Hello all, > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> I have picked up and updated KIP-150. Due to > > >> changes > > >>>> to > > >>>>>> the > > >>>>>>>>>>>>>> project > > >>>>>>>>>>>>>>>>> since > > >>>>>>>>>>>>>>>>>>> KIP #150 was written there are a few items that > > >> need > > >>>> to be > > >>>>>>>>>>>>>> updated. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> First item that changed is the adoption of the > > >>>>>> Materialized > > >>>>>>>>>>>>>>> parameter. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> The second item is the WindowedBy. How the old KIP > > >>>> handles > > >>>>>>>>>>>>>> windowing > > >>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>> that it overloads the aggregate function to take > > >> in > > >>> a > > >>>>>> Window > > >>>>>>>>>>>>>> object > > >>>>>>>>>>>>>>> as > > >>>>>>>>>>>>>>>>> well > > >>>>>>>>>>>>>>>>>>> as the other parameters. The current practice to > > >>>> window > > >>>>>>>>>>>>>>>> grouped-streams > > >>>>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>>> to call windowedBy and receive a windowed stream > > >>>> object. > > >>>>>> The > > >>>>>>>>>>>>>>> existing > > >>>>>>>>>>>>>>>>>>> interface for a windowed stream made from a > > >> grouped > > >>>> stream > > >>>>>>>> will > > >>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>>> work > > >>>>>>>>>>>>>>>>>>> for cogrouped streams. Hence, we have to make new > > >>>>>> interfaces > > >>>>>>>> for > > >>>>>>>>>>>>>>>>> cogrouped > > >>>>>>>>>>>>>>>>>>> windowed streams. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Please take a look, I would like to hear your > > >>>> feedback, > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Walker > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>>> -- > > >>>>>>>> -- Guozhang > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>>> > > >>>>>> -- > > >>>>>> -- Guozhang > > >>>>>> > > >>>> > > >>> > > >> > > >> > > >> -- > > >> -- Guozhang > > >> > > > > > > > > > -- > -- Guozhang >