Hi John, thanks. Yes, using hashes is what I already do. Thank you for excellent explanation.
Best Regards Artur On Tue, May 1, 2018 at 9:29 PM, John Roesler <j...@confluent.io> wrote: > Hi Artur, > > Thanks for the clarification. > > I don't think that ".toStream()" actually does anything besides change the > context from KTable to KStream in the DSL. The Javadoc says: > > * Note that this is a logical operation and only changes the > > "interpretation" of the stream, i.e., each record of > > * this changelog stream is no longer treated as an updated record (cf. > > {@link KStream} vs {@code KTable}). > > > Not to belabor the point, but I wouldn't want you to focus too much on > getting rid of the "toStream" and in favor of the same methods on KTable, > as I think that would have the exact same semantics. > > It's entirely possible that some additional tuning on the join could reduce > the deplicates you're seeing. For example, what are your current settings > for commit interval and dedup cache size? > > In any case, though, Kafka Streams's deduplication mechanism is only > best-effort. So if your correctness depends on unique events (as yours > does), I still think you're better off coding in anticipation of > duplicates. For example, you could implement hashCode and equals on > ClaimAndPayment and store them in a LinkedHashSet (to preserve both > uniqueness and order). > > Hope that helps, > -John > > > On Tue, May 1, 2018 at 12:40 PM, Artur Mrozowski <art...@gmail.com> wrote: > > > Hi John, > > yes, the answer is very helpful and your understanding of the data flow > is > > correct. Although, deduplication is not the issue because there will not > be > > any duplicates inserted into the flow. > > These, the duplicates will be generated, from unique records after the > join > > between claim and payments and converting the result to stream. > > But perhaps that stream is entirely avoidable? > > > > So it would look something like this: > > > > KTable<String,ArrayList> left > > {"claimcounter": 0, "claimreporttime": 55948.33110985625, "claimnumber": > > "3_0", "claimtime": 708.521153490306} > > > > and KTable <String,ArrayList> right > > > > {"claimcounter": 0, "paytime": 55960.226718985265, "claimnumber": "3_0", > > "payment": 847015.1437781961} > > > > When I leftjoin theses two objects the result in the state store will be > an > > object containing two ArrayLists left and right, like this > > > > {"claimList":{"lst":[{"claimnumber":"3_0","claimtime" > :"708.521153490306"," > > claimreporttime":"55948.33110985625","claimcounter":" > > 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961," > > paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}} > > > > But I want to continue processing the results by using groupBy and > > aggregate so I convert reuslt of the leftjoin to stream. Now the > resulting > > repartion and changelog topics will contain two identical messages, like > > this > > > > {"claimList":{"lst":[{"claimnumber":"3_0","claimtime" > :"708.521153490306"," > > claimreporttime":"55948.33110985625","claimcounter":" > > 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961," > > paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}} > > {"claimList":{"lst":[{"claimnumber":"3_0","claimtime" > :"708.521153490306"," > > claimreporttime":"55948.33110985625","claimcounter":" > > 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961," > > paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}} > > > > Best regards > > Artur > > > > > > On Mon, Apr 30, 2018 at 5:30 PM, John Roesler <j...@confluent.io> wrote: > > > > > Hello Artur, > > > > > > Apologies in advance if I say something incorrect, as I'm still a > little > > > new to this project. > > > > > > If I followed your example, then I think the scenario is that you're > > > joining "claims" and "payments", grouping by "claimNumber", and then > > > building a list for each "claimNumber" of all the claim/payment pairs. > Is > > > that right? > > > > > > It's not in your example, but the grouped stream or table for "claims" > > > (claimStrGrouped) and "payments" (paymentGrouped) must be keyed with > the > > > same key, right? In that case, the result of their join will also be > > keyed > > > by that same key. > > > > > > It seems like the problem you're seeing is that that list contains the > > same > > > claim/payment pair multiple times for a given claimNumber. Did I get > it? > > > > > > In that case, I don't know if what you're seeing is the same issue > Damian > > > reported in KAFKA-4609, since the problem he reported was that there > was > > no > > > deduping cache after the join, only before it, unless you register a > > state > > > store representing the join itself. In your case, it looks like you do > > > register a state store representing the join, the > > > "CLAIM_AND_PAYMENT_JOIN_STORE". > > > So you will have a cache that can dedup the join result. > > > > > > Note that the join itself is what causes duplicates, not the > subsequent " > > > claimAndPaymentKTable.toStream()". For example, if I see input like > > this: > > > > > > (left stream): > > > t1: k1 -> L1 > > > t3: k1 -> L1 > > > > > > (right stream): > > > t2: k1 -> R1 > > > > > > Then, without deduplication, the resulting join would be: > > > (left.join(right) stream): > > > t1: k1 -> (L1, null) > > > t2: k1 -> (L1, R1) > > > t3: k1 -> (L1, R1) > > > > > > Note that we see apparently duplicate join results, but really the > > meaning > > > of the join stream is that "as of right now, this is the value for this > > > key", so from the join's perspective it's not wrong to say "as of t2, > > k1's > > > value is (L1, R1)" and then to say it at t3 again. > > > > > > In Kafka Streams, there is a deduplication cache which can reduce such > > > duplicate events, but without unbounded memory, the cache can't > guarantee > > > to remove all duplicates, so it's important to deal with the join > result > > in > > > a semantically robust way. > > > > > > I think this also contains the key to resolving your issue; inside your > > > aggregator, instead of storing a list of *every event*, I think you'll > > want > > > to store a map of the *latest event by key*. (This would be the key > > that's > > > common to claimStrGrouped, paymentGrouped, and claimAndPaymentKTable). > > This > > > way, you'll automatically overwrite old, obsolete, join results with > new > > > ones for the same key (whether or not the old result happens to be the > > same > > > as the new one). > > > > > > Does this help? > > > -John > > > > > > On Mon, Apr 30, 2018 at 1:19 AM, Artur Mrozowski <art...@gmail.com> > > wrote: > > > > > > > Hi, > > > > a while ago I hit KAFKA-4609 when running a simple pipeline. I have > two > > > > KTable joins followed by group by and aggregate on and KStream and > one > > > > additional join. Now this KTable/KTable join followed by group by > and > > > > aggregated genereates duplicates. > > > > > > > > > > > > > > > > I wonder if a possible workaround would be to remove the KStream > after > > > > KTable/KTable join and make groupBy and aggregate on the KTable? > > > > > > > > > > > > KTable<Integer,CustomerAndPolicy> customerAndPolicyGroupedKTable = > > > > customerGrouped.leftJoin(policyGrouped,(customer, policy) -> new > > > > CustomerAndPolicy(customer,policy)); > > > > > > > > KTable<String,ClaimAndPayment> claimAndPaymentKTable = > > > > claimStrGrouped.leftJoin(paymentGrouped,(claim,payment) -> new > > > > ClaimAndPayment(claim,payment),claimAndPaymentSerde,CLAIM_ > > > > AND_PAYMENT_JOIN_STORE); > > > > > > > > > > > > * KStream<String,ClaimAndPayment> claimAndPaymentKStream = > > > > claimAndPaymentKTable.toStream(); //Can we remove this and avoid > > > > KAFKA-4609?* > > > > > > > > KTable<Integer,ClaimAndPayment2> > claimAndPayment2IntGroupedTabl > > e > > > = > > > > claimAndPaymentKStream > > > > .groupBy((k,claimPay) -> > > > > (claimPay.claimList != null ) ? > > > > > > > > Integer.parseInt(claimPay.claimList.claimRecords.get(0). > > > > claimnumber.split("_")[0]) > > > > : 999,integerSerde,claimAndPaymentSerde ) > > > > .aggregate( > > > > ClaimAndPayment2::new, > > > > (claimKey,claimPay,claimAndPay2) -> { > > > > > > > > > > > > claimAndPay2.claimAndPaymentList.add(claimPay); > > > > > > > > return claimAndPay2; > > > > > > > > } > > > > ,claimAndPayment2Serde > > > > ,CLAIM_AND_PAYMENT_STORE > > > > ); > > > > > > > > > > > > > > > > > > > > > > > > Best regards > > > > Artur Mrozowski > > > > > > > > > >