Hi Guozhang
Thank you very much for your response.
Since KStream does not have an aggregate method, I guess you meant:
"builder.stream("customer").groupByKey()", which can then be used in the
CoGroup, and there I'd probably need an aggregation like this:
(Long key, Customer value, Customer agg) -> {
     value.cart = agg.cart;
     value.purchases = agg.purchases;
     return value;
}

I think this works. But if I need a leftJoin, ie if I just want to send
updates for Customers that are in the customers topic, I'd still have to
add a filter in the end to check if the Customer base fields were set, ie,
if id is null, then we never received a Customer from the customer stream,
and we can filter it out.

Anyway, I was just trying to understand if I was missing something.
Also, is there a plan to extend this feature to KTables in the future? Or
is this considered as not relevant or not feasible for some reason for
KTables? I couldn't find in the KIP anything about supporting or not
supporting KTables.
Thanks again
Murilo


On Thu, 16 Apr 2020 at 19:27, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Murilo,
>
> Thanks for your interests in KIP-150.
>
> As we discussed in the KIP, the scope of this co-group is for stream
> co-aggregation. For your case, the first joining table is not from the
> aggregation but is a source table itself, in this case it cannot be
> included in the co-group of KIP-150.
>
> Although we discussed about extending it to KTable multi-joins as well it
> is not included in the current release, one (awkward) walk-around I can
> think of for now, is to transform your `customer` table as a
> "builder.stream("customer").aggregate(/*a dummy reducer that just
> materialize the stream records*/)" and then it can be included in the
> co-group.
>
> Guozhang
>
>
> On Thu, Apr 16, 2020 at 1:23 PM Murilo Tavares <murilo...@gmail.com>
> wrote:
>
> > Hi
> > I'm really excited about the new release for KafkaStreams.
> > I've been watching the new CoGroup feature, and now that this is out, I'm
> > trying to play around with it.
> > I wonder what would be the best way to do a
> > KTable.leftJoin(otherTable).leftJoin(yetAnotherTable)...
> > Taking the Customer example in the KIP (
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
> > ),
> > how would one do the cogroup if we had a Customer topic as well?
> >
> > In the old API, we would have
> > KTable<Long, Customer> customer = builder.table("customer");
> > KTable<K, V1> carts =
> > builder.table("carts").groupByKey().aggregate(initializer1,
> > aggregator1, materialized1);
> > KTable<K, V2> purchases =
> > builder.stream("purchases").groupByKey().aggregate(initializer2,
> > aggregator2, materialized2);
> > KTable<K, CG> final = customer.leftJoin(carts,
> > joinerOneAndTwo).leftJoin(purchases, joinerOneTwoAndThree);
> >
> > Thanks
> > Murilo
> >
>
>
> --
> -- Guozhang
>

Reply via email to