[ https://issues.apache.org/jira/browse/KAFKA-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Leah Thomas resolved KAFKA-10417. --------------------------------- Resolution: Fixed > suppress() with cogroup() throws ClassCastException > --------------------------------------------------- > > Key: KAFKA-10417 > URL: https://issues.apache.org/jira/browse/KAFKA-10417 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.6.0 > Reporter: Wardha Perinkada Kattu > Assignee: Leah Thomas > Priority: Critical > Labels: kafka-streams > Fix For: 2.8.0, 2.7.1 > > > Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` > throws `ClassCastException` > Works fine without the `suppress()` > Code block tested - > {code:java} > val stream1 = requestStreams.merge(successStreams).merge(errorStreams) > .groupByKey(Grouped.with(Serdes.String(), > serdesConfig.notificationSerde())) > val streams2 = confirmationStreams > .groupByKey(Grouped.with(Serdes.String(), > serdesConfig.confirmationsSerde())) > val cogrouped = > stream1.cogroup(notificationAggregator).cogroup(streams2, > confirmationsAggregator) > > .windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong()))) > .aggregate({ null }, Materialized.`as`<String, > NotificationMetric, WindowStore<Bytes, > ByteArray>>("time-windowed-aggregated-stream-store") > > .withValueSerde(serdesConfig.notificationMetricSerde())) > .suppress(Suppressed.untilWindowCloses(unbounded())) > .toStream() > {code} > Exception thrown is: > {code:java} > Caused by: java.lang.ClassCastException: class > org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to > class org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier > (org.apache.kafka.streams.kstream.internals.PassThrough and > org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in > unnamed module of loader 'app') > {code} > [https://stackoverflow.com/questions/63459685/kgroupedstream-with-cogroup-aggregate-suppress] -- This message was sent by Atlassian Jira (v8.3.4#803005)