[ 
https://issues.apache.org/jira/browse/KAFKA-10049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Chauhan updated KAFKA-10049:
---------------------------------
    Description: 
 I want to make use of _KTable-KTable_ Foreign Key join feature released in 
*_2.5.0_* but facing issue while running the code. 
{code:java}
 

 public static void main(String[] args) {

     Properties props = new Properties();
     props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"my-stream-processing-application-2");
     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
     props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
     props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new 
JSONSerdeComp<>().getClass());
     props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

     StreamsBuilder builder = new StreamsBuilder();
     KTable<String, OrderObject> ordersTable = builder.<String, 
OrderObject>table(TOPIC_Agora);
     KTable<String, StockMarketData> stockTable = builder.<String, 
StockMarketData>table(TOPIC_Stock_Data);

     KTable<String, EnrichedOrder> enriched = ordersTable.leftJoin(stockTable, 
OrderObject:: getSymbol, new ValueJoiner<OrderObject, StockMarketData, 
EnrichedOrder>() {

            @Override
            public EnrichedOrder apply(OrderObject order, StockMarketData 
stock) {
                EnrichedOrder enOrder = EnrichedOrder.builder()
                        .orderId(order.getOrderId())
                        .execPrice(order.getPrice())
                        .symbol(order.getSymbol())
                        .quanity(order.getQuanity())
                        .side(order.getSide())
                        .filledQty(order.getFilledQty())
                        .leaveQty(order.getLeaveQty())
                        .index(order.getIndex())
                        .vWaprelative(order.getVWaprelative())
                        .stockAsk(stock!=null?stock.getAsk().doubleValue():0.0)
                        .stockBid(stock!=null?stock.getBid().doubleValue():0.0)
                        
.stockLast(stock!=null?stock.getLast().doubleValue():0.0)
                        
.stockClose(stock!=null?stock.getClose().doubleValue():0.0)
                        .build();
                return enOrder;
            }
        } , Materialized.with(Serdes.String(), new JSONSerdeComp<>()));

     enriched.toStream().foreach(new ForeachAction<String, EnrichedOrder>() \{
         @Override
        public void apply(String arg0, EnrichedOrder arg1) {

             logger.info(String.format("key = %s, value = %s", arg0, arg1));
        }
    });

     KafkaStreams streams = new KafkaStreams(builder.build(), props);
     streams.start();

     Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
}}}



 

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.5.0</version>
    </dependency>

