Thanks a lot Matthias. Adding serde and state store as arguments in the left join solves the problem as described in jira.
On Tue, Dec 19, 2017 at 12:18 AM, Matthias J. Sax <matth...@confluent.io> wrote: > The Jira priority just increase by your report! > > Of course, we are always happy about pull request :D > > > -Matthias > > On 12/18/17 1:27 PM, Artur Mrozowski wrote: > > Yes, sounds like it. We run into problems at exactly same spot using BEAM > > as well, although in that case it resulted in data loss. > > > > Thank you Matthias. Doesn't sound like it's going to be resolved any time > > soon, does it? > > > > /Artur > > > > On Mon, Dec 18, 2017 at 8:11 PM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> I think you are hitting: https://issues.apache.org/ > jira/browse/KAFKA-4609 > >> > >> > >> -Matthias > >> > >> On 12/18/17 1:52 AM, Artur Mrozowski wrote: > >>> Hi Bill, > >>> > >>> I am actually referring to duplicates as completely identical > records. I > >>> can observe it when I convert result of left join between KTables to > >>> stream. The resulting stream will often contain identical messages. > >>> For example we have > >>> > >>> KTable<String,ArrayList> left > >>> {"claimcounter": 0, "claimreporttime": 55948.33110985625, > "claimnumber": > >>> "3_0", "claimtime": 708.521153490306} > >>> > >>> and KTable <String,ArrayList> right > >>> > >>> {"claimcounter": 0, "paytime": 55960.226718985265, "claimnumber": > "3_0", > >>> "payment": 847015.1437781961} > >>> > >>> When I leftjoin theses two objects the result in the state store will > be > >> an > >>> object containing two ArrayLists left and right, like this > >>> > >>> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime" > >> :"708.521153490306","claimreporttime":"55948. > 33110985625","claimcounter":" > >> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961," > >> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}} > >>> > >>> But I want to continue processing the results by using groupBy and > >>> aggregate so I convert reuslt of the leftjoin to stream. Now the > >> resulting > >>> reparation and changelog topics,most of the time, will contain two > >>> identical messages, like this > >>> > >>> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime" > >> :"708.521153490306","claimreporttime":"55948. > 33110985625","claimcounter":" > >> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961," > >> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}} > >>> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime" > >> :"708.521153490306","claimreporttime":"55948. > 33110985625","claimcounter":" > >> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961," > >> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}} > >>> > >>> > >>> and hence passing duplicates to the next operation. > >>> > >>> My question is what is the best practice to avoid that? > >>> > >>> https://github.com/afuyo/KStreamsDemo/blob/master/src/ > >> main/java/kstream.demo/CustomerStreamPipelineHDI.java#L423 > >>> > >>> Best regards > >>> Artur > >>> > >>> > >>> On Wed, Dec 13, 2017 at 3:42 PM, Bill Bejeck <b...@confluent.io> > wrote: > >>> > >>>> Hi Artur, > >>>> > >>>> The most direct way for deduplication (I'm using the term > deduplication > >> to > >>>> mean records with the same key, but not necessarily the same value, > >> where > >>>> later records are considered) is to set the > CACHE_MAX_BYTES_BUFFERING_ > >>>> CONFIG > >>>> setting to a value greater than zero. > >>>> > >>>> Your other option is to use the PAPI and by writing your own logic in > >>>> conjunction with a state store determine what constitutes a duplicate > >> and > >>>> when to emit a record. You could take the same approach in the DSL > >> layer > >>>> using a Transformer. > >>>> > >>>> HTH. > >>>> > >>>> Bill > >>>> > >>>> On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski <art...@gmail.com> > >> wrote: > >>>> > >>>>> Hi > >>>>> I run an app where I transform KTable to stream and then I groupBy > and > >>>>> aggregate and capture the results in KTable again. That generates > many > >>>>> duplicates. > >>>>> > >>>>> I have played with exactly once semantics that seems to reduce > >> duplicates > >>>>> for records that should be unique. But I still get duplicates on keys > >>>> that > >>>>> have two or more records. > >>>>> > >>>>> I could not reproduce it on small number of records so I disable > >> caching > >>>> by > >>>>> setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I got > >>>> loads > >>>>> of duplicates, even these previously eliminated by exactly once > >>>> semantics. > >>>>> Now I have hard time to enable it again on Confluent 3.3. > >>>>> > >>>>> But, generally what it the best deduplication strategy for Kafka > >> Streams? > >>>>> > >>>>> Artur > >>>>> > >>>> > >>> > >> > >> > > > >