Hello Walker / Matthias,

Thanks for your explanation. I can tell you've put a lot of thoughts into
this already and it seems we cannot avoid adding new interfaces in any
ways, so I will rest my arguments trying to reduce the number of
first-class citizens in the Streams DSL :)


Guozhang


On Thu, Oct 31, 2019 at 12:50 AM Matthias J. Sax <matth...@confluent.io>
wrote:

> Interesting discussion. My personal take is as follows:
>
> (1) Co-group is not a special case of a multi-way KTable join, because
> multiple record wit the same key from a single input stream should be
> aggregated together and there are not update semantics. A co-group is
> rather a muti-stream aggregation operation.
>
> (2) Semantically, non-windowed co-group is the same as (as laid out in
> the KIP already):
>
> > KTable aggTable1 = stream1.groupByKey().aggregate(...);
> > KTable aggTable2 = stream2.groupByKey().aggregate(...);
> > ...
> > KTable aggTableX = streamX.groupByKey().aggregate(...);
> >
> > KTable final = aggTable1.join(aggTable(2)....join(aggTAbleX)
>
> However, I don't think that it would be possible for our optimizer to
> rewrite one into the other, given what `Initializer`, `Aggregator`,
> `ValueJoiner`, and `S a user would provide.
>
> Hence, `cogroup()` is a more efficient way to express the above using a
> single store, instead of X stores are required above.
>
> For windowed co-group, especially session-windowed, it seems not
> possible at all to rewrite co-group as independent aggregations followed
> by joins. Note that sessions boundaries would be determined _after_ the
> input streams are co-partitioned/merged in `cogroup()` and thus would be
> different compare the the aggregate-join pattern.
>
> (3) For the current KIP writeup, I agree that adding `Named` to
> `aggregate()` aligns best with the current API layout. I also don't
> think that the overloads are a big issue, because they are spread out
> over multiple helper interfaces.
>
>
>
> -Matthias
>
>
>
> On 10/29/19 10:38 AM, Walker Carlson wrote:
> > Hi Gouzhang,
> >
> > I am not sure what you mean by "Fields from different streams are never
> > aggregated together", this certainly can be the case but not the general
> > rule. If we want to take care of the special cases where the key-sets are
> > disjoint for each stream then they can be given no-op operators. This
> would
> > have the same effect as a stitching join as the function to update the
> > store would have to be defined either way, even to just place it in.
> >
> > Now if we look at it from the other way, if we only specify the multiway
> > join then the user will need to aggregate each stream. Then they must do
> > the join which either will involve aggregators and value joiners or some
> > questionable optimization that would rely on each aggregator defined for
> a
> > grouped stream meshing together. And this would all have to happen inside
> > KStream.
> >
> > I do agree that there are optimizations that can be done on joining
> > multiple tables per your example, in both cases whether it be a
> "stitching
> > join" or not. But I do not think the place to do it is in Streams. This
> > could be relatively easy to accomplish. I think we save ourselves pain if
> > we consider the tables and streams as separate cases, as aggregating
> > multiple streams into one KTable can be done more efficiently than making
> > multiple KTables and then joining them together. We may be able to get
> > around this in the case of a stitching join but I am not sure how we
> could
> > do it safely otherwise.
> >
> > Walker
> >
> >
> >
> >
> >
> > On Mon, Oct 28, 2019 at 6:26 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> >> Hi Walker,
> >>
> >> This is a good point about compatibility breakage while overloading the
> >> existing classes; while reading John and your exchanges, I think I still
> >> need to clarify the motivations a bit more:
> >>
> >> 1) Multiple streams need to be aggregated together, inputs are always
> >> *KStreams* and end result is a *KTable*.
> >> 2) Fields from different streams are never aggregated together, i.e. on
> the
> >> higher level it is more like a "stitching up" the fields and then doing
> a
> >> single aggregation.
> >>
> >> In this context, I agree with you that it is still a streams-aggregation
> >> operator that we are trying to optimize (though its a multi-way), not a
> >> multi-way table-table-join operator that we are tying to optimize here.
> >>
> >>
> >> -----------------
> >>
> >> But now taking a step back looking at it, I'm wondering, because of 2)
> that
> >> all input streams do not have overlapping fields, we can generalize
> this to
> >> a broader scope. Consider this case for example:
> >>
> >> table1 = builder.table("topic1");
> >> table2 = builder.table("topic2");
> >> table3 = builder.table("topic3");
> >> table4 = table1.join(table2).join(table3);
> >>
> >> Suppose the join operations do not take out any fields or add any new
> >> fields, i.e. say table1 has fields A, table2 has fields B, and table2
> has
> >> fields C besides the key K, the table 4 has field {A, B, C} --- the
> join is
> >> just "stitching up" the fields --- then the above topology can actually
> be
> >> optimized in a similar way:
> >>
> >> * we only keep one materialized store in the form of K -> {A, B, C} as
> the
> >> materialized store of the final join result of table4.
> >> * when a record comes in from table1/2/3, just query the store on K, and
> >> then update the corresponding A/B/C field and then writes back to the
> >> store.
> >>
> >>
> >> Then the above streams-aggregation operator can be treated as a special
> >> case of this: you first aggregate separately on stream1/2/3 and generate
> >> table1/2/3, and then do this "stitching join", behind the scene we can
> >> optimize the topology to do exactly the co-group logic by updating the
> >> second bullet point above as an aggregation operator:
> >>
> >> * when a record comes in from *stream1/2/3*, just query the store on K,
> and
> >> then update the corresponding A/B/C field *with an aggregator *and then
> >> writes back to the store.
> >>
> >> -----------------
> >>
> >> Personally I think this is better because with 1) larger applicable
> scope,
> >> and 2) without introducing new interfaces. But of course on the other
> side
> >> it requires us to do this optimization inside the Streams with some
> syntax
> >> hint from users (for example, users need to specify it is a "stitching
> >> join" such that all fields are still preserved in the join result).
> WDYT?
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Mon, Oct 28, 2019 at 4:20 PM Walker Carlson <wcarl...@confluent.io>
> >> wrote:
> >>
> >>> Hi John,
> >>>
> >>> Thank you for the background information. I think I understand your
> >> point.
> >>>
> >>> I believe that this could be fixed by making the motivation a little
> >>> clearer in the KIP.  I think that the motivation is when you have
> >> multiple
> >>> streams that need to aggregate together to form a single object the
> >>> current, non optimal, way to do this is through a multiway table join.
> >> This
> >>> is a little hacky. There is a slight but significant difference in
> these
> >>> cases, as in the null value handling you pointed out.
> >>>
> >>> For the example in the motivation, these tables were grouped streams so
> >>> they already dropped the null values. If we consider Cogroup sitting in
> >> the
> >>> same grey area that KGroupedStream does it should also behave this way.
> >> If
> >>> you think about it that way it is more of an extension of
> KGroupedStream
> >>> than KTable or KStream. Therefore I handle null values the same way
> >>> KGroupedStream#aggregate does.
> >>>
> >>> Looking back I am not sure I understood you previous question fully at
> >> the
> >>> time. I am sorry if my answer caused any confusion!
> >>>
> >>> Walker
> >>>
> >>> On Mon, Oct 28, 2019 at 2:49 PM John Roesler <j...@confluent.io>
> wrote:
> >>>
> >>>> Hi Walker,
> >>>>
> >>>> Sorry for the delay in responding. Thanks for your response earlier.
> >>>>
> >>>> I think there might be a subtlety getting overlooked in considering
> >>>> whether we're talking about streams versus tables in cogroup. As I'm
> >>>> sure you know, Kafka Streams treats "stream" records as independent,
> >>>> immutable, and opaque "facts", whereas we treat "table" records as a
> >>>> sequence of updates to an entity identified by the record key (where
> >>>> "update" means that each record's value represents the new state after
> >>>> applying the update). For the most part, this is a clean separation,
> >>>> but there is one special case where records with a "null" value are
> >>>> interpreted as a tombstone in the table context (i.e., the record
> >>>> indicates not that the new value of the entity is "null", but rather
> >>>> that the entity has been deleted). In contrast, a record with a null
> >>>> value in the stream context is _just_ a record with a null value; no
> >>>> special semantics.
> >>>>
> >>>> The difficulty is that these two semantics clash at the stream/table
> >>>> boundary. So, operations that convert streams to tables (like
> >>>> KGroupedStream#aggregate) have to cope with ambiguity about whether to
> >>>> treat null values opaquely as null values, or as tombstones. I think
> >>>> I'll make a long story short and just say that this is a very, very
> >>>> complex issue. As a result (and as a bit of a punt), our
> >>>> KGroupedStream operations actually just discard null-valued records.
> >>>> This means that the following are _not_ equivalent programs:
> >>>>
> >>>> table1 =
> >>>>   stream<Id,Record>("records")
> >>>>     .filter(Record::isOk)
> >>>>     .groupByKey()
> >>>>     .aggregate(() -> new Record(), (key, value, agg) -> value)
> >>>> table2 =
> >>>>   table<Id,Record>("record")
> >>>>     .filter(Record::isOk)
> >>>>
> >>>> They look about the same, in that they'll both produce a
> >>>> KTable<Id,Record> with the value being the latest state. But if a
> >>>> record is deleted in the upstream data (represented as a "null"
> >>>> value), that record would also be deleted in table2, but not in
> >>>> table1. Table1 would just perpetually contain the value immediately
> >>>> prior to the delete.
> >>>>
> >>>> This is why it makes me nervous to propose a new kind of _stream_
> >>>> operation ostensibly in order to solve a problem that presents itself
> >>>> in the _table_ context.
> >>>>
> >>>> If the goal is to provide a more efficient and convenient multi-way
> >>>> KTable join, I think it would be a good idea to consider an extension
> >>>> to the KTable API, not the KStream API. On the other hand, if this is
> >>>> not the goal, then the motivation of the KIP shouldn't say that it is.
> >>>> Instead, the KIP could provide some other motivation specifically for
> >>>> augmenting the KStream API.
> >>>>
> >>>> There is a third alternative that comes to mind, if you wish to
> >>>> resolve the long-standing dilemma around this semantic problem and
> >>>> specify in the KIP how exactly nulls are handled in this operator. But
> >>>> (although this seems on the face to be a good option), I think it
> >>>> might be a briarpatch. Even if we are able to reach a suitable design,
> >>>> we'd have to contend with the fact that it looks like the
> >>>> KGroupedStream API, but behaves differently.
> >>>>
> >>>> What do you think about all this?
> >>>>
> >>>> Thanks again for the KIP and the discussion!
> >>>> -John
> >>>>
> >>>> On Mon, Oct 28, 2019 at 3:32 PM Walker Carlson <wcarl...@confluent.io
> >
> >>>> wrote:
> >>>>>
> >>>>> Hi Gouzhang,
> >>>>>
> >>>>> Matthias and I did talk about overloading different a type of
> >> aggregate
> >>>>> methods in the cogroup that would take in the windows and returns a
> >>>>> windowed KTable. We decided that it would break too much with the
> >>> current
> >>>>> pattern that was established in the normal KStream. We can revisit
> >> this
> >>>> if
> >>>>> you have a different opinion on the tradeoff.
> >>>>>
> >>>>> Walker
> >>>>>
> >>>>> On Mon, Oct 28, 2019 at 12:14 PM Guozhang Wang <wangg...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> Hi Walker,
> >>>>>>
> >>>>>> On Fri, Oct 25, 2019 at 1:34 PM Walker Carlson <
> >>> wcarl...@confluent.io>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi Guozhang,
> >>>>>>>
> >>>>>>> 1. I am familiar with the cogroup of spark, it is very similar to
> >>>>>>> their join operator but instead it makes the values iterable. I
> >>> think
> >>>>>> that
> >>>>>>> the use cases are different enough that it makes sense to specify
> >>> the
> >>>>>>> aggregator when we do.
> >>>>>>>
> >>>>>>> I like the idea of "absorb" and I think it could be useful.
> >>> Although
> >>>> I do
> >>>>>>> not think it is as intuitive.
> >>>>>>>
> >>>>>>> If we were to go that route we would either use more processors
> >> or
> >>> do
> >>>>>>> essentially the same thing but would have to store the
> >> information
> >>>>>>> required to cogroup inside that KTable. I think this would
> >> violate
> >>>> some
> >>>>>>> design principles. I would argue that we should consider adding
> >>>> absorb as
> >>>>>>> well and auto re-write it to use cogroup.
> >>>>>>>
> >>>>>>
> >>>>>> Yeah I think I agree with you about the internal design complexity
> >>> with
> >>>>>> "absorb"; I was primarily thinking if we can save ourselves from
> >>>>>> introducing 3 more public classes with co-group. But it seems that
> >>>> without
> >>>>>> introducing new classes there's no easy way for us to bound the
> >> scope
> >>>> of
> >>>>>> co-grouping (like how many streams will be co-grouped together).
> >>>>>>
> >>>>>> LMK if you have some better ideas: generally speaking the less new
> >>>> public
> >>>>>> interfaces we are introducing to fulfill a new feature the better,
> >> so
> >>>> I'd
> >>>>>> push us to think twice and carefully before we go down the route.
> >>>>>>
> >>>>>>
> >>>>>>>
> >>>>>>> 2. We have not considered this thought that would be a convenient
> >>>>>>> operation.
> >>>>>>>
> >>>>>>> 3. There is only one processor made. We are actually having the
> >>>> naming
> >>>>>>> conversation right now in the above thread
> >>>>>>>
> >>>>>>> 4, 5. fair points
> >>>>>>>
> >>>>>>> Walker
> >>>>>>>
> >>>>>>> On Fri, Oct 25, 2019 at 11:58 AM Guozhang Wang <
> >> wangg...@gmail.com
> >>>>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Walker, thanks for the KIP! I made a pass on the writeup and
> >>>> have
> >>>>>> some
> >>>>>>>> comments below:
> >>>>>>>>
> >>>>>>>> Meta:
> >>>>>>>>
> >>>>>>>> 1. Syntax-wise, I'm wondering if we have compared our current
> >>>> proposal
> >>>>>>> with
> >>>>>>>> Spark's co-group syntax (I know they are targeting for
> >> different
> >>>> use
> >>>>>>> cases,
> >>>>>>>> but wondering if their syntax is closer to the join operator),
> >>>> what are
> >>>>>>> the
> >>>>>>>> syntax / semantics trade-off here?
> >>>>>>>>
> >>>>>>>> Just playing a devil's advocate here, if the main motivation is
> >>> to
> >>>>>>> provide
> >>>>>>>> a more convienent multi-way join syntax, and in order to only
> >>> have
> >>>> one
> >>>>>>>> materialized store we need to specify the final joined format
> >> at
> >>>> the
> >>>>>>>> beginning, then what about the following alternative (with the
> >>>> given
> >>>>>>>> example in your wiki page):
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> KGroupedStream<K, V1> grouped1 =
> >>>> builder.stream("topic1").groupByKey();
> >>>>>>>> KGroupedStream<K, V2> grouped2 =
> >>>> builder.stream("topic2").groupByKey();
> >>>>>>>> KGroupedStream<K, V3> grouped3 =
> >>>> builder.stream("topic3").groupByKey();
> >>>>>>>>
> >>>>>>>> KTable<K, CG> aggregated = grouped1.aggregate(initializer,
> >>>>>> materialized,
> >>>>>>>> aggregator1);
> >>>>>>>>
> >>>>>>>> aggregated.absorb(grouped2, aggregator2);  // I'm just using a
> >>>> random
> >>>>>>> name
> >>>>>>>> on top of my head here
> >>>>>>>>                   .absorb(grouped3, aggregator3);
> >>>>>>>>
> >>>>>>>> In this way, we just add a new API to the KTable to "absorb"
> >> new
> >>>>>> streams
> >>>>>>> as
> >>>>>>>> aggregated results without needing to introduce new first
> >> citizen
> >>>>>>> classes.
> >>>>>>>>
> >>>>>>>> 2. From the DSL optimization, have we considered if we can auto
> >>>>>> re-write
> >>>>>>>> the user written old fashioned multi-join into this new DSL
> >>>> operator?
> >>>>>>>>
> >>>>>>>> 3. Although it is not needed for the wiki page itself, for
> >>> internal
> >>>>>>>> implementation how many processor nodes would we create for the
> >>> new
> >>>>>>>> operator, and how we can allow users to name them?
> >>>>>>>>
> >>>>>>>> Minor:
> >>>>>>>>
> >>>>>>>> 4. In "Public Interfaces", better add the templated generics to
> >>>>>>>> "KGroupedStream" as "KGroupedStream<K, V>".
> >>>>>>>>
> >>>>>>>> 5. Naming wise, I'd suggest we keep the "K" together with
> >>>> Stream/Table,
> >>>>>>>> e.g. "TimeWindowed*CogroupedKStream*<K, V>".
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Thu, Oct 24, 2019 at 11:43 PM Matthias J. Sax <
> >>>>>> matth...@confluent.io>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Walker,
> >>>>>>>>>
> >>>>>>>>> I am not sure if I can follow your argument. What do you
> >>> exactly
> >>>> mean
> >>>>>>> by
> >>>>>>>>>
> >>>>>>>>>> I also
> >>>>>>>>>>> think that in this case it would be better to separate
> >> the 2
> >>>>>> option
> >>>>>>>> out
> >>>>>>>>>>> into separate overloads.
> >>>>>>>>>
> >>>>>>>>> Maybe you can give an example what method signature you have
> >> in
> >>>> mind?
> >>>>>>>>>
> >>>>>>>>>>> We could take a named parameter from upstream or add an
> >>> extra
> >>>>>> naming
> >>>>>>>>> option
> >>>>>>>>>>> however I don't really see the advantage that would give.
> >>>>>>>>>
> >>>>>>>>> Are you familiar with KIP-307? Before KIP-307, KS generated
> >> all
> >>>> names
> >>>>>>>>> for all Processors. This makes it hard to reason about a
> >>>> Topology if
> >>>>>>>>> it's getting complex. Adding `Named` to the new co-group
> >>> operator
> >>>>>> would
> >>>>>>>>> actually align with KIP-307.
> >>>>>>>>>
> >>>>>>>>>> It seems to go in
> >>>>>>>>>>> the opposite direction from the cogroup configuration idea
> >>> you
> >>>>>>>> proposed.
> >>>>>>>>>
> >>>>>>>>> Can you elaborate? Not sure if I can follow.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -Matthias
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 10/24/19 10:20 AM, Walker Carlson wrote:
> >>>>>>>>>> While I like the idea Sophie I don't think that it is
> >>>> necessary. I
> >>>>>>> also
> >>>>>>>>>> think that in this case it would be better to separate the
> >> 2
> >>>> option
> >>>>>>> out
> >>>>>>>>>> into separate overloads.
> >>>>>>>>>> We could take a named parameter from upstream or add an
> >> extra
> >>>>>> naming
> >>>>>>>>> option
> >>>>>>>>>> however I don't really see the advantage that would give.
> >> It
> >>>> seems
> >>>>>> to
> >>>>>>>> go
> >>>>>>>>> in
> >>>>>>>>>> the opposite direction from the cogroup configuration idea
> >>> you
> >>>>>>>> proposed.
> >>>>>>>>>>
> >>>>>>>>>> John, I think it could be both. It depends on when you
> >>>> aggregate
> >>>>>> and
> >>>>>>>> what
> >>>>>>>>>> kind of data you have. In the example it is aggregating
> >>> before
> >>>>>>> joining,
> >>>>>>>>>> there are probably some cases where you could join before
> >>>>>>> aggregating.
> >>>>>>>>> IMHO
> >>>>>>>>>> it would be easier to group all the streams together then
> >>>> perform
> >>>>>> the
> >>>>>>>> one
> >>>>>>>>>> operation that results in a single KTable.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Oct 23, 2019 at 9:58 PM Sophie Blee-Goldman <
> >>>>>>>> sop...@confluent.io
> >>>>>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>>> I can personally not see any need to add other
> >>> configuration
> >>>>>>>>>>> Famous last words?
> >>>>>>>>>>>
> >>>>>>>>>>> Just kidding, 95% confidence is more than enough to  me
> >> (and
> >>>>>> better
> >>>>>>> to
> >>>>>>>>>>> optimize for current
> >>>>>>>>>>> design than for hypothetical future changes).
> >>>>>>>>>>>
> >>>>>>>>>>> One last question I have then is about the
> >>>>>>> operator/store/repartition
> >>>>>>>>>>> naming -- seems like
> >>>>>>>>>>> we can name the underlying store/changelog through the
> >>>>>> Materialized
> >>>>>>>>>>> parameter, but do we
> >>>>>>>>>>> also want to include an overload taking a Named parameter
> >>> for
> >>>> the
> >>>>>>>>> operator
> >>>>>>>>>>> name (as in the
> >>>>>>>>>>> KTable#join variations)?
> >>>>>>>>>>>
> >>>>>>>>>>> On Wed, Oct 23, 2019 at 5:14 PM Matthias J. Sax <
> >>>>>>>> matth...@confluent.io>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Interesting idea, Sophie.
> >>>>>>>>>>>>
> >>>>>>>>>>>> So far, we tried to reuse existing config objects and
> >> only
> >>>> add
> >>>>>> new
> >>>>>>>> ones
> >>>>>>>>>>>> when needed to avoid creating "redundant" classes. This
> >> is
> >>> of
> >>>>>>> course
> >>>>>>>> a
> >>>>>>>>>>>> reactive approach (with the drawback to deprecate stuff
> >> if
> >>> we
> >>>>>>> change
> >>>>>>>>> it,
> >>>>>>>>>>>> as you described).
> >>>>>>>>>>>>
> >>>>>>>>>>>> I can personally not see any need to add other
> >>> configuration
> >>>>>>>> parameters
> >>>>>>>>>>>> atm, so it's a 95% obvious "no" IMHO. The final
> >>>> `aggregate()` has
> >>>>>>>> only
> >>>>>>>>> a
> >>>>>>>>>>>> single state store that we need to configure, and reusing
> >>>>>>>>> `Materialized`
> >>>>>>>>>>>> seems to be appropriate.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Also note, that the `Initializer` is a mandatory
> >> parameter
> >>>> and
> >>>>>> not
> >>>>>>> a
> >>>>>>>>>>>> configuration and should be passed directly, and not via
> >> a
> >>>>>>>>> configuration
> >>>>>>>>>>>> object.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 10/23/19 11:37 AM, Sophie Blee-Goldman wrote:
> >>>>>>>>>>>>> Thanks for the explanation, makes sense to me! As for
> >> the
> >>>> API,
> >>>>>> one
> >>>>>>>>>>> other
> >>>>>>>>>>>>> thought I had is might we ever want or need to introduce
> >>> any
> >>>>>> other
> >>>>>>>>>>>> configs
> >>>>>>>>>>>>> or parameters in the future? Obviously that's difficult
> >> to
> >>>> say
> >>>>>> now
> >>>>>>>> (or
> >>>>>>>>>>>>> maybe the
> >>>>>>>>>>>>> answer seems obviously "no") but we seem to often end up
> >>>> needing
> >>>>>>> to
> >>>>>>>>> add
> >>>>>>>>>>>> new
> >>>>>>>>>>>>> overloads and/or deprecate old ones as new features or
> >>>>>>> requirements
> >>>>>>>>>>> come
> >>>>>>>>>>>>> into
> >>>>>>>>>>>>> play.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> What do you (and others?) think about wrapping the
> >> config
> >>>>>>> parameters
> >>>>>>>>>>> (ie
> >>>>>>>>>>>>> everything
> >>>>>>>>>>>>> except the actual grouped streams) in a new config
> >> object?
> >>>> For
> >>>>>>>>> example,
> >>>>>>>>>>>> the
> >>>>>>>>>>>>> CogroupedStream#aggregate field could take a single
> >>>> Cogrouped
> >>>>>>>> object,
> >>>>>>>>>>>>> which itself would have an initializer and a
> >> materialized.
> >>>> If we
> >>>>>>>> ever
> >>>>>>>>>>>> need
> >>>>>>>>>>>>> to add
> >>>>>>>>>>>>> a new parameter, we can just add it to the Cogrouped
> >>> class.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Also, will the backing store be available for IQ if a
> >>>>>> Materialized
> >>>>>>>> is
> >>>>>>>>>>>>> passed in?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Wed, Oct 23, 2019 at 10:49 AM Walker Carlson <
> >>>>>>>>> wcarl...@confluent.io
> >>>>>>>>>>>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Sophie,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thank you for your comments. As for the different
> >> methods
> >>>>>>>> signatures
> >>>>>>>>> I
> >>>>>>>>>>>> have
> >>>>>>>>>>>>>> not really considered any other options but  while I do
> >>>> agree
> >>>>>> it
> >>>>>>> is
> >>>>>>>>>>>>>> confusing, I don't see any obvious solutions. The
> >> problem
> >>>> is
> >>>>>> that
> >>>>>>>> the
> >>>>>>>>>>>>>> cogroup essentially pairs a group stream with an
> >>>> aggregator and
> >>>>>>>> when
> >>>>>>>>>>> it
> >>>>>>>>>>>> is
> >>>>>>>>>>>>>> first made the method is called on a groupedStream
> >>> already.
> >>>>>>> However
> >>>>>>>>>>> each
> >>>>>>>>>>>>>> subsequent stream-aggregator pair is added on to a
> >>> cogroup
> >>>>>> stream
> >>>>>>>> so
> >>>>>>>>>>> it
> >>>>>>>>>>>>>> needs both arguments.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> For the second question you should not need a joiner.
> >> The
> >>>> idea
> >>>>>> is
> >>>>>>>>> that
> >>>>>>>>>>>> you
> >>>>>>>>>>>>>> can collect many grouped streams with overlapping key
> >>>> spaces
> >>>>>> and
> >>>>>>>> any
> >>>>>>>>>>>> kind
> >>>>>>>>>>>>>> of value types. Once aggregated its value will be
> >> reduced
> >>>> into
> >>>>>>> one
> >>>>>>>>>>> type.
> >>>>>>>>>>>>>> This is why you need only one initializer. Each
> >>> aggregator
> >>>> will
> >>>>>>>> need
> >>>>>>>>>>> to
> >>>>>>>>>>>>>> integrate the new value with the new object made in the
> >>>>>>>> initializer.
> >>>>>>>>>>>>>> Does that make sense?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> This is a good question and I will include this
> >>>> explanation in
> >>>>>>> the
> >>>>>>>>> kip
> >>>>>>>>>>>> as
> >>>>>>>>>>>>>> well.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>> Walker
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Tue, Oct 22, 2019 at 8:59 PM Sophie Blee-Goldman <
> >>>>>>>>>>>> sop...@confluent.io>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hey Walker,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for the KIP! I have just a couple of questions:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1) It seems a little awkward to me that with the
> >> current
> >>>> API,
> >>>>>> we
> >>>>>>>>>>> have a
> >>>>>>>>>>>>>>> nearly identical
> >>>>>>>>>>>>>>> "add stream to cogroup" method, except for the first
> >>> which
> >>>>>> has a
> >>>>>>>>>>>>>> different
> >>>>>>>>>>>>>>> signature
> >>>>>>>>>>>>>>> (ie the first stream is joined as
> >>>> stream.cogroup(Aggregator)
> >>>>>>> while
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>> subsequent ones
> >>>>>>>>>>>>>>> are joined as .cogroup(Stream, Aggregator) ). I'm not
> >>> sure
> >>>>>> what
> >>>>>>> it
> >>>>>>>>>>>> would
> >>>>>>>>>>>>>>> look like exactly,
> >>>>>>>>>>>>>>> but I was just wondering if you'd considered and/or
> >>>> rejected
> >>>>>> any
> >>>>>>>>>>>>>>> alternative APIs?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 2) This might just be my lack of familiarity with
> >>>> "cogroup"
> >>>>>> as a
> >>>>>>>>>>>> concept,
> >>>>>>>>>>>>>>> but with the
> >>>>>>>>>>>>>>> current (non-optimal) API the user seems to have some
> >>>> control
> >>>>>>> over
> >>>>>>>>>>> how
> >>>>>>>>>>>>>>> exactly
> >>>>>>>>>>>>>>> the different streams are joined through the
> >>> ValueJoiners.
> >>>>>> Would
> >>>>>>>>> this
> >>>>>>>>>>>> new
> >>>>>>>>>>>>>>> cogroup
> >>>>>>>>>>>>>>> simply concatenate the values from the different
> >> cogroup
> >>>>>>> streams,
> >>>>>>>> or
> >>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>> users
> >>>>>>>>>>>>>>> potentially pass some kind of Joiner to the
> >>>> cogroup/aggregate
> >>>>>>>>>>> methods?
> >>>>>>>>>>>>>> Or,
> >>>>>>>>>>>>>>> is the
> >>>>>>>>>>>>>>> whole point of cogroups that you no longer ever need
> >> to
> >>>>>> specify
> >>>>>>> a
> >>>>>>>>>>>> Joiner?
> >>>>>>>>>>>>>>> If so, you
> >>>>>>>>>>>>>>> should add a short line to the KIP explaining that for
> >>>> those
> >>>>>> of
> >>>>>>> us
> >>>>>>>>>>> who
> >>>>>>>>>>>>>>> aren't fluent
> >>>>>>>>>>>>>>> in cogroup semantics :)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>> Sophie
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Thu, Oct 17, 2019 at 3:06 PM Walker Carlson <
> >>>>>>>>>>> wcarl...@confluent.io>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Good catch I updated that.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I have made a PR for this KIP
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I then am splitting it into 3 parts, first cogroup
> >> for
> >>> a
> >>>>>>>> key-value
> >>>>>>>>>>>>>> store
> >>>>>>>>>>>>>>> (
> >>>>>>>>>>>>>>>> here <https://github.com/apache/kafka/pull/7538>),
> >>> then
> >>>> for
> >>>>>> a
> >>>>>>>>>>>>>>>> timeWindowedStore, and then a sessionWindowedStore +
> >>>> ensuring
> >>>>>>>>>>>>>>> partitioning.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Walker
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Tue, Oct 15, 2019 at 12:47 PM Matthias J. Sax <
> >>>>>>>>>>>>>> matth...@confluent.io>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Walker,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> thanks for picking up the KIP and reworking it for
> >> the
> >>>>>> changed
> >>>>>>>>> API.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Overall, the updated API suggestions make sense to
> >> me.
> >>>> The
> >>>>>>> seem
> >>>>>>>> to
> >>>>>>>>>>>>>>> align
> >>>>>>>>>>>>>>>>> quite nicely with our current API design.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> One nit: In `CogroupedKStream#aggregate(...)` the
> >> type
> >>>>>>> parameter
> >>>>>>>>> of
> >>>>>>>>>>>>>>>>> `Materialized` should be `V`, not `VR`?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On 10/14/19 2:57 PM, Walker Carlson wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
> >>>>>>>>>>>>>>>>>> here
> >>>>>>>>>>>>>>>>>> is a link
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Mon, Oct 14, 2019 at 2:52 PM Walker Carlson <
> >>>>>>>>>>>>>>> wcarl...@confluent.io>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hello all,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I have picked up and updated KIP-150. Due to
> >> changes
> >>>> to
> >>>>>> the
> >>>>>>>>>>>>>> project
> >>>>>>>>>>>>>>>>> since
> >>>>>>>>>>>>>>>>>>> KIP #150 was written there are a few items that
> >> need
> >>>> to be
> >>>>>>>>>>>>>> updated.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> First item that changed is the adoption of the
> >>>>>> Materialized
> >>>>>>>>>>>>>>> parameter.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> The second item is the WindowedBy. How the old KIP
> >>>> handles
> >>>>>>>>>>>>>> windowing
> >>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>> that it overloads the aggregate function to take
> >> in
> >>> a
> >>>>>> Window
> >>>>>>>>>>>>>> object
> >>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>> well
> >>>>>>>>>>>>>>>>>>> as the other parameters. The current practice to
> >>>> window
> >>>>>>>>>>>>>>>> grouped-streams
> >>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>> to call windowedBy and receive a windowed stream
> >>>> object.
> >>>>>> The
> >>>>>>>>>>>>>>> existing
> >>>>>>>>>>>>>>>>>>> interface for a windowed stream made from a
> >> grouped
> >>>> stream
> >>>>>>>> will
> >>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>> work
> >>>>>>>>>>>>>>>>>>> for cogrouped streams. Hence, we have to make new
> >>>>>> interfaces
> >>>>>>>> for
> >>>>>>>>>>>>>>>>> cogrouped
> >>>>>>>>>>>>>>>>>>> windowed streams.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Please take a look, I would like to hear your
> >>>> feedback,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Walker
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> -- Guozhang
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>>
> >>>>
> >>>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
>
>

-- 
-- Guozhang

Reply via email to