Thanks for the reply!

I have two questions:
1) No output issue only happens when I restart/redeployment the
application with the same application Id. But when I run the
application first time, it works fine. Thus, I assume suppress() is
working fine, at least fine for the first run. The thing I can't
understand is how Kafka streams works with restart/redeployment.
2) Another thought is, my application has a huge lag (50M messages).
Will that be a problem during restart/redeployment?

Your suggestion will populate the duplicate messages to downstreams,
just the count number will be accurate. Is that correct?

Thanks a lot!!

On Thu, Sep 26, 2019 at 8:22 AM Alex Brekken <brek...@gmail.com> wrote:
>
> So I'm not exactly sure why supress() isn't working for you, because it
> should send out a final message once the window closes - assuming you're
> still getting new messages flowing through the topology.  Have you tried
> using the count function in KGroupedTable? It should handle duplicates
> correctly.   So a slightly modified version or your topology might look
> like this:
>
> KStream<Windowed<String>, Event> dedupedStream =
> deserializedStream.selectKey( ... )
> .groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(Event.class)))
> .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
> .reduce((value1, value2) -> value2)
> .filter(...)
> .groupBy() //change your key here as needed
> .count() //this count function should handle duplicates and still come out
> with the right answer
> .toStream()
> .selectKey( ... )
> .to(outputTopic);
>
> The idea here is you're not preventing duplicate messages from flowing
> through, instead you're tolerating them and not allowing them to
> incorrectly change your counts.  Also, you had a mapValues() call in there
> too which creates another KTable.  It might not matter, but could you do
> that as part of the reduce() step maybe?  (or replace the reduce with
> aggregate which lets you have a different type than your input type). Then
> your topology would only have 2 KTables instead of 3.  Good luck, if this
> doesn't work then I'm out of ideas.  :).
>
> Alex
>
> On Thu, Sep 26, 2019 at 12:06 AM Xiyuan Hu <xiyuan.h...@gmail.com> wrote:
>
> > Hi,
> >
> > The first selectKey/groupBy/windowedBy/reduce is to group messages by
> > key and drop duplicated messages based on the new key, so that for
> > each 1hr time window, each key will only populate 1 message. I use
> > suppress() is to make sure only the latest message per time window
> > will be sent.
> >
> > The second selectKey/groupBy/reduce is to do counting. After deduping,
> > count how many different messages per window.
> >
> > e.g. The first groupBy is grouped by key (A + B), and drop all
> > duplicates. The second groupBy is grouped by key (window start time +
> > A). If I comment out suppress(), I assume all updates during reduce()
> > will be populated to next step?
> >
> > Thanks!
> >
> > On Wed, Sep 25, 2019 at 11:16 PM Alex Brekken <brek...@gmail.com> wrote:
> > >
> > > You might want to try temporarily commenting the suppress() call just to
> > > see if that's the cause of the issue. That said, what is the goal of this
> > > topology? It looks like you're trying to produce a count at the end for a
> > > key.  Is the windowedBy() and suppress() there just to eliminate
> > > duplicates, or do you need the final results to be grouped by the 60
> > minute
> > > window?
> > >
> > > Alex
> > >
> > > On Wed, Sep 25, 2019 at 9:26 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote:
> > >
> > > > Hi Alex,
> > > > Thanks for the reply!
> > > >
> > > > Yes. After deploy with same application ID, source topic has new
> > > > messages and the application is consuming them but no output at the
> > > > end.
> > > > suppress call is:
> > > > .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > >
> > > > Topology is like below:
> > > >
> > > > final KStream<String, byte[]> source = builder.stream(inputTopic);
> > > > KStream<String, Event> deserializedStream = source.mapValues( ... });
> > > >
> > > > KStream<Windowed<String>, Event> dedupedStream =
> > > > deserializedStream.selectKey( ... )
> > > > .groupByKey(Grouped.with(Serdes.String(), new
> > JsonSerde<>(Event.class)))
> > > >
> > .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
> > > > .reduce((value1, value2) -> value2)
> > > > .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > .toStream();
> > > >
> > > > dedupedStream.selectKey( ... )
> > > > .mapValues( ... )
> > > > .filter(...)
> > > > .groupByKey(Grouped.with(Serdes.String(), new MessagetSerde()))
> > > > .reduce((value1, value2) -> {
> > > >     long count1 = value1.getCount();
> > > >     long count2 = value2.getCount();
> > > >     value2.setCount(count1 + count2);
> > > >     return value2;
> > > > }
> > > > )
> > > > .toStream()
> > > > .selectKey( ... )
> > > > .to(outputTopic);
> > > >
> > > > On Wed, Sep 25, 2019 at 10:14 PM Alex Brekken <brek...@gmail.com>
> > wrote:
> > > > >
> > > > > Hi Xiyuan, just to clarify: after you restart the application (using
> > the
> > > > > same application ID as previously) there ARE new messages in the
> > source
> > > > > topic and your application IS consuming them, but you're not seeing
> > any
> > > > > output at the end?  How are you configuring your suppress() call?
> > Is it
> > > > > possible that messages are being held there and not emitted further
> > > > > downstream? Does commenting the suppress call cause data to flow all
> > the
> > > > > way through?  In order to help further we might need to see your
> > actual
> > > > > topology code if that's possible.
> > > > >
> > > > > Alex
> > > > >
> > > > > On Wed, Sep 25, 2019 at 2:17 PM Xiyuan Hu <xiyuan.h...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > If I change application id, it will start process new messages I
> > > > > > assume? The old data will be dropped. But this solution will not
> > work
> > > > > > during production deployment, since we can't change application id
> > for
> > > > > > each release.
> > > > > >
> > > > > > My code looks like below:
> > > > > >
> > > > > > builder.stream(topicName)
> > > > > > .mapValues()
> > > > > > stream.selectKey(selectKey A)
> > > > > > .groupByKey(..)
> > > > > >
> > > >
> > .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
> > > > > > .reduce((value1,value2) -> value2)
> > > > > > .suppress
> > > > > > .toStreams()
> > > > > > .selectKey(selectKey B)
> > > > > > .mapValues()
> > > > > > .filter()
> > > > > > .groupByKey()
> > > > > > .reduce
> > > > > > .toStream()
> > > > > > .to()
> > > > > >
> > > > > > It will create 5 internal topics:
> > > > > > 03-repartition, 03-changelog, 09-changelog, 14-repartition,
> > > > 14-changelog.
> > > > > >
> > > > > > When I restart/redeployment the application, only 03-repartition
> > has
> > > > > > traffic and messages, but no out-traffic. Other internal topics
> > have
> > > > > > no traffic at all after restart/redeployment.
> > > > > > It only works when I change the application ID. Should I include
> > > > > > streams.cleanUp() before start the stream each time? Or anything
> > else
> > > > > > goes wrong?
> > > > > >
> > > > > > Thanks a lot!
> > > > > >
> > > > > > On Wed, Sep 25, 2019 at 2:35 PM Boyang Chen <
> > > > reluctanthero...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > Hey Xiyuan,
> > > > > > >
> > > > > > > I would assume it's easier for us to help you by reading your
> > > > application
> > > > > > > with a full paste of code (a prototype). Changing application id
> > > > would
> > > > > > work
> > > > > > > suggests that re-process all the data again shall work, do I
> > > > understand
> > > > > > > that correctly?
> > > > > > >
> > > > > > > Boyang
> > > > > > >
> > > > > > > On Wed, Sep 25, 2019 at 8:16 AM Xiyuan Hu <xiyuan.h...@gmail.com
> > >
> > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > I'm running a Kafka streams app(v2.1.0) with windowed
> > > > function(reduce
> > > > > > > > and suppress). One thing I noticed is, every time when I
> > > > redeployment
> > > > > > > > or restart the application, I have to change the application
> > ID to
> > > > a
> > > > > > > > new one, otherwise, only the reduce-repartition internal topic
> > has
> > > > > > > > input traffic(and it has no out-traffic), all other internal
> > topics
> > > > > > > > has no traffic as all. Looks like it just flows into the first
> > > > > > > > internal repartition topic(reduce-repartiton), the
> > reduce-changelog
> > > > > > > > has no traffic and no output traffic as well.
> > > > > > > >
> > > > > > > > Could anyone know what's wrong with it? Changing application
> > Id and
> > > > > > > > create new internal topics each time seems not the right thing
> > to
> > > > go
> > > > > > > > with.
> > > > > > > >
> > > > > > > > I started the app like below:
> > > > > > > >
> > > > > > > > streams = new KafkaStreams(topology.getTopology(config),
> > > > > > > > properties.getProperties());
> > > > > > > > streams.start();
> > > > > > > >
> > > > > > > > Any help would be appreciated! Thanks!
> > > > > > > >
> > > > > >
> > > >
> >

Reply via email to