You might want to try temporarily commenting the suppress() call just to see if that's the cause of the issue. That said, what is the goal of this topology? It looks like you're trying to produce a count at the end for a key. Is the windowedBy() and suppress() there just to eliminate duplicates, or do you need the final results to be grouped by the 60 minute window?
Alex On Wed, Sep 25, 2019 at 9:26 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote: > Hi Alex, > Thanks for the reply! > > Yes. After deploy with same application ID, source topic has new > messages and the application is consuming them but no output at the > end. > suppress call is: > .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > Topology is like below: > > final KStream<String, byte[]> source = builder.stream(inputTopic); > KStream<String, Event> deserializedStream = source.mapValues( ... }); > > KStream<Windowed<String>, Event> dedupedStream = > deserializedStream.selectKey( ... ) > .groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(Event.class))) > .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO)) > .reduce((value1, value2) -> value2) > .suppress(untilWindowCloses(Suppressed.BufferConfig.unbounded())) > .toStream(); > > dedupedStream.selectKey( ... ) > .mapValues( ... ) > .filter(...) > .groupByKey(Grouped.with(Serdes.String(), new MessagetSerde())) > .reduce((value1, value2) -> { > long count1 = value1.getCount(); > long count2 = value2.getCount(); > value2.setCount(count1 + count2); > return value2; > } > ) > .toStream() > .selectKey( ... ) > .to(outputTopic); > > On Wed, Sep 25, 2019 at 10:14 PM Alex Brekken <brek...@gmail.com> wrote: > > > > Hi Xiyuan, just to clarify: after you restart the application (using the > > same application ID as previously) there ARE new messages in the source > > topic and your application IS consuming them, but you're not seeing any > > output at the end? How are you configuring your suppress() call? Is it > > possible that messages are being held there and not emitted further > > downstream? Does commenting the suppress call cause data to flow all the > > way through? In order to help further we might need to see your actual > > topology code if that's possible. > > > > Alex > > > > On Wed, Sep 25, 2019 at 2:17 PM Xiyuan Hu <xiyuan.h...@gmail.com> wrote: > > > > > Hi, > > > > > > If I change application id, it will start process new messages I > > > assume? The old data will be dropped. But this solution will not work > > > during production deployment, since we can't change application id for > > > each release. > > > > > > My code looks like below: > > > > > > builder.stream(topicName) > > > .mapValues() > > > stream.selectKey(selectKey A) > > > .groupByKey(..) > > > > .windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO)) > > > .reduce((value1,value2) -> value2) > > > .suppress > > > .toStreams() > > > .selectKey(selectKey B) > > > .mapValues() > > > .filter() > > > .groupByKey() > > > .reduce > > > .toStream() > > > .to() > > > > > > It will create 5 internal topics: > > > 03-repartition, 03-changelog, 09-changelog, 14-repartition, > 14-changelog. > > > > > > When I restart/redeployment the application, only 03-repartition has > > > traffic and messages, but no out-traffic. Other internal topics have > > > no traffic at all after restart/redeployment. > > > It only works when I change the application ID. Should I include > > > streams.cleanUp() before start the stream each time? Or anything else > > > goes wrong? > > > > > > Thanks a lot! > > > > > > On Wed, Sep 25, 2019 at 2:35 PM Boyang Chen < > reluctanthero...@gmail.com> > > > wrote: > > > > > > > > Hey Xiyuan, > > > > > > > > I would assume it's easier for us to help you by reading your > application > > > > with a full paste of code (a prototype). Changing application id > would > > > work > > > > suggests that re-process all the data again shall work, do I > understand > > > > that correctly? > > > > > > > > Boyang > > > > > > > > On Wed, Sep 25, 2019 at 8:16 AM Xiyuan Hu <xiyuan.h...@gmail.com> > wrote: > > > > > > > > > Hi, > > > > > > > > > > I'm running a Kafka streams app(v2.1.0) with windowed > function(reduce > > > > > and suppress). One thing I noticed is, every time when I > redeployment > > > > > or restart the application, I have to change the application ID to > a > > > > > new one, otherwise, only the reduce-repartition internal topic has > > > > > input traffic(and it has no out-traffic), all other internal topics > > > > > has no traffic as all. Looks like it just flows into the first > > > > > internal repartition topic(reduce-repartiton), the reduce-changelog > > > > > has no traffic and no output traffic as well. > > > > > > > > > > Could anyone know what's wrong with it? Changing application Id and > > > > > create new internal topics each time seems not the right thing to > go > > > > > with. > > > > > > > > > > I started the app like below: > > > > > > > > > > streams = new KafkaStreams(topology.getTopology(config), > > > > > properties.getProperties()); > > > > > streams.start(); > > > > > > > > > > Any help would be appreciated! Thanks! > > > > > > > > >