Hi John,
thanks. Yes, using hashes is what I already do.
Thank you for excellent explanation.

Best Regards
Artur

On Tue, May 1, 2018 at 9:29 PM, John Roesler <j...@confluent.io> wrote:

> Hi Artur,
>
> Thanks for the clarification.
>
> I don't think that ".toStream()" actually does anything besides change the
> context from KTable to KStream in the DSL. The Javadoc says:
>
> * Note that this is a logical operation and only changes the
> > "interpretation" of the stream, i.e., each record of
> > * this changelog stream is no longer treated as an updated record (cf.
> > {@link KStream} vs {@code KTable}).
>
>
> Not to belabor the point, but I wouldn't want you to focus too much on
> getting rid of the "toStream" and in favor of the same methods on KTable,
> as I think that would have the exact same semantics.
>
> It's entirely possible that some additional tuning on the join could reduce
> the deplicates you're seeing. For example, what are your current settings
> for commit interval and dedup cache size?
>
> In any case, though, Kafka Streams's deduplication mechanism is only
> best-effort. So if your correctness depends on unique events (as yours
> does), I still think you're better off coding in anticipation of
> duplicates. For example, you could implement hashCode and equals on
> ClaimAndPayment and store them in a LinkedHashSet (to preserve both
> uniqueness and order).
>
> Hope that helps,
> -John
>
>
> On Tue, May 1, 2018 at 12:40 PM, Artur Mrozowski <art...@gmail.com> wrote:
>
> > Hi John,
> > yes, the answer is very helpful and your understanding of the data flow
> is
> > correct. Although, deduplication is not the issue because there will not
> be
> > any duplicates inserted into the flow.
> > These, the duplicates will be generated, from unique records after the
> join
> > between claim and payments and converting the result to stream.
> > But perhaps that stream is entirely avoidable?
> >
> > So it would look something like this:
> >
> > KTable<String,ArrayList> left
> > {"claimcounter": 0, "claimreporttime": 55948.33110985625, "claimnumber":
> > "3_0", "claimtime": 708.521153490306}
> >
> > and KTable <String,ArrayList> right
> >
> > {"claimcounter": 0, "paytime": 55960.226718985265, "claimnumber": "3_0",
> > "payment": 847015.1437781961}
> >
> > When I leftjoin theses two objects the result in the state store will be
> an
> > object containing two  ArrayLists left and right, like this
> >
> > {"claimList":{"lst":[{"claimnumber":"3_0","claimtime"
> :"708.521153490306","
> > claimreporttime":"55948.33110985625","claimcounter":"
> > 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
> > paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
> >
> > But I want to continue processing the results by using groupBy and
> > aggregate so I convert reuslt of the leftjoin to stream. Now the
> resulting
> > repartion and changelog topics will contain two identical messages, like
> > this
> >
> > {"claimList":{"lst":[{"claimnumber":"3_0","claimtime"
> :"708.521153490306","
> > claimreporttime":"55948.33110985625","claimcounter":"
> > 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
> > paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
> > {"claimList":{"lst":[{"claimnumber":"3_0","claimtime"
> :"708.521153490306","
> > claimreporttime":"55948.33110985625","claimcounter":"
> > 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
> > paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
> >
> > Best regards
> > Artur
> >
> >
> > On Mon, Apr 30, 2018 at 5:30 PM, John Roesler <j...@confluent.io> wrote:
> >
> > > Hello Artur,
> > >
> > > Apologies in advance if I say something incorrect, as I'm still a
> little
> > > new to this project.
> > >
> > > If I followed your example, then I think the scenario is that you're
> > > joining "claims" and "payments", grouping by "claimNumber", and then
> > > building a list for each "claimNumber" of all the claim/payment pairs.
> Is
> > > that right?
> > >
> > > It's not in your example, but the grouped stream or table for "claims"
> > > (claimStrGrouped) and "payments" (paymentGrouped) must be keyed with
> the
> > > same key, right? In that case, the result of their join will also be
> > keyed
> > > by that same key.
> > >
> > > It seems like the problem you're seeing is that that list contains the
> > same
> > > claim/payment pair multiple times for a given claimNumber. Did I get
> it?
> > >
> > > In that case, I don't know if what you're seeing is the same issue
> Damian
> > > reported in KAFKA-4609, since the problem he reported was that there
> was
> > no
> > > deduping cache after the join, only before it, unless you register a
> > state
> > > store representing the join itself. In your case, it looks like you do
> > > register a state store representing the join, the
> > > "CLAIM_AND_PAYMENT_JOIN_STORE".
> > > So you will have a cache that can dedup the join result.
> > >
> > > Note that the join itself is what causes duplicates, not the
> subsequent "
> > > claimAndPaymentKTable.toStream()". For example, if I see input like
> > this:
> > >
> > > (left stream):
> > > t1: k1 -> L1
> > > t3: k1 -> L1
> > >
> > > (right stream):
> > > t2: k1 -> R1
> > >
> > > Then, without deduplication, the resulting join would be:
> > > (left.join(right) stream):
> > > t1: k1 -> (L1, null)
> > > t2: k1 -> (L1, R1)
> > > t3: k1 -> (L1, R1)
> > >
> > > Note that we see apparently duplicate join results, but really the
> > meaning
> > > of the join stream is that "as of right now, this is the value for this
> > > key", so from the join's perspective it's not wrong to say "as of t2,
> > k1's
> > > value is (L1, R1)" and then to say it at t3 again.
> > >
> > > In Kafka Streams, there is a deduplication cache which can reduce such
> > > duplicate events, but without unbounded memory, the cache can't
> guarantee
> > > to remove all duplicates, so it's important to deal with the join
> result
> > in
> > > a semantically robust way.
> > >
> > > I think this also contains the key to resolving your issue; inside your
> > > aggregator, instead of storing a list of *every event*, I think you'll
> > want
> > > to store a map of the *latest event by key*. (This would be the key
> > that's
> > > common to claimStrGrouped, paymentGrouped, and claimAndPaymentKTable).
> > This
> > > way, you'll automatically overwrite old, obsolete, join results with
> new
> > > ones for the same key (whether or not the old result happens to be the
> > same
> > > as the new one).
> > >
> > > Does this help?
> > > -John
> > >
> > > On Mon, Apr 30, 2018 at 1:19 AM, Artur Mrozowski <art...@gmail.com>
> > wrote:
> > >
> > > > Hi,
> > > > a while ago I hit KAFKA-4609 when running a simple pipeline. I have
> two
> > > > KTable joins followed by group by and aggregate on and KStream  and
> one
> > > > additional join. Now this KTable/KTable join followed by group by
> and
> > > > aggregated genereates duplicates.
> > > >
> > > >
> > > >
> > > > I wonder if a possible workaround would be to remove the KStream
> after
> > > > KTable/KTable join and make groupBy and aggregate  on the KTable?
> > > >
> > > >
> > > >  KTable<Integer,CustomerAndPolicy> customerAndPolicyGroupedKTable =
> > > > customerGrouped.leftJoin(policyGrouped,(customer, policy) -> new
> > > > CustomerAndPolicy(customer,policy));
> > > >
> > > >        KTable<String,ClaimAndPayment> claimAndPaymentKTable =
> > > > claimStrGrouped.leftJoin(paymentGrouped,(claim,payment) -> new
> > > > ClaimAndPayment(claim,payment),claimAndPaymentSerde,CLAIM_
> > > > AND_PAYMENT_JOIN_STORE);
> > > >
> > > >
> > > >  *     KStream<String,ClaimAndPayment> claimAndPaymentKStream =
> > > > claimAndPaymentKTable.toStream(); //Can we remove this and avoid
> > > > KAFKA-4609?*
> > > >
> > > >        KTable<Integer,ClaimAndPayment2>
> claimAndPayment2IntGroupedTabl
> > e
> > > =
> > > > claimAndPaymentKStream
> > > >                .groupBy((k,claimPay) ->
> > > >                    (claimPay.claimList != null ) ?
> > > >
> > > > Integer.parseInt(claimPay.claimList.claimRecords.get(0).
> > > > claimnumber.split("_")[0])
> > > > :  999,integerSerde,claimAndPaymentSerde )
> > > >                .aggregate(
> > > >                         ClaimAndPayment2::new,
> > > >                         (claimKey,claimPay,claimAndPay2) -> {
> > > >
> > > >
> > > > claimAndPay2.claimAndPaymentList.add(claimPay);
> > > >
> > > >                                 return claimAndPay2;
> > > >
> > > >                         }
> > > >                         ,claimAndPayment2Serde
> > > >                         ,CLAIM_AND_PAYMENT_STORE
> > > >                 );
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > Best regards
> > > > Artur Mrozowski
> > > >
> > >
> >
>

Reply via email to