Hi, The first selectKey/groupBy/windowedBy/reduce is to group messages by key and drop duplicated messages based on the new key, so that for each 1hr time window, each key will only populate 1 message. I use suppress() is to make sure only the latest message per time window will be sent.
The second selectKey/groupBy/reduce is to do counting. After deduping, count how many different messages per window. e.g. The first groupBy is grouped by key (A + B), and drop all duplicates. The second groupBy is grouped by key (window start time + A). If I comment out suppress(), I assume all updates during reduce() will be populated to next step? Thanks! On Wed, Sep 25, 2019 at 11:16 PM Alex Brekken <brek...@gmail.com> wrote: > > You might want to try temporarily commenting the suppress() call just to > see if that's the cause of the issue. That said, what is the goal of this > topology? It looks like you're trying to produce a count at the end for a > key. Is the windowedBy() and suppress() there just to eliminate > duplicates, or do you need the final results to be grouped by the 60 minute > window? > > Alex > > On Wed, Sep 25, 2019 at 9:26 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote: > > > Hi Alex, > > Thanks for the reply! > > > > Yes. After deploy with same application ID, source topic has new > > messages and the application is consuming them but no output at the > > end. > > suppress call is: > > .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > > > Topology is like below: > > > > final KStream<String, byte[]> source = builder.stream(inputTopic); > > KStream<String, Event> deserializedStream = source.mapValues( ... }); > > > > KStream<Windowed<String>, Event> dedupedStream = > > deserializedStream.selectKey( ... ) > > .groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(Event.class))) > > .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO)) > > .reduce((value1, value2) -> value2) > > .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > .toStream(); > > > > dedupedStream.selectKey( ... ) > > .mapValues( ... ) > > .filter(...) > > .groupByKey(Grouped.with(Serdes.String(), new MessagetSerde())) > > .reduce((value1, value2) -> { > > long count1 = value1.getCount(); > > long count2 = value2.getCount(); > > value2.setCount(count1 + count2); > > return value2; > > } > > ) > > .toStream() > > .selectKey( ... ) > > .to(outputTopic); > > > > On Wed, Sep 25, 2019 at 10:14 PM Alex Brekken <brek...@gmail.com> wrote: > > > > > > Hi Xiyuan, just to clarify: after you restart the application (using the > > > same application ID as previously) there ARE new messages in the source > > > topic and your application IS consuming them, but you're not seeing any > > > output at the end? How are you configuring your suppress() call? Is it > > > possible that messages are being held there and not emitted further > > > downstream? Does commenting the suppress call cause data to flow all the > > > way through? In order to help further we might need to see your actual > > > topology code if that's possible. > > > > > > Alex > > > > > > On Wed, Sep 25, 2019 at 2:17 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote: > > > > > > > Hi, > > > > > > > > If I change application id, it will start process new messages I > > > > assume? The old data will be dropped. But this solution will not work > > > > during production deployment, since we can't change application id for > > > > each release. > > > > > > > > My code looks like below: > > > > > > > > builder.stream(topicName) > > > > .mapValues() > > > > stream.selectKey(selectKey A) > > > > .groupByKey(..) > > > > > > .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO)) > > > > .reduce((value1,value2) -> value2) > > > > .suppress > > > > .toStreams() > > > > .selectKey(selectKey B) > > > > .mapValues() > > > > .filter() > > > > .groupByKey() > > > > .reduce > > > > .toStream() > > > > .to() > > > > > > > > It will create 5 internal topics: > > > > 03-repartition, 03-changelog, 09-changelog, 14-repartition, > > 14-changelog. > > > > > > > > When I restart/redeployment the application, only 03-repartition has > > > > traffic and messages, but no out-traffic. Other internal topics have > > > > no traffic at all after restart/redeployment. > > > > It only works when I change the application ID. Should I include > > > > streams.cleanUp() before start the stream each time? Or anything else > > > > goes wrong? > > > > > > > > Thanks a lot! > > > > > > > > On Wed, Sep 25, 2019 at 2:35 PM Boyang Chen < > > reluctanthero...@gmail.com> > > > > wrote: > > > > > > > > > > Hey Xiyuan, > > > > > > > > > > I would assume it's easier for us to help you by reading your > > application > > > > > with a full paste of code (a prototype). Changing application id > > would > > > > work > > > > > suggests that re-process all the data again shall work, do I > > understand > > > > > that correctly? > > > > > > > > > > Boyang > > > > > > > > > > On Wed, Sep 25, 2019 at 8:16 AM Xiyuan Hu <xiyuan.h...@gmail.com> > > wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > I'm running a Kafka streams app(v2.1.0) with windowed > > function(reduce > > > > > > and suppress). One thing I noticed is, every time when I > > redeployment > > > > > > or restart the application, I have to change the application ID to > > a > > > > > > new one, otherwise, only the reduce-repartition internal topic has > > > > > > input traffic(and it has no out-traffic), all other internal topics > > > > > > has no traffic as all. Looks like it just flows into the first > > > > > > internal repartition topic(reduce-repartiton), the reduce-changelog > > > > > > has no traffic and no output traffic as well. > > > > > > > > > > > > Could anyone know what's wrong with it? Changing application Id and > > > > > > create new internal topics each time seems not the right thing to > > go > > > > > > with. > > > > > > > > > > > > I started the app like below: > > > > > > > > > > > > streams = new KafkaStreams(topology.getTopology(config), > > > > > > properties.getProperties()); > > > > > > streams.start(); > > > > > > > > > > > > Any help would be appreciated! Thanks! > > > > > > > > > > > >