[ https://issues.apache.org/jira/browse/KAFKA-4270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15678967#comment-15678967 ]
Mykola Polonskyi commented on KAFKA-4270: ----------------------------------------- Hello [~damianguy] I tried to do remappring with doAggragete. {code} cardId -> card(val userId, val cardId) {code} to {code} userId -> card(val userId, val cardId) {code} and then add adding to user(val userId, val setOfCards) card that was agregated. Looks like relation one-to-many I think. > ClassCast for Agregation > ------------------------ > > Key: KAFKA-4270 > URL: https://issues.apache.org/jira/browse/KAFKA-4270 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Mykola Polonskyi > Assignee: Damian Guy > Priority: Critical > Labels: architecture > > With defined serdes for intermediate topic in aggregation catch the > ClassCastException: from custom class to the ByteArray. > In debug I saw that defined serde isn't used for creation sinkNode (incide > `org.apache.kafka.streams.kstream.internals.KGroupedTableImpl#doAggregate`) > Instead defined serde inside aggregation call is used default Impl with empty > plugs instead of implementations > {code:koltin} > userTable.join( > skicardsTable.groupBy { key, value -> > KeyValue(value.skicardInfo.ownerId, value.skicardInfo) } > .aggregate( > { mutableSetOf<SkicardInfo>() }, > { ownerId, skicardInfo, accumulator -> > accumulator.put(skicardInfo) }, > { ownerId, skicardInfo, accumulator -> > accumulator }, > skicardByOwnerIdSerde, > skicardByOwnerIdTopicName > ), > { userCreatedOrUpdated, skicardInfoSet -> > UserWithSkicardsCreatedOrUpdated(userCreatedOrUpdated.user, skicardInfoSet) } > ).to( > userWithSkicardsTable > ) > {code} > I think current behavior of `doAggregate` with serdes and/or stores setting > up should be changed because that is incorrect in release 0.10.0.1-cp1 to. -- This message was sent by Atlassian JIRA (v6.3.4#6332)