Interesting discussion. My personal take is as follows: (1) Co-group is not a special case of a multi-way KTable join, because multiple record wit the same key from a single input stream should be aggregated together and there are not update semantics. A co-group is rather a muti-stream aggregation operation.
(2) Semantically, non-windowed co-group is the same as (as laid out in the KIP already): > KTable aggTable1 = stream1.groupByKey().aggregate(...); > KTable aggTable2 = stream2.groupByKey().aggregate(...); > ... > KTable aggTableX = streamX.groupByKey().aggregate(...); > > KTable final = aggTable1.join(aggTable(2)....join(aggTAbleX) However, I don't think that it would be possible for our optimizer to rewrite one into the other, given what `Initializer`, `Aggregator`, `ValueJoiner`, and `S a user would provide. Hence, `cogroup()` is a more efficient way to express the above using a single store, instead of X stores are required above. For windowed co-group, especially session-windowed, it seems not possible at all to rewrite co-group as independent aggregations followed by joins. Note that sessions boundaries would be determined _after_ the input streams are co-partitioned/merged in `cogroup()` and thus would be different compare the the aggregate-join pattern. (3) For the current KIP writeup, I agree that adding `Named` to `aggregate()` aligns best with the current API layout. I also don't think that the overloads are a big issue, because they are spread out over multiple helper interfaces. -Matthias On 10/29/19 10:38 AM, Walker Carlson wrote: > Hi Gouzhang, > > I am not sure what you mean by "Fields from different streams are never > aggregated together", this certainly can be the case but not the general > rule. If we want to take care of the special cases where the key-sets are > disjoint for each stream then they can be given no-op operators. This would > have the same effect as a stitching join as the function to update the > store would have to be defined either way, even to just place it in. > > Now if we look at it from the other way, if we only specify the multiway > join then the user will need to aggregate each stream. Then they must do > the join which either will involve aggregators and value joiners or some > questionable optimization that would rely on each aggregator defined for a > grouped stream meshing together. And this would all have to happen inside > KStream. > > I do agree that there are optimizations that can be done on joining > multiple tables per your example, in both cases whether it be a "stitching > join" or not. But I do not think the place to do it is in Streams. This > could be relatively easy to accomplish. I think we save ourselves pain if > we consider the tables and streams as separate cases, as aggregating > multiple streams into one KTable can be done more efficiently than making > multiple KTables and then joining them together. We may be able to get > around this in the case of a stitching join but I am not sure how we could > do it safely otherwise. > > Walker > > > > > > On Mon, Oct 28, 2019 at 6:26 PM Guozhang Wang <wangg...@gmail.com> wrote: > >> Hi Walker, >> >> This is a good point about compatibility breakage while overloading the >> existing classes; while reading John and your exchanges, I think I still >> need to clarify the motivations a bit more: >> >> 1) Multiple streams need to be aggregated together, inputs are always >> *KStreams* and end result is a *KTable*. >> 2) Fields from different streams are never aggregated together, i.e. on the >> higher level it is more like a "stitching up" the fields and then doing a >> single aggregation. >> >> In this context, I agree with you that it is still a streams-aggregation >> operator that we are trying to optimize (though its a multi-way), not a >> multi-way table-table-join operator that we are tying to optimize here. >> >> >> ----------------- >> >> But now taking a step back looking at it, I'm wondering, because of 2) that >> all input streams do not have overlapping fields, we can generalize this to >> a broader scope. Consider this case for example: >> >> table1 = builder.table("topic1"); >> table2 = builder.table("topic2"); >> table3 = builder.table("topic3"); >> table4 = table1.join(table2).join(table3); >> >> Suppose the join operations do not take out any fields or add any new >> fields, i.e. say table1 has fields A, table2 has fields B, and table2 has >> fields C besides the key K, the table 4 has field {A, B, C} --- the join is >> just "stitching up" the fields --- then the above topology can actually be >> optimized in a similar way: >> >> * we only keep one materialized store in the form of K -> {A, B, C} as the >> materialized store of the final join result of table4. >> * when a record comes in from table1/2/3, just query the store on K, and >> then update the corresponding A/B/C field and then writes back to the >> store. >> >> >> Then the above streams-aggregation operator can be treated as a special >> case of this: you first aggregate separately on stream1/2/3 and generate >> table1/2/3, and then do this "stitching join", behind the scene we can >> optimize the topology to do exactly the co-group logic by updating the >> second bullet point above as an aggregation operator: >> >> * when a record comes in from *stream1/2/3*, just query the store on K, and >> then update the corresponding A/B/C field *with an aggregator *and then >> writes back to the store. >> >> ----------------- >> >> Personally I think this is better because with 1) larger applicable scope, >> and 2) without introducing new interfaces. But of course on the other side >> it requires us to do this optimization inside the Streams with some syntax >> hint from users (for example, users need to specify it is a "stitching >> join" such that all fields are still preserved in the join result). WDYT? >> >> >> Guozhang >> >> >> On Mon, Oct 28, 2019 at 4:20 PM Walker Carlson <wcarl...@confluent.io> >> wrote: >> >>> Hi John, >>> >>> Thank you for the background information. I think I understand your >> point. >>> >>> I believe that this could be fixed by making the motivation a little >>> clearer in the KIP. I think that the motivation is when you have >> multiple >>> streams that need to aggregate together to form a single object the >>> current, non optimal, way to do this is through a multiway table join. >> This >>> is a little hacky. There is a slight but significant difference in these >>> cases, as in the null value handling you pointed out. >>> >>> For the example in the motivation, these tables were grouped streams so >>> they already dropped the null values. If we consider Cogroup sitting in >> the >>> same grey area that KGroupedStream does it should also behave this way. >> If >>> you think about it that way it is more of an extension of KGroupedStream >>> than KTable or KStream. Therefore I handle null values the same way >>> KGroupedStream#aggregate does. >>> >>> Looking back I am not sure I understood you previous question fully at >> the >>> time. I am sorry if my answer caused any confusion! >>> >>> Walker >>> >>> On Mon, Oct 28, 2019 at 2:49 PM John Roesler <j...@confluent.io> wrote: >>> >>>> Hi Walker, >>>> >>>> Sorry for the delay in responding. Thanks for your response earlier. >>>> >>>> I think there might be a subtlety getting overlooked in considering >>>> whether we're talking about streams versus tables in cogroup. As I'm >>>> sure you know, Kafka Streams treats "stream" records as independent, >>>> immutable, and opaque "facts", whereas we treat "table" records as a >>>> sequence of updates to an entity identified by the record key (where >>>> "update" means that each record's value represents the new state after >>>> applying the update). For the most part, this is a clean separation, >>>> but there is one special case where records with a "null" value are >>>> interpreted as a tombstone in the table context (i.e., the record >>>> indicates not that the new value of the entity is "null", but rather >>>> that the entity has been deleted). In contrast, a record with a null >>>> value in the stream context is _just_ a record with a null value; no >>>> special semantics. >>>> >>>> The difficulty is that these two semantics clash at the stream/table >>>> boundary. So, operations that convert streams to tables (like >>>> KGroupedStream#aggregate) have to cope with ambiguity about whether to >>>> treat null values opaquely as null values, or as tombstones. I think >>>> I'll make a long story short and just say that this is a very, very >>>> complex issue. As a result (and as a bit of a punt), our >>>> KGroupedStream operations actually just discard null-valued records. >>>> This means that the following are _not_ equivalent programs: >>>> >>>> table1 = >>>> stream<Id,Record>("records") >>>> .filter(Record::isOk) >>>> .groupByKey() >>>> .aggregate(() -> new Record(), (key, value, agg) -> value) >>>> table2 = >>>> table<Id,Record>("record") >>>> .filter(Record::isOk) >>>> >>>> They look about the same, in that they'll both produce a >>>> KTable<Id,Record> with the value being the latest state. But if a >>>> record is deleted in the upstream data (represented as a "null" >>>> value), that record would also be deleted in table2, but not in >>>> table1. Table1 would just perpetually contain the value immediately >>>> prior to the delete. >>>> >>>> This is why it makes me nervous to propose a new kind of _stream_ >>>> operation ostensibly in order to solve a problem that presents itself >>>> in the _table_ context. >>>> >>>> If the goal is to provide a more efficient and convenient multi-way >>>> KTable join, I think it would be a good idea to consider an extension >>>> to the KTable API, not the KStream API. On the other hand, if this is >>>> not the goal, then the motivation of the KIP shouldn't say that it is. >>>> Instead, the KIP could provide some other motivation specifically for >>>> augmenting the KStream API. >>>> >>>> There is a third alternative that comes to mind, if you wish to >>>> resolve the long-standing dilemma around this semantic problem and >>>> specify in the KIP how exactly nulls are handled in this operator. But >>>> (although this seems on the face to be a good option), I think it >>>> might be a briarpatch. Even if we are able to reach a suitable design, >>>> we'd have to contend with the fact that it looks like the >>>> KGroupedStream API, but behaves differently. >>>> >>>> What do you think about all this? >>>> >>>> Thanks again for the KIP and the discussion! >>>> -John >>>> >>>> On Mon, Oct 28, 2019 at 3:32 PM Walker Carlson <wcarl...@confluent.io> >>>> wrote: >>>>> >>>>> Hi Gouzhang, >>>>> >>>>> Matthias and I did talk about overloading different a type of >> aggregate >>>>> methods in the cogroup that would take in the windows and returns a >>>>> windowed KTable. We decided that it would break too much with the >>> current >>>>> pattern that was established in the normal KStream. We can revisit >> this >>>> if >>>>> you have a different opinion on the tradeoff. >>>>> >>>>> Walker >>>>> >>>>> On Mon, Oct 28, 2019 at 12:14 PM Guozhang Wang <wangg...@gmail.com> >>>> wrote: >>>>> >>>>>> Hi Walker, >>>>>> >>>>>> On Fri, Oct 25, 2019 at 1:34 PM Walker Carlson < >>> wcarl...@confluent.io> >>>>>> wrote: >>>>>> >>>>>>> Hi Guozhang, >>>>>>> >>>>>>> 1. I am familiar with the cogroup of spark, it is very similar to >>>>>>> their join operator but instead it makes the values iterable. I >>> think >>>>>> that >>>>>>> the use cases are different enough that it makes sense to specify >>> the >>>>>>> aggregator when we do. >>>>>>> >>>>>>> I like the idea of "absorb" and I think it could be useful. >>> Although >>>> I do >>>>>>> not think it is as intuitive. >>>>>>> >>>>>>> If we were to go that route we would either use more processors >> or >>> do >>>>>>> essentially the same thing but would have to store the >> information >>>>>>> required to cogroup inside that KTable. I think this would >> violate >>>> some >>>>>>> design principles. I would argue that we should consider adding >>>> absorb as >>>>>>> well and auto re-write it to use cogroup. >>>>>>> >>>>>> >>>>>> Yeah I think I agree with you about the internal design complexity >>> with >>>>>> "absorb"; I was primarily thinking if we can save ourselves from >>>>>> introducing 3 more public classes with co-group. But it seems that >>>> without >>>>>> introducing new classes there's no easy way for us to bound the >> scope >>>> of >>>>>> co-grouping (like how many streams will be co-grouped together). >>>>>> >>>>>> LMK if you have some better ideas: generally speaking the less new >>>> public >>>>>> interfaces we are introducing to fulfill a new feature the better, >> so >>>> I'd >>>>>> push us to think twice and carefully before we go down the route. >>>>>> >>>>>> >>>>>>> >>>>>>> 2. We have not considered this thought that would be a convenient >>>>>>> operation. >>>>>>> >>>>>>> 3. There is only one processor made. We are actually having the >>>> naming >>>>>>> conversation right now in the above thread >>>>>>> >>>>>>> 4, 5. fair points >>>>>>> >>>>>>> Walker >>>>>>> >>>>>>> On Fri, Oct 25, 2019 at 11:58 AM Guozhang Wang < >> wangg...@gmail.com >>>> >>>>>> wrote: >>>>>>> >>>>>>>> Hi Walker, thanks for the KIP! I made a pass on the writeup and >>>> have >>>>>> some >>>>>>>> comments below: >>>>>>>> >>>>>>>> Meta: >>>>>>>> >>>>>>>> 1. Syntax-wise, I'm wondering if we have compared our current >>>> proposal >>>>>>> with >>>>>>>> Spark's co-group syntax (I know they are targeting for >> different >>>> use >>>>>>> cases, >>>>>>>> but wondering if their syntax is closer to the join operator), >>>> what are >>>>>>> the >>>>>>>> syntax / semantics trade-off here? >>>>>>>> >>>>>>>> Just playing a devil's advocate here, if the main motivation is >>> to >>>>>>> provide >>>>>>>> a more convienent multi-way join syntax, and in order to only >>> have >>>> one >>>>>>>> materialized store we need to specify the final joined format >> at >>>> the >>>>>>>> beginning, then what about the following alternative (with the >>>> given >>>>>>>> example in your wiki page): >>>>>>>> >>>>>>>> >>>>>>>> KGroupedStream<K, V1> grouped1 = >>>> builder.stream("topic1").groupByKey(); >>>>>>>> KGroupedStream<K, V2> grouped2 = >>>> builder.stream("topic2").groupByKey(); >>>>>>>> KGroupedStream<K, V3> grouped3 = >>>> builder.stream("topic3").groupByKey(); >>>>>>>> >>>>>>>> KTable<K, CG> aggregated = grouped1.aggregate(initializer, >>>>>> materialized, >>>>>>>> aggregator1); >>>>>>>> >>>>>>>> aggregated.absorb(grouped2, aggregator2); // I'm just using a >>>> random >>>>>>> name >>>>>>>> on top of my head here >>>>>>>> .absorb(grouped3, aggregator3); >>>>>>>> >>>>>>>> In this way, we just add a new API to the KTable to "absorb" >> new >>>>>> streams >>>>>>> as >>>>>>>> aggregated results without needing to introduce new first >> citizen >>>>>>> classes. >>>>>>>> >>>>>>>> 2. From the DSL optimization, have we considered if we can auto >>>>>> re-write >>>>>>>> the user written old fashioned multi-join into this new DSL >>>> operator? >>>>>>>> >>>>>>>> 3. Although it is not needed for the wiki page itself, for >>> internal >>>>>>>> implementation how many processor nodes would we create for the >>> new >>>>>>>> operator, and how we can allow users to name them? >>>>>>>> >>>>>>>> Minor: >>>>>>>> >>>>>>>> 4. In "Public Interfaces", better add the templated generics to >>>>>>>> "KGroupedStream" as "KGroupedStream<K, V>". >>>>>>>> >>>>>>>> 5. Naming wise, I'd suggest we keep the "K" together with >>>> Stream/Table, >>>>>>>> e.g. "TimeWindowed*CogroupedKStream*<K, V>". >>>>>>>> >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Oct 24, 2019 at 11:43 PM Matthias J. Sax < >>>>>> matth...@confluent.io> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Walker, >>>>>>>>> >>>>>>>>> I am not sure if I can follow your argument. What do you >>> exactly >>>> mean >>>>>>> by >>>>>>>>> >>>>>>>>>> I also >>>>>>>>>>> think that in this case it would be better to separate >> the 2 >>>>>> option >>>>>>>> out >>>>>>>>>>> into separate overloads. >>>>>>>>> >>>>>>>>> Maybe you can give an example what method signature you have >> in >>>> mind? >>>>>>>>> >>>>>>>>>>> We could take a named parameter from upstream or add an >>> extra >>>>>> naming >>>>>>>>> option >>>>>>>>>>> however I don't really see the advantage that would give. >>>>>>>>> >>>>>>>>> Are you familiar with KIP-307? Before KIP-307, KS generated >> all >>>> names >>>>>>>>> for all Processors. This makes it hard to reason about a >>>> Topology if >>>>>>>>> it's getting complex. Adding `Named` to the new co-group >>> operator >>>>>> would >>>>>>>>> actually align with KIP-307. >>>>>>>>> >>>>>>>>>> It seems to go in >>>>>>>>>>> the opposite direction from the cogroup configuration idea >>> you >>>>>>>> proposed. >>>>>>>>> >>>>>>>>> Can you elaborate? Not sure if I can follow. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> >>>>>>>>> On 10/24/19 10:20 AM, Walker Carlson wrote: >>>>>>>>>> While I like the idea Sophie I don't think that it is >>>> necessary. I >>>>>>> also >>>>>>>>>> think that in this case it would be better to separate the >> 2 >>>> option >>>>>>> out >>>>>>>>>> into separate overloads. >>>>>>>>>> We could take a named parameter from upstream or add an >> extra >>>>>> naming >>>>>>>>> option >>>>>>>>>> however I don't really see the advantage that would give. >> It >>>> seems >>>>>> to >>>>>>>> go >>>>>>>>> in >>>>>>>>>> the opposite direction from the cogroup configuration idea >>> you >>>>>>>> proposed. >>>>>>>>>> >>>>>>>>>> John, I think it could be both. It depends on when you >>>> aggregate >>>>>> and >>>>>>>> what >>>>>>>>>> kind of data you have. In the example it is aggregating >>> before >>>>>>> joining, >>>>>>>>>> there are probably some cases where you could join before >>>>>>> aggregating. >>>>>>>>> IMHO >>>>>>>>>> it would be easier to group all the streams together then >>>> perform >>>>>> the >>>>>>>> one >>>>>>>>>> operation that results in a single KTable. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Oct 23, 2019 at 9:58 PM Sophie Blee-Goldman < >>>>>>>> sop...@confluent.io >>>>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>>> I can personally not see any need to add other >>> configuration >>>>>>>>>>> Famous last words? >>>>>>>>>>> >>>>>>>>>>> Just kidding, 95% confidence is more than enough to me >> (and >>>>>> better >>>>>>> to >>>>>>>>>>> optimize for current >>>>>>>>>>> design than for hypothetical future changes). >>>>>>>>>>> >>>>>>>>>>> One last question I have then is about the >>>>>>> operator/store/repartition >>>>>>>>>>> naming -- seems like >>>>>>>>>>> we can name the underlying store/changelog through the >>>>>> Materialized >>>>>>>>>>> parameter, but do we >>>>>>>>>>> also want to include an overload taking a Named parameter >>> for >>>> the >>>>>>>>> operator >>>>>>>>>>> name (as in the >>>>>>>>>>> KTable#join variations)? >>>>>>>>>>> >>>>>>>>>>> On Wed, Oct 23, 2019 at 5:14 PM Matthias J. Sax < >>>>>>>> matth...@confluent.io> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Interesting idea, Sophie. >>>>>>>>>>>> >>>>>>>>>>>> So far, we tried to reuse existing config objects and >> only >>>> add >>>>>> new >>>>>>>> ones >>>>>>>>>>>> when needed to avoid creating "redundant" classes. This >> is >>> of >>>>>>> course >>>>>>>> a >>>>>>>>>>>> reactive approach (with the drawback to deprecate stuff >> if >>> we >>>>>>> change >>>>>>>>> it, >>>>>>>>>>>> as you described). >>>>>>>>>>>> >>>>>>>>>>>> I can personally not see any need to add other >>> configuration >>>>>>>> parameters >>>>>>>>>>>> atm, so it's a 95% obvious "no" IMHO. The final >>>> `aggregate()` has >>>>>>>> only >>>>>>>>> a >>>>>>>>>>>> single state store that we need to configure, and reusing >>>>>>>>> `Materialized` >>>>>>>>>>>> seems to be appropriate. >>>>>>>>>>>> >>>>>>>>>>>> Also note, that the `Initializer` is a mandatory >> parameter >>>> and >>>>>> not >>>>>>> a >>>>>>>>>>>> configuration and should be passed directly, and not via >> a >>>>>>>>> configuration >>>>>>>>>>>> object. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -Matthias >>>>>>>>>>>> >>>>>>>>>>>> On 10/23/19 11:37 AM, Sophie Blee-Goldman wrote: >>>>>>>>>>>>> Thanks for the explanation, makes sense to me! As for >> the >>>> API, >>>>>> one >>>>>>>>>>> other >>>>>>>>>>>>> thought I had is might we ever want or need to introduce >>> any >>>>>> other >>>>>>>>>>>> configs >>>>>>>>>>>>> or parameters in the future? Obviously that's difficult >> to >>>> say >>>>>> now >>>>>>>> (or >>>>>>>>>>>>> maybe the >>>>>>>>>>>>> answer seems obviously "no") but we seem to often end up >>>> needing >>>>>>> to >>>>>>>>> add >>>>>>>>>>>> new >>>>>>>>>>>>> overloads and/or deprecate old ones as new features or >>>>>>> requirements >>>>>>>>>>> come >>>>>>>>>>>>> into >>>>>>>>>>>>> play. >>>>>>>>>>>>> >>>>>>>>>>>>> What do you (and others?) think about wrapping the >> config >>>>>>> parameters >>>>>>>>>>> (ie >>>>>>>>>>>>> everything >>>>>>>>>>>>> except the actual grouped streams) in a new config >> object? >>>> For >>>>>>>>> example, >>>>>>>>>>>> the >>>>>>>>>>>>> CogroupedStream#aggregate field could take a single >>>> Cogrouped >>>>>>>> object, >>>>>>>>>>>>> which itself would have an initializer and a >> materialized. >>>> If we >>>>>>>> ever >>>>>>>>>>>> need >>>>>>>>>>>>> to add >>>>>>>>>>>>> a new parameter, we can just add it to the Cogrouped >>> class. >>>>>>>>>>>>> >>>>>>>>>>>>> Also, will the backing store be available for IQ if a >>>>>> Materialized >>>>>>>> is >>>>>>>>>>>>> passed in? >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Oct 23, 2019 at 10:49 AM Walker Carlson < >>>>>>>>> wcarl...@confluent.io >>>>>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Sophie, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thank you for your comments. As for the different >> methods >>>>>>>> signatures >>>>>>>>> I >>>>>>>>>>>> have >>>>>>>>>>>>>> not really considered any other options but while I do >>>> agree >>>>>> it >>>>>>> is >>>>>>>>>>>>>> confusing, I don't see any obvious solutions. The >> problem >>>> is >>>>>> that >>>>>>>> the >>>>>>>>>>>>>> cogroup essentially pairs a group stream with an >>>> aggregator and >>>>>>>> when >>>>>>>>>>> it >>>>>>>>>>>> is >>>>>>>>>>>>>> first made the method is called on a groupedStream >>> already. >>>>>>> However >>>>>>>>>>> each >>>>>>>>>>>>>> subsequent stream-aggregator pair is added on to a >>> cogroup >>>>>> stream >>>>>>>> so >>>>>>>>>>> it >>>>>>>>>>>>>> needs both arguments. >>>>>>>>>>>>>> >>>>>>>>>>>>>> For the second question you should not need a joiner. >> The >>>> idea >>>>>> is >>>>>>>>> that >>>>>>>>>>>> you >>>>>>>>>>>>>> can collect many grouped streams with overlapping key >>>> spaces >>>>>> and >>>>>>>> any >>>>>>>>>>>> kind >>>>>>>>>>>>>> of value types. Once aggregated its value will be >> reduced >>>> into >>>>>>> one >>>>>>>>>>> type. >>>>>>>>>>>>>> This is why you need only one initializer. Each >>> aggregator >>>> will >>>>>>>> need >>>>>>>>>>> to >>>>>>>>>>>>>> integrate the new value with the new object made in the >>>>>>>> initializer. >>>>>>>>>>>>>> Does that make sense? >>>>>>>>>>>>>> >>>>>>>>>>>>>> This is a good question and I will include this >>>> explanation in >>>>>>> the >>>>>>>>> kip >>>>>>>>>>>> as >>>>>>>>>>>>>> well. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Walker >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Oct 22, 2019 at 8:59 PM Sophie Blee-Goldman < >>>>>>>>>>>> sop...@confluent.io> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hey Walker, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for the KIP! I have just a couple of questions: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1) It seems a little awkward to me that with the >> current >>>> API, >>>>>> we >>>>>>>>>>> have a >>>>>>>>>>>>>>> nearly identical >>>>>>>>>>>>>>> "add stream to cogroup" method, except for the first >>> which >>>>>> has a >>>>>>>>>>>>>> different >>>>>>>>>>>>>>> signature >>>>>>>>>>>>>>> (ie the first stream is joined as >>>> stream.cogroup(Aggregator) >>>>>>> while >>>>>>>>>>> the >>>>>>>>>>>>>>> subsequent ones >>>>>>>>>>>>>>> are joined as .cogroup(Stream, Aggregator) ). I'm not >>> sure >>>>>> what >>>>>>> it >>>>>>>>>>>> would >>>>>>>>>>>>>>> look like exactly, >>>>>>>>>>>>>>> but I was just wondering if you'd considered and/or >>>> rejected >>>>>> any >>>>>>>>>>>>>>> alternative APIs? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2) This might just be my lack of familiarity with >>>> "cogroup" >>>>>> as a >>>>>>>>>>>> concept, >>>>>>>>>>>>>>> but with the >>>>>>>>>>>>>>> current (non-optimal) API the user seems to have some >>>> control >>>>>>> over >>>>>>>>>>> how >>>>>>>>>>>>>>> exactly >>>>>>>>>>>>>>> the different streams are joined through the >>> ValueJoiners. >>>>>> Would >>>>>>>>> this >>>>>>>>>>>> new >>>>>>>>>>>>>>> cogroup >>>>>>>>>>>>>>> simply concatenate the values from the different >> cogroup >>>>>>> streams, >>>>>>>> or >>>>>>>>>>>>>> could >>>>>>>>>>>>>>> users >>>>>>>>>>>>>>> potentially pass some kind of Joiner to the >>>> cogroup/aggregate >>>>>>>>>>> methods? >>>>>>>>>>>>>> Or, >>>>>>>>>>>>>>> is the >>>>>>>>>>>>>>> whole point of cogroups that you no longer ever need >> to >>>>>> specify >>>>>>> a >>>>>>>>>>>> Joiner? >>>>>>>>>>>>>>> If so, you >>>>>>>>>>>>>>> should add a short line to the KIP explaining that for >>>> those >>>>>> of >>>>>>> us >>>>>>>>>>> who >>>>>>>>>>>>>>> aren't fluent >>>>>>>>>>>>>>> in cogroup semantics :) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>> Sophie >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Oct 17, 2019 at 3:06 PM Walker Carlson < >>>>>>>>>>> wcarl...@confluent.io> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Good catch I updated that. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I have made a PR for this KIP >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I then am splitting it into 3 parts, first cogroup >> for >>> a >>>>>>>> key-value >>>>>>>>>>>>>> store >>>>>>>>>>>>>>> ( >>>>>>>>>>>>>>>> here <https://github.com/apache/kafka/pull/7538>), >>> then >>>> for >>>>>> a >>>>>>>>>>>>>>>> timeWindowedStore, and then a sessionWindowedStore + >>>> ensuring >>>>>>>>>>>>>>> partitioning. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Walker >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Tue, Oct 15, 2019 at 12:47 PM Matthias J. Sax < >>>>>>>>>>>>>> matth...@confluent.io> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Walker, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> thanks for picking up the KIP and reworking it for >> the >>>>>> changed >>>>>>>>> API. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Overall, the updated API suggestions make sense to >> me. >>>> The >>>>>>> seem >>>>>>>> to >>>>>>>>>>>>>>> align >>>>>>>>>>>>>>>>> quite nicely with our current API design. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> One nit: In `CogroupedKStream#aggregate(...)` the >> type >>>>>>> parameter >>>>>>>>> of >>>>>>>>>>>>>>>>> `Materialized` should be `V`, not `VR`? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 10/14/19 2:57 PM, Walker Carlson wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup >>>>>>>>>>>>>>>>>> here >>>>>>>>>>>>>>>>>> is a link >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Mon, Oct 14, 2019 at 2:52 PM Walker Carlson < >>>>>>>>>>>>>>> wcarl...@confluent.io> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hello all, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I have picked up and updated KIP-150. Due to >> changes >>>> to >>>>>> the >>>>>>>>>>>>>> project >>>>>>>>>>>>>>>>> since >>>>>>>>>>>>>>>>>>> KIP #150 was written there are a few items that >> need >>>> to be >>>>>>>>>>>>>> updated. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> First item that changed is the adoption of the >>>>>> Materialized >>>>>>>>>>>>>>> parameter. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The second item is the WindowedBy. How the old KIP >>>> handles >>>>>>>>>>>>>> windowing >>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>> that it overloads the aggregate function to take >> in >>> a >>>>>> Window >>>>>>>>>>>>>> object >>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>> well >>>>>>>>>>>>>>>>>>> as the other parameters. The current practice to >>>> window >>>>>>>>>>>>>>>> grouped-streams >>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>> to call windowedBy and receive a windowed stream >>>> object. >>>>>> The >>>>>>>>>>>>>>> existing >>>>>>>>>>>>>>>>>>> interface for a windowed stream made from a >> grouped >>>> stream >>>>>>>> will >>>>>>>>>>>>>> not >>>>>>>>>>>>>>>> work >>>>>>>>>>>>>>>>>>> for cogrouped streams. Hence, we have to make new >>>>>> interfaces >>>>>>>> for >>>>>>>>>>>>>>>>> cogrouped >>>>>>>>>>>>>>>>>>> windowed streams. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Please take a look, I would like to hear your >>>> feedback, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Walker >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> -- Guozhang >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> -- Guozhang >>>>>> >>>> >>> >> >> >> -- >> -- Guozhang >> >
signature.asc
Description: OpenPGP digital signature