Hi, a while ago I hit KAFKA-4609 when running a simple pipeline. I have two KTable joins followed by group by and aggregate on and KStream and one additional join. Now this KTable/KTable join followed by group by and aggregated genereates duplicates.
I wonder if a possible workaround would be to remove the KStream after KTable/KTable join and make groupBy and aggregate on the KTable? KTable<Integer,CustomerAndPolicy> customerAndPolicyGroupedKTable = customerGrouped.leftJoin(policyGrouped,(customer, policy) -> new CustomerAndPolicy(customer,policy)); KTable<String,ClaimAndPayment> claimAndPaymentKTable = claimStrGrouped.leftJoin(paymentGrouped,(claim,payment) -> new ClaimAndPayment(claim,payment),claimAndPaymentSerde,CLAIM_AND_PAYMENT_JOIN_STORE); * KStream<String,ClaimAndPayment> claimAndPaymentKStream = claimAndPaymentKTable.toStream(); //Can we remove this and avoid KAFKA-4609?* KTable<Integer,ClaimAndPayment2> claimAndPayment2IntGroupedTable = claimAndPaymentKStream .groupBy((k,claimPay) -> (claimPay.claimList != null ) ? Integer.parseInt(claimPay.claimList.claimRecords.get(0).claimnumber.split("_")[0]) : 999,integerSerde,claimAndPaymentSerde ) .aggregate( ClaimAndPayment2::new, (claimKey,claimPay,claimAndPay2) -> { claimAndPay2.claimAndPaymentList.add(claimPay); return claimAndPay2; } ,claimAndPayment2Serde ,CLAIM_AND_PAYMENT_STORE ); Best regards Artur Mrozowski