Hello Walker / Matthias, Thanks for your explanation. I can tell you've put a lot of thoughts into this already and it seems we cannot avoid adding new interfaces in any ways, so I will rest my arguments trying to reduce the number of first-class citizens in the Streams DSL :)
Guozhang On Thu, Oct 31, 2019 at 12:50 AM Matthias J. Sax <matth...@confluent.io> wrote: > Interesting discussion. My personal take is as follows: > > (1) Co-group is not a special case of a multi-way KTable join, because > multiple record wit the same key from a single input stream should be > aggregated together and there are not update semantics. A co-group is > rather a muti-stream aggregation operation. > > (2) Semantically, non-windowed co-group is the same as (as laid out in > the KIP already): > > > KTable aggTable1 = stream1.groupByKey().aggregate(...); > > KTable aggTable2 = stream2.groupByKey().aggregate(...); > > ... > > KTable aggTableX = streamX.groupByKey().aggregate(...); > > > > KTable final = aggTable1.join(aggTable(2)....join(aggTAbleX) > > However, I don't think that it would be possible for our optimizer to > rewrite one into the other, given what `Initializer`, `Aggregator`, > `ValueJoiner`, and `S a user would provide. > > Hence, `cogroup()` is a more efficient way to express the above using a > single store, instead of X stores are required above. > > For windowed co-group, especially session-windowed, it seems not > possible at all to rewrite co-group as independent aggregations followed > by joins. Note that sessions boundaries would be determined _after_ the > input streams are co-partitioned/merged in `cogroup()` and thus would be > different compare the the aggregate-join pattern. > > (3) For the current KIP writeup, I agree that adding `Named` to > `aggregate()` aligns best with the current API layout. I also don't > think that the overloads are a big issue, because they are spread out > over multiple helper interfaces. > > > > -Matthias > > > > On 10/29/19 10:38 AM, Walker Carlson wrote: > > Hi Gouzhang, > > > > I am not sure what you mean by "Fields from different streams are never > > aggregated together", this certainly can be the case but not the general > > rule. If we want to take care of the special cases where the key-sets are > > disjoint for each stream then they can be given no-op operators. This > would > > have the same effect as a stitching join as the function to update the > > store would have to be defined either way, even to just place it in. > > > > Now if we look at it from the other way, if we only specify the multiway > > join then the user will need to aggregate each stream. Then they must do > > the join which either will involve aggregators and value joiners or some > > questionable optimization that would rely on each aggregator defined for > a > > grouped stream meshing together. And this would all have to happen inside > > KStream. > > > > I do agree that there are optimizations that can be done on joining > > multiple tables per your example, in both cases whether it be a > "stitching > > join" or not. But I do not think the place to do it is in Streams. This > > could be relatively easy to accomplish. I think we save ourselves pain if > > we consider the tables and streams as separate cases, as aggregating > > multiple streams into one KTable can be done more efficiently than making > > multiple KTables and then joining them together. We may be able to get > > around this in the case of a stitching join but I am not sure how we > could > > do it safely otherwise. > > > > Walker > > > > > > > > > > > > On Mon, Oct 28, 2019 at 6:26 PM Guozhang Wang <wangg...@gmail.com> > wrote: > > > >> Hi Walker, > >> > >> This is a good point about compatibility breakage while overloading the > >> existing classes; while reading John and your exchanges, I think I still > >> need to clarify the motivations a bit more: > >> > >> 1) Multiple streams need to be aggregated together, inputs are always > >> *KStreams* and end result is a *KTable*. > >> 2) Fields from different streams are never aggregated together, i.e. on > the > >> higher level it is more like a "stitching up" the fields and then doing > a > >> single aggregation. > >> > >> In this context, I agree with you that it is still a streams-aggregation > >> operator that we are trying to optimize (though its a multi-way), not a > >> multi-way table-table-join operator that we are tying to optimize here. > >> > >> > >> ----------------- > >> > >> But now taking a step back looking at it, I'm wondering, because of 2) > that > >> all input streams do not have overlapping fields, we can generalize > this to > >> a broader scope. Consider this case for example: > >> > >> table1 = builder.table("topic1"); > >> table2 = builder.table("topic2"); > >> table3 = builder.table("topic3"); > >> table4 = table1.join(table2).join(table3); > >> > >> Suppose the join operations do not take out any fields or add any new > >> fields, i.e. say table1 has fields A, table2 has fields B, and table2 > has > >> fields C besides the key K, the table 4 has field {A, B, C} --- the > join is > >> just "stitching up" the fields --- then the above topology can actually > be > >> optimized in a similar way: > >> > >> * we only keep one materialized store in the form of K -> {A, B, C} as > the > >> materialized store of the final join result of table4. > >> * when a record comes in from table1/2/3, just query the store on K, and > >> then update the corresponding A/B/C field and then writes back to the > >> store. > >> > >> > >> Then the above streams-aggregation operator can be treated as a special > >> case of this: you first aggregate separately on stream1/2/3 and generate > >> table1/2/3, and then do this "stitching join", behind the scene we can > >> optimize the topology to do exactly the co-group logic by updating the > >> second bullet point above as an aggregation operator: > >> > >> * when a record comes in from *stream1/2/3*, just query the store on K, > and > >> then update the corresponding A/B/C field *with an aggregator *and then > >> writes back to the store. > >> > >> ----------------- > >> > >> Personally I think this is better because with 1) larger applicable > scope, > >> and 2) without introducing new interfaces. But of course on the other > side > >> it requires us to do this optimization inside the Streams with some > syntax > >> hint from users (for example, users need to specify it is a "stitching > >> join" such that all fields are still preserved in the join result). > WDYT? > >> > >> > >> Guozhang > >> > >> > >> On Mon, Oct 28, 2019 at 4:20 PM Walker Carlson <wcarl...@confluent.io> > >> wrote: > >> > >>> Hi John, > >>> > >>> Thank you for the background information. I think I understand your > >> point. > >>> > >>> I believe that this could be fixed by making the motivation a little > >>> clearer in the KIP. I think that the motivation is when you have > >> multiple > >>> streams that need to aggregate together to form a single object the > >>> current, non optimal, way to do this is through a multiway table join. > >> This > >>> is a little hacky. There is a slight but significant difference in > these > >>> cases, as in the null value handling you pointed out. > >>> > >>> For the example in the motivation, these tables were grouped streams so > >>> they already dropped the null values. If we consider Cogroup sitting in > >> the > >>> same grey area that KGroupedStream does it should also behave this way. > >> If > >>> you think about it that way it is more of an extension of > KGroupedStream > >>> than KTable or KStream. Therefore I handle null values the same way > >>> KGroupedStream#aggregate does. > >>> > >>> Looking back I am not sure I understood you previous question fully at > >> the > >>> time. I am sorry if my answer caused any confusion! > >>> > >>> Walker > >>> > >>> On Mon, Oct 28, 2019 at 2:49 PM John Roesler <j...@confluent.io> > wrote: > >>> > >>>> Hi Walker, > >>>> > >>>> Sorry for the delay in responding. Thanks for your response earlier. > >>>> > >>>> I think there might be a subtlety getting overlooked in considering > >>>> whether we're talking about streams versus tables in cogroup. As I'm > >>>> sure you know, Kafka Streams treats "stream" records as independent, > >>>> immutable, and opaque "facts", whereas we treat "table" records as a > >>>> sequence of updates to an entity identified by the record key (where > >>>> "update" means that each record's value represents the new state after > >>>> applying the update). For the most part, this is a clean separation, > >>>> but there is one special case where records with a "null" value are > >>>> interpreted as a tombstone in the table context (i.e., the record > >>>> indicates not that the new value of the entity is "null", but rather > >>>> that the entity has been deleted). In contrast, a record with a null > >>>> value in the stream context is _just_ a record with a null value; no > >>>> special semantics. > >>>> > >>>> The difficulty is that these two semantics clash at the stream/table > >>>> boundary. So, operations that convert streams to tables (like > >>>> KGroupedStream#aggregate) have to cope with ambiguity about whether to > >>>> treat null values opaquely as null values, or as tombstones. I think > >>>> I'll make a long story short and just say that this is a very, very > >>>> complex issue. As a result (and as a bit of a punt), our > >>>> KGroupedStream operations actually just discard null-valued records. > >>>> This means that the following are _not_ equivalent programs: > >>>> > >>>> table1 = > >>>> stream<Id,Record>("records") > >>>> .filter(Record::isOk) > >>>> .groupByKey() > >>>> .aggregate(() -> new Record(), (key, value, agg) -> value) > >>>> table2 = > >>>> table<Id,Record>("record") > >>>> .filter(Record::isOk) > >>>> > >>>> They look about the same, in that they'll both produce a > >>>> KTable<Id,Record> with the value being the latest state. But if a > >>>> record is deleted in the upstream data (represented as a "null" > >>>> value), that record would also be deleted in table2, but not in > >>>> table1. Table1 would just perpetually contain the value immediately > >>>> prior to the delete. > >>>> > >>>> This is why it makes me nervous to propose a new kind of _stream_ > >>>> operation ostensibly in order to solve a problem that presents itself > >>>> in the _table_ context. > >>>> > >>>> If the goal is to provide a more efficient and convenient multi-way > >>>> KTable join, I think it would be a good idea to consider an extension > >>>> to the KTable API, not the KStream API. On the other hand, if this is > >>>> not the goal, then the motivation of the KIP shouldn't say that it is. > >>>> Instead, the KIP could provide some other motivation specifically for > >>>> augmenting the KStream API. > >>>> > >>>> There is a third alternative that comes to mind, if you wish to > >>>> resolve the long-standing dilemma around this semantic problem and > >>>> specify in the KIP how exactly nulls are handled in this operator. But > >>>> (although this seems on the face to be a good option), I think it > >>>> might be a briarpatch. Even if we are able to reach a suitable design, > >>>> we'd have to contend with the fact that it looks like the > >>>> KGroupedStream API, but behaves differently. > >>>> > >>>> What do you think about all this? > >>>> > >>>> Thanks again for the KIP and the discussion! > >>>> -John > >>>> > >>>> On Mon, Oct 28, 2019 at 3:32 PM Walker Carlson <wcarl...@confluent.io > > > >>>> wrote: > >>>>> > >>>>> Hi Gouzhang, > >>>>> > >>>>> Matthias and I did talk about overloading different a type of > >> aggregate > >>>>> methods in the cogroup that would take in the windows and returns a > >>>>> windowed KTable. We decided that it would break too much with the > >>> current > >>>>> pattern that was established in the normal KStream. We can revisit > >> this > >>>> if > >>>>> you have a different opinion on the tradeoff. > >>>>> > >>>>> Walker > >>>>> > >>>>> On Mon, Oct 28, 2019 at 12:14 PM Guozhang Wang <wangg...@gmail.com> > >>>> wrote: > >>>>> > >>>>>> Hi Walker, > >>>>>> > >>>>>> On Fri, Oct 25, 2019 at 1:34 PM Walker Carlson < > >>> wcarl...@confluent.io> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi Guozhang, > >>>>>>> > >>>>>>> 1. I am familiar with the cogroup of spark, it is very similar to > >>>>>>> their join operator but instead it makes the values iterable. I > >>> think > >>>>>> that > >>>>>>> the use cases are different enough that it makes sense to specify > >>> the > >>>>>>> aggregator when we do. > >>>>>>> > >>>>>>> I like the idea of "absorb" and I think it could be useful. > >>> Although > >>>> I do > >>>>>>> not think it is as intuitive. > >>>>>>> > >>>>>>> If we were to go that route we would either use more processors > >> or > >>> do > >>>>>>> essentially the same thing but would have to store the > >> information > >>>>>>> required to cogroup inside that KTable. I think this would > >> violate > >>>> some > >>>>>>> design principles. I would argue that we should consider adding > >>>> absorb as > >>>>>>> well and auto re-write it to use cogroup. > >>>>>>> > >>>>>> > >>>>>> Yeah I think I agree with you about the internal design complexity > >>> with > >>>>>> "absorb"; I was primarily thinking if we can save ourselves from > >>>>>> introducing 3 more public classes with co-group. But it seems that > >>>> without > >>>>>> introducing new classes there's no easy way for us to bound the > >> scope > >>>> of > >>>>>> co-grouping (like how many streams will be co-grouped together). > >>>>>> > >>>>>> LMK if you have some better ideas: generally speaking the less new > >>>> public > >>>>>> interfaces we are introducing to fulfill a new feature the better, > >> so > >>>> I'd > >>>>>> push us to think twice and carefully before we go down the route. > >>>>>> > >>>>>> > >>>>>>> > >>>>>>> 2. We have not considered this thought that would be a convenient > >>>>>>> operation. > >>>>>>> > >>>>>>> 3. There is only one processor made. We are actually having the > >>>> naming > >>>>>>> conversation right now in the above thread > >>>>>>> > >>>>>>> 4, 5. fair points > >>>>>>> > >>>>>>> Walker > >>>>>>> > >>>>>>> On Fri, Oct 25, 2019 at 11:58 AM Guozhang Wang < > >> wangg...@gmail.com > >>>> > >>>>>> wrote: > >>>>>>> > >>>>>>>> Hi Walker, thanks for the KIP! I made a pass on the writeup and > >>>> have > >>>>>> some > >>>>>>>> comments below: > >>>>>>>> > >>>>>>>> Meta: > >>>>>>>> > >>>>>>>> 1. Syntax-wise, I'm wondering if we have compared our current > >>>> proposal > >>>>>>> with > >>>>>>>> Spark's co-group syntax (I know they are targeting for > >> different > >>>> use > >>>>>>> cases, > >>>>>>>> but wondering if their syntax is closer to the join operator), > >>>> what are > >>>>>>> the > >>>>>>>> syntax / semantics trade-off here? > >>>>>>>> > >>>>>>>> Just playing a devil's advocate here, if the main motivation is > >>> to > >>>>>>> provide > >>>>>>>> a more convienent multi-way join syntax, and in order to only > >>> have > >>>> one > >>>>>>>> materialized store we need to specify the final joined format > >> at > >>>> the > >>>>>>>> beginning, then what about the following alternative (with the > >>>> given > >>>>>>>> example in your wiki page): > >>>>>>>> > >>>>>>>> > >>>>>>>> KGroupedStream<K, V1> grouped1 = > >>>> builder.stream("topic1").groupByKey(); > >>>>>>>> KGroupedStream<K, V2> grouped2 = > >>>> builder.stream("topic2").groupByKey(); > >>>>>>>> KGroupedStream<K, V3> grouped3 = > >>>> builder.stream("topic3").groupByKey(); > >>>>>>>> > >>>>>>>> KTable<K, CG> aggregated = grouped1.aggregate(initializer, > >>>>>> materialized, > >>>>>>>> aggregator1); > >>>>>>>> > >>>>>>>> aggregated.absorb(grouped2, aggregator2); // I'm just using a > >>>> random > >>>>>>> name > >>>>>>>> on top of my head here > >>>>>>>> .absorb(grouped3, aggregator3); > >>>>>>>> > >>>>>>>> In this way, we just add a new API to the KTable to "absorb" > >> new > >>>>>> streams > >>>>>>> as > >>>>>>>> aggregated results without needing to introduce new first > >> citizen > >>>>>>> classes. > >>>>>>>> > >>>>>>>> 2. From the DSL optimization, have we considered if we can auto > >>>>>> re-write > >>>>>>>> the user written old fashioned multi-join into this new DSL > >>>> operator? > >>>>>>>> > >>>>>>>> 3. Although it is not needed for the wiki page itself, for > >>> internal > >>>>>>>> implementation how many processor nodes would we create for the > >>> new > >>>>>>>> operator, and how we can allow users to name them? > >>>>>>>> > >>>>>>>> Minor: > >>>>>>>> > >>>>>>>> 4. In "Public Interfaces", better add the templated generics to > >>>>>>>> "KGroupedStream" as "KGroupedStream<K, V>". > >>>>>>>> > >>>>>>>> 5. Naming wise, I'd suggest we keep the "K" together with > >>>> Stream/Table, > >>>>>>>> e.g. "TimeWindowed*CogroupedKStream*<K, V>". > >>>>>>>> > >>>>>>>> > >>>>>>>> Guozhang > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Thu, Oct 24, 2019 at 11:43 PM Matthias J. Sax < > >>>>>> matth...@confluent.io> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Walker, > >>>>>>>>> > >>>>>>>>> I am not sure if I can follow your argument. What do you > >>> exactly > >>>> mean > >>>>>>> by > >>>>>>>>> > >>>>>>>>>> I also > >>>>>>>>>>> think that in this case it would be better to separate > >> the 2 > >>>>>> option > >>>>>>>> out > >>>>>>>>>>> into separate overloads. > >>>>>>>>> > >>>>>>>>> Maybe you can give an example what method signature you have > >> in > >>>> mind? > >>>>>>>>> > >>>>>>>>>>> We could take a named parameter from upstream or add an > >>> extra > >>>>>> naming > >>>>>>>>> option > >>>>>>>>>>> however I don't really see the advantage that would give. > >>>>>>>>> > >>>>>>>>> Are you familiar with KIP-307? Before KIP-307, KS generated > >> all > >>>> names > >>>>>>>>> for all Processors. This makes it hard to reason about a > >>>> Topology if > >>>>>>>>> it's getting complex. Adding `Named` to the new co-group > >>> operator > >>>>>> would > >>>>>>>>> actually align with KIP-307. > >>>>>>>>> > >>>>>>>>>> It seems to go in > >>>>>>>>>>> the opposite direction from the cogroup configuration idea > >>> you > >>>>>>>> proposed. > >>>>>>>>> > >>>>>>>>> Can you elaborate? Not sure if I can follow. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -Matthias > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On 10/24/19 10:20 AM, Walker Carlson wrote: > >>>>>>>>>> While I like the idea Sophie I don't think that it is > >>>> necessary. I > >>>>>>> also > >>>>>>>>>> think that in this case it would be better to separate the > >> 2 > >>>> option > >>>>>>> out > >>>>>>>>>> into separate overloads. > >>>>>>>>>> We could take a named parameter from upstream or add an > >> extra > >>>>>> naming > >>>>>>>>> option > >>>>>>>>>> however I don't really see the advantage that would give. > >> It > >>>> seems > >>>>>> to > >>>>>>>> go > >>>>>>>>> in > >>>>>>>>>> the opposite direction from the cogroup configuration idea > >>> you > >>>>>>>> proposed. > >>>>>>>>>> > >>>>>>>>>> John, I think it could be both. It depends on when you > >>>> aggregate > >>>>>> and > >>>>>>>> what > >>>>>>>>>> kind of data you have. In the example it is aggregating > >>> before > >>>>>>> joining, > >>>>>>>>>> there are probably some cases where you could join before > >>>>>>> aggregating. > >>>>>>>>> IMHO > >>>>>>>>>> it would be easier to group all the streams together then > >>>> perform > >>>>>> the > >>>>>>>> one > >>>>>>>>>> operation that results in a single KTable. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Wed, Oct 23, 2019 at 9:58 PM Sophie Blee-Goldman < > >>>>>>>> sop...@confluent.io > >>>>>>>>>> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>>> I can personally not see any need to add other > >>> configuration > >>>>>>>>>>> Famous last words? > >>>>>>>>>>> > >>>>>>>>>>> Just kidding, 95% confidence is more than enough to me > >> (and > >>>>>> better > >>>>>>> to > >>>>>>>>>>> optimize for current > >>>>>>>>>>> design than for hypothetical future changes). > >>>>>>>>>>> > >>>>>>>>>>> One last question I have then is about the > >>>>>>> operator/store/repartition > >>>>>>>>>>> naming -- seems like > >>>>>>>>>>> we can name the underlying store/changelog through the > >>>>>> Materialized > >>>>>>>>>>> parameter, but do we > >>>>>>>>>>> also want to include an overload taking a Named parameter > >>> for > >>>> the > >>>>>>>>> operator > >>>>>>>>>>> name (as in the > >>>>>>>>>>> KTable#join variations)? > >>>>>>>>>>> > >>>>>>>>>>> On Wed, Oct 23, 2019 at 5:14 PM Matthias J. Sax < > >>>>>>>> matth...@confluent.io> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Interesting idea, Sophie. > >>>>>>>>>>>> > >>>>>>>>>>>> So far, we tried to reuse existing config objects and > >> only > >>>> add > >>>>>> new > >>>>>>>> ones > >>>>>>>>>>>> when needed to avoid creating "redundant" classes. This > >> is > >>> of > >>>>>>> course > >>>>>>>> a > >>>>>>>>>>>> reactive approach (with the drawback to deprecate stuff > >> if > >>> we > >>>>>>> change > >>>>>>>>> it, > >>>>>>>>>>>> as you described). > >>>>>>>>>>>> > >>>>>>>>>>>> I can personally not see any need to add other > >>> configuration > >>>>>>>> parameters > >>>>>>>>>>>> atm, so it's a 95% obvious "no" IMHO. The final > >>>> `aggregate()` has > >>>>>>>> only > >>>>>>>>> a > >>>>>>>>>>>> single state store that we need to configure, and reusing > >>>>>>>>> `Materialized` > >>>>>>>>>>>> seems to be appropriate. > >>>>>>>>>>>> > >>>>>>>>>>>> Also note, that the `Initializer` is a mandatory > >> parameter > >>>> and > >>>>>> not > >>>>>>> a > >>>>>>>>>>>> configuration and should be passed directly, and not via > >> a > >>>>>>>>> configuration > >>>>>>>>>>>> object. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> -Matthias > >>>>>>>>>>>> > >>>>>>>>>>>> On 10/23/19 11:37 AM, Sophie Blee-Goldman wrote: > >>>>>>>>>>>>> Thanks for the explanation, makes sense to me! As for > >> the > >>>> API, > >>>>>> one > >>>>>>>>>>> other > >>>>>>>>>>>>> thought I had is might we ever want or need to introduce > >>> any > >>>>>> other > >>>>>>>>>>>> configs > >>>>>>>>>>>>> or parameters in the future? Obviously that's difficult > >> to > >>>> say > >>>>>> now > >>>>>>>> (or > >>>>>>>>>>>>> maybe the > >>>>>>>>>>>>> answer seems obviously "no") but we seem to often end up > >>>> needing > >>>>>>> to > >>>>>>>>> add > >>>>>>>>>>>> new > >>>>>>>>>>>>> overloads and/or deprecate old ones as new features or > >>>>>>> requirements > >>>>>>>>>>> come > >>>>>>>>>>>>> into > >>>>>>>>>>>>> play. > >>>>>>>>>>>>> > >>>>>>>>>>>>> What do you (and others?) think about wrapping the > >> config > >>>>>>> parameters > >>>>>>>>>>> (ie > >>>>>>>>>>>>> everything > >>>>>>>>>>>>> except the actual grouped streams) in a new config > >> object? > >>>> For > >>>>>>>>> example, > >>>>>>>>>>>> the > >>>>>>>>>>>>> CogroupedStream#aggregate field could take a single > >>>> Cogrouped > >>>>>>>> object, > >>>>>>>>>>>>> which itself would have an initializer and a > >> materialized. > >>>> If we > >>>>>>>> ever > >>>>>>>>>>>> need > >>>>>>>>>>>>> to add > >>>>>>>>>>>>> a new parameter, we can just add it to the Cogrouped > >>> class. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Also, will the backing store be available for IQ if a > >>>>>> Materialized > >>>>>>>> is > >>>>>>>>>>>>> passed in? > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Wed, Oct 23, 2019 at 10:49 AM Walker Carlson < > >>>>>>>>> wcarl...@confluent.io > >>>>>>>>>>>> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Sophie, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thank you for your comments. As for the different > >> methods > >>>>>>>> signatures > >>>>>>>>> I > >>>>>>>>>>>> have > >>>>>>>>>>>>>> not really considered any other options but while I do > >>>> agree > >>>>>> it > >>>>>>> is > >>>>>>>>>>>>>> confusing, I don't see any obvious solutions. The > >> problem > >>>> is > >>>>>> that > >>>>>>>> the > >>>>>>>>>>>>>> cogroup essentially pairs a group stream with an > >>>> aggregator and > >>>>>>>> when > >>>>>>>>>>> it > >>>>>>>>>>>> is > >>>>>>>>>>>>>> first made the method is called on a groupedStream > >>> already. > >>>>>>> However > >>>>>>>>>>> each > >>>>>>>>>>>>>> subsequent stream-aggregator pair is added on to a > >>> cogroup > >>>>>> stream > >>>>>>>> so > >>>>>>>>>>> it > >>>>>>>>>>>>>> needs both arguments. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> For the second question you should not need a joiner. > >> The > >>>> idea > >>>>>> is > >>>>>>>>> that > >>>>>>>>>>>> you > >>>>>>>>>>>>>> can collect many grouped streams with overlapping key > >>>> spaces > >>>>>> and > >>>>>>>> any > >>>>>>>>>>>> kind > >>>>>>>>>>>>>> of value types. Once aggregated its value will be > >> reduced > >>>> into > >>>>>>> one > >>>>>>>>>>> type. > >>>>>>>>>>>>>> This is why you need only one initializer. Each > >>> aggregator > >>>> will > >>>>>>>> need > >>>>>>>>>>> to > >>>>>>>>>>>>>> integrate the new value with the new object made in the > >>>>>>>> initializer. > >>>>>>>>>>>>>> Does that make sense? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> This is a good question and I will include this > >>>> explanation in > >>>>>>> the > >>>>>>>>> kip > >>>>>>>>>>>> as > >>>>>>>>>>>>>> well. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>> Walker > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Tue, Oct 22, 2019 at 8:59 PM Sophie Blee-Goldman < > >>>>>>>>>>>> sop...@confluent.io> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hey Walker, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks for the KIP! I have just a couple of questions: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 1) It seems a little awkward to me that with the > >> current > >>>> API, > >>>>>> we > >>>>>>>>>>> have a > >>>>>>>>>>>>>>> nearly identical > >>>>>>>>>>>>>>> "add stream to cogroup" method, except for the first > >>> which > >>>>>> has a > >>>>>>>>>>>>>> different > >>>>>>>>>>>>>>> signature > >>>>>>>>>>>>>>> (ie the first stream is joined as > >>>> stream.cogroup(Aggregator) > >>>>>>> while > >>>>>>>>>>> the > >>>>>>>>>>>>>>> subsequent ones > >>>>>>>>>>>>>>> are joined as .cogroup(Stream, Aggregator) ). I'm not > >>> sure > >>>>>> what > >>>>>>> it > >>>>>>>>>>>> would > >>>>>>>>>>>>>>> look like exactly, > >>>>>>>>>>>>>>> but I was just wondering if you'd considered and/or > >>>> rejected > >>>>>> any > >>>>>>>>>>>>>>> alternative APIs? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 2) This might just be my lack of familiarity with > >>>> "cogroup" > >>>>>> as a > >>>>>>>>>>>> concept, > >>>>>>>>>>>>>>> but with the > >>>>>>>>>>>>>>> current (non-optimal) API the user seems to have some > >>>> control > >>>>>>> over > >>>>>>>>>>> how > >>>>>>>>>>>>>>> exactly > >>>>>>>>>>>>>>> the different streams are joined through the > >>> ValueJoiners. > >>>>>> Would > >>>>>>>>> this > >>>>>>>>>>>> new > >>>>>>>>>>>>>>> cogroup > >>>>>>>>>>>>>>> simply concatenate the values from the different > >> cogroup > >>>>>>> streams, > >>>>>>>> or > >>>>>>>>>>>>>> could > >>>>>>>>>>>>>>> users > >>>>>>>>>>>>>>> potentially pass some kind of Joiner to the > >>>> cogroup/aggregate > >>>>>>>>>>> methods? > >>>>>>>>>>>>>> Or, > >>>>>>>>>>>>>>> is the > >>>>>>>>>>>>>>> whole point of cogroups that you no longer ever need > >> to > >>>>>> specify > >>>>>>> a > >>>>>>>>>>>> Joiner? > >>>>>>>>>>>>>>> If so, you > >>>>>>>>>>>>>>> should add a short line to the KIP explaining that for > >>>> those > >>>>>> of > >>>>>>> us > >>>>>>>>>>> who > >>>>>>>>>>>>>>> aren't fluent > >>>>>>>>>>>>>>> in cogroup semantics :) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>> Sophie > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Thu, Oct 17, 2019 at 3:06 PM Walker Carlson < > >>>>>>>>>>> wcarl...@confluent.io> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Good catch I updated that. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I have made a PR for this KIP > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I then am splitting it into 3 parts, first cogroup > >> for > >>> a > >>>>>>>> key-value > >>>>>>>>>>>>>> store > >>>>>>>>>>>>>>> ( > >>>>>>>>>>>>>>>> here <https://github.com/apache/kafka/pull/7538>), > >>> then > >>>> for > >>>>>> a > >>>>>>>>>>>>>>>> timeWindowedStore, and then a sessionWindowedStore + > >>>> ensuring > >>>>>>>>>>>>>>> partitioning. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Walker > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Tue, Oct 15, 2019 at 12:47 PM Matthias J. Sax < > >>>>>>>>>>>>>> matth...@confluent.io> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Walker, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> thanks for picking up the KIP and reworking it for > >> the > >>>>>> changed > >>>>>>>>> API. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Overall, the updated API suggestions make sense to > >> me. > >>>> The > >>>>>>> seem > >>>>>>>> to > >>>>>>>>>>>>>>> align > >>>>>>>>>>>>>>>>> quite nicely with our current API design. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> One nit: In `CogroupedKStream#aggregate(...)` the > >> type > >>>>>>> parameter > >>>>>>>>> of > >>>>>>>>>>>>>>>>> `Materialized` should be `V`, not `VR`? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On 10/14/19 2:57 PM, Walker Carlson wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup > >>>>>>>>>>>>>>>>>> here > >>>>>>>>>>>>>>>>>> is a link > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Mon, Oct 14, 2019 at 2:52 PM Walker Carlson < > >>>>>>>>>>>>>>> wcarl...@confluent.io> > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hello all, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I have picked up and updated KIP-150. Due to > >> changes > >>>> to > >>>>>> the > >>>>>>>>>>>>>> project > >>>>>>>>>>>>>>>>> since > >>>>>>>>>>>>>>>>>>> KIP #150 was written there are a few items that > >> need > >>>> to be > >>>>>>>>>>>>>> updated. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> First item that changed is the adoption of the > >>>>>> Materialized > >>>>>>>>>>>>>>> parameter. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> The second item is the WindowedBy. How the old KIP > >>>> handles > >>>>>>>>>>>>>> windowing > >>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>> that it overloads the aggregate function to take > >> in > >>> a > >>>>>> Window > >>>>>>>>>>>>>> object > >>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>> well > >>>>>>>>>>>>>>>>>>> as the other parameters. The current practice to > >>>> window > >>>>>>>>>>>>>>>> grouped-streams > >>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>> to call windowedBy and receive a windowed stream > >>>> object. > >>>>>> The > >>>>>>>>>>>>>>> existing > >>>>>>>>>>>>>>>>>>> interface for a windowed stream made from a > >> grouped > >>>> stream > >>>>>>>> will > >>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>> work > >>>>>>>>>>>>>>>>>>> for cogrouped streams. Hence, we have to make new > >>>>>> interfaces > >>>>>>>> for > >>>>>>>>>>>>>>>>> cogrouped > >>>>>>>>>>>>>>>>>>> windowed streams. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Please take a look, I would like to hear your > >>>> feedback, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Walker > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>>> -- > >>>>>>>> -- Guozhang > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> -- Guozhang > >>>>>> > >>>> > >>> > >> > >> > >> -- > >> -- Guozhang > >> > > > > -- -- Guozhang