Before, switching the example from to() to through() things seemed to work fine. Afterward, I get ClassCastExceptions from GenericRecord to SpecificRecord. Hopefully, someone can point out something quick and dumb so I can get my demo wrapped up. I never get past the through() method. I am using the Kafka Streams tech preview, and using Avro compiled from AvroTools. Additionally, initially I had a second parallel application taking the output of this to() invocation and continuing with the same logic, there was no crash, but no code ever ran such as initializing the context. Any illumination is most appreciated.
Here's the stack trace followed by the offending bit code... [info] CdcProcessorSupplier::get [info] CdcProcessor::init - ctx: org.apache.kafka.streams.processor.internals.ProcessorContextImpl@c5ff552 [info] CdcProcessor::close [error] Exception in thread "StreamThread-1" java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to org.apache.avro.specific.SpecificRecord [error] at io.confluent.examples.streams.utils.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:51) [error] at io.confluent.examples.streams.utils.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:24) [error] at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:41) [error] at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:77) [error] at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:113) [error] at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:134) [error] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:334) [error] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248) def main(args: Array[String]) { val schemaRegistry = new CachedSchemaRegistryClient(" http://localhost:8081", AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT) val builder: KStreamBuilder = new KStreamBuilder val streamingConfig = { val settings = new Properties settings.put(StreamsConfig.JOB_ID_CONFIG, "kstreams") settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181") settings.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer") settings.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer") settings.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer") settings.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer") settings } import KeyValueImplicits._ val stringSerializer = new StringSerializer val stringDeserializer = new StringDeserializer val bcAccessLogSerializer = new SpecificAvroSerializer[BcAccessLog](schemaRegistry) val bcAccessLogDeserializer = new SpecificAvroDeserializer[BcAccessLog](schemaRegistry) val rawSourceStream: KStream[String, String] = builder.stream(stringDeserializer, stringDeserializer, "raw-accesslog-" + testRun) val filteredStream: KStream[String, BcAccessLog] = rawSourceStream.mapValues(parseAsJson) .filter(isCdcEvent) .map((key, value) => (getAccessLogKey(value), toBcAccessLog(value))) filteredStream .through("accesslog-" + testRun, stringSerializer, bcAccessLogSerializer, stringDeserializer, bcAccessLogDeserializer) .mapValues(value => { println("filteredSourceStream value: " + value) value }) .process(new CdcProcessorSupplier) val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig) println("\nstart filtering BcAccessLog test run: " + testRun + "\n") stream.start() } Regards, Fred Patton