[ 
https://issues.apache.org/jira/browse/KAFKA-8199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16845185#comment-16845185
 ] 

ASF GitHub Bot commented on KAFKA-8199:
---------------------------------------

vvcephei commented on pull request #6781: KAFKA-8199: Implement ValueGetter for 
Suppress
URL: https://github.com/apache/kafka/pull/6781
 
 
   See also https://github.com/apache/kafka/pull/6684
   
   KTable processors must be supplied with a KTableProcessorSupplier, which in 
turn requires implementing a ValueGetter, for use with joins and groupings.
   
   For suppression, a correct view only includes the previously emitted values 
(not the currently buffered ones), so this change also involves pushing the 
Change value type into the suppression buffer's interface, so that it can get 
the prior value upon first buffering (which is also the previously emitted 
value).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ClassCastException when trying to groupBy after suppress
> --------------------------------------------------------
>
>                 Key: KAFKA-8199
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8199
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.1.0
>            Reporter: Bill Bejeck
>            Assignee: Jose Lopez
>            Priority: Major
>             Fix For: 2.3.0
>
>
> A topology with a groupBy after a suppress operation results in a 
> ClassCastException
>  The following sample topology
> {noformat}
> Properties properties = new Properties(); 
> properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appid"); 
> properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost");
> StreamsBuilder builder = new StreamsBuilder();
>  builder.<String, String>stream("topic")
> .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(30))).count() 
> .suppress(Suppressed.untilTimeLimit(Duration.ofHours(1), 
> BufferConfig.unbounded())) 
> .groupBy((k, v) -> KeyValue.pair(k,v)).count().toStream(); 
> builder.build(properties);
> {noformat}
> results in this exception:
> {noformat}
> java.lang.ClassCastException: 
> org.apache.kafka.streams.kstream.internals.KTableImpl$$Lambda$4/2084435065 
> cannot be cast to 
> org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier{noformat}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to