Hi Gouzhang,

I am sorry to have bothered you, but I figured out the problem and it was
related to logic in my topology. Please disregard the question.

Thank you!
Dmitry

On Thu, Jan 25, 2018 at 2:55 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Dmitry,
>
> What does your distributeMailboxOperation in the flatMap do? Would it
> possibly generates multiple records for the follow-up aggregation for each
> input?
>
>
> Guozhang
>
>
> On Thu, Jan 25, 2018 at 6:54 AM, Dmitry Minkovsky <dminkov...@gmail.com>
> wrote:
>
> > You may not be surprised that after further investigation it turns out
> this
> > was related to some logic in my topology.
> >
> > On Wed, Jan 24, 2018 at 5:43 PM, Dmitry Minkovsky <dminkov...@gmail.com>
> > wrote:
> >
> > > Hi Gouzhang,
> > >
> > > Here it is:
> > >
> > > topology.stream(MAILBOX_OPERATION_REQUESTS,
> > > Consumed.with(byteStringSerde, mailboxOperationRequestSerde))
> > >   .flatMap(entityTopologyProcessor::distributeMailboxOperation)
> > >   .groupByKey(Serialized.with(byteStringSerde,
> > > mailboxOperationRequestSerde))
> > >   .reduce((a, b) -> b, Materialized.with(byteStringSerde,
> > > mailboxOperationRequestSerde));
> > >
> > >
> > >
> > >
> > > On Wed, Jan 24, 2018 at 4:43 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > >> Dmitry,
> > >>
> > >> For your topology it is not expected to happen, could you elaborate a
> > bit
> > >> more on your code snippet as well as the input data? Is there a good
> way
> > >> to
> > >> re-produce it?
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Wed, Jan 24, 2018 at 11:50 AM, Dmitry Minkovsky <
> > dminkov...@gmail.com>
> > >> wrote:
> > >>
> > >> > Oh I'm sorry—my situation is even simpler. I have a KStream -> group
> > by
> > >> ->
> > >> > reduce. It emits duplicate key/value/timestamps (i.e. total
> > duplicates).
> > >> >
> > >> > On Wed, Jan 24, 2018 at 2:42 PM, Dmitry Minkovsky <
> > dminkov...@gmail.com
> > >> >
> > >> > wrote:
> > >> >
> > >> > > Can someone explain what is causing this? I am experiencing this
> > too.
> > >> My
> > >> > > `buffered.records.per.partition` and `cache.max.bytes.buffering`
> > are
> > >> at
> > >> > > their default values, so quite substantial. I tried raising them
> but
> > >> it
> > >> > had
> > >> > > no effect.
> > >> > >
> > >> > > On Wed, Dec 13, 2017 at 7:00 AM, Artur Mrozowski <
> art...@gmail.com>
> > >> > wrote:
> > >> > >
> > >> > >> Hi
> > >> > >> I run an app where I transform KTable to stream and then I
> groupBy
> > >> and
> > >> > >> aggregate and capture the results in KTable again. That generates
> > >> many
> > >> > >> duplicates.
> > >> > >>
> > >> > >> I have played with exactly once semantics that seems to reduce
> > >> > duplicates
> > >> > >> for records that should be unique. But I still get duplicates on
> > keys
> > >> > that
> > >> > >> have two or more records.
> > >> > >>
> > >> > >> I could not reproduce it on small number of records so I disable
> > >> caching
> > >> > >> by
> > >> > >> setting CACHE_MAX_BYTES_BUFFERING_CONFIG to 0. Surely enough, I
> > got
> > >> > loads
> > >> > >> of duplicates, even these previously eliminated by exactly once
> > >> > semantics.
> > >> > >> Now I have hard time to enable it again on Confluent 3.3.
> > >> > >>
> > >> > >> But, generally what it the best deduplication strategy for Kafka
> > >> > Streams?
> > >> > >>
> > >> > >> Artur
> > >> > >>
> > >> > >
> > >> > >
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to