Thanks for the reply! I have two questions: 1) No output issue only happens when I restart/redeployment the application with the same application Id. But when I run the application first time, it works fine. Thus, I assume suppress() is working fine, at least fine for the first run. The thing I can't understand is how Kafka streams works with restart/redeployment. 2) Another thought is, my application has a huge lag (50M messages). Will that be a problem during restart/redeployment?
Your suggestion will populate the duplicate messages to downstreams, just the count number will be accurate. Is that correct? Thanks a lot!! On Thu, Sep 26, 2019 at 8:22 AM Alex Brekken <brek...@gmail.com> wrote: > > So I'm not exactly sure why supress() isn't working for you, because it > should send out a final message once the window closes - assuming you're > still getting new messages flowing through the topology. Have you tried > using the count function in KGroupedTable? It should handle duplicates > correctly. So a slightly modified version or your topology might look > like this: > > KStream<Windowed<String>, Event> dedupedStream = > deserializedStream.selectKey( ... ) > .groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(Event.class))) > .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO)) > .reduce((value1, value2) -> value2) > .filter(...) > .groupBy() //change your key here as needed > .count() //this count function should handle duplicates and still come out > with the right answer > .toStream() > .selectKey( ... ) > .to(outputTopic); > > The idea here is you're not preventing duplicate messages from flowing > through, instead you're tolerating them and not allowing them to > incorrectly change your counts. Also, you had a mapValues() call in there > too which creates another KTable. It might not matter, but could you do > that as part of the reduce() step maybe? (or replace the reduce with > aggregate which lets you have a different type than your input type). Then > your topology would only have 2 KTables instead of 3. Good luck, if this > doesn't work then I'm out of ideas. :). > > Alex > > On Thu, Sep 26, 2019 at 12:06 AM Xiyuan Hu <xiyuan.h...@gmail.com> wrote: > > > Hi, > > > > The first selectKey/groupBy/windowedBy/reduce is to group messages by > > key and drop duplicated messages based on the new key, so that for > > each 1hr time window, each key will only populate 1 message. I use > > suppress() is to make sure only the latest message per time window > > will be sent. > > > > The second selectKey/groupBy/reduce is to do counting. After deduping, > > count how many different messages per window. > > > > e.g. The first groupBy is grouped by key (A + B), and drop all > > duplicates. The second groupBy is grouped by key (window start time + > > A). If I comment out suppress(), I assume all updates during reduce() > > will be populated to next step? > > > > Thanks! > > > > On Wed, Sep 25, 2019 at 11:16 PM Alex Brekken <brek...@gmail.com> wrote: > > > > > > You might want to try temporarily commenting the suppress() call just to > > > see if that's the cause of the issue. That said, what is the goal of this > > > topology? It looks like you're trying to produce a count at the end for a > > > key. Is the windowedBy() and suppress() there just to eliminate > > > duplicates, or do you need the final results to be grouped by the 60 > > minute > > > window? > > > > > > Alex > > > > > > On Wed, Sep 25, 2019 at 9:26 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote: > > > > > > > Hi Alex, > > > > Thanks for the reply! > > > > > > > > Yes. After deploy with same application ID, source topic has new > > > > messages and the application is consuming them but no output at the > > > > end. > > > > suppress call is: > > > > .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > > > > > > > Topology is like below: > > > > > > > > final KStream<String, byte[]> source = builder.stream(inputTopic); > > > > KStream<String, Event> deserializedStream = source.mapValues( ... }); > > > > > > > > KStream<Windowed<String>, Event> dedupedStream = > > > > deserializedStream.selectKey( ... ) > > > > .groupByKey(Grouped.with(Serdes.String(), new > > JsonSerde<>(Event.class))) > > > > > > .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO)) > > > > .reduce((value1, value2) -> value2) > > > > .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > > > .toStream(); > > > > > > > > dedupedStream.selectKey( ... ) > > > > .mapValues( ... ) > > > > .filter(...) > > > > .groupByKey(Grouped.with(Serdes.String(), new MessagetSerde())) > > > > .reduce((value1, value2) -> { > > > > long count1 = value1.getCount(); > > > > long count2 = value2.getCount(); > > > > value2.setCount(count1 + count2); > > > > return value2; > > > > } > > > > ) > > > > .toStream() > > > > .selectKey( ... ) > > > > .to(outputTopic); > > > > > > > > On Wed, Sep 25, 2019 at 10:14 PM Alex Brekken <brek...@gmail.com> > > wrote: > > > > > > > > > > Hi Xiyuan, just to clarify: after you restart the application (using > > the > > > > > same application ID as previously) there ARE new messages in the > > source > > > > > topic and your application IS consuming them, but you're not seeing > > any > > > > > output at the end? How are you configuring your suppress() call? > > Is it > > > > > possible that messages are being held there and not emitted further > > > > > downstream? Does commenting the suppress call cause data to flow all > > the > > > > > way through? In order to help further we might need to see your > > actual > > > > > topology code if that's possible. > > > > > > > > > > Alex > > > > > > > > > > On Wed, Sep 25, 2019 at 2:17 PM Xiyuan Hu <xiyuan.h...@gmail.com> > > wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > If I change application id, it will start process new messages I > > > > > > assume? The old data will be dropped. But this solution will not > > work > > > > > > during production deployment, since we can't change application id > > for > > > > > > each release. > > > > > > > > > > > > My code looks like below: > > > > > > > > > > > > builder.stream(topicName) > > > > > > .mapValues() > > > > > > stream.selectKey(selectKey A) > > > > > > .groupByKey(..) > > > > > > > > > > > > .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO)) > > > > > > .reduce((value1,value2) -> value2) > > > > > > .suppress > > > > > > .toStreams() > > > > > > .selectKey(selectKey B) > > > > > > .mapValues() > > > > > > .filter() > > > > > > .groupByKey() > > > > > > .reduce > > > > > > .toStream() > > > > > > .to() > > > > > > > > > > > > It will create 5 internal topics: > > > > > > 03-repartition, 03-changelog, 09-changelog, 14-repartition, > > > > 14-changelog. > > > > > > > > > > > > When I restart/redeployment the application, only 03-repartition > > has > > > > > > traffic and messages, but no out-traffic. Other internal topics > > have > > > > > > no traffic at all after restart/redeployment. > > > > > > It only works when I change the application ID. Should I include > > > > > > streams.cleanUp() before start the stream each time? Or anything > > else > > > > > > goes wrong? > > > > > > > > > > > > Thanks a lot! > > > > > > > > > > > > On Wed, Sep 25, 2019 at 2:35 PM Boyang Chen < > > > > reluctanthero...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > Hey Xiyuan, > > > > > > > > > > > > > > I would assume it's easier for us to help you by reading your > > > > application > > > > > > > with a full paste of code (a prototype). Changing application id > > > > would > > > > > > work > > > > > > > suggests that re-process all the data again shall work, do I > > > > understand > > > > > > > that correctly? > > > > > > > > > > > > > > Boyang > > > > > > > > > > > > > > On Wed, Sep 25, 2019 at 8:16 AM Xiyuan Hu <xiyuan.h...@gmail.com > > > > > > > wrote: > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > I'm running a Kafka streams app(v2.1.0) with windowed > > > > function(reduce > > > > > > > > and suppress). One thing I noticed is, every time when I > > > > redeployment > > > > > > > > or restart the application, I have to change the application > > ID to > > > > a > > > > > > > > new one, otherwise, only the reduce-repartition internal topic > > has > > > > > > > > input traffic(and it has no out-traffic), all other internal > > topics > > > > > > > > has no traffic as all. Looks like it just flows into the first > > > > > > > > internal repartition topic(reduce-repartiton), the > > reduce-changelog > > > > > > > > has no traffic and no output traffic as well. > > > > > > > > > > > > > > > > Could anyone know what's wrong with it? Changing application > > Id and > > > > > > > > create new internal topics each time seems not the right thing > > to > > > > go > > > > > > > > with. > > > > > > > > > > > > > > > > I started the app like below: > > > > > > > > > > > > > > > > streams = new KafkaStreams(topology.getTopology(config), > > > > > > > > properties.getProperties()); > > > > > > > > streams.start(); > > > > > > > > > > > > > > > > Any help would be appreciated! Thanks! > > > > > > > > > > > > > > > > > > > >