Hi,
a while ago I hit KAFKA-4609 when running a simple pipeline. I have two
KTable joins followed by group by and aggregate on and KStream  and one
additional join. Now this KTable/KTable join followed by group by  and
aggregated genereates duplicates.



I wonder if a possible workaround would be to remove the KStream after
KTable/KTable join and make groupBy and aggregate  on the KTable?


 KTable<Integer,CustomerAndPolicy> customerAndPolicyGroupedKTable =
customerGrouped.leftJoin(policyGrouped,(customer, policy) -> new
CustomerAndPolicy(customer,policy));

       KTable<String,ClaimAndPayment> claimAndPaymentKTable =
claimStrGrouped.leftJoin(paymentGrouped,(claim,payment) -> new
ClaimAndPayment(claim,payment),claimAndPaymentSerde,CLAIM_AND_PAYMENT_JOIN_STORE);


 *     KStream<String,ClaimAndPayment> claimAndPaymentKStream =
claimAndPaymentKTable.toStream(); //Can we remove this and avoid
KAFKA-4609?*

       KTable<Integer,ClaimAndPayment2> claimAndPayment2IntGroupedTable =
claimAndPaymentKStream
               .groupBy((k,claimPay) ->
                   (claimPay.claimList != null ) ?

Integer.parseInt(claimPay.claimList.claimRecords.get(0).claimnumber.split("_")[0])
:  999,integerSerde,claimAndPaymentSerde )
               .aggregate(
                        ClaimAndPayment2::new,
                        (claimKey,claimPay,claimAndPay2) -> {


claimAndPay2.claimAndPaymentList.add(claimPay);

                                return claimAndPay2;

                        }
                        ,claimAndPayment2Serde
                        ,CLAIM_AND_PAYMENT_STORE
                );





Best regards
Artur Mrozowski

Reply via email to