So I'm not exactly sure why supress() isn't working for you, because it
should send out a final message once the window closes - assuming you're
still getting new messages flowing through the topology.  Have you tried
using the count function in KGroupedTable? It should handle duplicates
correctly.   So a slightly modified version or your topology might look
like this:

KStream<Windowed<String>, Event> dedupedStream =
deserializedStream.selectKey( ... )
.groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(Event.class)))
.windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
.reduce((value1, value2) -> value2)
.filter(...)
.groupBy() //change your key here as needed
.count() //this count function should handle duplicates and still come out
with the right answer
.toStream()
.selectKey( ... )
.to(outputTopic);

The idea here is you're not preventing duplicate messages from flowing
through, instead you're tolerating them and not allowing them to
incorrectly change your counts.  Also, you had a mapValues() call in there
too which creates another KTable.  It might not matter, but could you do
that as part of the reduce() step maybe?  (or replace the reduce with
aggregate which lets you have a different type than your input type). Then
your topology would only have 2 KTables instead of 3.  Good luck, if this
doesn't work then I'm out of ideas.  :).

Alex

On Thu, Sep 26, 2019 at 12:06 AM Xiyuan Hu <xiyuan.h...@gmail.com> wrote:

> Hi,
>
> The first selectKey/groupBy/windowedBy/reduce is to group messages by
> key and drop duplicated messages based on the new key, so that for
> each 1hr time window, each key will only populate 1 message. I use
> suppress() is to make sure only the latest message per time window
> will be sent.
>
> The second selectKey/groupBy/reduce is to do counting. After deduping,
> count how many different messages per window.
>
> e.g. The first groupBy is grouped by key (A + B), and drop all
> duplicates. The second groupBy is grouped by key (window start time +
> A). If I comment out suppress(), I assume all updates during reduce()
> will be populated to next step?
>
> Thanks!
>
> On Wed, Sep 25, 2019 at 11:16 PM Alex Brekken <brek...@gmail.com> wrote:
> >
> > You might want to try temporarily commenting the suppress() call just to
> > see if that's the cause of the issue. That said, what is the goal of this
> > topology? It looks like you're trying to produce a count at the end for a
> > key.  Is the windowedBy() and suppress() there just to eliminate
> > duplicates, or do you need the final results to be grouped by the 60
> minute
> > window?
> >
> > Alex
> >
> > On Wed, Sep 25, 2019 at 9:26 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote:
> >
> > > Hi Alex,
> > > Thanks for the reply!
> > >
> > > Yes. After deploy with same application ID, source topic has new
> > > messages and the application is consuming them but no output at the
> > > end.
> > > suppress call is:
> > > .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > >
> > > Topology is like below:
> > >
> > > final KStream<String, byte[]> source = builder.stream(inputTopic);
> > > KStream<String, Event> deserializedStream = source.mapValues( ... });
> > >
> > > KStream<Windowed<String>, Event> dedupedStream =
> > > deserializedStream.selectKey( ... )
> > > .groupByKey(Grouped.with(Serdes.String(), new
> JsonSerde<>(Event.class)))
> > >
> .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
> > > .reduce((value1, value2) -> value2)
> > > .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > .toStream();
> > >
> > > dedupedStream.selectKey( ... )
> > > .mapValues( ... )
> > > .filter(...)
> > > .groupByKey(Grouped.with(Serdes.String(), new MessagetSerde()))
> > > .reduce((value1, value2) -> {
> > >     long count1 = value1.getCount();
> > >     long count2 = value2.getCount();
> > >     value2.setCount(count1 + count2);
> > >     return value2;
> > > }
> > > )
> > > .toStream()
> > > .selectKey( ... )
> > > .to(outputTopic);
> > >
> > > On Wed, Sep 25, 2019 at 10:14 PM Alex Brekken <brek...@gmail.com>
> wrote:
> > > >
> > > > Hi Xiyuan, just to clarify: after you restart the application (using
> the
> > > > same application ID as previously) there ARE new messages in the
> source
> > > > topic and your application IS consuming them, but you're not seeing
> any
> > > > output at the end?  How are you configuring your suppress() call?
> Is it
> > > > possible that messages are being held there and not emitted further
> > > > downstream? Does commenting the suppress call cause data to flow all
> the
> > > > way through?  In order to help further we might need to see your
> actual
> > > > topology code if that's possible.
> > > >
> > > > Alex
> > > >
> > > > On Wed, Sep 25, 2019 at 2:17 PM Xiyuan Hu <xiyuan.h...@gmail.com>
> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > If I change application id, it will start process new messages I
> > > > > assume? The old data will be dropped. But this solution will not
> work
> > > > > during production deployment, since we can't change application id
> for
> > > > > each release.
> > > > >
> > > > > My code looks like below:
> > > > >
> > > > > builder.stream(topicName)
> > > > > .mapValues()
> > > > > stream.selectKey(selectKey A)
> > > > > .groupByKey(..)
> > > > >
> > >
> .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
> > > > > .reduce((value1,value2) -> value2)
> > > > > .suppress
> > > > > .toStreams()
> > > > > .selectKey(selectKey B)
> > > > > .mapValues()
> > > > > .filter()
> > > > > .groupByKey()
> > > > > .reduce
> > > > > .toStream()
> > > > > .to()
> > > > >
> > > > > It will create 5 internal topics:
> > > > > 03-repartition, 03-changelog, 09-changelog, 14-repartition,
> > > 14-changelog.
> > > > >
> > > > > When I restart/redeployment the application, only 03-repartition
> has
> > > > > traffic and messages, but no out-traffic. Other internal topics
> have
> > > > > no traffic at all after restart/redeployment.
> > > > > It only works when I change the application ID. Should I include
> > > > > streams.cleanUp() before start the stream each time? Or anything
> else
> > > > > goes wrong?
> > > > >
> > > > > Thanks a lot!
> > > > >
> > > > > On Wed, Sep 25, 2019 at 2:35 PM Boyang Chen <
> > > reluctanthero...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > Hey Xiyuan,
> > > > > >
> > > > > > I would assume it's easier for us to help you by reading your
> > > application
> > > > > > with a full paste of code (a prototype). Changing application id
> > > would
> > > > > work
> > > > > > suggests that re-process all the data again shall work, do I
> > > understand
> > > > > > that correctly?
> > > > > >
> > > > > > Boyang
> > > > > >
> > > > > > On Wed, Sep 25, 2019 at 8:16 AM Xiyuan Hu <xiyuan.h...@gmail.com
> >
> > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > I'm running a Kafka streams app(v2.1.0) with windowed
> > > function(reduce
> > > > > > > and suppress). One thing I noticed is, every time when I
> > > redeployment
> > > > > > > or restart the application, I have to change the application
> ID to
> > > a
> > > > > > > new one, otherwise, only the reduce-repartition internal topic
> has
> > > > > > > input traffic(and it has no out-traffic), all other internal
> topics
> > > > > > > has no traffic as all. Looks like it just flows into the first
> > > > > > > internal repartition topic(reduce-repartiton), the
> reduce-changelog
> > > > > > > has no traffic and no output traffic as well.
> > > > > > >
> > > > > > > Could anyone know what's wrong with it? Changing application
> Id and
> > > > > > > create new internal topics each time seems not the right thing
> to
> > > go
> > > > > > > with.
> > > > > > >
> > > > > > > I started the app like below:
> > > > > > >
> > > > > > > streams = new KafkaStreams(topology.getTopology(config),
> > > > > > > properties.getProperties());
> > > > > > > streams.start();
> > > > > > >
> > > > > > > Any help would be appreciated! Thanks!
> > > > > > >
> > > > >
> > >
>

Reply via email to