Hello Dmitry, What does your distributeMailboxOperation in the flatMap do? Would it possibly generates multiple records for the follow-up aggregation for each input?
Guozhang On Thu, Jan 25, 2018 at 6:54 AM, Dmitry Minkovsky <dminkov...@gmail.com> wrote: > You may not be surprised that after further investigation it turns out this > was related to some logic in my topology. > > On Wed, Jan 24, 2018 at 5:43 PM, Dmitry Minkovsky <dminkov...@gmail.com> > wrote: > > > Hi Gouzhang, > > > > Here it is: > > > > topology.stream(MAILBOX_OPERATION_REQUESTS, > > Consumed.with(byteStringSerde, mailboxOperationRequestSerde)) > > .flatMap(entityTopologyProcessor::distributeMailboxOperation) > > .groupByKey(Serialized.with(byteStringSerde, > > mailboxOperationRequestSerde)) > > .reduce((a, b) -> b, Materialized.with(byteStringSerde, > > mailboxOperationRequestSerde)); > > > > > > > > > > On Wed, Jan 24, 2018 at 4:43 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > >> Dmitry, > >> > >> For your topology it is not expected to happen, could you elaborate a > bit > >> more on your code snippet as well as the input data? Is there a good way > >> to > >> re-produce it? > >> > >> > >> Guozhang > >> > >> > >> On Wed, Jan 24, 2018 at 11:50 AM, Dmitry Minkovsky < > dminkov...@gmail.com> > >> wrote: > >> > >> > Oh I'm sorry—my situation is even simpler. I have a KStream -> group > by > >> -> > >> > reduce. It emits duplicate key/value/timestamps (i.e. total > duplicates). > >> > > >> > On Wed, Jan 24, 2018 at 2:42 PM, Dmitry Minkovsky < > dminkov...@gmail.com > >> > > >> > wrote: > >> > > >> > > Can someone explain what is causing this? I am experiencing this > too. > >> My > >> > > `buffered.records.per.partition` and `cache.max.bytes.buffering` > are > >> at > >> > > their default values, so quite substantial. I tried raising them but > >> it > >> > had > >> > > no effect. > >> > > > >> > > On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski <art...@gmail.com> > >> > wrote: > >> > > > >> > >> Hi > >> > >> I run an app where I transform KTable to stream and then I groupBy > >> and > >> > >> aggregate and capture the results in KTable again. That generates > >> many > >> > >> duplicates. > >> > >> > >> > >> I have played with exactly once semantics that seems to reduce > >> > duplicates > >> > >> for records that should be unique. But I still get duplicates on > keys > >> > that > >> > >> have two or more records. > >> > >> > >> > >> I could not reproduce it on small number of records so I disable > >> caching > >> > >> by > >> > >> setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I > got > >> > loads > >> > >> of duplicates, even these previously eliminated by exactly once > >> > semantics. > >> > >> Now I have hard time to enable it again on Confluent 3.3. > >> > >> > >> > >> But, generally what it the best deduplication strategy for Kafka > >> > Streams? > >> > >> > >> > >> Artur > >> > >> > >> > > > >> > > > >> > > >> > >> > >> > >> -- > >> -- Guozhang > >> > > > > > -- -- Guozhang