Hello Fred, Thanks for reporting this. I looked through your code and found this is due to a bug in io.confluent.examples.streams.utils.SpecificAvroDeserializer.
I will fix this bug in Confluent's repo, as for you to work around it, you can create your own SpecificAvroDeserializer with the correct logic by copy-past Confluent class with the following one-line change: In configure(Map<String, ?> configs, boolean isKey) function, add the following key-value pair to the configs before calling inner.configure: configs.put("specific.avro.reader", "true"); Guozhang On Tue, May 17, 2016 at 11:35 AM, Fred Patton <thoughtp...@gmail.com> wrote: > Before, switching the example from to() to through() things seemed to work > fine. Afterward, I get ClassCastExceptions from GenericRecord to > SpecificRecord. Hopefully, someone can point out something quick and dumb > so I can get my demo wrapped up. I never get past the through() method. I > am using the Kafka Streams tech preview, and using Avro compiled from > AvroTools. Additionally, initially I had a second parallel application > taking the output of this to() invocation and continuing with the same > logic, there was no crash, but no code ever ran such as initializing the > context. Any illumination is most appreciated. > > Here's the stack trace followed by the offending bit code... > > [info] CdcProcessorSupplier::get > [info] CdcProcessor::init - ctx: > org.apache.kafka.streams.processor.internals.ProcessorContextImpl@c5ff552 > [info] CdcProcessor::close > [error] Exception in thread "StreamThread-1" java.lang.ClassCastException: > org.apache.avro.generic.GenericData$Record cannot be cast to > org.apache.avro.specific.SpecificRecord > [error] at > > io.confluent.examples.streams.utils.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:51) > [error] at > > io.confluent.examples.streams.utils.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:24) > [error] at > > org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:41) > [error] at > > org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:77) > [error] at > > org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:113) > [error] at > > org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:134) > [error] at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:334) > [error] at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248) > > def main(args: Array[String]) { > val schemaRegistry = new CachedSchemaRegistryClient(" > http://localhost:8081", > AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT) > val builder: KStreamBuilder = new KStreamBuilder > val streamingConfig = { > val settings = new Properties > settings.put(StreamsConfig.JOB_ID_CONFIG, "kstreams") > settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092") > settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, > "localhost:2181") > settings.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, > "io.confluent.kafka.serializers.KafkaAvroSerializer") > settings.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, > "io.confluent.kafka.serializers.KafkaAvroDeserializer") > settings.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, > "io.confluent.kafka.serializers.KafkaAvroSerializer") > settings.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "io.confluent.kafka.serializers.KafkaAvroDeserializer") > settings > } > > import KeyValueImplicits._ > > val stringSerializer = new StringSerializer > val stringDeserializer = new StringDeserializer > val bcAccessLogSerializer = new > SpecificAvroSerializer[BcAccessLog](schemaRegistry) > val bcAccessLogDeserializer = new > SpecificAvroDeserializer[BcAccessLog](schemaRegistry) > > val rawSourceStream: KStream[String, String] = > builder.stream(stringDeserializer, > stringDeserializer, > "raw-accesslog-" + testRun) > > val filteredStream: KStream[String, BcAccessLog] = > rawSourceStream.mapValues(parseAsJson) > .filter(isCdcEvent) > .map((key, value) => (getAccessLogKey(value), toBcAccessLog(value))) > > filteredStream > .through("accesslog-" + testRun, > stringSerializer, > bcAccessLogSerializer, > stringDeserializer, > bcAccessLogDeserializer) > .mapValues(value => { > println("filteredSourceStream value: " + value) > value > }) > .process(new CdcProcessorSupplier) > > val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig) > println("\nstart filtering BcAccessLog test run: " + testRun + "\n") > stream.start() > } > > Regards, > Fred Patton > -- -- Guozhang