[ https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16763174#comment-16763174 ]
John Roesler commented on KAFKA-7895: ------------------------------------- I think I figured it out. The suppression operator depends on a guarantee from upstream windowed aggregations that they will not send any further events for a window once its grace period expires. The windowed aggregations themselves correctly implement this guarantee, but record caches can violate it, since they'll hold onto events until the next commit/flush. This suggests that a workaround would be to disable caching in the windowed aggregation prior to suppression, but I didn't try it out. Depending on the specific topology, this might not be sufficient. It just so happens that in all my tests, I'd disabled caching, probably with the idea of making the output deterministic. I didn't predict that it would affect correctness. I have a fix for it, by changing the way that stream-time is accounted for. I'm running the system tests now to be sure the fix is ok. Once a preliminary PR ([https://github.com/apache/kafka/pull/6231)] is merged and the system tests pass, I'll clean up my branch and send a PR. Once I submit the PR, maybe some intrepid folks out there can pull my branch and try it out. Does this explanation make sense? Thanks, -John > Ktable supress operator emitting more than one record for the same key per > window > --------------------------------------------------------------------------------- > > Key: KAFKA-7895 > URL: https://issues.apache.org/jira/browse/KAFKA-7895 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.1.0, 2.1.1 > Reporter: prasanthi > Assignee: John Roesler > Priority: Major > > Hi, We are using kstreams to get the aggregated counts per vendor(key) within > a specified window. > Here's how we configured the suppress operator to emit one final record per > key/window. > {code:java} > KTable<Windowed<Integer>, Long> windowedCount = groupedStream > .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L))) > .count(Materialized.with(Serdes.Integer(),Serdes.Long())) > .suppress(Suppressed.untilWindowCloses(unbounded())); > {code} > But we are getting more than one record for the same key/window as shown > below. > {code:java} > [KTABLE-TOSTREAM-0000000010]: [131@1549067040000/1549067100000], 1039 > [KTABLE-TOSTREAM-0000000010]: [131@1549067040000/1549067100000], 1162 > [KTABLE-TOSTREAM-0000000010]: [9@1549067040000/1549067100000], 6584 > [KTABLE-TOSTREAM-0000000010]: [88@1549067040000/1549067100000], 107 > [KTABLE-TOSTREAM-0000000010]: [108@1549067040000/1549067100000], 315 > [KTABLE-TOSTREAM-0000000010]: [119@1549067040000/1549067100000], 119 > [KTABLE-TOSTREAM-0000000010]: [154@1549067040000/1549067100000], 746 > [KTABLE-TOSTREAM-0000000010]: [154@1549067040000/1549067100000], 809{code} > Could you please take a look? > Thanks -- This message was sent by Atlassian JIRA (v7.6.3#76005)