[ https://issues.apache.org/jira/browse/KAFKA-10049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17119197#comment-17119197 ]
Adam Bellemare commented on KAFKA-10049: ---------------------------------------- Found the bug. The SubscriptionWrapperSerde wraps a Key Serde, but it is being overwritten by a valueSerde during init. Here's the line where the key is overwritten: [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java#L68] Here's the init where it calls setIfUnset (notice that it is passing in the valSerializer and valDeserializer, whereas SubscriptionWrapperSerde is expecting to be passed in a keySerde). [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java#L72 https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java#L92|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java#L72] I'll take a look at fixing it, but the workflow is fuzzy to me, so it may take a bit of time for me to understand it. > KTable-KTable Foreign Key join throwing Serialization Exception > ---------------------------------------------------------------- > > Key: KAFKA-10049 > URL: https://issues.apache.org/jira/browse/KAFKA-10049 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.5.0, 2.6.0 > Reporter: Amit Chauhan > Assignee: John Roesler > Priority: Blocker > > I want to make use of _KTable-KTable_ Foreign Key join feature released in > *_2.5.0_* but facing issue while running the code. > {code:java} > > public static void main(String[] args) { > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, > "my-stream-processing-application-2"); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new > JSONSerdeComp<>().getClass()); > props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp"); > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > StreamsBuilder builder = new StreamsBuilder(); > KTable<String, OrderObject> ordersTable = builder.<String, > OrderObject>table(TOPIC_Agora); > KTable<String, StockMarketData> stockTable = builder.<String, > StockMarketData>table(TOPIC_Stock_Data); > KTable<String, EnrichedOrder> enriched = > ordersTable.leftJoin(stockTable, OrderObject:: getSymbol, new > ValueJoiner<OrderObject, StockMarketData, EnrichedOrder>() { > @Override > public EnrichedOrder apply(OrderObject order, StockMarketData > stock) { > EnrichedOrder enOrder = EnrichedOrder.builder() > .orderId(order.getOrderId()) > .execPrice(order.getPrice()) > .symbol(order.getSymbol()) > .quanity(order.getQuanity()) > .side(order.getSide()) > .filledQty(order.getFilledQty()) > .leaveQty(order.getLeaveQty()) > .index(order.getIndex()) > .vWaprelative(order.getVWaprelative()) > > .stockAsk(stock!=null?stock.getAsk().doubleValue():0.0) > > .stockBid(stock!=null?stock.getBid().doubleValue():0.0) > > .stockLast(stock!=null?stock.getLast().doubleValue():0.0) > > .stockClose(stock!=null?stock.getClose().doubleValue():0.0) > .build(); > return enOrder; > } > } , Materialized.with(Serdes.String(), new JSONSerdeComp<>())); > enriched.toStream().foreach(new ForeachAction<String, EnrichedOrder>() \{ > @Override > public void apply(String arg0, EnrichedOrder arg1) { > logger.info(String.format("key = %s, value = %s", arg0, arg1)); > } > }); > KafkaStreams streams = new KafkaStreams(builder.build(), props); > streams.start(); > Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close())); > }}} > > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka-clients</artifactId> > <version>2.5.0</version> > </dependency> > <dependency> > <groupId>org.apache.kafka</groupId> > <artifactId>kafka-streams</artifactId> > <version>2.5.0</version> > </dependency> > {code} > *+Exception:+* > {code:java} > 18:49:31.525 > [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1] > ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager - > stream-thread > [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1] > task [0_0] Failed to flush state store orders-STATE-STORE-0000000000: > org.apache.kafka.streams.errors.StreamsException: ClassCastException > while producing data to a sink topic. A serializer (key: > org.apache.kafka.common.serialization.StringSerializer / value: > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer) > is not compatible to the actual key or value type (key type: > java.lang.String / value type: > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper). > Change the default Serdes in StreamConfig or provide correct Serdes via > method parameters (for example if using the DSL, `#to(String topic, > Produced<K, V> produced)` with > `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`). > at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:157) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:71) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:119) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:272) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$flush$7(MeteredKeyValueStore.java:192) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:192) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:282) > [kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:177) > [kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:554) > [kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:490) > [kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:478) > [kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:226) > [kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:543) > [kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:977) > [kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823) > [kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) > [kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > [kafka-streams-2.5.0.jar:?] > Caused by: java.lang.ClassCastException: java.lang.String cannot be cast > to com.messages.JSONSerdeCompatible > at com.messages.JSONSerdeComp.serialize(JSONSerdeComp.java:1) > ~[classes/:?] > at > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:79) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:51) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) > ~[kafka-clients-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:176) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111) > ~[kafka-streams-2.5.0.jar:?] > at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89) > ~[kafka-streams-2.5.0.jar:?] > ... 34 more > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)