Hi Fred, You probably need to cast the configs to Map<String, Object> before adding the specific.avro.reader config.
Thanks, Liquan On Tue, May 17, 2016 at 5:29 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Fred, > > Thanks for reporting this. I looked through your code and found this is due > to a bug in io.confluent.examples.streams.utils.SpecificAvroDeserializer. > > I will fix this bug in Confluent's repo, as for you to work around it, you > can create your own SpecificAvroDeserializer with the correct logic by > copy-past Confluent class with the following one-line change: > > In configure(Map<String, ?> configs, boolean isKey) function, add the > following key-value pair to the configs before calling inner.configure: > > configs.put("specific.avro.reader", "true"); > > > Guozhang > > On Tue, May 17, 2016 at 11:35 AM, Fred Patton <thoughtp...@gmail.com> > wrote: > > > Before, switching the example from to() to through() things seemed to > work > > fine. Afterward, I get ClassCastExceptions from GenericRecord to > > SpecificRecord. Hopefully, someone can point out something quick and dumb > > so I can get my demo wrapped up. I never get past the through() method. I > > am using the Kafka Streams tech preview, and using Avro compiled from > > AvroTools. Additionally, initially I had a second parallel application > > taking the output of this to() invocation and continuing with the same > > logic, there was no crash, but no code ever ran such as initializing the > > context. Any illumination is most appreciated. > > > > Here's the stack trace followed by the offending bit code... > > > > [info] CdcProcessorSupplier::get > > [info] CdcProcessor::init - ctx: > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl@c5ff552 > > [info] CdcProcessor::close > > [error] Exception in thread "StreamThread-1" > java.lang.ClassCastException: > > org.apache.avro.generic.GenericData$Record cannot be cast to > > org.apache.avro.specific.SpecificRecord > > [error] at > > > > > io.confluent.examples.streams.utils.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:51) > > [error] at > > > > > io.confluent.examples.streams.utils.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:24) > > [error] at > > > > > org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:41) > > [error] at > > > > > org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:77) > > [error] at > > > > > org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:113) > > [error] at > > > > > org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:134) > > [error] at > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:334) > > [error] at > > > > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248) > > > > def main(args: Array[String]) { > > val schemaRegistry = new CachedSchemaRegistryClient(" > > http://localhost:8081", > > AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT) > > val builder: KStreamBuilder = new KStreamBuilder > > val streamingConfig = { > > val settings = new Properties > > settings.put(StreamsConfig.JOB_ID_CONFIG, "kstreams") > > settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > "localhost:9092") > > settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, > > "localhost:2181") > > settings.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, > > "io.confluent.kafka.serializers.KafkaAvroSerializer") > > settings.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, > > "io.confluent.kafka.serializers.KafkaAvroDeserializer") > > settings.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, > > "io.confluent.kafka.serializers.KafkaAvroSerializer") > > settings.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > > "io.confluent.kafka.serializers.KafkaAvroDeserializer") > > settings > > } > > > > import KeyValueImplicits._ > > > > val stringSerializer = new StringSerializer > > val stringDeserializer = new StringDeserializer > > val bcAccessLogSerializer = new > > SpecificAvroSerializer[BcAccessLog](schemaRegistry) > > val bcAccessLogDeserializer = new > > SpecificAvroDeserializer[BcAccessLog](schemaRegistry) > > > > val rawSourceStream: KStream[String, String] = > > builder.stream(stringDeserializer, > > stringDeserializer, > > "raw-accesslog-" + testRun) > > > > val filteredStream: KStream[String, BcAccessLog] = > > rawSourceStream.mapValues(parseAsJson) > > .filter(isCdcEvent) > > .map((key, value) => (getAccessLogKey(value), > toBcAccessLog(value))) > > > > filteredStream > > .through("accesslog-" + testRun, > > stringSerializer, > > bcAccessLogSerializer, > > stringDeserializer, > > bcAccessLogDeserializer) > > .mapValues(value => { > > println("filteredSourceStream value: " + value) > > value > > }) > > .process(new CdcProcessorSupplier) > > > > val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig) > > println("\nstart filtering BcAccessLog test run: " + testRun + "\n") > > stream.start() > > } > > > > Regards, > > Fred Patton > > > > > > -- > -- Guozhang > -- Liquan Pei Software Engineer, Confluent Inc