Re: deduplication strategy for Kafka Streams DSL

2018-01-25 Thread Dmitry Minkovsky
Hi Gouzhang, I am sorry to have bothered you, but I figured out the problem and it was related to logic in my topology. Please disregard the question. Thank you! Dmitry On Thu, Jan 25, 2018 at 2:55 PM, Guozhang Wang wrote: > Hello Dmitry, > > What does your distributeMailboxOperation in the fl

Re: deduplication strategy for Kafka Streams DSL

2018-01-25 Thread Guozhang Wang
Hello Dmitry, What does your distributeMailboxOperation in the flatMap do? Would it possibly generates multiple records for the follow-up aggregation for each input? Guozhang On Thu, Jan 25, 2018 at 6:54 AM, Dmitry Minkovsky wrote: > You may not be surprised that after further investigation

Re: deduplication strategy for Kafka Streams DSL

2018-01-25 Thread Dmitry Minkovsky
You may not be surprised that after further investigation it turns out this was related to some logic in my topology. On Wed, Jan 24, 2018 at 5:43 PM, Dmitry Minkovsky wrote: > Hi Gouzhang, > > Here it is: > > topology.stream(MAILBOX_OPERATION_REQUESTS, > Consumed.with(byteStringSerde, mailboxOp

Re: deduplication strategy for Kafka Streams DSL

2018-01-24 Thread Dmitry Minkovsky
Hi Gouzhang, Here it is: topology.stream(MAILBOX_OPERATION_REQUESTS, Consumed.with(byteStringSerde, mailboxOperationRequestSerde)) .flatMap(entityTopologyProcessor::distributeMailboxOperation) .groupByKey(Serialized.with(byteStringSerde, mailboxOperationRequestSerde)) .reduce((a, b) -> b, M

Re: deduplication strategy for Kafka Streams DSL

2018-01-24 Thread Guozhang Wang
Dmitry, For your topology it is not expected to happen, could you elaborate a bit more on your code snippet as well as the input data? Is there a good way to re-produce it? Guozhang On Wed, Jan 24, 2018 at 11:50 AM, Dmitry Minkovsky wrote: > Oh I'm sorry—my situation is even simpler. I have

Re: deduplication strategy for Kafka Streams DSL

2018-01-24 Thread Dmitry Minkovsky
Oh I'm sorry—my situation is even simpler. I have a KStream -> group by -> reduce. It emits duplicate key/value/timestamps (i.e. total duplicates). On Wed, Jan 24, 2018 at 2:42 PM, Dmitry Minkovsky wrote: > Can someone explain what is causing this? I am experiencing this too. My > `buffered.reco

Re: deduplication strategy for Kafka Streams DSL

2018-01-24 Thread Dmitry Minkovsky
Can someone explain what is causing this? I am experiencing this too. My `buffered.records.per.partition` and `cache.max.bytes.buffering` are at their default values, so quite substantial. I tried raising them but it had no effect. On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski wrote: > Hi > I

Re: deduplication strategy for Kafka Streams DSL

2017-12-19 Thread Artur Mrozowski
Thanks a lot Matthias. Adding serde and state store as arguments in the left join solves the problem as described in jira. On Tue, Dec 19, 2017 at 12:18 AM, Matthias J. Sax wrote: > The Jira priority just increase by your report! > > Of course, we are always happy about pull request :D > > > -

Re: deduplication strategy for Kafka Streams DSL

2017-12-18 Thread Matthias J. Sax
The Jira priority just increase by your report! Of course, we are always happy about pull request :D -Matthias On 12/18/17 1:27 PM, Artur Mrozowski wrote: > Yes, sounds like it. We run into problems at exactly same spot using BEAM > as well, although in that case it resulted in data loss. > >

Re: deduplication strategy for Kafka Streams DSL

2017-12-18 Thread Artur Mrozowski
Yes, sounds like it. We run into problems at exactly same spot using BEAM as well, although in that case it resulted in data loss. Thank you Matthias. Doesn't sound like it's going to be resolved any time soon, does it? /Artur On Mon, Dec 18, 2017 at 8:11 PM, Matthias J. Sax wrote: > I think y

Re: deduplication strategy for Kafka Streams DSL

2017-12-18 Thread Matthias J. Sax
I think you are hitting: https://issues.apache.org/jira/browse/KAFKA-4609 -Matthias On 12/18/17 1:52 AM, Artur Mrozowski wrote: > Hi Bill, > > I am actually referring to duplicates as completely identical records. I > can observe it when I convert result of left join between KTables to > strea

Re: deduplication strategy for Kafka Streams DSL

2017-12-18 Thread Artur Mrozowski
Hi Bill, I am actually referring to duplicates as completely identical records. I can observe it when I convert result of left join between KTables to stream. The resulting stream will often contain identical messages. For example we have KTable left {"claimcounter": 0, "claimreporttime": 55948.

Re: deduplication strategy for Kafka Streams DSL

2017-12-13 Thread Bill Bejeck
Hi Artur, The most direct way for deduplication (I'm using the term deduplication to mean records with the same key, but not necessarily the same value, where later records are considered) is to set the CACHE_MAX_BYTES_BUFFERING_CONFIG setting to a value greater than zero. Your other option is t

deduplication strategy for Kafka Streams DSL

2017-12-13 Thread Artur Mrozowski
Hi I run an app where I transform KTable to stream and then I groupBy and aggregate and capture the results in KTable again. That generates many duplicates. I have played with exactly once semantics that seems to reduce duplicates for records that should be unique. But I still get duplicates on ke