Hello Dmitry,

What does your distributeMailboxOperation in the flatMap do? Would it
possibly generates multiple records for the follow-up aggregation for each
input?


Guozhang


On Thu, Jan 25, 2018 at 6:54 AM, Dmitry Minkovsky <dminkov...@gmail.com>
wrote:

> You may not be surprised that after further investigation it turns out this
> was related to some logic in my topology.
>
> On Wed, Jan 24, 2018 at 5:43 PM, Dmitry Minkovsky <dminkov...@gmail.com>
> wrote:
>
> > Hi Gouzhang,
> >
> > Here it is:
> >
> > topology.stream(MAILBOX_OPERATION_REQUESTS,
> > Consumed.with(byteStringSerde, mailboxOperationRequestSerde))
> >   .flatMap(entityTopologyProcessor::distributeMailboxOperation)
> >   .groupByKey(Serialized.with(byteStringSerde,
> > mailboxOperationRequestSerde))
> >   .reduce((a, b) -> b, Materialized.with(byteStringSerde,
> > mailboxOperationRequestSerde));
> >
> >
> >
> >
> > On Wed, Jan 24, 2018 at 4:43 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> >> Dmitry,
> >>
> >> For your topology it is not expected to happen, could you elaborate a
> bit
> >> more on your code snippet as well as the input data? Is there a good way
> >> to
> >> re-produce it?
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Wed, Jan 24, 2018 at 11:50 AM, Dmitry Minkovsky <
> dminkov...@gmail.com>
> >> wrote:
> >>
> >> > Oh I'm sorry—my situation is even simpler. I have a KStream -> group
> by
> >> ->
> >> > reduce. It emits duplicate key/value/timestamps (i.e. total
> duplicates).
> >> >
> >> > On Wed, Jan 24, 2018 at 2:42 PM, Dmitry Minkovsky <
> dminkov...@gmail.com
> >> >
> >> > wrote:
> >> >
> >> > > Can someone explain what is causing this? I am experiencing this
> too.
> >> My
> >> > > `buffered.records.per.partition` and `cache.max.bytes.buffering`
> are
> >> at
> >> > > their default values, so quite substantial. I tried raising them but
> >> it
> >> > had
> >> > > no effect.
> >> > >
> >> > > On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski <art...@gmail.com>
> >> > wrote:
> >> > >
> >> > >> Hi
> >> > >> I run an app where I transform KTable to stream and then I groupBy
> >> and
> >> > >> aggregate and capture the results in KTable again. That generates
> >> many
> >> > >> duplicates.
> >> > >>
> >> > >> I have played with exactly once semantics that seems to reduce
> >> > duplicates
> >> > >> for records that should be unique. But I still get duplicates on
> keys
> >> > that
> >> > >> have two or more records.
> >> > >>
> >> > >> I could not reproduce it on small number of records so I disable
> >> caching
> >> > >> by
> >> > >> setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I
> got
> >> > loads
> >> > >> of duplicates, even these previously eliminated by exactly once
> >> > semantics.
> >> > >> Now I have hard time to enable it again on Confluent 3.3.
> >> > >>
> >> > >> But, generally what it the best deduplication strategy for Kafka
> >> > Streams?
> >> > >>
> >> > >> Artur
> >> > >>
> >> > >
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
-- Guozhang

Reply via email to