Walker, I am not sure if I can follow your argument. What do you exactly mean by
> I also >> think that in this case it would be better to separate the 2 option out >> into separate overloads. Maybe you can give an example what method signature you have in mind? >> We could take a named parameter from upstream or add an extra naming option >> however I don't really see the advantage that would give. Are you familiar with KIP-307? Before KIP-307, KS generated all names for all Processors. This makes it hard to reason about a Topology if it's getting complex. Adding `Named` to the new co-group operator would actually align with KIP-307. > It seems to go in >> the opposite direction from the cogroup configuration idea you proposed. Can you elaborate? Not sure if I can follow. -Matthias On 10/24/19 10:20 AM, Walker Carlson wrote: > While I like the idea Sophie I don't think that it is necessary. I also > think that in this case it would be better to separate the 2 option out > into separate overloads. > We could take a named parameter from upstream or add an extra naming option > however I don't really see the advantage that would give. It seems to go in > the opposite direction from the cogroup configuration idea you proposed. > > John, I think it could be both. It depends on when you aggregate and what > kind of data you have. In the example it is aggregating before joining, > there are probably some cases where you could join before aggregating. IMHO > it would be easier to group all the streams together then perform the one > operation that results in a single KTable. > > > > On Wed, Oct 23, 2019 at 9:58 PM Sophie Blee-Goldman <sop...@confluent.io> > wrote: > >>> I can personally not see any need to add other configuration >> Famous last words? >> >> Just kidding, 95% confidence is more than enough to me (and better to >> optimize for current >> design than for hypothetical future changes). >> >> One last question I have then is about the operator/store/repartition >> naming -- seems like >> we can name the underlying store/changelog through the Materialized >> parameter, but do we >> also want to include an overload taking a Named parameter for the operator >> name (as in the >> KTable#join variations)? >> >> On Wed, Oct 23, 2019 at 5:14 PM Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> Interesting idea, Sophie. >>> >>> So far, we tried to reuse existing config objects and only add new ones >>> when needed to avoid creating "redundant" classes. This is of course a >>> reactive approach (with the drawback to deprecate stuff if we change it, >>> as you described). >>> >>> I can personally not see any need to add other configuration parameters >>> atm, so it's a 95% obvious "no" IMHO. The final `aggregate()` has only a >>> single state store that we need to configure, and reusing `Materialized` >>> seems to be appropriate. >>> >>> Also note, that the `Initializer` is a mandatory parameter and not a >>> configuration and should be passed directly, and not via a configuration >>> object. >>> >>> >>> -Matthias >>> >>> On 10/23/19 11:37 AM, Sophie Blee-Goldman wrote: >>>> Thanks for the explanation, makes sense to me! As for the API, one >> other >>>> thought I had is might we ever want or need to introduce any other >>> configs >>>> or parameters in the future? Obviously that's difficult to say now (or >>>> maybe the >>>> answer seems obviously "no") but we seem to often end up needing to add >>> new >>>> overloads and/or deprecate old ones as new features or requirements >> come >>>> into >>>> play. >>>> >>>> What do you (and others?) think about wrapping the config parameters >> (ie >>>> everything >>>> except the actual grouped streams) in a new config object? For example, >>> the >>>> CogroupedStream#aggregate field could take a single Cogrouped object, >>>> which itself would have an initializer and a materialized. If we ever >>> need >>>> to add >>>> a new parameter, we can just add it to the Cogrouped class. >>>> >>>> Also, will the backing store be available for IQ if a Materialized is >>>> passed in? >>>> >>>> On Wed, Oct 23, 2019 at 10:49 AM Walker Carlson <wcarl...@confluent.io >>> >>>> wrote: >>>> >>>>> Hi Sophie, >>>>> >>>>> Thank you for your comments. As for the different methods signatures I >>> have >>>>> not really considered any other options but while I do agree it is >>>>> confusing, I don't see any obvious solutions. The problem is that the >>>>> cogroup essentially pairs a group stream with an aggregator and when >> it >>> is >>>>> first made the method is called on a groupedStream already. However >> each >>>>> subsequent stream-aggregator pair is added on to a cogroup stream so >> it >>>>> needs both arguments. >>>>> >>>>> For the second question you should not need a joiner. The idea is that >>> you >>>>> can collect many grouped streams with overlapping key spaces and any >>> kind >>>>> of value types. Once aggregated its value will be reduced into one >> type. >>>>> This is why you need only one initializer. Each aggregator will need >> to >>>>> integrate the new value with the new object made in the initializer. >>>>> Does that make sense? >>>>> >>>>> This is a good question and I will include this explanation in the kip >>> as >>>>> well. >>>>> >>>>> Thanks, >>>>> Walker >>>>> >>>>> On Tue, Oct 22, 2019 at 8:59 PM Sophie Blee-Goldman < >>> sop...@confluent.io> >>>>> wrote: >>>>> >>>>>> Hey Walker, >>>>>> >>>>>> Thanks for the KIP! I have just a couple of questions: >>>>>> >>>>>> 1) It seems a little awkward to me that with the current API, we >> have a >>>>>> nearly identical >>>>>> "add stream to cogroup" method, except for the first which has a >>>>> different >>>>>> signature >>>>>> (ie the first stream is joined as stream.cogroup(Aggregator) while >> the >>>>>> subsequent ones >>>>>> are joined as .cogroup(Stream, Aggregator) ). I'm not sure what it >>> would >>>>>> look like exactly, >>>>>> but I was just wondering if you'd considered and/or rejected any >>>>>> alternative APIs? >>>>>> >>>>>> 2) This might just be my lack of familiarity with "cogroup" as a >>> concept, >>>>>> but with the >>>>>> current (non-optimal) API the user seems to have some control over >> how >>>>>> exactly >>>>>> the different streams are joined through the ValueJoiners. Would this >>> new >>>>>> cogroup >>>>>> simply concatenate the values from the different cogroup streams, or >>>>> could >>>>>> users >>>>>> potentially pass some kind of Joiner to the cogroup/aggregate >> methods? >>>>> Or, >>>>>> is the >>>>>> whole point of cogroups that you no longer ever need to specify a >>> Joiner? >>>>>> If so, you >>>>>> should add a short line to the KIP explaining that for those of us >> who >>>>>> aren't fluent >>>>>> in cogroup semantics :) >>>>>> >>>>>> Cheers, >>>>>> Sophie >>>>>> >>>>>> On Thu, Oct 17, 2019 at 3:06 PM Walker Carlson < >> wcarl...@confluent.io> >>>>>> wrote: >>>>>> >>>>>>> Good catch I updated that. >>>>>>> >>>>>>> I have made a PR for this KIP >>>>>>> >>>>>>> I then am splitting it into 3 parts, first cogroup for a key-value >>>>> store >>>>>> ( >>>>>>> here <https://github.com/apache/kafka/pull/7538>), then for a >>>>>>> timeWindowedStore, and then a sessionWindowedStore + ensuring >>>>>> partitioning. >>>>>>> >>>>>>> Walker >>>>>>> >>>>>>> On Tue, Oct 15, 2019 at 12:47 PM Matthias J. Sax < >>>>> matth...@confluent.io> >>>>>>> wrote: >>>>>>> >>>>>>>> Walker, >>>>>>>> >>>>>>>> thanks for picking up the KIP and reworking it for the changed API. >>>>>>>> >>>>>>>> Overall, the updated API suggestions make sense to me. The seem to >>>>>> align >>>>>>>> quite nicely with our current API design. >>>>>>>> >>>>>>>> One nit: In `CogroupedKStream#aggregate(...)` the type parameter of >>>>>>>> `Materialized` should be `V`, not `VR`? >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On 10/14/19 2:57 PM, Walker Carlson wrote: >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup >>>>>>>>> here >>>>>>>>> is a link >>>>>>>>> >>>>>>>>> On Mon, Oct 14, 2019 at 2:52 PM Walker Carlson < >>>>>> wcarl...@confluent.io> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hello all, >>>>>>>>>> >>>>>>>>>> I have picked up and updated KIP-150. Due to changes to the >>>>> project >>>>>>>> since >>>>>>>>>> KIP #150 was written there are a few items that need to be >>>>> updated. >>>>>>>>>> >>>>>>>>>> First item that changed is the adoption of the Materialized >>>>>> parameter. >>>>>>>>>> >>>>>>>>>> The second item is the WindowedBy. How the old KIP handles >>>>> windowing >>>>>>> is >>>>>>>>>> that it overloads the aggregate function to take in a Window >>>>> object >>>>>> as >>>>>>>> well >>>>>>>>>> as the other parameters. The current practice to window >>>>>>> grouped-streams >>>>>>>> is >>>>>>>>>> to call windowedBy and receive a windowed stream object. The >>>>>> existing >>>>>>>>>> interface for a windowed stream made from a grouped stream will >>>>> not >>>>>>> work >>>>>>>>>> for cogrouped streams. Hence, we have to make new interfaces for >>>>>>>> cogrouped >>>>>>>>>> windowed streams. >>>>>>>>>> >>>>>>>>>> Please take a look, I would like to hear your feedback, >>>>>>>>>> >>>>>>>>>> Walker >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>> >> >
signature.asc
Description: OpenPGP digital signature