{code}
*+Exception:+*
{code:java}
18:49:31.525 
[my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
 ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager - 
stream-thread 
[my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
 task [0_0] Failed to flush state store orders-STATE-STORE-0000000000: 
    org.apache.kafka.streams.errors.StreamsException: ClassCastException while 
producing data to a sink topic. A serializer (key: 
org.apache.kafka.common.serialization.StringSerializer / value: 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer)
 is not compatible to the actual key or value type (key type: java.lang.String 
/ value type: 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper). 
Change the default Serdes in StreamConfig or provide correct Serdes via method 
parameters (for example if using the DSL, `#to(String topic, Produced<K, V> 
produced)` with 
`Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
        at 
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94) 
~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:157)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:71)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:119)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) 
~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109) 
~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:272)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$flush$7(MeteredKeyValueStore.java:192)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:192)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:282)
 [kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:177)
 [kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:554)
 [kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:490)
 [kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:478)
 [kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:226)
 [kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:543)
 [kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:977)
 [kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823)
 [kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
 [kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
 [kafka-streams-2.5.0.jar:?]
    Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to 
com.messages.JSONSerdeCompatible
        at com.messages.JSONSerdeComp.serialize(JSONSerdeComp.java:1) 
~[classes/:?]
        at 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:79)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:51)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) 
~[kafka-clients-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:176)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89) 
~[kafka-streams-2.5.0.jar:?]
        ... 34 more
{code}

  was:
 I want to make use of _KTable-KTable_ join feature released in *_2.5.0_* but 
facing issue while running the code.
{code:java}
 

 public static void main(String[] args) {

     Properties props = new Properties();
     props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"my-stream-processing-application-2");
     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
     props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
     props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new 
JSONSerdeComp<>().getClass());
     props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

     StreamsBuilder builder = new StreamsBuilder();
     KTable<String, OrderObject> ordersTable = builder.<String, 
OrderObject>table(TOPIC_Agora);
     KTable<String, StockMarketData> stockTable = builder.<String, 
StockMarketData>table(TOPIC_Stock_Data);

     KTable<String, EnrichedOrder> enriched = ordersTable.leftJoin(stockTable, 
OrderObject:: getSymbol, new ValueJoiner<OrderObject, StockMarketData, 
EnrichedOrder>() {

            @Override
            public EnrichedOrder apply(OrderObject order, StockMarketData 
stock) {
                EnrichedOrder enOrder = EnrichedOrder.builder()
                        .orderId(order.getOrderId())
                        .execPrice(order.getPrice())
                        .symbol(order.getSymbol())
                        .quanity(order.getQuanity())
                        .side(order.getSide())
                        .filledQty(order.getFilledQty())
                        .leaveQty(order.getLeaveQty())
                        .index(order.getIndex())
                        .vWaprelative(order.getVWaprelative())
                        .stockAsk(stock!=null?stock.getAsk().doubleValue():0.0)
                        .stockBid(stock!=null?stock.getBid().doubleValue():0.0)
                        
.stockLast(stock!=null?stock.getLast().doubleValue():0.0)
                        
.stockClose(stock!=null?stock.getClose().doubleValue():0.0)
                        .build();
                return enOrder;
            }
        } , Materialized.with(Serdes.String(), new JSONSerdeComp<>()));

     enriched.toStream().foreach(new ForeachAction<String, EnrichedOrder>() \{
         @Override
        public void apply(String arg0, EnrichedOrder arg1) {

             logger.info(String.format("key = %s, value = %s", arg0, arg1));
        }
    });

     KafkaStreams streams = new KafkaStreams(builder.build(), props);
     streams.start();

     Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
}}}



 

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.5.0</version>
    </dependency>

{code}

*+Exception:+*

{code}
18:49:31.525 
[my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
 ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager - 
stream-thread 
[my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
 task [0_0] Failed to flush state store orders-STATE-STORE-0000000000: 
    org.apache.kafka.streams.errors.StreamsException: ClassCastException while 
producing data to a sink topic. A serializer (key: 
org.apache.kafka.common.serialization.StringSerializer / value: 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer)
 is not compatible to the actual key or value type (key type: java.lang.String 
/ value type: 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper). 
Change the default Serdes in StreamConfig or provide correct Serdes via method 
parameters (for example if using the DSL, `#to(String topic, Produced<K, V> 
produced)` with 
`Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
        at 
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94) 
~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:157)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:71)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:119)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) 
~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109) 
~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:272)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$flush$7(MeteredKeyValueStore.java:192)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:192)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:282)
 [kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:177)
 [kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:554)
 [kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:490)
 [kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:478)
 [kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:226)
 [kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:543)
 [kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:977)
 [kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823)
 [kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
 [kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
 [kafka-streams-2.5.0.jar:?]
    Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to 
com.messages.JSONSerdeCompatible
        at com.messages.JSONSerdeComp.serialize(JSONSerdeComp.java:1) 
~[classes/:?]
        at 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:79)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:51)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) 
~[kafka-clients-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:176)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111)
 ~[kafka-streams-2.5.0.jar:?]
        at 
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89) 
~[kafka-streams-2.5.0.jar:?]
        ... 34 more
{code}


