[ 
https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812787#comment-16812787
 ] 

John Roesler commented on KAFKA-7895:
-------------------------------------

No problem. Thanks for _your_ help!

The PR is being reviewed now. Once it's merged, I'll request it to be 
cherry-picked to all versions that have Suppress, and then request bugfix 
releases for them.

 

As an update, I've tracked down the problem where I'm still seeing duplicates 
when I crash the application, even with EOS enabled. The problem is that 
Streams flushes the state stores in no particular order. If there is a cached 
state store upstream of the suppression, and Streams flushes the suppression 
buffer first, and then the cached state store second, the cached data will get 
flushed and processed by Suppress, and there may be processing results that 
Suppress emits, but the changelog for the buffer won't be updated again. The 
solution I have in mind is to flush the stores in topological order. In the 
mean time, disabling caching should eliminate this particular source of 
duplicates, if someone is looking for a workaround.

> Ktable supress operator emitting more than one record for the same key per 
> window
> ---------------------------------------------------------------------------------
>
>                 Key: KAFKA-7895
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7895
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.1.0, 2.1.1
>            Reporter: prasanthi
>            Assignee: John Roesler
>            Priority: Major
>             Fix For: 2.2.0, 2.1.2
>
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within 
> a specified window.
> Here's how we configured the suppress operator to emit one final record per 
> key/window.
> {code:java}
> KTable<Windowed<Integer>, Long> windowedCount = groupedStream
>      .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
>      .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
>      .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown 
> below.
> {code:java}
> [KTABLE-TOSTREAM-0000000010]: [131@1549067040000/1549067100000], 1039
> [KTABLE-TOSTREAM-0000000010]: [131@1549067040000/1549067100000], 1162
> [KTABLE-TOSTREAM-0000000010]: [9@1549067040000/1549067100000], 6584
> [KTABLE-TOSTREAM-0000000010]: [88@1549067040000/1549067100000], 107
> [KTABLE-TOSTREAM-0000000010]: [108@1549067040000/1549067100000], 315
> [KTABLE-TOSTREAM-0000000010]: [119@1549067040000/1549067100000], 119
> [KTABLE-TOSTREAM-0000000010]: [154@1549067040000/1549067100000], 746
> [KTABLE-TOSTREAM-0000000010]: [154@1549067040000/1549067100000], 809{code}
> Could you please take a look?
> Thanks
>  
>  
> Added by John:
> Acceptance Criteria:
>  * add suppress to system tests, such that it's exercised with crash/shutdown 
> recovery, rebalance, etc.
>  ** [https://github.com/apache/kafka/pull/6278]
>  * make sure that there's some system test coverage with caching disabled.
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943
>  * test with tighter time bounds with windows of say 30 seconds and use 
> system time without adding any extra time for verification
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to