Hi Gouzhang, I am sorry to have bothered you, but I figured out the problem and it was related to logic in my topology. Please disregard the question.
Thank you! Dmitry On Thu, Jan 25, 2018 at 2:55 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Dmitry, > > What does your distributeMailboxOperation in the flatMap do? Would it > possibly generates multiple records for the follow-up aggregation for each > input? > > > Guozhang > > > On Thu, Jan 25, 2018 at 6:54 AM, Dmitry Minkovsky <dminkov...@gmail.com> > wrote: > > > You may not be surprised that after further investigation it turns out > this > > was related to some logic in my topology. > > > > On Wed, Jan 24, 2018 at 5:43 PM, Dmitry Minkovsky <dminkov...@gmail.com> > > wrote: > > > > > Hi Gouzhang, > > > > > > Here it is: > > > > > > topology.stream(MAILBOX_OPERATION_REQUESTS, > > > Consumed.with(byteStringSerde, mailboxOperationRequestSerde)) > > > .flatMap(entityTopologyProcessor::distributeMailboxOperation) > > > .groupByKey(Serialized.with(byteStringSerde, > > > mailboxOperationRequestSerde)) > > > .reduce((a, b) -> b, Materialized.with(byteStringSerde, > > > mailboxOperationRequestSerde)); > > > > > > > > > > > > > > > On Wed, Jan 24, 2018 at 4:43 PM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > >> Dmitry, > > >> > > >> For your topology it is not expected to happen, could you elaborate a > > bit > > >> more on your code snippet as well as the input data? Is there a good > way > > >> to > > >> re-produce it? > > >> > > >> > > >> Guozhang > > >> > > >> > > >> On Wed, Jan 24, 2018 at 11:50 AM, Dmitry Minkovsky < > > dminkov...@gmail.com> > > >> wrote: > > >> > > >> > Oh I'm sorry—my situation is even simpler. I have a KStream -> group > > by > > >> -> > > >> > reduce. It emits duplicate key/value/timestamps (i.e. total > > duplicates). > > >> > > > >> > On Wed, Jan 24, 2018 at 2:42 PM, Dmitry Minkovsky < > > dminkov...@gmail.com > > >> > > > >> > wrote: > > >> > > > >> > > Can someone explain what is causing this? I am experiencing this > > too. > > >> My > > >> > > `buffered.records.per.partition` and `cache.max.bytes.buffering` > > are > > >> at > > >> > > their default values, so quite substantial. I tried raising them > but > > >> it > > >> > had > > >> > > no effect. > > >> > > > > >> > > On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski < > art...@gmail.com> > > >> > wrote: > > >> > > > > >> > >> Hi > > >> > >> I run an app where I transform KTable to stream and then I > groupBy > > >> and > > >> > >> aggregate and capture the results in KTable again. That generates > > >> many > > >> > >> duplicates. > > >> > >> > > >> > >> I have played with exactly once semantics that seems to reduce > > >> > duplicates > > >> > >> for records that should be unique. But I still get duplicates on > > keys > > >> > that > > >> > >> have two or more records. > > >> > >> > > >> > >> I could not reproduce it on small number of records so I disable > > >> caching > > >> > >> by > > >> > >> setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I > > got > > >> > loads > > >> > >> of duplicates, even these previously eliminated by exactly once > > >> > semantics. > > >> > >> Now I have hard time to enable it again on Confluent 3.3. > > >> > >> > > >> > >> But, generally what it the best deduplication strategy for Kafka > > >> > Streams? > > >> > >> > > >> > >> Artur > > >> > >> > > >> > > > > >> > > > > >> > > > >> > > >> > > >> > > >> -- > > >> -- Guozhang > > >> > > > > > > > > > > > > -- > -- Guozhang >