1.  Yeah I'm not sure why restarting is causing you problems.  You
shouldn't be changing your application ID just to get data flowing ,so
something is wrong there I'm just not sure what.

2.  Lag on the source topic?  I guess that depends on how long your
application is down and how quickly it can catch up once it's running.

" Your suggestion will populate the duplicate messages to downstreams,
just the count number will be accurate. Is that correct?"

Yes that's correct. The final count() function will actually subtract 1
when you a get a duplicate in the upstream KTable, and then add 1 back
again resulting in no change to the final number.

Alex


On Thu, Sep 26, 2019 at 2:38 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote:

> Thanks for the reply!
>
> I have two questions:
> 1) No output issue only happens when I restart/redeployment the
> application with the same application Id. But when I run the
> application first time, it works fine. Thus, I assume suppress() is
> working fine, at least fine for the first run. The thing I can't
> understand is how Kafka streams works with restart/redeployment.
> 2) Another thought is, my application has a huge lag (50M messages).
> Will that be a problem during restart/redeployment?
>
> Your suggestion will populate the duplicate messages to downstreams,
> just the count number will be accurate. Is that correct?
>
> Thanks a lot!!
>
> On Thu, Sep 26, 2019 at 8:22 AM Alex Brekken <brek...@gmail.com> wrote:
> >
> > So I'm not exactly sure why supress() isn't working for you, because it
> > should send out a final message once the window closes - assuming you're
> > still getting new messages flowing through the topology.  Have you tried
> > using the count function in KGroupedTable? It should handle duplicates
> > correctly.   So a slightly modified version or your topology might look
> > like this:
> >
> > KStream<Windowed<String>, Event> dedupedStream =
> > deserializedStream.selectKey( ... )
> > .groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(Event.class)))
> > .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
> > .reduce((value1, value2) -> value2)
> > .filter(...)
> > .groupBy() //change your key here as needed
> > .count() //this count function should handle duplicates and still come
> out
> > with the right answer
> > .toStream()
> > .selectKey( ... )
> > .to(outputTopic);
> >
> > The idea here is you're not preventing duplicate messages from flowing
> > through, instead you're tolerating them and not allowing them to
> > incorrectly change your counts.  Also, you had a mapValues() call in
> there
> > too which creates another KTable.  It might not matter, but could you do
> > that as part of the reduce() step maybe?  (or replace the reduce with
> > aggregate which lets you have a different type than your input type).
> Then
> > your topology would only have 2 KTables instead of 3.  Good luck, if this
> > doesn't work then I'm out of ideas.  :).
> >
> > Alex
> >
> > On Thu, Sep 26, 2019 at 12:06 AM Xiyuan Hu <xiyuan.h...@gmail.com>
> wrote:
> >
> > > Hi,
> > >
> > > The first selectKey/groupBy/windowedBy/reduce is to group messages by
> > > key and drop duplicated messages based on the new key, so that for
> > > each 1hr time window, each key will only populate 1 message. I use
> > > suppress() is to make sure only the latest message per time window
> > > will be sent.
> > >
> > > The second selectKey/groupBy/reduce is to do counting. After deduping,
> > > count how many different messages per window.
> > >
> > > e.g. The first groupBy is grouped by key (A + B), and drop all
> > > duplicates. The second groupBy is grouped by key (window start time +
> > > A). If I comment out suppress(), I assume all updates during reduce()
> > > will be populated to next step?
> > >
> > > Thanks!
> > >
> > > On Wed, Sep 25, 2019 at 11:16 PM Alex Brekken <brek...@gmail.com>
> wrote:
> > > >
> > > > You might want to try temporarily commenting the suppress() call
> just to
> > > > see if that's the cause of the issue. That said, what is the goal of
> this
> > > > topology? It looks like you're trying to produce a count at the end
> for a
> > > > key.  Is the windowedBy() and suppress() there just to eliminate
> > > > duplicates, or do you need the final results to be grouped by the 60
> > > minute
> > > > window?
> > > >
> > > > Alex
> > > >
> > > > On Wed, Sep 25, 2019 at 9:26 PM Xiyuan Hu <xiyuan.h...@gmail.com>
> wrote:
> > > >
> > > > > Hi Alex,
> > > > > Thanks for the reply!
> > > > >
> > > > > Yes. After deploy with same application ID, source topic has new
> > > > > messages and the application is consuming them but no output at the
> > > > > end.
> > > > > suppress call is:
> > > > > .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > >
> > > > > Topology is like below:
> > > > >
> > > > > final KStream<String, byte[]> source = builder.stream(inputTopic);
> > > > > KStream<String, Event> deserializedStream = source.mapValues( ...
> });
> > > > >
> > > > > KStream<Windowed<String>, Event> dedupedStream =
> > > > > deserializedStream.selectKey( ... )
> > > > > .groupByKey(Grouped.with(Serdes.String(), new
> > > JsonSerde<>(Event.class)))
> > > > >
> > >
> .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
> > > > > .reduce((value1, value2) -> value2)
> > > > > .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > > .toStream();
> > > > >
> > > > > dedupedStream.selectKey( ... )
> > > > > .mapValues( ... )
> > > > > .filter(...)
> > > > > .groupByKey(Grouped.with(Serdes.String(), new MessagetSerde()))
> > > > > .reduce((value1, value2) -> {
> > > > >     long count1 = value1.getCount();
> > > > >     long count2 = value2.getCount();
> > > > >     value2.setCount(count1 + count2);
> > > > >     return value2;
> > > > > }
> > > > > )
> > > > > .toStream()
> > > > > .selectKey( ... )
> > > > > .to(outputTopic);
> > > > >
> > > > > On Wed, Sep 25, 2019 at 10:14 PM Alex Brekken <brek...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > Hi Xiyuan, just to clarify: after you restart the application
> (using
> > > the
> > > > > > same application ID as previously) there ARE new messages in the
> > > source
> > > > > > topic and your application IS consuming them, but you're not
> seeing
> > > any
> > > > > > output at the end?  How are you configuring your suppress() call?
> > > Is it
> > > > > > possible that messages are being held there and not emitted
> further
> > > > > > downstream? Does commenting the suppress call cause data to flow
> all
> > > the
> > > > > > way through?  In order to help further we might need to see your
> > > actual
> > > > > > topology code if that's possible.
> > > > > >
> > > > > > Alex
> > > > > >
> > > > > > On Wed, Sep 25, 2019 at 2:17 PM Xiyuan Hu <xiyuan.h...@gmail.com
> >
> > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > If I change application id, it will start process new messages
> I
> > > > > > > assume? The old data will be dropped. But this solution will
> not
> > > work
> > > > > > > during production deployment, since we can't change
> application id
> > > for
> > > > > > > each release.
> > > > > > >
> > > > > > > My code looks like below:
> > > > > > >
> > > > > > > builder.stream(topicName)
> > > > > > > .mapValues()
> > > > > > > stream.selectKey(selectKey A)
> > > > > > > .groupByKey(..)
> > > > > > >
> > > > >
> > >
> .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
> > > > > > > .reduce((value1,value2) -> value2)
> > > > > > > .suppress
> > > > > > > .toStreams()
> > > > > > > .selectKey(selectKey B)
> > > > > > > .mapValues()
> > > > > > > .filter()
> > > > > > > .groupByKey()
> > > > > > > .reduce
> > > > > > > .toStream()
> > > > > > > .to()
> > > > > > >
> > > > > > > It will create 5 internal topics:
> > > > > > > 03-repartition, 03-changelog, 09-changelog, 14-repartition,
> > > > > 14-changelog.
> > > > > > >
> > > > > > > When I restart/redeployment the application, only
> 03-repartition
> > > has
> > > > > > > traffic and messages, but no out-traffic. Other internal topics
> > > have
> > > > > > > no traffic at all after restart/redeployment.
> > > > > > > It only works when I change the application ID. Should I
> include
> > > > > > > streams.cleanUp() before start the stream each time? Or
> anything
> > > else
> > > > > > > goes wrong?
> > > > > > >
> > > > > > > Thanks a lot!
> > > > > > >
> > > > > > > On Wed, Sep 25, 2019 at 2:35 PM Boyang Chen <
> > > > > reluctanthero...@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > Hey Xiyuan,
> > > > > > > >
> > > > > > > > I would assume it's easier for us to help you by reading your
> > > > > application
> > > > > > > > with a full paste of code (a prototype). Changing
> application id
> > > > > would
> > > > > > > work
> > > > > > > > suggests that re-process all the data again shall work, do I
> > > > > understand
> > > > > > > > that correctly?
> > > > > > > >
> > > > > > > > Boyang
> > > > > > > >
> > > > > > > > On Wed, Sep 25, 2019 at 8:16 AM Xiyuan Hu <
> xiyuan.h...@gmail.com
> > > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > >
> > > > > > > > > I'm running a Kafka streams app(v2.1.0) with windowed
> > > > > function(reduce
> > > > > > > > > and suppress). One thing I noticed is, every time when I
> > > > > redeployment
> > > > > > > > > or restart the application, I have to change the
> application
> > > ID to
> > > > > a
> > > > > > > > > new one, otherwise, only the reduce-repartition internal
> topic
> > > has
> > > > > > > > > input traffic(and it has no out-traffic), all other
> internal
> > > topics
> > > > > > > > > has no traffic as all. Looks like it just flows into the
> first
> > > > > > > > > internal repartition topic(reduce-repartiton), the
> > > reduce-changelog
> > > > > > > > > has no traffic and no output traffic as well.
> > > > > > > > >
> > > > > > > > > Could anyone know what's wrong with it? Changing
> application
> > > Id and
> > > > > > > > > create new internal topics each time seems not the right
> thing
> > > to
> > > > > go
> > > > > > > > > with.
> > > > > > > > >
> > > > > > > > > I started the app like below:
> > > > > > > > >
> > > > > > > > > streams = new KafkaStreams(topology.getTopology(config),
> > > > > > > > > properties.getProperties());
> > > > > > > > > streams.start();
> > > > > > > > >
> > > > > > > > > Any help would be appreciated! Thanks!
> > > > > > > > >
> > > > > > >
> > > > >
> > >
>

Reply via email to