Interesting discussion. My personal take is as follows:

(1) Co-group is not a special case of a multi-way KTable join, because
multiple record wit the same key from a single input stream should be
aggregated together and there are not update semantics. A co-group is
rather a muti-stream aggregation operation.

(2) Semantically, non-windowed co-group is the same as (as laid out in
the KIP already):

> KTable aggTable1 = stream1.groupByKey().aggregate(...);
> KTable aggTable2 = stream2.groupByKey().aggregate(...);
> ...
> KTable aggTableX = streamX.groupByKey().aggregate(...);
> 
> KTable final = aggTable1.join(aggTable(2)....join(aggTAbleX)

However, I don't think that it would be possible for our optimizer to
rewrite one into the other, given what `Initializer`, `Aggregator`,
`ValueJoiner`, and `S a user would provide.

Hence, `cogroup()` is a more efficient way to express the above using a
single store, instead of X stores are required above.

For windowed co-group, especially session-windowed, it seems not
possible at all to rewrite co-group as independent aggregations followed
by joins. Note that sessions boundaries would be determined _after_ the
input streams are co-partitioned/merged in `cogroup()` and thus would be
different compare the the aggregate-join pattern.

(3) For the current KIP writeup, I agree that adding `Named` to
`aggregate()` aligns best with the current API layout. I also don't
think that the overloads are a big issue, because they are spread out
over multiple helper interfaces.



-Matthias



On 10/29/19 10:38 AM, Walker Carlson wrote:
> Hi Gouzhang,
> 
> I am not sure what you mean by "Fields from different streams are never
> aggregated together", this certainly can be the case but not the general
> rule. If we want to take care of the special cases where the key-sets are
> disjoint for each stream then they can be given no-op operators. This would
> have the same effect as a stitching join as the function to update the
> store would have to be defined either way, even to just place it in.
> 
> Now if we look at it from the other way, if we only specify the multiway
> join then the user will need to aggregate each stream. Then they must do
> the join which either will involve aggregators and value joiners or some
> questionable optimization that would rely on each aggregator defined for a
> grouped stream meshing together. And this would all have to happen inside
> KStream.
> 
> I do agree that there are optimizations that can be done on joining
> multiple tables per your example, in both cases whether it be a "stitching
> join" or not. But I do not think the place to do it is in Streams. This
> could be relatively easy to accomplish. I think we save ourselves pain if
> we consider the tables and streams as separate cases, as aggregating
> multiple streams into one KTable can be done more efficiently than making
> multiple KTables and then joining them together. We may be able to get
> around this in the case of a stitching join but I am not sure how we could
> do it safely otherwise.
> 
> Walker
> 
> 
> 
> 
> 
> On Mon, Oct 28, 2019 at 6:26 PM Guozhang Wang <wangg...@gmail.com> wrote:
> 
>> Hi Walker,
>>
>> This is a good point about compatibility breakage while overloading the
>> existing classes; while reading John and your exchanges, I think I still
>> need to clarify the motivations a bit more:
>>
>> 1) Multiple streams need to be aggregated together, inputs are always
>> *KStreams* and end result is a *KTable*.
>> 2) Fields from different streams are never aggregated together, i.e. on the
>> higher level it is more like a "stitching up" the fields and then doing a
>> single aggregation.
>>
>> In this context, I agree with you that it is still a streams-aggregation
>> operator that we are trying to optimize (though its a multi-way), not a
>> multi-way table-table-join operator that we are tying to optimize here.
>>
>>
>> -----------------
>>
>> But now taking a step back looking at it, I'm wondering, because of 2) that
>> all input streams do not have overlapping fields, we can generalize this to
>> a broader scope. Consider this case for example:
>>
>> table1 = builder.table("topic1");
>> table2 = builder.table("topic2");
>> table3 = builder.table("topic3");
>> table4 = table1.join(table2).join(table3);
>>
>> Suppose the join operations do not take out any fields or add any new
>> fields, i.e. say table1 has fields A, table2 has fields B, and table2 has
>> fields C besides the key K, the table 4 has field {A, B, C} --- the join is
>> just "stitching up" the fields --- then the above topology can actually be
>> optimized in a similar way:
>>
>> * we only keep one materialized store in the form of K -> {A, B, C} as the
>> materialized store of the final join result of table4.
>> * when a record comes in from table1/2/3, just query the store on K, and
>> then update the corresponding A/B/C field and then writes back to the
>> store.
>>
>>
>> Then the above streams-aggregation operator can be treated as a special
>> case of this: you first aggregate separately on stream1/2/3 and generate
>> table1/2/3, and then do this "stitching join", behind the scene we can
>> optimize the topology to do exactly the co-group logic by updating the
>> second bullet point above as an aggregation operator:
>>
>> * when a record comes in from *stream1/2/3*, just query the store on K, and
>> then update the corresponding A/B/C field *with an aggregator *and then
>> writes back to the store.
>>
>> -----------------
>>
>> Personally I think this is better because with 1) larger applicable scope,
>> and 2) without introducing new interfaces. But of course on the other side
>> it requires us to do this optimization inside the Streams with some syntax
>> hint from users (for example, users need to specify it is a "stitching
>> join" such that all fields are still preserved in the join result). WDYT?
>>
>>
>> Guozhang
>>
>>
>> On Mon, Oct 28, 2019 at 4:20 PM Walker Carlson <wcarl...@confluent.io>
>> wrote:
>>
>>> Hi John,
>>>
>>> Thank you for the background information. I think I understand your
>> point.
>>>
>>> I believe that this could be fixed by making the motivation a little
>>> clearer in the KIP.  I think that the motivation is when you have
>> multiple
>>> streams that need to aggregate together to form a single object the
>>> current, non optimal, way to do this is through a multiway table join.
>> This
>>> is a little hacky. There is a slight but significant difference in these
>>> cases, as in the null value handling you pointed out.
>>>
>>> For the example in the motivation, these tables were grouped streams so
>>> they already dropped the null values. If we consider Cogroup sitting in
>> the
>>> same grey area that KGroupedStream does it should also behave this way.
>> If
>>> you think about it that way it is more of an extension of KGroupedStream
>>> than KTable or KStream. Therefore I handle null values the same way
>>> KGroupedStream#aggregate does.
>>>
>>> Looking back I am not sure I understood you previous question fully at
>> the
>>> time. I am sorry if my answer caused any confusion!
>>>
>>> Walker
>>>
>>> On Mon, Oct 28, 2019 at 2:49 PM John Roesler <j...@confluent.io> wrote:
>>>
>>>> Hi Walker,
>>>>
>>>> Sorry for the delay in responding. Thanks for your response earlier.
>>>>
>>>> I think there might be a subtlety getting overlooked in considering
>>>> whether we're talking about streams versus tables in cogroup. As I'm
>>>> sure you know, Kafka Streams treats "stream" records as independent,
>>>> immutable, and opaque "facts", whereas we treat "table" records as a
>>>> sequence of updates to an entity identified by the record key (where
>>>> "update" means that each record's value represents the new state after
>>>> applying the update). For the most part, this is a clean separation,
>>>> but there is one special case where records with a "null" value are
>>>> interpreted as a tombstone in the table context (i.e., the record
>>>> indicates not that the new value of the entity is "null", but rather
>>>> that the entity has been deleted). In contrast, a record with a null
>>>> value in the stream context is _just_ a record with a null value; no
>>>> special semantics.
>>>>
>>>> The difficulty is that these two semantics clash at the stream/table
>>>> boundary. So, operations that convert streams to tables (like
>>>> KGroupedStream#aggregate) have to cope with ambiguity about whether to
>>>> treat null values opaquely as null values, or as tombstones. I think
>>>> I'll make a long story short and just say that this is a very, very
>>>> complex issue. As a result (and as a bit of a punt), our
>>>> KGroupedStream operations actually just discard null-valued records.
>>>> This means that the following are _not_ equivalent programs:
>>>>
>>>> table1 =
>>>>   stream<Id,Record>("records")
>>>>     .filter(Record::isOk)
>>>>     .groupByKey()
>>>>     .aggregate(() -> new Record(), (key, value, agg) -> value)
>>>> table2 =
>>>>   table<Id,Record>("record")
>>>>     .filter(Record::isOk)
>>>>
>>>> They look about the same, in that they'll both produce a
>>>> KTable<Id,Record> with the value being the latest state. But if a
>>>> record is deleted in the upstream data (represented as a "null"
>>>> value), that record would also be deleted in table2, but not in
>>>> table1. Table1 would just perpetually contain the value immediately
>>>> prior to the delete.
>>>>
>>>> This is why it makes me nervous to propose a new kind of _stream_
>>>> operation ostensibly in order to solve a problem that presents itself
>>>> in the _table_ context.
>>>>
>>>> If the goal is to provide a more efficient and convenient multi-way
>>>> KTable join, I think it would be a good idea to consider an extension
>>>> to the KTable API, not the KStream API. On the other hand, if this is
>>>> not the goal, then the motivation of the KIP shouldn't say that it is.
>>>> Instead, the KIP could provide some other motivation specifically for
>>>> augmenting the KStream API.
>>>>
>>>> There is a third alternative that comes to mind, if you wish to
>>>> resolve the long-standing dilemma around this semantic problem and
>>>> specify in the KIP how exactly nulls are handled in this operator. But
>>>> (although this seems on the face to be a good option), I think it
>>>> might be a briarpatch. Even if we are able to reach a suitable design,
>>>> we'd have to contend with the fact that it looks like the
>>>> KGroupedStream API, but behaves differently.
>>>>
>>>> What do you think about all this?
>>>>
>>>> Thanks again for the KIP and the discussion!
>>>> -John
>>>>
>>>> On Mon, Oct 28, 2019 at 3:32 PM Walker Carlson <wcarl...@confluent.io>
>>>> wrote:
>>>>>
>>>>> Hi Gouzhang,
>>>>>
>>>>> Matthias and I did talk about overloading different a type of
>> aggregate
>>>>> methods in the cogroup that would take in the windows and returns a
>>>>> windowed KTable. We decided that it would break too much with the
>>> current
>>>>> pattern that was established in the normal KStream. We can revisit
>> this
>>>> if
>>>>> you have a different opinion on the tradeoff.
>>>>>
>>>>> Walker
>>>>>
>>>>> On Mon, Oct 28, 2019 at 12:14 PM Guozhang Wang <wangg...@gmail.com>
>>>> wrote:
>>>>>
>>>>>> Hi Walker,
>>>>>>
>>>>>> On Fri, Oct 25, 2019 at 1:34 PM Walker Carlson <
>>> wcarl...@confluent.io>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Guozhang,
>>>>>>>
>>>>>>> 1. I am familiar with the cogroup of spark, it is very similar to
>>>>>>> their join operator but instead it makes the values iterable. I
>>> think
>>>>>> that
>>>>>>> the use cases are different enough that it makes sense to specify
>>> the
>>>>>>> aggregator when we do.
>>>>>>>
>>>>>>> I like the idea of "absorb" and I think it could be useful.
>>> Although
>>>> I do
>>>>>>> not think it is as intuitive.
>>>>>>>
>>>>>>> If we were to go that route we would either use more processors
>> or
>>> do
>>>>>>> essentially the same thing but would have to store the
>> information
>>>>>>> required to cogroup inside that KTable. I think this would
>> violate
>>>> some
>>>>>>> design principles. I would argue that we should consider adding
>>>> absorb as
>>>>>>> well and auto re-write it to use cogroup.
>>>>>>>
>>>>>>
>>>>>> Yeah I think I agree with you about the internal design complexity
>>> with
>>>>>> "absorb"; I was primarily thinking if we can save ourselves from
>>>>>> introducing 3 more public classes with co-group. But it seems that
>>>> without
>>>>>> introducing new classes there's no easy way for us to bound the
>> scope
>>>> of
>>>>>> co-grouping (like how many streams will be co-grouped together).
>>>>>>
>>>>>> LMK if you have some better ideas: generally speaking the less new
>>>> public
>>>>>> interfaces we are introducing to fulfill a new feature the better,
>> so
>>>> I'd
>>>>>> push us to think twice and carefully before we go down the route.
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> 2. We have not considered this thought that would be a convenient
>>>>>>> operation.
>>>>>>>
>>>>>>> 3. There is only one processor made. We are actually having the
>>>> naming
>>>>>>> conversation right now in the above thread
>>>>>>>
>>>>>>> 4, 5. fair points
>>>>>>>
>>>>>>> Walker
>>>>>>>
>>>>>>> On Fri, Oct 25, 2019 at 11:58 AM Guozhang Wang <
>> wangg...@gmail.com
>>>>
>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Walker, thanks for the KIP! I made a pass on the writeup and
>>>> have
>>>>>> some
>>>>>>>> comments below:
>>>>>>>>
>>>>>>>> Meta:
>>>>>>>>
>>>>>>>> 1. Syntax-wise, I'm wondering if we have compared our current
>>>> proposal
>>>>>>> with
>>>>>>>> Spark's co-group syntax (I know they are targeting for
>> different
>>>> use
>>>>>>> cases,
>>>>>>>> but wondering if their syntax is closer to the join operator),
>>>> what are
>>>>>>> the
>>>>>>>> syntax / semantics trade-off here?
>>>>>>>>
>>>>>>>> Just playing a devil's advocate here, if the main motivation is
>>> to
>>>>>>> provide
>>>>>>>> a more convienent multi-way join syntax, and in order to only
>>> have
>>>> one
>>>>>>>> materialized store we need to specify the final joined format
>> at
>>>> the
>>>>>>>> beginning, then what about the following alternative (with the
>>>> given
>>>>>>>> example in your wiki page):
>>>>>>>>
>>>>>>>>
>>>>>>>> KGroupedStream<K, V1> grouped1 =
>>>> builder.stream("topic1").groupByKey();
>>>>>>>> KGroupedStream<K, V2> grouped2 =
>>>> builder.stream("topic2").groupByKey();
>>>>>>>> KGroupedStream<K, V3> grouped3 =
>>>> builder.stream("topic3").groupByKey();
>>>>>>>>
>>>>>>>> KTable<K, CG> aggregated = grouped1.aggregate(initializer,
>>>>>> materialized,
>>>>>>>> aggregator1);
>>>>>>>>
>>>>>>>> aggregated.absorb(grouped2, aggregator2);  // I'm just using a
>>>> random
>>>>>>> name
>>>>>>>> on top of my head here
>>>>>>>>                   .absorb(grouped3, aggregator3);
>>>>>>>>
>>>>>>>> In this way, we just add a new API to the KTable to "absorb"
>> new
>>>>>> streams
>>>>>>> as
>>>>>>>> aggregated results without needing to introduce new first
>> citizen
>>>>>>> classes.
>>>>>>>>
>>>>>>>> 2. From the DSL optimization, have we considered if we can auto
>>>>>> re-write
>>>>>>>> the user written old fashioned multi-join into this new DSL
>>>> operator?
>>>>>>>>
>>>>>>>> 3. Although it is not needed for the wiki page itself, for
>>> internal
>>>>>>>> implementation how many processor nodes would we create for the
>>> new
>>>>>>>> operator, and how we can allow users to name them?
>>>>>>>>
>>>>>>>> Minor:
>>>>>>>>
>>>>>>>> 4. In "Public Interfaces", better add the templated generics to
>>>>>>>> "KGroupedStream" as "KGroupedStream<K, V>".
>>>>>>>>
>>>>>>>> 5. Naming wise, I'd suggest we keep the "K" together with
>>>> Stream/Table,
>>>>>>>> e.g. "TimeWindowed*CogroupedKStream*<K, V>".
>>>>>>>>
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 24, 2019 at 11:43 PM Matthias J. Sax <
>>>>>> matth...@confluent.io>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Walker,
>>>>>>>>>
>>>>>>>>> I am not sure if I can follow your argument. What do you
>>> exactly
>>>> mean
>>>>>>> by
>>>>>>>>>
>>>>>>>>>> I also
>>>>>>>>>>> think that in this case it would be better to separate
>> the 2
>>>>>> option
>>>>>>>> out
>>>>>>>>>>> into separate overloads.
>>>>>>>>>
>>>>>>>>> Maybe you can give an example what method signature you have
>> in
>>>> mind?
>>>>>>>>>
>>>>>>>>>>> We could take a named parameter from upstream or add an
>>> extra
>>>>>> naming
>>>>>>>>> option
>>>>>>>>>>> however I don't really see the advantage that would give.
>>>>>>>>>
>>>>>>>>> Are you familiar with KIP-307? Before KIP-307, KS generated
>> all
>>>> names
>>>>>>>>> for all Processors. This makes it hard to reason about a
>>>> Topology if
>>>>>>>>> it's getting complex. Adding `Named` to the new co-group
>>> operator
>>>>>> would
>>>>>>>>> actually align with KIP-307.
>>>>>>>>>
>>>>>>>>>> It seems to go in
>>>>>>>>>>> the opposite direction from the cogroup configuration idea
>>> you
>>>>>>>> proposed.
>>>>>>>>>
>>>>>>>>> Can you elaborate? Not sure if I can follow.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 10/24/19 10:20 AM, Walker Carlson wrote:
>>>>>>>>>> While I like the idea Sophie I don't think that it is
>>>> necessary. I
>>>>>>> also
>>>>>>>>>> think that in this case it would be better to separate the
>> 2
>>>> option
>>>>>>> out
>>>>>>>>>> into separate overloads.
>>>>>>>>>> We could take a named parameter from upstream or add an
>> extra
>>>>>> naming
>>>>>>>>> option
>>>>>>>>>> however I don't really see the advantage that would give.
>> It
>>>> seems
>>>>>> to
>>>>>>>> go
>>>>>>>>> in
>>>>>>>>>> the opposite direction from the cogroup configuration idea
>>> you
>>>>>>>> proposed.
>>>>>>>>>>
>>>>>>>>>> John, I think it could be both. It depends on when you
>>>> aggregate
>>>>>> and
>>>>>>>> what
>>>>>>>>>> kind of data you have. In the example it is aggregating
>>> before
>>>>>>> joining,
>>>>>>>>>> there are probably some cases where you could join before
>>>>>>> aggregating.
>>>>>>>>> IMHO
>>>>>>>>>> it would be easier to group all the streams together then
>>>> perform
>>>>>> the
>>>>>>>> one
>>>>>>>>>> operation that results in a single KTable.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Oct 23, 2019 at 9:58 PM Sophie Blee-Goldman <
>>>>>>>> sop...@confluent.io
>>>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>>> I can personally not see any need to add other
>>> configuration
>>>>>>>>>>> Famous last words?
>>>>>>>>>>>
>>>>>>>>>>> Just kidding, 95% confidence is more than enough to  me
>> (and
>>>>>> better
>>>>>>> to
>>>>>>>>>>> optimize for current
>>>>>>>>>>> design than for hypothetical future changes).
>>>>>>>>>>>
>>>>>>>>>>> One last question I have then is about the
>>>>>>> operator/store/repartition
>>>>>>>>>>> naming -- seems like
>>>>>>>>>>> we can name the underlying store/changelog through the
>>>>>> Materialized
>>>>>>>>>>> parameter, but do we
>>>>>>>>>>> also want to include an overload taking a Named parameter
>>> for
>>>> the
>>>>>>>>> operator
>>>>>>>>>>> name (as in the
>>>>>>>>>>> KTable#join variations)?
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Oct 23, 2019 at 5:14 PM Matthias J. Sax <
>>>>>>>> matth...@confluent.io>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Interesting idea, Sophie.
>>>>>>>>>>>>
>>>>>>>>>>>> So far, we tried to reuse existing config objects and
>> only
>>>> add
>>>>>> new
>>>>>>>> ones
>>>>>>>>>>>> when needed to avoid creating "redundant" classes. This
>> is
>>> of
>>>>>>> course
>>>>>>>> a
>>>>>>>>>>>> reactive approach (with the drawback to deprecate stuff
>> if
>>> we
>>>>>>> change
>>>>>>>>> it,
>>>>>>>>>>>> as you described).
>>>>>>>>>>>>
>>>>>>>>>>>> I can personally not see any need to add other
>>> configuration
>>>>>>>> parameters
>>>>>>>>>>>> atm, so it's a 95% obvious "no" IMHO. The final
>>>> `aggregate()` has
>>>>>>>> only
>>>>>>>>> a
>>>>>>>>>>>> single state store that we need to configure, and reusing
>>>>>>>>> `Materialized`
>>>>>>>>>>>> seems to be appropriate.
>>>>>>>>>>>>
>>>>>>>>>>>> Also note, that the `Initializer` is a mandatory
>> parameter
>>>> and
>>>>>> not
>>>>>>> a
>>>>>>>>>>>> configuration and should be passed directly, and not via
>> a
>>>>>>>>> configuration
>>>>>>>>>>>> object.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>
>>>>>>>>>>>> On 10/23/19 11:37 AM, Sophie Blee-Goldman wrote:
>>>>>>>>>>>>> Thanks for the explanation, makes sense to me! As for
>> the
>>>> API,
>>>>>> one
>>>>>>>>>>> other
>>>>>>>>>>>>> thought I had is might we ever want or need to introduce
>>> any
>>>>>> other
>>>>>>>>>>>> configs
>>>>>>>>>>>>> or parameters in the future? Obviously that's difficult
>> to
>>>> say
>>>>>> now
>>>>>>>> (or
>>>>>>>>>>>>> maybe the
>>>>>>>>>>>>> answer seems obviously "no") but we seem to often end up
>>>> needing
>>>>>>> to
>>>>>>>>> add
>>>>>>>>>>>> new
>>>>>>>>>>>>> overloads and/or deprecate old ones as new features or
>>>>>>> requirements
>>>>>>>>>>> come
>>>>>>>>>>>>> into
>>>>>>>>>>>>> play.
>>>>>>>>>>>>>
>>>>>>>>>>>>> What do you (and others?) think about wrapping the
>> config
>>>>>>> parameters
>>>>>>>>>>> (ie
>>>>>>>>>>>>> everything
>>>>>>>>>>>>> except the actual grouped streams) in a new config
>> object?
>>>> For
>>>>>>>>> example,
>>>>>>>>>>>> the
>>>>>>>>>>>>> CogroupedStream#aggregate field could take a single
>>>> Cogrouped
>>>>>>>> object,
>>>>>>>>>>>>> which itself would have an initializer and a
>> materialized.
>>>> If we
>>>>>>>> ever
>>>>>>>>>>>> need
>>>>>>>>>>>>> to add
>>>>>>>>>>>>> a new parameter, we can just add it to the Cogrouped
>>> class.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Also, will the backing store be available for IQ if a
>>>>>> Materialized
>>>>>>>> is
>>>>>>>>>>>>> passed in?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Oct 23, 2019 at 10:49 AM Walker Carlson <
>>>>>>>>> wcarl...@confluent.io
>>>>>>>>>>>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Sophie,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thank you for your comments. As for the different
>> methods
>>>>>>>> signatures
>>>>>>>>> I
>>>>>>>>>>>> have
>>>>>>>>>>>>>> not really considered any other options but  while I do
>>>> agree
>>>>>> it
>>>>>>> is
>>>>>>>>>>>>>> confusing, I don't see any obvious solutions. The
>> problem
>>>> is
>>>>>> that
>>>>>>>> the
>>>>>>>>>>>>>> cogroup essentially pairs a group stream with an
>>>> aggregator and
>>>>>>>> when
>>>>>>>>>>> it
>>>>>>>>>>>> is
>>>>>>>>>>>>>> first made the method is called on a groupedStream
>>> already.
>>>>>>> However
>>>>>>>>>>> each
>>>>>>>>>>>>>> subsequent stream-aggregator pair is added on to a
>>> cogroup
>>>>>> stream
>>>>>>>> so
>>>>>>>>>>> it
>>>>>>>>>>>>>> needs both arguments.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For the second question you should not need a joiner.
>> The
>>>> idea
>>>>>> is
>>>>>>>>> that
>>>>>>>>>>>> you
>>>>>>>>>>>>>> can collect many grouped streams with overlapping key
>>>> spaces
>>>>>> and
>>>>>>>> any
>>>>>>>>>>>> kind
>>>>>>>>>>>>>> of value types. Once aggregated its value will be
>> reduced
>>>> into
>>>>>>> one
>>>>>>>>>>> type.
>>>>>>>>>>>>>> This is why you need only one initializer. Each
>>> aggregator
>>>> will
>>>>>>>> need
>>>>>>>>>>> to
>>>>>>>>>>>>>> integrate the new value with the new object made in the
>>>>>>>> initializer.
>>>>>>>>>>>>>> Does that make sense?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This is a good question and I will include this
>>>> explanation in
>>>>>>> the
>>>>>>>>> kip
>>>>>>>>>>>> as
>>>>>>>>>>>>>> well.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Walker
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Oct 22, 2019 at 8:59 PM Sophie Blee-Goldman <
>>>>>>>>>>>> sop...@confluent.io>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hey Walker,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for the KIP! I have just a couple of questions:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1) It seems a little awkward to me that with the
>> current
>>>> API,
>>>>>> we
>>>>>>>>>>> have a
>>>>>>>>>>>>>>> nearly identical
>>>>>>>>>>>>>>> "add stream to cogroup" method, except for the first
>>> which
>>>>>> has a
>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>> signature
>>>>>>>>>>>>>>> (ie the first stream is joined as
>>>> stream.cogroup(Aggregator)
>>>>>>> while
>>>>>>>>>>> the
>>>>>>>>>>>>>>> subsequent ones
>>>>>>>>>>>>>>> are joined as .cogroup(Stream, Aggregator) ). I'm not
>>> sure
>>>>>> what
>>>>>>> it
>>>>>>>>>>>> would
>>>>>>>>>>>>>>> look like exactly,
>>>>>>>>>>>>>>> but I was just wondering if you'd considered and/or
>>>> rejected
>>>>>> any
>>>>>>>>>>>>>>> alternative APIs?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2) This might just be my lack of familiarity with
>>>> "cogroup"
>>>>>> as a
>>>>>>>>>>>> concept,
>>>>>>>>>>>>>>> but with the
>>>>>>>>>>>>>>> current (non-optimal) API the user seems to have some
>>>> control
>>>>>>> over
>>>>>>>>>>> how
>>>>>>>>>>>>>>> exactly
>>>>>>>>>>>>>>> the different streams are joined through the
>>> ValueJoiners.
>>>>>> Would
>>>>>>>>> this
>>>>>>>>>>>> new
>>>>>>>>>>>>>>> cogroup
>>>>>>>>>>>>>>> simply concatenate the values from the different
>> cogroup
>>>>>>> streams,
>>>>>>>> or
>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>> users
>>>>>>>>>>>>>>> potentially pass some kind of Joiner to the
>>>> cogroup/aggregate
>>>>>>>>>>> methods?
>>>>>>>>>>>>>> Or,
>>>>>>>>>>>>>>> is the
>>>>>>>>>>>>>>> whole point of cogroups that you no longer ever need
>> to
>>>>>> specify
>>>>>>> a
>>>>>>>>>>>> Joiner?
>>>>>>>>>>>>>>> If so, you
>>>>>>>>>>>>>>> should add a short line to the KIP explaining that for
>>>> those
>>>>>> of
>>>>>>> us
>>>>>>>>>>> who
>>>>>>>>>>>>>>> aren't fluent
>>>>>>>>>>>>>>> in cogroup semantics :)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>> Sophie
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Oct 17, 2019 at 3:06 PM Walker Carlson <
>>>>>>>>>>> wcarl...@confluent.io>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Good catch I updated that.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I have made a PR for this KIP
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I then am splitting it into 3 parts, first cogroup
>> for
>>> a
>>>>>>>> key-value
>>>>>>>>>>>>>> store
>>>>>>>>>>>>>>> (
>>>>>>>>>>>>>>>> here <https://github.com/apache/kafka/pull/7538>),
>>> then
>>>> for
>>>>>> a
>>>>>>>>>>>>>>>> timeWindowedStore, and then a sessionWindowedStore +
>>>> ensuring
>>>>>>>>>>>>>>> partitioning.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Walker
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Oct 15, 2019 at 12:47 PM Matthias J. Sax <
>>>>>>>>>>>>>> matth...@confluent.io>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Walker,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> thanks for picking up the KIP and reworking it for
>> the
>>>>>> changed
>>>>>>>>> API.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Overall, the updated API suggestions make sense to
>> me.
>>>> The
>>>>>>> seem
>>>>>>>> to
>>>>>>>>>>>>>>> align
>>>>>>>>>>>>>>>>> quite nicely with our current API design.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> One nit: In `CogroupedKStream#aggregate(...)` the
>> type
>>>>>>> parameter
>>>>>>>>> of
>>>>>>>>>>>>>>>>> `Materialized` should be `V`, not `VR`?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On 10/14/19 2:57 PM, Walker Carlson wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
>>>>>>>>>>>>>>>>>> here
>>>>>>>>>>>>>>>>>> is a link
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Mon, Oct 14, 2019 at 2:52 PM Walker Carlson <
>>>>>>>>>>>>>>> wcarl...@confluent.io>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I have picked up and updated KIP-150. Due to
>> changes
>>>> to
>>>>>> the
>>>>>>>>>>>>>> project
>>>>>>>>>>>>>>>>> since
>>>>>>>>>>>>>>>>>>> KIP #150 was written there are a few items that
>> need
>>>> to be
>>>>>>>>>>>>>> updated.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> First item that changed is the adoption of the
>>>>>> Materialized
>>>>>>>>>>>>>>> parameter.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The second item is the WindowedBy. How the old KIP
>>>> handles
>>>>>>>>>>>>>> windowing
>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>> that it overloads the aggregate function to take
>> in
>>> a
>>>>>> Window
>>>>>>>>>>>>>> object
>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>> well
>>>>>>>>>>>>>>>>>>> as the other parameters. The current practice to
>>>> window
>>>>>>>>>>>>>>>> grouped-streams
>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>> to call windowedBy and receive a windowed stream
>>>> object.
>>>>>> The
>>>>>>>>>>>>>>> existing
>>>>>>>>>>>>>>>>>>> interface for a windowed stream made from a
>> grouped
>>>> stream
>>>>>>>> will
>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>> work
>>>>>>>>>>>>>>>>>>> for cogrouped streams. Hence, we have to make new
>>>>>> interfaces
>>>>>>>> for
>>>>>>>>>>>>>>>>> cogrouped
>>>>>>>>>>>>>>>>>>> windowed streams.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Please take a look, I would like to hear your
>>>> feedback,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Walker
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -- Guozhang
>>>>>>
>>>>
>>>
>>
>>
>> --
>> -- Guozhang
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to