Hi,

The first selectKey/groupBy/windowedBy/reduce is to group messages by
key and drop duplicated messages based on the new key, so that for
each 1hr time window, each key will only populate 1 message. I use
suppress() is to make sure only the latest message per time window
will be sent.

The second selectKey/groupBy/reduce is to do counting. After deduping,
count how many different messages per window.

e.g. The first groupBy is grouped by key (A + B), and drop all
duplicates. The second groupBy is grouped by key (window start time +
A). If I comment out suppress(), I assume all updates during reduce()
will be populated to next step?

Thanks!

On Wed, Sep 25, 2019 at 11:16 PM Alex Brekken <brek...@gmail.com> wrote:
>
> You might want to try temporarily commenting the suppress() call just to
> see if that's the cause of the issue. That said, what is the goal of this
> topology? It looks like you're trying to produce a count at the end for a
> key.  Is the windowedBy() and suppress() there just to eliminate
> duplicates, or do you need the final results to be grouped by the 60 minute
> window?
>
> Alex
>
> On Wed, Sep 25, 2019 at 9:26 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote:
>
> > Hi Alex,
> > Thanks for the reply!
> >
> > Yes. After deploy with same application ID, source topic has new
> > messages and the application is consuming them but no output at the
> > end.
> > suppress call is:
> > .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> >
> > Topology is like below:
> >
> > final KStream<String, byte[]> source = builder.stream(inputTopic);
> > KStream<String, Event> deserializedStream = source.mapValues( ... });
> >
> > KStream<Windowed<String>, Event> dedupedStream =
> > deserializedStream.selectKey( ... )
> > .groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(Event.class)))
> > .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
> > .reduce((value1, value2) -> value2)
> > .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > .toStream();
> >
> > dedupedStream.selectKey( ... )
> > .mapValues( ... )
> > .filter(...)
> > .groupByKey(Grouped.with(Serdes.String(), new MessagetSerde()))
> > .reduce((value1, value2) -> {
> >     long count1 = value1.getCount();
> >     long count2 = value2.getCount();
> >     value2.setCount(count1 + count2);
> >     return value2;
> > }
> > )
> > .toStream()
> > .selectKey( ... )
> > .to(outputTopic);
> >
> > On Wed, Sep 25, 2019 at 10:14 PM Alex Brekken <brek...@gmail.com> wrote:
> > >
> > > Hi Xiyuan, just to clarify: after you restart the application (using the
> > > same application ID as previously) there ARE new messages in the source
> > > topic and your application IS consuming them, but you're not seeing any
> > > output at the end?  How are you configuring your suppress() call?  Is it
> > > possible that messages are being held there and not emitted further
> > > downstream? Does commenting the suppress call cause data to flow all the
> > > way through?  In order to help further we might need to see your actual
> > > topology code if that's possible.
> > >
> > > Alex
> > >
> > > On Wed, Sep 25, 2019 at 2:17 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > If I change application id, it will start process new messages I
> > > > assume? The old data will be dropped. But this solution will not work
> > > > during production deployment, since we can't change application id for
> > > > each release.
> > > >
> > > > My code looks like below:
> > > >
> > > > builder.stream(topicName)
> > > > .mapValues()
> > > > stream.selectKey(selectKey A)
> > > > .groupByKey(..)
> > > >
> > .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
> > > > .reduce((value1,value2) -> value2)
> > > > .suppress
> > > > .toStreams()
> > > > .selectKey(selectKey B)
> > > > .mapValues()
> > > > .filter()
> > > > .groupByKey()
> > > > .reduce
> > > > .toStream()
> > > > .to()
> > > >
> > > > It will create 5 internal topics:
> > > > 03-repartition, 03-changelog, 09-changelog, 14-repartition,
> > 14-changelog.
> > > >
> > > > When I restart/redeployment the application, only 03-repartition has
> > > > traffic and messages, but no out-traffic. Other internal topics have
> > > > no traffic at all after restart/redeployment.
> > > > It only works when I change the application ID. Should I include
> > > > streams.cleanUp() before start the stream each time? Or anything else
> > > > goes wrong?
> > > >
> > > > Thanks a lot!
> > > >
> > > > On Wed, Sep 25, 2019 at 2:35 PM Boyang Chen <
> > reluctanthero...@gmail.com>
> > > > wrote:
> > > > >
> > > > > Hey Xiyuan,
> > > > >
> > > > > I would assume it's easier for us to help you by reading your
> > application
> > > > > with a full paste of code (a prototype). Changing application id
> > would
> > > > work
> > > > > suggests that re-process all the data again shall work, do I
> > understand
> > > > > that correctly?
> > > > >
> > > > > Boyang
> > > > >
> > > > > On Wed, Sep 25, 2019 at 8:16 AM Xiyuan Hu <xiyuan.h...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I'm running a Kafka streams app(v2.1.0) with windowed
> > function(reduce
> > > > > > and suppress). One thing I noticed is, every time when I
> > redeployment
> > > > > > or restart the application, I have to change the application ID to
> > a
> > > > > > new one, otherwise, only the reduce-repartition internal topic has
> > > > > > input traffic(and it has no out-traffic), all other internal topics
> > > > > > has no traffic as all. Looks like it just flows into the first
> > > > > > internal repartition topic(reduce-repartiton), the reduce-changelog
> > > > > > has no traffic and no output traffic as well.
> > > > > >
> > > > > > Could anyone know what's wrong with it? Changing application Id and
> > > > > > create new internal topics each time seems not the right thing to
> > go
> > > > > > with.
> > > > > >
> > > > > > I started the app like below:
> > > > > >
> > > > > > streams = new KafkaStreams(topology.getTopology(config),
> > > > > > properties.getProperties());
> > > > > > streams.start();
> > > > > >
> > > > > > Any help would be appreciated! Thanks!
> > > > > >
> > > >
> >

Reply via email to