Walker,

I am not sure if I can follow your argument. What do you exactly mean by

> I also
>> think that in this case it would be better to separate the 2 option out
>> into separate overloads.

Maybe you can give an example what method signature you have in mind?

>> We could take a named parameter from upstream or add an extra naming option
>> however I don't really see the advantage that would give.

Are you familiar with KIP-307? Before KIP-307, KS generated all names
for all Processors. This makes it hard to reason about a Topology if
it's getting complex. Adding `Named` to the new co-group operator would
actually align with KIP-307.

> It seems to go in
>> the opposite direction from the cogroup configuration idea you proposed.

Can you elaborate? Not sure if I can follow.



-Matthias


On 10/24/19 10:20 AM, Walker Carlson wrote:
> While I like the idea Sophie I don't think that it is necessary. I also
> think that in this case it would be better to separate the 2 option out
> into separate overloads.
> We could take a named parameter from upstream or add an extra naming option
> however I don't really see the advantage that would give. It seems to go in
> the opposite direction from the cogroup configuration idea you proposed.
> 
> John, I think it could be both. It depends on when you aggregate and what
> kind of data you have. In the example it is aggregating before joining,
> there are probably some cases where you could join before aggregating. IMHO
> it would be easier to group all the streams together then perform the one
> operation that results in a single KTable.
> 
> 
> 
> On Wed, Oct 23, 2019 at 9:58 PM Sophie Blee-Goldman <sop...@confluent.io>
> wrote:
> 
>>> I can personally not see any need to add other configuration
>> Famous last words?
>>
>> Just kidding, 95% confidence is more than enough to  me (and better to
>> optimize for current
>> design than for hypothetical future changes).
>>
>> One last question I have then is about the operator/store/repartition
>> naming -- seems like
>> we can name the underlying store/changelog through the Materialized
>> parameter, but do we
>> also want to include an overload taking a Named parameter for the operator
>> name (as in the
>> KTable#join variations)?
>>
>> On Wed, Oct 23, 2019 at 5:14 PM Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>
>>> Interesting idea, Sophie.
>>>
>>> So far, we tried to reuse existing config objects and only add new ones
>>> when needed to avoid creating "redundant" classes. This is of course a
>>> reactive approach (with the drawback to deprecate stuff if we change it,
>>> as you described).
>>>
>>> I can personally not see any need to add other configuration parameters
>>> atm, so it's a 95% obvious "no" IMHO. The final `aggregate()` has only a
>>> single state store that we need to configure, and reusing `Materialized`
>>> seems to be appropriate.
>>>
>>> Also note, that the `Initializer` is a mandatory parameter and not a
>>> configuration and should be passed directly, and not via a configuration
>>> object.
>>>
>>>
>>> -Matthias
>>>
>>> On 10/23/19 11:37 AM, Sophie Blee-Goldman wrote:
>>>> Thanks for the explanation, makes sense to me! As for the API, one
>> other
>>>> thought I had is might we ever want or need to introduce any other
>>> configs
>>>> or parameters in the future? Obviously that's difficult to say now (or
>>>> maybe the
>>>> answer seems obviously "no") but we seem to often end up needing to add
>>> new
>>>> overloads and/or deprecate old ones as new features or requirements
>> come
>>>> into
>>>> play.
>>>>
>>>> What do you (and others?) think about wrapping the config parameters
>> (ie
>>>> everything
>>>> except the actual grouped streams) in a new config object? For example,
>>> the
>>>> CogroupedStream#aggregate field could take a single Cogrouped object,
>>>> which itself would have an initializer and a materialized. If we ever
>>> need
>>>> to add
>>>> a new parameter, we can just add it to the Cogrouped class.
>>>>
>>>> Also, will the backing store be available for IQ if a Materialized is
>>>> passed in?
>>>>
>>>> On Wed, Oct 23, 2019 at 10:49 AM Walker Carlson <wcarl...@confluent.io
>>>
>>>> wrote:
>>>>
>>>>> Hi Sophie,
>>>>>
>>>>> Thank you for your comments. As for the different methods signatures I
>>> have
>>>>> not really considered any other options but  while I do agree it is
>>>>> confusing, I don't see any obvious solutions. The problem is that the
>>>>> cogroup essentially pairs a group stream with an aggregator and when
>> it
>>> is
>>>>> first made the method is called on a groupedStream already. However
>> each
>>>>> subsequent stream-aggregator pair is added on to a cogroup stream so
>> it
>>>>> needs both arguments.
>>>>>
>>>>> For the second question you should not need a joiner. The idea is that
>>> you
>>>>> can collect many grouped streams with overlapping key spaces and any
>>> kind
>>>>> of value types. Once aggregated its value will be reduced into one
>> type.
>>>>> This is why you need only one initializer. Each aggregator will need
>> to
>>>>> integrate the new value with the new object made in the initializer.
>>>>> Does that make sense?
>>>>>
>>>>> This is a good question and I will include this explanation in the kip
>>> as
>>>>> well.
>>>>>
>>>>> Thanks,
>>>>> Walker
>>>>>
>>>>> On Tue, Oct 22, 2019 at 8:59 PM Sophie Blee-Goldman <
>>> sop...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> Hey Walker,
>>>>>>
>>>>>> Thanks for the KIP! I have just a couple of questions:
>>>>>>
>>>>>> 1) It seems a little awkward to me that with the current API, we
>> have a
>>>>>> nearly identical
>>>>>> "add stream to cogroup" method, except for the first which has a
>>>>> different
>>>>>> signature
>>>>>> (ie the first stream is joined as stream.cogroup(Aggregator) while
>> the
>>>>>> subsequent ones
>>>>>> are joined as .cogroup(Stream, Aggregator) ). I'm not sure what it
>>> would
>>>>>> look like exactly,
>>>>>> but I was just wondering if you'd considered and/or rejected any
>>>>>> alternative APIs?
>>>>>>
>>>>>> 2) This might just be my lack of familiarity with "cogroup" as a
>>> concept,
>>>>>> but with the
>>>>>> current (non-optimal) API the user seems to have some control over
>> how
>>>>>> exactly
>>>>>> the different streams are joined through the ValueJoiners. Would this
>>> new
>>>>>> cogroup
>>>>>> simply concatenate the values from the different cogroup streams, or
>>>>> could
>>>>>> users
>>>>>> potentially pass some kind of Joiner to the cogroup/aggregate
>> methods?
>>>>> Or,
>>>>>> is the
>>>>>> whole point of cogroups that you no longer ever need to specify a
>>> Joiner?
>>>>>> If so, you
>>>>>> should add a short line to the KIP explaining that for those of us
>> who
>>>>>> aren't fluent
>>>>>> in cogroup semantics :)
>>>>>>
>>>>>> Cheers,
>>>>>> Sophie
>>>>>>
>>>>>> On Thu, Oct 17, 2019 at 3:06 PM Walker Carlson <
>> wcarl...@confluent.io>
>>>>>> wrote:
>>>>>>
>>>>>>> Good catch I updated that.
>>>>>>>
>>>>>>> I have made a PR for this KIP
>>>>>>>
>>>>>>> I then am splitting it into 3 parts, first cogroup for a key-value
>>>>> store
>>>>>> (
>>>>>>> here <https://github.com/apache/kafka/pull/7538>), then for a
>>>>>>> timeWindowedStore, and then a sessionWindowedStore + ensuring
>>>>>> partitioning.
>>>>>>>
>>>>>>> Walker
>>>>>>>
>>>>>>> On Tue, Oct 15, 2019 at 12:47 PM Matthias J. Sax <
>>>>> matth...@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Walker,
>>>>>>>>
>>>>>>>> thanks for picking up the KIP and reworking it for the changed API.
>>>>>>>>
>>>>>>>> Overall, the updated API suggestions make sense to me. The seem to
>>>>>> align
>>>>>>>> quite nicely with our current API design.
>>>>>>>>
>>>>>>>> One nit: In `CogroupedKStream#aggregate(...)` the type parameter of
>>>>>>>> `Materialized` should be `V`, not `VR`?
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 10/14/19 2:57 PM, Walker Carlson wrote:
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
>>>>>>>>> here
>>>>>>>>> is a link
>>>>>>>>>
>>>>>>>>> On Mon, Oct 14, 2019 at 2:52 PM Walker Carlson <
>>>>>> wcarl...@confluent.io>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello all,
>>>>>>>>>>
>>>>>>>>>> I have picked up and updated KIP-150. Due to changes to the
>>>>> project
>>>>>>>> since
>>>>>>>>>> KIP #150 was written there are a few items that need to be
>>>>> updated.
>>>>>>>>>>
>>>>>>>>>> First item that changed is the adoption of the Materialized
>>>>>> parameter.
>>>>>>>>>>
>>>>>>>>>> The second item is the WindowedBy. How the old KIP handles
>>>>> windowing
>>>>>>> is
>>>>>>>>>> that it overloads the aggregate function to take in a Window
>>>>> object
>>>>>> as
>>>>>>>> well
>>>>>>>>>> as the other parameters. The current practice to window
>>>>>>> grouped-streams
>>>>>>>> is
>>>>>>>>>> to call windowedBy and receive a windowed stream object. The
>>>>>> existing
>>>>>>>>>> interface for a windowed stream made from a grouped stream will
>>>>> not
>>>>>>> work
>>>>>>>>>> for cogrouped streams. Hence, we have to make new interfaces for
>>>>>>>> cogrouped
>>>>>>>>>> windowed streams.
>>>>>>>>>>
>>>>>>>>>> Please take a look, I would like to hear your feedback,
>>>>>>>>>>
>>>>>>>>>> Walker
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to