[ 
https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16763174#comment-16763174
 ] 

John Roesler commented on KAFKA-7895:
-------------------------------------

I think I figured it out.

 

The suppression operator depends on a guarantee from upstream windowed 
aggregations that they will not send any further events for a window once its 
grace period expires. The windowed aggregations themselves correctly implement 
this guarantee, but record caches can violate it, since they'll hold onto 
events until the next commit/flush.

This suggests that a workaround would be to disable caching in the windowed 
aggregation prior to suppression, but I didn't try it out. Depending on the 
specific topology, this might not be sufficient.

It just so happens that in all my tests, I'd disabled caching, probably with 
the idea of making the output deterministic. I didn't predict that it would 
affect correctness.

I have a fix for it, by changing the way that stream-time is accounted for. I'm 
running the system tests now to be sure the fix is ok. Once a preliminary PR 
([https://github.com/apache/kafka/pull/6231)] is merged and the system tests 
pass, I'll clean up my branch and send a PR.

Once I submit the PR, maybe some intrepid folks out there can pull my branch 
and try it out.

 

Does this explanation make sense?

Thanks,

-John

> Ktable supress operator emitting more than one record for the same key per 
> window
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-7895
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7895
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.1.0, 2.1.1
>            Reporter: prasanthi
>            Assignee: John Roesler
>            Priority: Major
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within 
> a specified window.
> Here's how we configured the suppress operator to emit one final record per 
> key/window.
> {code:java}
> KTable<Windowed<Integer>, Long> windowedCount = groupedStream
>      .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
>      .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
>      .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown 
> below.
> {code:java}
> [KTABLE-TOSTREAM-0000000010]: [131@1549067040000/1549067100000], 1039
> [KTABLE-TOSTREAM-0000000010]: [131@1549067040000/1549067100000], 1162
> [KTABLE-TOSTREAM-0000000010]: [9@1549067040000/1549067100000], 6584
> [KTABLE-TOSTREAM-0000000010]: [88@1549067040000/1549067100000], 107
> [KTABLE-TOSTREAM-0000000010]: [108@1549067040000/1549067100000], 315
> [KTABLE-TOSTREAM-0000000010]: [119@1549067040000/1549067100000], 119
> [KTABLE-TOSTREAM-0000000010]: [154@1549067040000/1549067100000], 746
> [KTABLE-TOSTREAM-0000000010]: [154@1549067040000/1549067100000], 809{code}
> Could you please take a look?
> Thanks



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to