[ https://issues.apache.org/jira/browse/KAFKA-4270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Damian Guy resolved KAFKA-4270. ------------------------------- Resolution: Cannot Reproduce Closing this as haven't been able to reproduce and no further information provided > ClassCast for Agregation > ------------------------ > > Key: KAFKA-4270 > URL: https://issues.apache.org/jira/browse/KAFKA-4270 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Mykola Polonskyi > Assignee: Damian Guy > Priority: Critical > Labels: architecture > > With defined serdes for intermediate topic in aggregation catch the > ClassCastException: from custom class to the ByteArray. > In debug I saw that defined serde isn't used for creation sinkNode (incide > `org.apache.kafka.streams.kstream.internals.KGroupedTableImpl#doAggregate`) > Instead defined serde inside aggregation call is used default Impl with empty > plugs instead of implementations > {code:koltin} > userTable.join( > skicardsTable.groupBy { key, value -> > KeyValue(value.skicardInfo.ownerId, value.skicardInfo) } > .aggregate( > { mutableSetOf<SkicardInfo>() }, > { ownerId, skicardInfo, accumulator -> > accumulator.put(skicardInfo) }, > { ownerId, skicardInfo, accumulator -> > accumulator }, > skicardByOwnerIdSerde, > skicardByOwnerIdTopicName > ), > { userCreatedOrUpdated, skicardInfoSet -> > UserWithSkicardsCreatedOrUpdated(userCreatedOrUpdated.user, skicardInfoSet) } > ).to( > userWithSkicardsTable > ) > {code} > I think current behavior of `doAggregate` with serdes and/or stores setting > up should be changed because that is incorrect in release 0.10.0.1-cp1 to. -- This message was sent by Atlassian JIRA (v6.3.15#6346)