While I like the idea Sophie I don't think that it is necessary. I also think that in this case it would be better to separate the 2 option out into separate overloads. We could take a named parameter from upstream or add an extra naming option however I don't really see the advantage that would give. It seems to go in the opposite direction from the cogroup configuration idea you proposed.
John, I think it could be both. It depends on when you aggregate and what kind of data you have. In the example it is aggregating before joining, there are probably some cases where you could join before aggregating. IMHO it would be easier to group all the streams together then perform the one operation that results in a single KTable. On Wed, Oct 23, 2019 at 9:58 PM Sophie Blee-Goldman <sop...@confluent.io> wrote: > > I can personally not see any need to add other configuration > Famous last words? > > Just kidding, 95% confidence is more than enough to me (and better to > optimize for current > design than for hypothetical future changes). > > One last question I have then is about the operator/store/repartition > naming -- seems like > we can name the underlying store/changelog through the Materialized > parameter, but do we > also want to include an overload taking a Named parameter for the operator > name (as in the > KTable#join variations)? > > On Wed, Oct 23, 2019 at 5:14 PM Matthias J. Sax <matth...@confluent.io> > wrote: > > > Interesting idea, Sophie. > > > > So far, we tried to reuse existing config objects and only add new ones > > when needed to avoid creating "redundant" classes. This is of course a > > reactive approach (with the drawback to deprecate stuff if we change it, > > as you described). > > > > I can personally not see any need to add other configuration parameters > > atm, so it's a 95% obvious "no" IMHO. The final `aggregate()` has only a > > single state store that we need to configure, and reusing `Materialized` > > seems to be appropriate. > > > > Also note, that the `Initializer` is a mandatory parameter and not a > > configuration and should be passed directly, and not via a configuration > > object. > > > > > > -Matthias > > > > On 10/23/19 11:37 AM, Sophie Blee-Goldman wrote: > > > Thanks for the explanation, makes sense to me! As for the API, one > other > > > thought I had is might we ever want or need to introduce any other > > configs > > > or parameters in the future? Obviously that's difficult to say now (or > > > maybe the > > > answer seems obviously "no") but we seem to often end up needing to add > > new > > > overloads and/or deprecate old ones as new features or requirements > come > > > into > > > play. > > > > > > What do you (and others?) think about wrapping the config parameters > (ie > > > everything > > > except the actual grouped streams) in a new config object? For example, > > the > > > CogroupedStream#aggregate field could take a single Cogrouped object, > > > which itself would have an initializer and a materialized. If we ever > > need > > > to add > > > a new parameter, we can just add it to the Cogrouped class. > > > > > > Also, will the backing store be available for IQ if a Materialized is > > > passed in? > > > > > > On Wed, Oct 23, 2019 at 10:49 AM Walker Carlson <wcarl...@confluent.io > > > > > wrote: > > > > > >> Hi Sophie, > > >> > > >> Thank you for your comments. As for the different methods signatures I > > have > > >> not really considered any other options but while I do agree it is > > >> confusing, I don't see any obvious solutions. The problem is that the > > >> cogroup essentially pairs a group stream with an aggregator and when > it > > is > > >> first made the method is called on a groupedStream already. However > each > > >> subsequent stream-aggregator pair is added on to a cogroup stream so > it > > >> needs both arguments. > > >> > > >> For the second question you should not need a joiner. The idea is that > > you > > >> can collect many grouped streams with overlapping key spaces and any > > kind > > >> of value types. Once aggregated its value will be reduced into one > type. > > >> This is why you need only one initializer. Each aggregator will need > to > > >> integrate the new value with the new object made in the initializer. > > >> Does that make sense? > > >> > > >> This is a good question and I will include this explanation in the kip > > as > > >> well. > > >> > > >> Thanks, > > >> Walker > > >> > > >> On Tue, Oct 22, 2019 at 8:59 PM Sophie Blee-Goldman < > > sop...@confluent.io> > > >> wrote: > > >> > > >>> Hey Walker, > > >>> > > >>> Thanks for the KIP! I have just a couple of questions: > > >>> > > >>> 1) It seems a little awkward to me that with the current API, we > have a > > >>> nearly identical > > >>> "add stream to cogroup" method, except for the first which has a > > >> different > > >>> signature > > >>> (ie the first stream is joined as stream.cogroup(Aggregator) while > the > > >>> subsequent ones > > >>> are joined as .cogroup(Stream, Aggregator) ). I'm not sure what it > > would > > >>> look like exactly, > > >>> but I was just wondering if you'd considered and/or rejected any > > >>> alternative APIs? > > >>> > > >>> 2) This might just be my lack of familiarity with "cogroup" as a > > concept, > > >>> but with the > > >>> current (non-optimal) API the user seems to have some control over > how > > >>> exactly > > >>> the different streams are joined through the ValueJoiners. Would this > > new > > >>> cogroup > > >>> simply concatenate the values from the different cogroup streams, or > > >> could > > >>> users > > >>> potentially pass some kind of Joiner to the cogroup/aggregate > methods? > > >> Or, > > >>> is the > > >>> whole point of cogroups that you no longer ever need to specify a > > Joiner? > > >>> If so, you > > >>> should add a short line to the KIP explaining that for those of us > who > > >>> aren't fluent > > >>> in cogroup semantics :) > > >>> > > >>> Cheers, > > >>> Sophie > > >>> > > >>> On Thu, Oct 17, 2019 at 3:06 PM Walker Carlson < > wcarl...@confluent.io> > > >>> wrote: > > >>> > > >>>> Good catch I updated that. > > >>>> > > >>>> I have made a PR for this KIP > > >>>> > > >>>> I then am splitting it into 3 parts, first cogroup for a key-value > > >> store > > >>> ( > > >>>> here <https://github.com/apache/kafka/pull/7538>), then for a > > >>>> timeWindowedStore, and then a sessionWindowedStore + ensuring > > >>> partitioning. > > >>>> > > >>>> Walker > > >>>> > > >>>> On Tue, Oct 15, 2019 at 12:47 PM Matthias J. Sax < > > >> matth...@confluent.io> > > >>>> wrote: > > >>>> > > >>>>> Walker, > > >>>>> > > >>>>> thanks for picking up the KIP and reworking it for the changed API. > > >>>>> > > >>>>> Overall, the updated API suggestions make sense to me. The seem to > > >>> align > > >>>>> quite nicely with our current API design. > > >>>>> > > >>>>> One nit: In `CogroupedKStream#aggregate(...)` the type parameter of > > >>>>> `Materialized` should be `V`, not `VR`? > > >>>>> > > >>>>> > > >>>>> -Matthias > > >>>>> > > >>>>> > > >>>>> > > >>>>> On 10/14/19 2:57 PM, Walker Carlson wrote: > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup > > >>>>>> here > > >>>>>> is a link > > >>>>>> > > >>>>>> On Mon, Oct 14, 2019 at 2:52 PM Walker Carlson < > > >>> wcarl...@confluent.io> > > >>>>>> wrote: > > >>>>>> > > >>>>>>> Hello all, > > >>>>>>> > > >>>>>>> I have picked up and updated KIP-150. Due to changes to the > > >> project > > >>>>> since > > >>>>>>> KIP #150 was written there are a few items that need to be > > >> updated. > > >>>>>>> > > >>>>>>> First item that changed is the adoption of the Materialized > > >>> parameter. > > >>>>>>> > > >>>>>>> The second item is the WindowedBy. How the old KIP handles > > >> windowing > > >>>> is > > >>>>>>> that it overloads the aggregate function to take in a Window > > >> object > > >>> as > > >>>>> well > > >>>>>>> as the other parameters. The current practice to window > > >>>> grouped-streams > > >>>>> is > > >>>>>>> to call windowedBy and receive a windowed stream object. The > > >>> existing > > >>>>>>> interface for a windowed stream made from a grouped stream will > > >> not > > >>>> work > > >>>>>>> for cogrouped streams. Hence, we have to make new interfaces for > > >>>>> cogrouped > > >>>>>>> windowed streams. > > >>>>>>> > > >>>>>>> Please take a look, I would like to hear your feedback, > > >>>>>>> > > >>>>>>> Walker > > >>>>>>> > > >>>>>> > > >>>>> > > >>>>> > > >>>> > > >>> > > >> > > > > > > > >