[ https://issues.apache.org/jira/browse/KAFKA-8199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16845185#comment-16845185 ]
ASF GitHub Bot commented on KAFKA-8199: --------------------------------------- vvcephei commented on pull request #6781: KAFKA-8199: Implement ValueGetter for Suppress URL: https://github.com/apache/kafka/pull/6781 See also https://github.com/apache/kafka/pull/6684 KTable processors must be supplied with a KTableProcessorSupplier, which in turn requires implementing a ValueGetter, for use with joins and groupings. For suppression, a correct view only includes the previously emitted values (not the currently buffered ones), so this change also involves pushing the Change value type into the suppression buffer's interface, so that it can get the prior value upon first buffering (which is also the previously emitted value). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ClassCastException when trying to groupBy after suppress > -------------------------------------------------------- > > Key: KAFKA-8199 > URL: https://issues.apache.org/jira/browse/KAFKA-8199 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.1.0 > Reporter: Bill Bejeck > Assignee: Jose Lopez > Priority: Major > Fix For: 2.3.0 > > > A topology with a groupBy after a suppress operation results in a > ClassCastException > The following sample topology > {noformat} > Properties properties = new Properties(); > properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "appid"); > properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost"); > StreamsBuilder builder = new StreamsBuilder(); > builder.<String, String>stream("topic") > .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(30))).count() > .suppress(Suppressed.untilTimeLimit(Duration.ofHours(1), > BufferConfig.unbounded())) > .groupBy((k, v) -> KeyValue.pair(k,v)).count().toStream(); > builder.build(properties); > {noformat} > results in this exception: > {noformat} > java.lang.ClassCastException: > org.apache.kafka.streams.kstream.internals.KTableImpl$$Lambda$4/2084435065 > cannot be cast to > org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier{noformat} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)