Thanks a lot Matthias.
 Adding serde and state store as arguments in the left join solves the
problem as described in  jira.

On Tue, Dec 19, 2017 at 12:18 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> The Jira priority just increase by your report!
>
> Of course, we are always happy about pull request :D
>
>
> -Matthias
>
> On 12/18/17 1:27 PM, Artur Mrozowski wrote:
> > Yes, sounds like it. We run into problems at exactly same spot using BEAM
> > as well, although in that case it resulted in data loss.
> >
> > Thank you Matthias. Doesn't sound like it's going to be resolved any time
> > soon, does it?
> >
> > /Artur
> >
> > On Mon, Dec 18, 2017 at 8:11 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> I think you are hitting: https://issues.apache.org/
> jira/browse/KAFKA-4609
> >>
> >>
> >> -Matthias
> >>
> >> On 12/18/17 1:52 AM, Artur Mrozowski wrote:
> >>> Hi Bill,
> >>>
> >>>  I am actually referring to duplicates as completely identical
> records. I
> >>> can observe it when I convert result of left join between KTables to
> >>> stream. The resulting stream will often contain identical messages.
> >>> For example we have
> >>>
> >>> KTable<String,ArrayList> left
> >>> {"claimcounter": 0, "claimreporttime": 55948.33110985625,
> "claimnumber":
> >>> "3_0", "claimtime": 708.521153490306}
> >>>
> >>> and KTable <String,ArrayList> right
> >>>
> >>> {"claimcounter": 0, "paytime": 55960.226718985265, "claimnumber":
> "3_0",
> >>> "payment": 847015.1437781961}
> >>>
> >>> When I leftjoin theses two objects the result in the state store will
> be
> >> an
> >>> object containing two  ArrayLists left and right, like this
> >>>
> >>> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime"
> >> :"708.521153490306","claimreporttime":"55948.
> 33110985625","claimcounter":"
> >> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
> >> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
> >>>
> >>> But I want to continue processing the results by using groupBy and
> >>> aggregate so I convert reuslt of the leftjoin to stream. Now the
> >> resulting
> >>> reparation and changelog topics,most of the time, will contain two
> >>> identical messages, like this
> >>>
> >>> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime"
> >> :"708.521153490306","claimreporttime":"55948.
> 33110985625","claimcounter":"
> >> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
> >> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
> >>> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime"
> >> :"708.521153490306","claimreporttime":"55948.
> 33110985625","claimcounter":"
> >> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
> >> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
> >>>
> >>>
> >>> and hence passing duplicates to the next operation.
> >>>
> >>> My question is what is the best practice to avoid that?
> >>>
> >>> https://github.com/afuyo/KStreamsDemo/blob/master/src/
> >> main/java/kstream.demo/CustomerStreamPipelineHDI.java#L423
> >>>
> >>> Best regards
> >>> Artur
> >>>
> >>>
> >>> On Wed, Dec 13, 2017 at 3:42 PM, Bill Bejeck <b...@confluent.io>
> wrote:
> >>>
> >>>> Hi Artur,
> >>>>
> >>>> The most direct way for deduplication (I'm using the term
> deduplication
> >> to
> >>>> mean records with the same key, but not necessarily the same value,
> >> where
> >>>> later records are considered) is to set the
> CACHE_MAX_BYTES_BUFFERING_
> >>>> CONFIG
> >>>> setting to a value greater than zero.
> >>>>
> >>>> Your other option is to use the PAPI and by writing your own logic in
> >>>> conjunction with a state store determine what constitutes a duplicate
> >> and
> >>>> when to emit a record.  You could take the same approach in the DSL
> >> layer
> >>>> using a Transformer.
> >>>>
> >>>> HTH.
> >>>>
> >>>> Bill
> >>>>
> >>>> On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski <art...@gmail.com>
> >> wrote:
> >>>>
> >>>>> Hi
> >>>>> I run an app where I transform KTable to stream and then I groupBy
> and
> >>>>> aggregate and capture the results in KTable again. That generates
> many
> >>>>> duplicates.
> >>>>>
> >>>>> I have played with exactly once semantics that seems to reduce
> >> duplicates
> >>>>> for records that should be unique. But I still get duplicates on keys
> >>>> that
> >>>>> have two or more records.
> >>>>>
> >>>>> I could not reproduce it on small number of records so I disable
> >> caching
> >>>> by
> >>>>> setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I got
> >>>> loads
> >>>>> of duplicates, even these previously eliminated by exactly once
> >>>> semantics.
> >>>>> Now I have hard time to enable it again on Confluent 3.3.
> >>>>>
> >>>>> But, generally what it the best deduplication strategy for Kafka
> >> Streams?
> >>>>>
> >>>>> Artur
> >>>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Reply via email to