> KTable-KTable Foreign Key join throwing Serialization Exception 
> ----------------------------------------------------------------
>
>                 Key: KAFKA-10049
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10049
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.5.0
>            Reporter: Amit Chauhan
>            Priority: Blocker
>
>  I want to make use of _KTable-KTable_ Foreign Key join feature released in 
> *_2.5.0_* but facing issue while running the code. 
> {code:java}
>  
>  public static void main(String[] args) {
>      Properties props = new Properties();
>      props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "my-stream-processing-application-2");
>      props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
>      props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
>      props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new 
> JSONSerdeComp<>().getClass());
>      props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
>      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>      StreamsBuilder builder = new StreamsBuilder();
>      KTable<String, OrderObject> ordersTable = builder.<String, 
> OrderObject>table(TOPIC_Agora);
>      KTable<String, StockMarketData> stockTable = builder.<String, 
> StockMarketData>table(TOPIC_Stock_Data);
>      KTable<String, EnrichedOrder> enriched = 
> ordersTable.leftJoin(stockTable, OrderObject:: getSymbol, new 
> ValueJoiner<OrderObject, StockMarketData, EnrichedOrder>() {
>             @Override
>             public EnrichedOrder apply(OrderObject order, StockMarketData 
> stock) {
>                 EnrichedOrder enOrder = EnrichedOrder.builder()
>                         .orderId(order.getOrderId())
>                         .execPrice(order.getPrice())
>                         .symbol(order.getSymbol())
>                         .quanity(order.getQuanity())
>                         .side(order.getSide())
>                         .filledQty(order.getFilledQty())
>                         .leaveQty(order.getLeaveQty())
>                         .index(order.getIndex())
>                         .vWaprelative(order.getVWaprelative())
>                         
> .stockAsk(stock!=null?stock.getAsk().doubleValue():0.0)
>                         
> .stockBid(stock!=null?stock.getBid().doubleValue():0.0)
>                         
> .stockLast(stock!=null?stock.getLast().doubleValue():0.0)
>                         
> .stockClose(stock!=null?stock.getClose().doubleValue():0.0)
>                         .build();
>                 return enOrder;
>             }
>         } , Materialized.with(Serdes.String(), new JSONSerdeComp<>()));
>      enriched.toStream().foreach(new ForeachAction<String, EnrichedOrder>() \{
>          @Override
>         public void apply(String arg0, EnrichedOrder arg1) {
>              logger.info(String.format("key = %s, value = %s", arg0, arg1));
>         }
>     });
>      KafkaStreams streams = new KafkaStreams(builder.build(), props);
>      streams.start();
>      Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
> }}}
>  
>     <dependency>
>         <groupId>org.apache.kafka</groupId>
>         <artifactId>kafka-clients</artifactId>
>         <version>2.5.0</version>
>     </dependency>
>     <dependency>
>         <groupId>org.apache.kafka</groupId>
>         <artifactId>kafka-streams</artifactId>
>         <version>2.5.0</version>
>     </dependency>
> {code}
> *+Exception:+*
> {code:java}
> 18:49:31.525 
> [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
>  ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager - 
> stream-thread 
> [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
>  task [0_0] Failed to flush state store orders-STATE-STORE-0000000000: 
>     org.apache.kafka.streams.errors.StreamsException: ClassCastException 
> while producing data to a sink topic. A serializer (key: 
> org.apache.kafka.common.serialization.StringSerializer / value: 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer)
>  is not compatible to the actual key or value type (key type: 
> java.lang.String / value type: 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper).
>  Change the default Serdes in StreamConfig or provide correct Serdes via 
> method parameters (for example if using the DSL, `#to(String topic, 
> Produced<K, V> produced)` with 
> `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
>         at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:157)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:71)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:119)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:272)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$flush$7(MeteredKeyValueStore.java:192)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:192)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:282)
>  [kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:177)
>  [kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:554)
>  [kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:490)
>  [kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:478)
>  [kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:226)
>  [kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:543)
>  [kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:977)
>  [kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823)
>  [kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>  [kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
>  [kafka-streams-2.5.0.jar:?]
>     Caused by: java.lang.ClassCastException: java.lang.String cannot be cast 
> to com.messages.JSONSerdeCompatible
>         at com.messages.JSONSerdeComp.serialize(JSONSerdeComp.java:1) 
> ~[classes/:?]
>         at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:79)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:51)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
>  ~[kafka-clients-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:176)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111)
>  ~[kafka-streams-2.5.0.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
>  ~[kafka-streams-2.5.0.jar:?]
>         ... 34 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to