Hello Artur, Apologies in advance if I say something incorrect, as I'm still a little new to this project.
If I followed your example, then I think the scenario is that you're joining "claims" and "payments", grouping by "claimNumber", and then building a list for each "claimNumber" of all the claim/payment pairs. Is that right? It's not in your example, but the grouped stream or table for "claims" (claimStrGrouped) and "payments" (paymentGrouped) must be keyed with the same key, right? In that case, the result of their join will also be keyed by that same key. It seems like the problem you're seeing is that that list contains the same claim/payment pair multiple times for a given claimNumber. Did I get it? In that case, I don't know if what you're seeing is the same issue Damian reported in KAFKA-4609, since the problem he reported was that there was no deduping cache after the join, only before it, unless you register a state store representing the join itself. In your case, it looks like you do register a state store representing the join, the "CLAIM_AND_PAYMENT_JOIN_STORE". So you will have a cache that can dedup the join result. Note that the join itself is what causes duplicates, not the subsequent " claimAndPaymentKTable.toStream()". For example, if I see input like this: (left stream): t1: k1 -> L1 t3: k1 -> L1 (right stream): t2: k1 -> R1 Then, without deduplication, the resulting join would be: (left.join(right) stream): t1: k1 -> (L1, null) t2: k1 -> (L1, R1) t3: k1 -> (L1, R1) Note that we see apparently duplicate join results, but really the meaning of the join stream is that "as of right now, this is the value for this key", so from the join's perspective it's not wrong to say "as of t2, k1's value is (L1, R1)" and then to say it at t3 again. In Kafka Streams, there is a deduplication cache which can reduce such duplicate events, but without unbounded memory, the cache can't guarantee to remove all duplicates, so it's important to deal with the join result in a semantically robust way. I think this also contains the key to resolving your issue; inside your aggregator, instead of storing a list of *every event*, I think you'll want to store a map of the *latest event by key*. (This would be the key that's common to claimStrGrouped, paymentGrouped, and claimAndPaymentKTable). This way, you'll automatically overwrite old, obsolete, join results with new ones for the same key (whether or not the old result happens to be the same as the new one). Does this help? -John On Mon, Apr 30, 2018 at 1:19 AM, Artur Mrozowski <art...@gmail.com> wrote: > Hi, > a while ago I hit KAFKA-4609 when running a simple pipeline. I have two > KTable joins followed by group by and aggregate on and KStream and one > additional join. Now this KTable/KTable join followed by group by and > aggregated genereates duplicates. > > > > I wonder if a possible workaround would be to remove the KStream after > KTable/KTable join and make groupBy and aggregate on the KTable? > > > KTable<Integer,CustomerAndPolicy> customerAndPolicyGroupedKTable = > customerGrouped.leftJoin(policyGrouped,(customer, policy) -> new > CustomerAndPolicy(customer,policy)); > > KTable<String,ClaimAndPayment> claimAndPaymentKTable = > claimStrGrouped.leftJoin(paymentGrouped,(claim,payment) -> new > ClaimAndPayment(claim,payment),claimAndPaymentSerde,CLAIM_ > AND_PAYMENT_JOIN_STORE); > > > * KStream<String,ClaimAndPayment> claimAndPaymentKStream = > claimAndPaymentKTable.toStream(); //Can we remove this and avoid > KAFKA-4609?* > > KTable<Integer,ClaimAndPayment2> claimAndPayment2IntGroupedTable = > claimAndPaymentKStream > .groupBy((k,claimPay) -> > (claimPay.claimList != null ) ? > > Integer.parseInt(claimPay.claimList.claimRecords.get(0). > claimnumber.split("_")[0]) > : 999,integerSerde,claimAndPaymentSerde ) > .aggregate( > ClaimAndPayment2::new, > (claimKey,claimPay,claimAndPay2) -> { > > > claimAndPay2.claimAndPaymentList.add(claimPay); > > return claimAndPay2; > > } > ,claimAndPayment2Serde > ,CLAIM_AND_PAYMENT_STORE > ); > > > > > > Best regards > Artur Mrozowski >