Hi Guozhang Thank you very much for your response. Since KStream does not have an aggregate method, I guess you meant: "builder.stream("customer").groupByKey()", which can then be used in the CoGroup, and there I'd probably need an aggregation like this: (Long key, Customer value, Customer agg) -> { value.cart = agg.cart; value.purchases = agg.purchases; return value; }
I think this works. But if I need a leftJoin, ie if I just want to send updates for Customers that are in the customers topic, I'd still have to add a filter in the end to check if the Customer base fields were set, ie, if id is null, then we never received a Customer from the customer stream, and we can filter it out. Anyway, I was just trying to understand if I was missing something. Also, is there a plan to extend this feature to KTables in the future? Or is this considered as not relevant or not feasible for some reason for KTables? I couldn't find in the KIP anything about supporting or not supporting KTables. Thanks again Murilo On Thu, 16 Apr 2020 at 19:27, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Murilo, > > Thanks for your interests in KIP-150. > > As we discussed in the KIP, the scope of this co-group is for stream > co-aggregation. For your case, the first joining table is not from the > aggregation but is a source table itself, in this case it cannot be > included in the co-group of KIP-150. > > Although we discussed about extending it to KTable multi-joins as well it > is not included in the current release, one (awkward) walk-around I can > think of for now, is to transform your `customer` table as a > "builder.stream("customer").aggregate(/*a dummy reducer that just > materialize the stream records*/)" and then it can be included in the > co-group. > > Guozhang > > > On Thu, Apr 16, 2020 at 1:23 PM Murilo Tavares <murilo...@gmail.com> > wrote: > > > Hi > > I'm really excited about the new release for KafkaStreams. > > I've been watching the new CoGroup feature, and now that this is out, I'm > > trying to play around with it. > > I wonder what would be the best way to do a > > KTable.leftJoin(otherTable).leftJoin(yetAnotherTable)... > > Taking the Customer example in the KIP ( > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup > > ), > > how would one do the cogroup if we had a Customer topic as well? > > > > In the old API, we would have > > KTable<Long, Customer> customer = builder.table("customer"); > > KTable<K, V1> carts = > > builder.table("carts").groupByKey().aggregate(initializer1, > > aggregator1, materialized1); > > KTable<K, V2> purchases = > > builder.stream("purchases").groupByKey().aggregate(initializer2, > > aggregator2, materialized2); > > KTable<K, CG> final = customer.leftJoin(carts, > > joinerOneAndTwo).leftJoin(purchases, joinerOneTwoAndThree); > > > > Thanks > > Murilo > > > > > -- > -- Guozhang >