Hello Murilo,

Thanks for your interests in KIP-150.

As we discussed in the KIP, the scope of this co-group is for stream
co-aggregation. For your case, the first joining table is not from the
aggregation but is a source table itself, in this case it cannot be
included in the co-group of KIP-150.

Although we discussed about extending it to KTable multi-joins as well it
is not included in the current release, one (awkward) walk-around I can
think of for now, is to transform your `customer` table as a
"builder.stream("customer").aggregate(/*a dummy reducer that just
materialize the stream records*/)" and then it can be included in the
co-group.

Guozhang


On Thu, Apr 16, 2020 at 1:23 PM Murilo Tavares <murilo...@gmail.com> wrote:

> Hi
> I'm really excited about the new release for KafkaStreams.
> I've been watching the new CoGroup feature, and now that this is out, I'm
> trying to play around with it.
> I wonder what would be the best way to do a
> KTable.leftJoin(otherTable).leftJoin(yetAnotherTable)...
> Taking the Customer example in the KIP (
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
> ),
> how would one do the cogroup if we had a Customer topic as well?
>
> In the old API, we would have
> KTable<Long, Customer> customer = builder.table("customer");
> KTable<K, V1> carts =
> builder.table("carts").groupByKey().aggregate(initializer1,
> aggregator1, materialized1);
> KTable<K, V2> purchases =
> builder.stream("purchases").groupByKey().aggregate(initializer2,
> aggregator2, materialized2);
> KTable<K, CG> final = customer.leftJoin(carts,
> joinerOneAndTwo).leftJoin(purchases, joinerOneTwoAndThree);
>
> Thanks
> Murilo
>


-- 
-- Guozhang

Reply via email to