I just sent out a call for a vote. I think everyone has had a good
discussion :). If there are any more thoughts I would love to hear them.

Walker

On Thu, Oct 31, 2019 at 10:36 AM Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Walker / Matthias,
>
> Thanks for your explanation. I can tell you've put a lot of thoughts into
> this already and it seems we cannot avoid adding new interfaces in any
> ways, so I will rest my arguments trying to reduce the number of
> first-class citizens in the Streams DSL :)
>
>
> Guozhang
>
>
> On Thu, Oct 31, 2019 at 12:50 AM Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > Interesting discussion. My personal take is as follows:
> >
> > (1) Co-group is not a special case of a multi-way KTable join, because
> > multiple record wit the same key from a single input stream should be
> > aggregated together and there are not update semantics. A co-group is
> > rather a muti-stream aggregation operation.
> >
> > (2) Semantically, non-windowed co-group is the same as (as laid out in
> > the KIP already):
> >
> > > KTable aggTable1 = stream1.groupByKey().aggregate(...);
> > > KTable aggTable2 = stream2.groupByKey().aggregate(...);
> > > ...
> > > KTable aggTableX = streamX.groupByKey().aggregate(...);
> > >
> > > KTable final = aggTable1.join(aggTable(2)....join(aggTAbleX)
> >
> > However, I don't think that it would be possible for our optimizer to
> > rewrite one into the other, given what `Initializer`, `Aggregator`,
> > `ValueJoiner`, and `S a user would provide.
> >
> > Hence, `cogroup()` is a more efficient way to express the above using a
> > single store, instead of X stores are required above.
> >
> > For windowed co-group, especially session-windowed, it seems not
> > possible at all to rewrite co-group as independent aggregations followed
> > by joins. Note that sessions boundaries would be determined _after_ the
> > input streams are co-partitioned/merged in `cogroup()` and thus would be
> > different compare the the aggregate-join pattern.
> >
> > (3) For the current KIP writeup, I agree that adding `Named` to
> > `aggregate()` aligns best with the current API layout. I also don't
> > think that the overloads are a big issue, because they are spread out
> > over multiple helper interfaces.
> >
> >
> >
> > -Matthias
> >
> >
> >
> > On 10/29/19 10:38 AM, Walker Carlson wrote:
> > > Hi Gouzhang,
> > >
> > > I am not sure what you mean by "Fields from different streams are never
> > > aggregated together", this certainly can be the case but not the
> general
> > > rule. If we want to take care of the special cases where the key-sets
> are
> > > disjoint for each stream then they can be given no-op operators. This
> > would
> > > have the same effect as a stitching join as the function to update the
> > > store would have to be defined either way, even to just place it in.
> > >
> > > Now if we look at it from the other way, if we only specify the
> multiway
> > > join then the user will need to aggregate each stream. Then they must
> do
> > > the join which either will involve aggregators and value joiners or
> some
> > > questionable optimization that would rely on each aggregator defined
> for
> > a
> > > grouped stream meshing together. And this would all have to happen
> inside
> > > KStream.
> > >
> > > I do agree that there are optimizations that can be done on joining
> > > multiple tables per your example, in both cases whether it be a
> > "stitching
> > > join" or not. But I do not think the place to do it is in Streams. This
> > > could be relatively easy to accomplish. I think we save ourselves pain
> if
> > > we consider the tables and streams as separate cases, as aggregating
> > > multiple streams into one KTable can be done more efficiently than
> making
> > > multiple KTables and then joining them together. We may be able to get
> > > around this in the case of a stitching join but I am not sure how we
> > could
> > > do it safely otherwise.
> > >
> > > Walker
> > >
> > >
> > >
> > >
> > >
> > > On Mon, Oct 28, 2019 at 6:26 PM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > >> Hi Walker,
> > >>
> > >> This is a good point about compatibility breakage while overloading
> the
> > >> existing classes; while reading John and your exchanges, I think I
> still
> > >> need to clarify the motivations a bit more:
> > >>
> > >> 1) Multiple streams need to be aggregated together, inputs are always
> > >> *KStreams* and end result is a *KTable*.
> > >> 2) Fields from different streams are never aggregated together, i.e.
> on
> > the
> > >> higher level it is more like a "stitching up" the fields and then
> doing
> > a
> > >> single aggregation.
> > >>
> > >> In this context, I agree with you that it is still a
> streams-aggregation
> > >> operator that we are trying to optimize (though its a multi-way), not
> a
> > >> multi-way table-table-join operator that we are tying to optimize
> here.
> > >>
> > >>
> > >> -----------------
> > >>
> > >> But now taking a step back looking at it, I'm wondering, because of 2)
> > that
> > >> all input streams do not have overlapping fields, we can generalize
> > this to
> > >> a broader scope. Consider this case for example:
> > >>
> > >> table1 = builder.table("topic1");
> > >> table2 = builder.table("topic2");
> > >> table3 = builder.table("topic3");
> > >> table4 = table1.join(table2).join(table3);
> > >>
> > >> Suppose the join operations do not take out any fields or add any new
> > >> fields, i.e. say table1 has fields A, table2 has fields B, and table2
> > has
> > >> fields C besides the key K, the table 4 has field {A, B, C} --- the
> > join is
> > >> just "stitching up" the fields --- then the above topology can
> actually
> > be
> > >> optimized in a similar way:
> > >>
> > >> * we only keep one materialized store in the form of K -> {A, B, C} as
> > the
> > >> materialized store of the final join result of table4.
> > >> * when a record comes in from table1/2/3, just query the store on K,
> and
> > >> then update the corresponding A/B/C field and then writes back to the
> > >> store.
> > >>
> > >>
> > >> Then the above streams-aggregation operator can be treated as a
> special
> > >> case of this: you first aggregate separately on stream1/2/3 and
> generate
> > >> table1/2/3, and then do this "stitching join", behind the scene we can
> > >> optimize the topology to do exactly the co-group logic by updating the
> > >> second bullet point above as an aggregation operator:
> > >>
> > >> * when a record comes in from *stream1/2/3*, just query the store on
> K,
> > and
> > >> then update the corresponding A/B/C field *with an aggregator *and
> then
> > >> writes back to the store.
> > >>
> > >> -----------------
> > >>
> > >> Personally I think this is better because with 1) larger applicable
> > scope,
> > >> and 2) without introducing new interfaces. But of course on the other
> > side
> > >> it requires us to do this optimization inside the Streams with some
> > syntax
> > >> hint from users (for example, users need to specify it is a "stitching
> > >> join" such that all fields are still preserved in the join result).
> > WDYT?
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Mon, Oct 28, 2019 at 4:20 PM Walker Carlson <wcarl...@confluent.io
> >
> > >> wrote:
> > >>
> > >>> Hi John,
> > >>>
> > >>> Thank you for the background information. I think I understand your
> > >> point.
> > >>>
> > >>> I believe that this could be fixed by making the motivation a little
> > >>> clearer in the KIP.  I think that the motivation is when you have
> > >> multiple
> > >>> streams that need to aggregate together to form a single object the
> > >>> current, non optimal, way to do this is through a multiway table
> join.
> > >> This
> > >>> is a little hacky. There is a slight but significant difference in
> > these
> > >>> cases, as in the null value handling you pointed out.
> > >>>
> > >>> For the example in the motivation, these tables were grouped streams
> so
> > >>> they already dropped the null values. If we consider Cogroup sitting
> in
> > >> the
> > >>> same grey area that KGroupedStream does it should also behave this
> way.
> > >> If
> > >>> you think about it that way it is more of an extension of
> > KGroupedStream
> > >>> than KTable or KStream. Therefore I handle null values the same way
> > >>> KGroupedStream#aggregate does.
> > >>>
> > >>> Looking back I am not sure I understood you previous question fully
> at
> > >> the
> > >>> time. I am sorry if my answer caused any confusion!
> > >>>
> > >>> Walker
> > >>>
> > >>> On Mon, Oct 28, 2019 at 2:49 PM John Roesler <j...@confluent.io>
> > wrote:
> > >>>
> > >>>> Hi Walker,
> > >>>>
> > >>>> Sorry for the delay in responding. Thanks for your response earlier.
> > >>>>
> > >>>> I think there might be a subtlety getting overlooked in considering
> > >>>> whether we're talking about streams versus tables in cogroup. As I'm
> > >>>> sure you know, Kafka Streams treats "stream" records as independent,
> > >>>> immutable, and opaque "facts", whereas we treat "table" records as a
> > >>>> sequence of updates to an entity identified by the record key (where
> > >>>> "update" means that each record's value represents the new state
> after
> > >>>> applying the update). For the most part, this is a clean separation,
> > >>>> but there is one special case where records with a "null" value are
> > >>>> interpreted as a tombstone in the table context (i.e., the record
> > >>>> indicates not that the new value of the entity is "null", but rather
> > >>>> that the entity has been deleted). In contrast, a record with a null
> > >>>> value in the stream context is _just_ a record with a null value; no
> > >>>> special semantics.
> > >>>>
> > >>>> The difficulty is that these two semantics clash at the stream/table
> > >>>> boundary. So, operations that convert streams to tables (like
> > >>>> KGroupedStream#aggregate) have to cope with ambiguity about whether
> to
> > >>>> treat null values opaquely as null values, or as tombstones. I think
> > >>>> I'll make a long story short and just say that this is a very, very
> > >>>> complex issue. As a result (and as a bit of a punt), our
> > >>>> KGroupedStream operations actually just discard null-valued records.
> > >>>> This means that the following are _not_ equivalent programs:
> > >>>>
> > >>>> table1 =
> > >>>>   stream<Id,Record>("records")
> > >>>>     .filter(Record::isOk)
> > >>>>     .groupByKey()
> > >>>>     .aggregate(() -> new Record(), (key, value, agg) -> value)
> > >>>> table2 =
> > >>>>   table<Id,Record>("record")
> > >>>>     .filter(Record::isOk)
> > >>>>
> > >>>> They look about the same, in that they'll both produce a
> > >>>> KTable<Id,Record> with the value being the latest state. But if a
> > >>>> record is deleted in the upstream data (represented as a "null"
> > >>>> value), that record would also be deleted in table2, but not in
> > >>>> table1. Table1 would just perpetually contain the value immediately
> > >>>> prior to the delete.
> > >>>>
> > >>>> This is why it makes me nervous to propose a new kind of _stream_
> > >>>> operation ostensibly in order to solve a problem that presents
> itself
> > >>>> in the _table_ context.
> > >>>>
> > >>>> If the goal is to provide a more efficient and convenient multi-way
> > >>>> KTable join, I think it would be a good idea to consider an
> extension
> > >>>> to the KTable API, not the KStream API. On the other hand, if this
> is
> > >>>> not the goal, then the motivation of the KIP shouldn't say that it
> is.
> > >>>> Instead, the KIP could provide some other motivation specifically
> for
> > >>>> augmenting the KStream API.
> > >>>>
> > >>>> There is a third alternative that comes to mind, if you wish to
> > >>>> resolve the long-standing dilemma around this semantic problem and
> > >>>> specify in the KIP how exactly nulls are handled in this operator.
> But
> > >>>> (although this seems on the face to be a good option), I think it
> > >>>> might be a briarpatch. Even if we are able to reach a suitable
> design,
> > >>>> we'd have to contend with the fact that it looks like the
> > >>>> KGroupedStream API, but behaves differently.
> > >>>>
> > >>>> What do you think about all this?
> > >>>>
> > >>>> Thanks again for the KIP and the discussion!
> > >>>> -John
> > >>>>
> > >>>> On Mon, Oct 28, 2019 at 3:32 PM Walker Carlson <
> wcarl...@confluent.io
> > >
> > >>>> wrote:
> > >>>>>
> > >>>>> Hi Gouzhang,
> > >>>>>
> > >>>>> Matthias and I did talk about overloading different a type of
> > >> aggregate
> > >>>>> methods in the cogroup that would take in the windows and returns a
> > >>>>> windowed KTable. We decided that it would break too much with the
> > >>> current
> > >>>>> pattern that was established in the normal KStream. We can revisit
> > >> this
> > >>>> if
> > >>>>> you have a different opinion on the tradeoff.
> > >>>>>
> > >>>>> Walker
> > >>>>>
> > >>>>> On Mon, Oct 28, 2019 at 12:14 PM Guozhang Wang <wangg...@gmail.com
> >
> > >>>> wrote:
> > >>>>>
> > >>>>>> Hi Walker,
> > >>>>>>
> > >>>>>> On Fri, Oct 25, 2019 at 1:34 PM Walker Carlson <
> > >>> wcarl...@confluent.io>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi Guozhang,
> > >>>>>>>
> > >>>>>>> 1. I am familiar with the cogroup of spark, it is very similar to
> > >>>>>>> their join operator but instead it makes the values iterable. I
> > >>> think
> > >>>>>> that
> > >>>>>>> the use cases are different enough that it makes sense to specify
> > >>> the
> > >>>>>>> aggregator when we do.
> > >>>>>>>
> > >>>>>>> I like the idea of "absorb" and I think it could be useful.
> > >>> Although
> > >>>> I do
> > >>>>>>> not think it is as intuitive.
> > >>>>>>>
> > >>>>>>> If we were to go that route we would either use more processors
> > >> or
> > >>> do
> > >>>>>>> essentially the same thing but would have to store the
> > >> information
> > >>>>>>> required to cogroup inside that KTable. I think this would
> > >> violate
> > >>>> some
> > >>>>>>> design principles. I would argue that we should consider adding
> > >>>> absorb as
> > >>>>>>> well and auto re-write it to use cogroup.
> > >>>>>>>
> > >>>>>>
> > >>>>>> Yeah I think I agree with you about the internal design complexity
> > >>> with
> > >>>>>> "absorb"; I was primarily thinking if we can save ourselves from
> > >>>>>> introducing 3 more public classes with co-group. But it seems that
> > >>>> without
> > >>>>>> introducing new classes there's no easy way for us to bound the
> > >> scope
> > >>>> of
> > >>>>>> co-grouping (like how many streams will be co-grouped together).
> > >>>>>>
> > >>>>>> LMK if you have some better ideas: generally speaking the less new
> > >>>> public
> > >>>>>> interfaces we are introducing to fulfill a new feature the better,
> > >> so
> > >>>> I'd
> > >>>>>> push us to think twice and carefully before we go down the route.
> > >>>>>>
> > >>>>>>
> > >>>>>>>
> > >>>>>>> 2. We have not considered this thought that would be a convenient
> > >>>>>>> operation.
> > >>>>>>>
> > >>>>>>> 3. There is only one processor made. We are actually having the
> > >>>> naming
> > >>>>>>> conversation right now in the above thread
> > >>>>>>>
> > >>>>>>> 4, 5. fair points
> > >>>>>>>
> > >>>>>>> Walker
> > >>>>>>>
> > >>>>>>> On Fri, Oct 25, 2019 at 11:58 AM Guozhang Wang <
> > >> wangg...@gmail.com
> > >>>>
> > >>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hi Walker, thanks for the KIP! I made a pass on the writeup and
> > >>>> have
> > >>>>>> some
> > >>>>>>>> comments below:
> > >>>>>>>>
> > >>>>>>>> Meta:
> > >>>>>>>>
> > >>>>>>>> 1. Syntax-wise, I'm wondering if we have compared our current
> > >>>> proposal
> > >>>>>>> with
> > >>>>>>>> Spark's co-group syntax (I know they are targeting for
> > >> different
> > >>>> use
> > >>>>>>> cases,
> > >>>>>>>> but wondering if their syntax is closer to the join operator),
> > >>>> what are
> > >>>>>>> the
> > >>>>>>>> syntax / semantics trade-off here?
> > >>>>>>>>
> > >>>>>>>> Just playing a devil's advocate here, if the main motivation is
> > >>> to
> > >>>>>>> provide
> > >>>>>>>> a more convienent multi-way join syntax, and in order to only
> > >>> have
> > >>>> one
> > >>>>>>>> materialized store we need to specify the final joined format
> > >> at
> > >>>> the
> > >>>>>>>> beginning, then what about the following alternative (with the
> > >>>> given
> > >>>>>>>> example in your wiki page):
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> KGroupedStream<K, V1> grouped1 =
> > >>>> builder.stream("topic1").groupByKey();
> > >>>>>>>> KGroupedStream<K, V2> grouped2 =
> > >>>> builder.stream("topic2").groupByKey();
> > >>>>>>>> KGroupedStream<K, V3> grouped3 =
> > >>>> builder.stream("topic3").groupByKey();
> > >>>>>>>>
> > >>>>>>>> KTable<K, CG> aggregated = grouped1.aggregate(initializer,
> > >>>>>> materialized,
> > >>>>>>>> aggregator1);
> > >>>>>>>>
> > >>>>>>>> aggregated.absorb(grouped2, aggregator2);  // I'm just using a
> > >>>> random
> > >>>>>>> name
> > >>>>>>>> on top of my head here
> > >>>>>>>>                   .absorb(grouped3, aggregator3);
> > >>>>>>>>
> > >>>>>>>> In this way, we just add a new API to the KTable to "absorb"
> > >> new
> > >>>>>> streams
> > >>>>>>> as
> > >>>>>>>> aggregated results without needing to introduce new first
> > >> citizen
> > >>>>>>> classes.
> > >>>>>>>>
> > >>>>>>>> 2. From the DSL optimization, have we considered if we can auto
> > >>>>>> re-write
> > >>>>>>>> the user written old fashioned multi-join into this new DSL
> > >>>> operator?
> > >>>>>>>>
> > >>>>>>>> 3. Although it is not needed for the wiki page itself, for
> > >>> internal
> > >>>>>>>> implementation how many processor nodes would we create for the
> > >>> new
> > >>>>>>>> operator, and how we can allow users to name them?
> > >>>>>>>>
> > >>>>>>>> Minor:
> > >>>>>>>>
> > >>>>>>>> 4. In "Public Interfaces", better add the templated generics to
> > >>>>>>>> "KGroupedStream" as "KGroupedStream<K, V>".
> > >>>>>>>>
> > >>>>>>>> 5. Naming wise, I'd suggest we keep the "K" together with
> > >>>> Stream/Table,
> > >>>>>>>> e.g. "TimeWindowed*CogroupedKStream*<K, V>".
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Guozhang
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On Thu, Oct 24, 2019 at 11:43 PM Matthias J. Sax <
> > >>>>>> matth...@confluent.io>
> > >>>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Walker,
> > >>>>>>>>>
> > >>>>>>>>> I am not sure if I can follow your argument. What do you
> > >>> exactly
> > >>>> mean
> > >>>>>>> by
> > >>>>>>>>>
> > >>>>>>>>>> I also
> > >>>>>>>>>>> think that in this case it would be better to separate
> > >> the 2
> > >>>>>> option
> > >>>>>>>> out
> > >>>>>>>>>>> into separate overloads.
> > >>>>>>>>>
> > >>>>>>>>> Maybe you can give an example what method signature you have
> > >> in
> > >>>> mind?
> > >>>>>>>>>
> > >>>>>>>>>>> We could take a named parameter from upstream or add an
> > >>> extra
> > >>>>>> naming
> > >>>>>>>>> option
> > >>>>>>>>>>> however I don't really see the advantage that would give.
> > >>>>>>>>>
> > >>>>>>>>> Are you familiar with KIP-307? Before KIP-307, KS generated
> > >> all
> > >>>> names
> > >>>>>>>>> for all Processors. This makes it hard to reason about a
> > >>>> Topology if
> > >>>>>>>>> it's getting complex. Adding `Named` to the new co-group
> > >>> operator
> > >>>>>> would
> > >>>>>>>>> actually align with KIP-307.
> > >>>>>>>>>
> > >>>>>>>>>> It seems to go in
> > >>>>>>>>>>> the opposite direction from the cogroup configuration idea
> > >>> you
> > >>>>>>>> proposed.
> > >>>>>>>>>
> > >>>>>>>>> Can you elaborate? Not sure if I can follow.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> -Matthias
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On 10/24/19 10:20 AM, Walker Carlson wrote:
> > >>>>>>>>>> While I like the idea Sophie I don't think that it is
> > >>>> necessary. I
> > >>>>>>> also
> > >>>>>>>>>> think that in this case it would be better to separate the
> > >> 2
> > >>>> option
> > >>>>>>> out
> > >>>>>>>>>> into separate overloads.
> > >>>>>>>>>> We could take a named parameter from upstream or add an
> > >> extra
> > >>>>>> naming
> > >>>>>>>>> option
> > >>>>>>>>>> however I don't really see the advantage that would give.
> > >> It
> > >>>> seems
> > >>>>>> to
> > >>>>>>>> go
> > >>>>>>>>> in
> > >>>>>>>>>> the opposite direction from the cogroup configuration idea
> > >>> you
> > >>>>>>>> proposed.
> > >>>>>>>>>>
> > >>>>>>>>>> John, I think it could be both. It depends on when you
> > >>>> aggregate
> > >>>>>> and
> > >>>>>>>> what
> > >>>>>>>>>> kind of data you have. In the example it is aggregating
> > >>> before
> > >>>>>>> joining,
> > >>>>>>>>>> there are probably some cases where you could join before
> > >>>>>>> aggregating.
> > >>>>>>>>> IMHO
> > >>>>>>>>>> it would be easier to group all the streams together then
> > >>>> perform
> > >>>>>> the
> > >>>>>>>> one
> > >>>>>>>>>> operation that results in a single KTable.
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On Wed, Oct 23, 2019 at 9:58 PM Sophie Blee-Goldman <
> > >>>>>>>> sop...@confluent.io
> > >>>>>>>>>>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>>> I can personally not see any need to add other
> > >>> configuration
> > >>>>>>>>>>> Famous last words?
> > >>>>>>>>>>>
> > >>>>>>>>>>> Just kidding, 95% confidence is more than enough to  me
> > >> (and
> > >>>>>> better
> > >>>>>>> to
> > >>>>>>>>>>> optimize for current
> > >>>>>>>>>>> design than for hypothetical future changes).
> > >>>>>>>>>>>
> > >>>>>>>>>>> One last question I have then is about the
> > >>>>>>> operator/store/repartition
> > >>>>>>>>>>> naming -- seems like
> > >>>>>>>>>>> we can name the underlying store/changelog through the
> > >>>>>> Materialized
> > >>>>>>>>>>> parameter, but do we
> > >>>>>>>>>>> also want to include an overload taking a Named parameter
> > >>> for
> > >>>> the
> > >>>>>>>>> operator
> > >>>>>>>>>>> name (as in the
> > >>>>>>>>>>> KTable#join variations)?
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Wed, Oct 23, 2019 at 5:14 PM Matthias J. Sax <
> > >>>>>>>> matth...@confluent.io>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Interesting idea, Sophie.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> So far, we tried to reuse existing config objects and
> > >> only
> > >>>> add
> > >>>>>> new
> > >>>>>>>> ones
> > >>>>>>>>>>>> when needed to avoid creating "redundant" classes. This
> > >> is
> > >>> of
> > >>>>>>> course
> > >>>>>>>> a
> > >>>>>>>>>>>> reactive approach (with the drawback to deprecate stuff
> > >> if
> > >>> we
> > >>>>>>> change
> > >>>>>>>>> it,
> > >>>>>>>>>>>> as you described).
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I can personally not see any need to add other
> > >>> configuration
> > >>>>>>>> parameters
> > >>>>>>>>>>>> atm, so it's a 95% obvious "no" IMHO. The final
> > >>>> `aggregate()` has
> > >>>>>>>> only
> > >>>>>>>>> a
> > >>>>>>>>>>>> single state store that we need to configure, and reusing
> > >>>>>>>>> `Materialized`
> > >>>>>>>>>>>> seems to be appropriate.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Also note, that the `Initializer` is a mandatory
> > >> parameter
> > >>>> and
> > >>>>>> not
> > >>>>>>> a
> > >>>>>>>>>>>> configuration and should be passed directly, and not via
> > >> a
> > >>>>>>>>> configuration
> > >>>>>>>>>>>> object.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On 10/23/19 11:37 AM, Sophie Blee-Goldman wrote:
> > >>>>>>>>>>>>> Thanks for the explanation, makes sense to me! As for
> > >> the
> > >>>> API,
> > >>>>>> one
> > >>>>>>>>>>> other
> > >>>>>>>>>>>>> thought I had is might we ever want or need to introduce
> > >>> any
> > >>>>>> other
> > >>>>>>>>>>>> configs
> > >>>>>>>>>>>>> or parameters in the future? Obviously that's difficult
> > >> to
> > >>>> say
> > >>>>>> now
> > >>>>>>>> (or
> > >>>>>>>>>>>>> maybe the
> > >>>>>>>>>>>>> answer seems obviously "no") but we seem to often end up
> > >>>> needing
> > >>>>>>> to
> > >>>>>>>>> add
> > >>>>>>>>>>>> new
> > >>>>>>>>>>>>> overloads and/or deprecate old ones as new features or
> > >>>>>>> requirements
> > >>>>>>>>>>> come
> > >>>>>>>>>>>>> into
> > >>>>>>>>>>>>> play.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> What do you (and others?) think about wrapping the
> > >> config
> > >>>>>>> parameters
> > >>>>>>>>>>> (ie
> > >>>>>>>>>>>>> everything
> > >>>>>>>>>>>>> except the actual grouped streams) in a new config
> > >> object?
> > >>>> For
> > >>>>>>>>> example,
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>> CogroupedStream#aggregate field could take a single
> > >>>> Cogrouped
> > >>>>>>>> object,
> > >>>>>>>>>>>>> which itself would have an initializer and a
> > >> materialized.
> > >>>> If we
> > >>>>>>>> ever
> > >>>>>>>>>>>> need
> > >>>>>>>>>>>>> to add
> > >>>>>>>>>>>>> a new parameter, we can just add it to the Cogrouped
> > >>> class.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Also, will the backing store be available for IQ if a
> > >>>>>> Materialized
> > >>>>>>>> is
> > >>>>>>>>>>>>> passed in?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Wed, Oct 23, 2019 at 10:49 AM Walker Carlson <
> > >>>>>>>>> wcarl...@confluent.io
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Hi Sophie,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Thank you for your comments. As for the different
> > >> methods
> > >>>>>>>> signatures
> > >>>>>>>>> I
> > >>>>>>>>>>>> have
> > >>>>>>>>>>>>>> not really considered any other options but  while I do
> > >>>> agree
> > >>>>>> it
> > >>>>>>> is
> > >>>>>>>>>>>>>> confusing, I don't see any obvious solutions. The
> > >> problem
> > >>>> is
> > >>>>>> that
> > >>>>>>>> the
> > >>>>>>>>>>>>>> cogroup essentially pairs a group stream with an
> > >>>> aggregator and
> > >>>>>>>> when
> > >>>>>>>>>>> it
> > >>>>>>>>>>>> is
> > >>>>>>>>>>>>>> first made the method is called on a groupedStream
> > >>> already.
> > >>>>>>> However
> > >>>>>>>>>>> each
> > >>>>>>>>>>>>>> subsequent stream-aggregator pair is added on to a
> > >>> cogroup
> > >>>>>> stream
> > >>>>>>>> so
> > >>>>>>>>>>> it
> > >>>>>>>>>>>>>> needs both arguments.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> For the second question you should not need a joiner.
> > >> The
> > >>>> idea
> > >>>>>> is
> > >>>>>>>>> that
> > >>>>>>>>>>>> you
> > >>>>>>>>>>>>>> can collect many grouped streams with overlapping key
> > >>>> spaces
> > >>>>>> and
> > >>>>>>>> any
> > >>>>>>>>>>>> kind
> > >>>>>>>>>>>>>> of value types. Once aggregated its value will be
> > >> reduced
> > >>>> into
> > >>>>>>> one
> > >>>>>>>>>>> type.
> > >>>>>>>>>>>>>> This is why you need only one initializer. Each
> > >>> aggregator
> > >>>> will
> > >>>>>>>> need
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>>> integrate the new value with the new object made in the
> > >>>>>>>> initializer.
> > >>>>>>>>>>>>>> Does that make sense?
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> This is a good question and I will include this
> > >>>> explanation in
> > >>>>>>> the
> > >>>>>>>>> kip
> > >>>>>>>>>>>> as
> > >>>>>>>>>>>>>> well.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>> Walker
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On Tue, Oct 22, 2019 at 8:59 PM Sophie Blee-Goldman <
> > >>>>>>>>>>>> sop...@confluent.io>
> > >>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Hey Walker,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Thanks for the KIP! I have just a couple of questions:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 1) It seems a little awkward to me that with the
> > >> current
> > >>>> API,
> > >>>>>> we
> > >>>>>>>>>>> have a
> > >>>>>>>>>>>>>>> nearly identical
> > >>>>>>>>>>>>>>> "add stream to cogroup" method, except for the first
> > >>> which
> > >>>>>> has a
> > >>>>>>>>>>>>>> different
> > >>>>>>>>>>>>>>> signature
> > >>>>>>>>>>>>>>> (ie the first stream is joined as
> > >>>> stream.cogroup(Aggregator)
> > >>>>>>> while
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>>> subsequent ones
> > >>>>>>>>>>>>>>> are joined as .cogroup(Stream, Aggregator) ). I'm not
> > >>> sure
> > >>>>>> what
> > >>>>>>> it
> > >>>>>>>>>>>> would
> > >>>>>>>>>>>>>>> look like exactly,
> > >>>>>>>>>>>>>>> but I was just wondering if you'd considered and/or
> > >>>> rejected
> > >>>>>> any
> > >>>>>>>>>>>>>>> alternative APIs?
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 2) This might just be my lack of familiarity with
> > >>>> "cogroup"
> > >>>>>> as a
> > >>>>>>>>>>>> concept,
> > >>>>>>>>>>>>>>> but with the
> > >>>>>>>>>>>>>>> current (non-optimal) API the user seems to have some
> > >>>> control
> > >>>>>>> over
> > >>>>>>>>>>> how
> > >>>>>>>>>>>>>>> exactly
> > >>>>>>>>>>>>>>> the different streams are joined through the
> > >>> ValueJoiners.
> > >>>>>> Would
> > >>>>>>>>> this
> > >>>>>>>>>>>> new
> > >>>>>>>>>>>>>>> cogroup
> > >>>>>>>>>>>>>>> simply concatenate the values from the different
> > >> cogroup
> > >>>>>>> streams,
> > >>>>>>>> or
> > >>>>>>>>>>>>>> could
> > >>>>>>>>>>>>>>> users
> > >>>>>>>>>>>>>>> potentially pass some kind of Joiner to the
> > >>>> cogroup/aggregate
> > >>>>>>>>>>> methods?
> > >>>>>>>>>>>>>> Or,
> > >>>>>>>>>>>>>>> is the
> > >>>>>>>>>>>>>>> whole point of cogroups that you no longer ever need
> > >> to
> > >>>>>> specify
> > >>>>>>> a
> > >>>>>>>>>>>> Joiner?
> > >>>>>>>>>>>>>>> If so, you
> > >>>>>>>>>>>>>>> should add a short line to the KIP explaining that for
> > >>>> those
> > >>>>>> of
> > >>>>>>> us
> > >>>>>>>>>>> who
> > >>>>>>>>>>>>>>> aren't fluent
> > >>>>>>>>>>>>>>> in cogroup semantics :)
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Cheers,
> > >>>>>>>>>>>>>>> Sophie
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Thu, Oct 17, 2019 at 3:06 PM Walker Carlson <
> > >>>>>>>>>>> wcarl...@confluent.io>
> > >>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Good catch I updated that.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I have made a PR for this KIP
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I then am splitting it into 3 parts, first cogroup
> > >> for
> > >>> a
> > >>>>>>>> key-value
> > >>>>>>>>>>>>>> store
> > >>>>>>>>>>>>>>> (
> > >>>>>>>>>>>>>>>> here <https://github.com/apache/kafka/pull/7538>),
> > >>> then
> > >>>> for
> > >>>>>> a
> > >>>>>>>>>>>>>>>> timeWindowedStore, and then a sessionWindowedStore +
> > >>>> ensuring
> > >>>>>>>>>>>>>>> partitioning.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Walker
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On Tue, Oct 15, 2019 at 12:47 PM Matthias J. Sax <
> > >>>>>>>>>>>>>> matth...@confluent.io>
> > >>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Walker,
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> thanks for picking up the KIP and reworking it for
> > >> the
> > >>>>>> changed
> > >>>>>>>>> API.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Overall, the updated API suggestions make sense to
> > >> me.
> > >>>> The
> > >>>>>>> seem
> > >>>>>>>> to
> > >>>>>>>>>>>>>>> align
> > >>>>>>>>>>>>>>>>> quite nicely with our current API design.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> One nit: In `CogroupedKStream#aggregate(...)` the
> > >> type
> > >>>>>>> parameter
> > >>>>>>>>> of
> > >>>>>>>>>>>>>>>>> `Materialized` should be `V`, not `VR`?
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> On 10/14/19 2:57 PM, Walker Carlson wrote:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
> > >>>>>>>>>>>>>>>>>> here
> > >>>>>>>>>>>>>>>>>> is a link
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> On Mon, Oct 14, 2019 at 2:52 PM Walker Carlson <
> > >>>>>>>>>>>>>>> wcarl...@confluent.io>
> > >>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Hello all,
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> I have picked up and updated KIP-150. Due to
> > >> changes
> > >>>> to
> > >>>>>> the
> > >>>>>>>>>>>>>> project
> > >>>>>>>>>>>>>>>>> since
> > >>>>>>>>>>>>>>>>>>> KIP #150 was written there are a few items that
> > >> need
> > >>>> to be
> > >>>>>>>>>>>>>> updated.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> First item that changed is the adoption of the
> > >>>>>> Materialized
> > >>>>>>>>>>>>>>> parameter.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> The second item is the WindowedBy. How the old KIP
> > >>>> handles
> > >>>>>>>>>>>>>> windowing
> > >>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>> that it overloads the aggregate function to take
> > >> in
> > >>> a
> > >>>>>> Window
> > >>>>>>>>>>>>>> object
> > >>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>> well
> > >>>>>>>>>>>>>>>>>>> as the other parameters. The current practice to
> > >>>> window
> > >>>>>>>>>>>>>>>> grouped-streams
> > >>>>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>>> to call windowedBy and receive a windowed stream
> > >>>> object.
> > >>>>>> The
> > >>>>>>>>>>>>>>> existing
> > >>>>>>>>>>>>>>>>>>> interface for a windowed stream made from a
> > >> grouped
> > >>>> stream
> > >>>>>>>> will
> > >>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>>> work
> > >>>>>>>>>>>>>>>>>>> for cogrouped streams. Hence, we have to make new
> > >>>>>> interfaces
> > >>>>>>>> for
> > >>>>>>>>>>>>>>>>> cogrouped
> > >>>>>>>>>>>>>>>>>>> windowed streams.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Please take a look, I would like to hear your
> > >>>> feedback,
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Walker
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>> --
> > >>>>>>>> -- Guozhang
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> --
> > >>>>>> -- Guozhang
> > >>>>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> >
> >
>
> --
> -- Guozhang
>

Reply via email to