Hi Gouzhang, Here it is:
topology.stream(MAILBOX_OPERATION_REQUESTS, Consumed.with(byteStringSerde, mailboxOperationRequestSerde)) .flatMap(entityTopologyProcessor::distributeMailboxOperation) .groupByKey(Serialized.with(byteStringSerde, mailboxOperationRequestSerde)) .reduce((a, b) -> b, Materialized.with(byteStringSerde, mailboxOperationRequestSerde)); On Wed, Jan 24, 2018 at 4:43 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Dmitry, > > For your topology it is not expected to happen, could you elaborate a bit > more on your code snippet as well as the input data? Is there a good way to > re-produce it? > > > Guozhang > > > On Wed, Jan 24, 2018 at 11:50 AM, Dmitry Minkovsky <dminkov...@gmail.com> > wrote: > > > Oh I'm sorry—my situation is even simpler. I have a KStream -> group by > -> > > reduce. It emits duplicate key/value/timestamps (i.e. total duplicates). > > > > On Wed, Jan 24, 2018 at 2:42 PM, Dmitry Minkovsky <dminkov...@gmail.com> > > wrote: > > > > > Can someone explain what is causing this? I am experiencing this too. > My > > > `buffered.records.per.partition` and `cache.max.bytes.buffering` are > at > > > their default values, so quite substantial. I tried raising them but it > > had > > > no effect. > > > > > > On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski <art...@gmail.com> > > wrote: > > > > > >> Hi > > >> I run an app where I transform KTable to stream and then I groupBy and > > >> aggregate and capture the results in KTable again. That generates many > > >> duplicates. > > >> > > >> I have played with exactly once semantics that seems to reduce > > duplicates > > >> for records that should be unique. But I still get duplicates on keys > > that > > >> have two or more records. > > >> > > >> I could not reproduce it on small number of records so I disable > caching > > >> by > > >> setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I got > > loads > > >> of duplicates, even these previously eliminated by exactly once > > semantics. > > >> Now I have hard time to enable it again on Confluent 3.3. > > >> > > >> But, generally what it the best deduplication strategy for Kafka > > Streams? > > >> > > >> Artur > > >> > > > > > > > > > > > > -- > -- Guozhang >