[ https://issues.apache.org/jira/browse/KAFKA-4612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15819174#comment-15819174 ]
Matthias J. Sax commented on KAFKA-4612: ---------------------------------------- If you add {{.through()}} there will be not internally created repartitioning topic because this it not created by {{selectKey}} (it only labels its output as {{requiredRepartitioning}}) but by the following {{leftJoin}}. If you add {{through}} the flag {{requiresRepartitioning}} will be {{false}} for {{leftLeft}} input and thus no internal topic is created. Yes, {{map}}, {{flatMap}}, and {{transform}} have the same issue. > Simple Kafka Streams app causes "ClassCastException: java.lang.String cannot > be cast to [B" > ------------------------------------------------------------------------------------------- > > Key: KAFKA-4612 > URL: https://issues.apache.org/jira/browse/KAFKA-4612 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.1.1 > Environment: Virtual Machine using Debian 8 + Confluent Platform > 3.1.1. > Reporter: Kurt Ostfeld > Attachments: KafkaIsolatedBug.tar.gz > > > I've attached a minimal single source file project that reliably reproduces > this issue. > This project does the following: > 1) Create test input data. Produces a single random (String,String) record > into two diferent topics "topicInput" and "topicTable" > 2) Creates and runs a Kafka Streams application: > val kafkaTable: KTable[String, String] = builder.table(Serdes.String, > Serdes.String, "topicTable", "topicTable") > val incomingRecords: KStream[String, String] = > builder.stream(Serdes.String, Serdes.String, "topicInput") > val reKeyedRecords: KStream[String, String] = > incomingRecords.selectKey((k, _) => k) > val joinedRecords: KStream[String, String] = > reKeyedRecords.leftJoin(kafkaTable, (s1: String, _: String) => s1) > joinedRecords.to(Serdes.String, Serdes.String, "topicOutput") > This reliably generates the following error: > [error] (StreamThread-1) java.lang.ClassCastException: java.lang.String > cannot be cast to [B > java.lang.ClassCastException: java.lang.String cannot be cast to [B > at > org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:18) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:63) > at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) > at > org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) > at > org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) > One caveat: I'm running this on a Confluent Platform 3.1.1 instance which > uses Kafka 0.10.1.0 since there is no newer Confluent Platform available. The > Kafka Streams project is built using "kafka-clients" and "kafka-streams" > version 0.10.1.1. If I use 0.10.1.0, I reliably hit bug > https://issues.apache.org/jira/browse/KAFKA-4355. I am not sure if there is > any issue using 0.10.1.1 libraries with a Confluent Platform running Kafka > 0.10.1.0. I will obviously try the next Confluent Platform binary when it is > available. -- This message was sent by Atlassian JIRA (v6.3.4#6332)