You may not be surprised that after further investigation it turns out this was related to some logic in my topology.
On Wed, Jan 24, 2018 at 5:43 PM, Dmitry Minkovsky <dminkov...@gmail.com> wrote: > Hi Gouzhang, > > Here it is: > > topology.stream(MAILBOX_OPERATION_REQUESTS, > Consumed.with(byteStringSerde, mailboxOperationRequestSerde)) > .flatMap(entityTopologyProcessor::distributeMailboxOperation) > .groupByKey(Serialized.with(byteStringSerde, > mailboxOperationRequestSerde)) > .reduce((a, b) -> b, Materialized.with(byteStringSerde, > mailboxOperationRequestSerde)); > > > > > On Wed, Jan 24, 2018 at 4:43 PM, Guozhang Wang <wangg...@gmail.com> wrote: > >> Dmitry, >> >> For your topology it is not expected to happen, could you elaborate a bit >> more on your code snippet as well as the input data? Is there a good way >> to >> re-produce it? >> >> >> Guozhang >> >> >> On Wed, Jan 24, 2018 at 11:50 AM, Dmitry Minkovsky <dminkov...@gmail.com> >> wrote: >> >> > Oh I'm sorry—my situation is even simpler. I have a KStream -> group by >> -> >> > reduce. It emits duplicate key/value/timestamps (i.e. total duplicates). >> > >> > On Wed, Jan 24, 2018 at 2:42 PM, Dmitry Minkovsky <dminkov...@gmail.com >> > >> > wrote: >> > >> > > Can someone explain what is causing this? I am experiencing this too. >> My >> > > `buffered.records.per.partition` and `cache.max.bytes.buffering` are >> at >> > > their default values, so quite substantial. I tried raising them but >> it >> > had >> > > no effect. >> > > >> > > On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski <art...@gmail.com> >> > wrote: >> > > >> > >> Hi >> > >> I run an app where I transform KTable to stream and then I groupBy >> and >> > >> aggregate and capture the results in KTable again. That generates >> many >> > >> duplicates. >> > >> >> > >> I have played with exactly once semantics that seems to reduce >> > duplicates >> > >> for records that should be unique. But I still get duplicates on keys >> > that >> > >> have two or more records. >> > >> >> > >> I could not reproduce it on small number of records so I disable >> caching >> > >> by >> > >> setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I got >> > loads >> > >> of duplicates, even these previously eliminated by exactly once >> > semantics. >> > >> Now I have hard time to enable it again on Confluent 3.3. >> > >> >> > >> But, generally what it the best deduplication strategy for Kafka >> > Streams? >> > >> >> > >> Artur >> > >> >> > > >> > > >> > >> >> >> >> -- >> -- Guozhang >> > >