[ https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798017#comment-16798017 ]
Andrew Klopper commented on KAFKA-7895: --------------------------------------- Hi I am still seeing this behaviour using the 2.2.0 Maven artifacts from [https://repository.apache.org/content/groups/staging] With the topology below, I invariably get emissions of previously emitted windows for the same key on restart of my streams application, and sometimes the re-emitted windows have earlier timestamps and aggregated data that is consistent with the latter part of the window being lost (i.e., state seems to be resetting to an earlier version): {code:java} return sourceStream .groupByKey() .windowedBy(TimeWindows.of(rollupInterval).grace(config.getGracePeriodDuration())) .aggregate( () -> rollupFactory.createRollup(windowDuration), aggregator, Materialized.<String, R, WindowStore<Bytes, byte[]>>as(outputTopic + "_state") .withValueSerde(rollupSerde) ) .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()) .withName(outputTopic + "_suppress_state")) .toStream((stringWindowed, rollup) -> stringWindowed.key()); {code} I have tried disabling caching using: {code:java} streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); {code} but this does not appear to make a difference. Regards Andrew > Ktable supress operator emitting more than one record for the same key per > window > --------------------------------------------------------------------------------- > > Key: KAFKA-7895 > URL: https://issues.apache.org/jira/browse/KAFKA-7895 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.1.0, 2.1.1 > Reporter: prasanthi > Assignee: John Roesler > Priority: Major > Fix For: 2.2.0, 2.1.2 > > > Hi, We are using kstreams to get the aggregated counts per vendor(key) within > a specified window. > Here's how we configured the suppress operator to emit one final record per > key/window. > {code:java} > KTable<Windowed<Integer>, Long> windowedCount = groupedStream > .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L))) > .count(Materialized.with(Serdes.Integer(),Serdes.Long())) > .suppress(Suppressed.untilWindowCloses(unbounded())); > {code} > But we are getting more than one record for the same key/window as shown > below. > {code:java} > [KTABLE-TOSTREAM-0000000010]: [131@1549067040000/1549067100000], 1039 > [KTABLE-TOSTREAM-0000000010]: [131@1549067040000/1549067100000], 1162 > [KTABLE-TOSTREAM-0000000010]: [9@1549067040000/1549067100000], 6584 > [KTABLE-TOSTREAM-0000000010]: [88@1549067040000/1549067100000], 107 > [KTABLE-TOSTREAM-0000000010]: [108@1549067040000/1549067100000], 315 > [KTABLE-TOSTREAM-0000000010]: [119@1549067040000/1549067100000], 119 > [KTABLE-TOSTREAM-0000000010]: [154@1549067040000/1549067100000], 746 > [KTABLE-TOSTREAM-0000000010]: [154@1549067040000/1549067100000], 809{code} > Could you please take a look? > Thanks > > > Added by John: > Acceptance Criteria: > * add suppress to system tests, such that it's exercised with crash/shutdown > recovery, rebalance, etc. > ** [https://github.com/apache/kafka/pull/6278] > * make sure that there's some system test coverage with caching disabled. > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943 > * test with tighter time bounds with windows of say 30 seconds and use > system time without adding any extra time for verification > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944 -- This message was sent by Atlassian JIRA (v7.6.3#76005)