The Jira priority just increase by your report! Of course, we are always happy about pull request :D
-Matthias On 12/18/17 1:27 PM, Artur Mrozowski wrote: > Yes, sounds like it. We run into problems at exactly same spot using BEAM > as well, although in that case it resulted in data loss. > > Thank you Matthias. Doesn't sound like it's going to be resolved any time > soon, does it? > > /Artur > > On Mon, Dec 18, 2017 at 8:11 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> I think you are hitting: https://issues.apache.org/jira/browse/KAFKA-4609 >> >> >> -Matthias >> >> On 12/18/17 1:52 AM, Artur Mrozowski wrote: >>> Hi Bill, >>> >>> I am actually referring to duplicates as completely identical records. I >>> can observe it when I convert result of left join between KTables to >>> stream. The resulting stream will often contain identical messages. >>> For example we have >>> >>> KTable<String,ArrayList> left >>> {"claimcounter": 0, "claimreporttime": 55948.33110985625, "claimnumber": >>> "3_0", "claimtime": 708.521153490306} >>> >>> and KTable <String,ArrayList> right >>> >>> {"claimcounter": 0, "paytime": 55960.226718985265, "claimnumber": "3_0", >>> "payment": 847015.1437781961} >>> >>> When I leftjoin theses two objects the result in the state store will be >> an >>> object containing two ArrayLists left and right, like this >>> >>> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime" >> :"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":" >> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961," >> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}} >>> >>> But I want to continue processing the results by using groupBy and >>> aggregate so I convert reuslt of the leftjoin to stream. Now the >> resulting >>> reparation and changelog topics,most of the time, will contain two >>> identical messages, like this >>> >>> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime" >> :"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":" >> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961," >> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}} >>> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime" >> :"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":" >> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961," >> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}} >>> >>> >>> and hence passing duplicates to the next operation. >>> >>> My question is what is the best practice to avoid that? >>> >>> https://github.com/afuyo/KStreamsDemo/blob/master/src/ >> main/java/kstream.demo/CustomerStreamPipelineHDI.java#L423 >>> >>> Best regards >>> Artur >>> >>> >>> On Wed, Dec 13, 2017 at 3:42 PM, Bill Bejeck <b...@confluent.io> wrote: >>> >>>> Hi Artur, >>>> >>>> The most direct way for deduplication (I'm using the term deduplication >> to >>>> mean records with the same key, but not necessarily the same value, >> where >>>> later records are considered) is to set the CACHE_MAX_BYTES_BUFFERING_ >>>> CONFIG >>>> setting to a value greater than zero. >>>> >>>> Your other option is to use the PAPI and by writing your own logic in >>>> conjunction with a state store determine what constitutes a duplicate >> and >>>> when to emit a record. You could take the same approach in the DSL >> layer >>>> using a Transformer. >>>> >>>> HTH. >>>> >>>> Bill >>>> >>>> On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski <art...@gmail.com> >> wrote: >>>> >>>>> Hi >>>>> I run an app where I transform KTable to stream and then I groupBy and >>>>> aggregate and capture the results in KTable again. That generates many >>>>> duplicates. >>>>> >>>>> I have played with exactly once semantics that seems to reduce >> duplicates >>>>> for records that should be unique. But I still get duplicates on keys >>>> that >>>>> have two or more records. >>>>> >>>>> I could not reproduce it on small number of records so I disable >> caching >>>> by >>>>> setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I got >>>> loads >>>>> of duplicates, even these previously eliminated by exactly once >>>> semantics. >>>>> Now I have hard time to enable it again on Confluent 3.3. >>>>> >>>>> But, generally what it the best deduplication strategy for Kafka >> Streams? >>>>> >>>>> Artur >>>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature