The Jira priority just increase by your report!

Of course, we are always happy about pull request :D


-Matthias

On 12/18/17 1:27 PM, Artur Mrozowski wrote:
> Yes, sounds like it. We run into problems at exactly same spot using BEAM
> as well, although in that case it resulted in data loss.
> 
> Thank you Matthias. Doesn't sound like it's going to be resolved any time
> soon, does it?
> 
> /Artur
> 
> On Mon, Dec 18, 2017 at 8:11 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> I think you are hitting: https://issues.apache.org/jira/browse/KAFKA-4609
>>
>>
>> -Matthias
>>
>> On 12/18/17 1:52 AM, Artur Mrozowski wrote:
>>> Hi Bill,
>>>
>>>  I am actually referring to duplicates as completely identical records. I
>>> can observe it when I convert result of left join between KTables to
>>> stream. The resulting stream will often contain identical messages.
>>> For example we have
>>>
>>> KTable<String,ArrayList> left
>>> {"claimcounter": 0, "claimreporttime": 55948.33110985625, "claimnumber":
>>> "3_0", "claimtime": 708.521153490306}
>>>
>>> and KTable <String,ArrayList> right
>>>
>>> {"claimcounter": 0, "paytime": 55960.226718985265, "claimnumber": "3_0",
>>> "payment": 847015.1437781961}
>>>
>>> When I leftjoin theses two objects the result in the state store will be
>> an
>>> object containing two  ArrayLists left and right, like this
>>>
>>> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime"
>> :"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"
>> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
>> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
>>>
>>> But I want to continue processing the results by using groupBy and
>>> aggregate so I convert reuslt of the leftjoin to stream. Now the
>> resulting
>>> reparation and changelog topics,most of the time, will contain two
>>> identical messages, like this
>>>
>>> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime"
>> :"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"
>> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
>> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
>>> {"claimList":{"lst":[{"claimnumber":"3_0","claimtime"
>> :"708.521153490306","claimreporttime":"55948.33110985625","claimcounter":"
>> 0"}]},"paymentList":{"lst":[{"payment":847015.1437781961,"
>> paytime":55960.226718985265,"claimcounter":0,"claimnumber":"3_0"}]}}
>>>
>>>
>>> and hence passing duplicates to the next operation.
>>>
>>> My question is what is the best practice to avoid that?
>>>
>>> https://github.com/afuyo/KStreamsDemo/blob/master/src/
>> main/java/kstream.demo/CustomerStreamPipelineHDI.java#L423
>>>
>>> Best regards
>>> Artur
>>>
>>>
>>> On Wed, Dec 13, 2017 at 3:42 PM, Bill Bejeck <b...@confluent.io> wrote:
>>>
>>>> Hi Artur,
>>>>
>>>> The most direct way for deduplication (I'm using the term deduplication
>> to
>>>> mean records with the same key, but not necessarily the same value,
>> where
>>>> later records are considered) is to set the  CACHE_MAX_BYTES_BUFFERING_
>>>> CONFIG
>>>> setting to a value greater than zero.
>>>>
>>>> Your other option is to use the PAPI and by writing your own logic in
>>>> conjunction with a state store determine what constitutes a duplicate
>> and
>>>> when to emit a record.  You could take the same approach in the DSL
>> layer
>>>> using a Transformer.
>>>>
>>>> HTH.
>>>>
>>>> Bill
>>>>
>>>> On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski <art...@gmail.com>
>> wrote:
>>>>
>>>>> Hi
>>>>> I run an app where I transform KTable to stream and then I groupBy and
>>>>> aggregate and capture the results in KTable again. That generates many
>>>>> duplicates.
>>>>>
>>>>> I have played with exactly once semantics that seems to reduce
>> duplicates
>>>>> for records that should be unique. But I still get duplicates on keys
>>>> that
>>>>> have two or more records.
>>>>>
>>>>> I could not reproduce it on small number of records so I disable
>> caching
>>>> by
>>>>> setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I got
>>>> loads
>>>>> of duplicates, even these previously eliminated by exactly once
>>>> semantics.
>>>>> Now I have hard time to enable it again on Confluent 3.3.
>>>>>
>>>>> But, generally what it the best deduplication strategy for Kafka
>> Streams?
>>>>>
>>>>> Artur
>>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to