Hello Fred,

Thanks for reporting this. I looked through your code and found this is due
to a bug in io.confluent.examples.streams.utils.SpecificAvroDeserializer.

I will fix this bug in Confluent's repo, as for you to work around it, you
can create your own SpecificAvroDeserializer with the correct logic by
copy-past Confluent class with the following one-line change:

In configure(Map<String, ?> configs, boolean isKey) function, add the
following key-value pair to the configs before calling inner.configure:

configs.put("specific.avro.reader", "true");


Guozhang

On Tue, May 17, 2016 at 11:35 AM, Fred Patton <thoughtp...@gmail.com> wrote:

> Before, switching the example from to() to through() things seemed to work
> fine. Afterward, I get ClassCastExceptions from GenericRecord to
> SpecificRecord. Hopefully, someone can point out something quick and dumb
> so I can get my demo wrapped up. I never get past the through() method. I
> am using the Kafka Streams tech preview, and using Avro compiled from
> AvroTools. Additionally, initially I had a second parallel application
> taking the output of this to() invocation and continuing with the same
> logic, there was no crash, but no code ever ran such as initializing the
> context. Any illumination is most appreciated.
>
> Here's the stack trace followed by the offending bit code...
>
> [info] CdcProcessorSupplier::get
> [info] CdcProcessor::init - ctx:
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl@c5ff552
> [info] CdcProcessor::close
> [error] Exception in thread "StreamThread-1" java.lang.ClassCastException:
> org.apache.avro.generic.GenericData$Record cannot be cast to
> org.apache.avro.specific.SpecificRecord
> [error] at
>
> io.confluent.examples.streams.utils.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:51)
> [error] at
>
> io.confluent.examples.streams.utils.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:24)
> [error] at
>
> org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:41)
> [error] at
>
> org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:77)
> [error] at
>
> org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:113)
> [error] at
>
> org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:134)
> [error] at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:334)
> [error] at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248)
>
>   def main(args: Array[String]) {
>     val schemaRegistry = new CachedSchemaRegistryClient("
> http://localhost:8081";,
> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT)
>     val builder: KStreamBuilder = new KStreamBuilder
>     val streamingConfig = {
>       val settings = new Properties
>       settings.put(StreamsConfig.JOB_ID_CONFIG, "kstreams")
>       settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092")
>       settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> "localhost:2181")
>       settings.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG,
> "io.confluent.kafka.serializers.KafkaAvroSerializer")
>       settings.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> "io.confluent.kafka.serializers.KafkaAvroDeserializer")
>       settings.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> "io.confluent.kafka.serializers.KafkaAvroSerializer")
>       settings.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> "io.confluent.kafka.serializers.KafkaAvroDeserializer")
>       settings
>     }
>
>     import KeyValueImplicits._
>
>     val stringSerializer = new StringSerializer
>     val stringDeserializer = new StringDeserializer
>     val bcAccessLogSerializer = new
> SpecificAvroSerializer[BcAccessLog](schemaRegistry)
>     val bcAccessLogDeserializer = new
> SpecificAvroDeserializer[BcAccessLog](schemaRegistry)
>
>     val rawSourceStream: KStream[String, String] =
> builder.stream(stringDeserializer,
>       stringDeserializer,
>       "raw-accesslog-" + testRun)
>
>     val filteredStream: KStream[String, BcAccessLog] =
> rawSourceStream.mapValues(parseAsJson)
>       .filter(isCdcEvent)
>       .map((key, value) => (getAccessLogKey(value), toBcAccessLog(value)))
>
>     filteredStream
>       .through("accesslog-" + testRun,
>                stringSerializer,
>                bcAccessLogSerializer,
>                stringDeserializer,
>                bcAccessLogDeserializer)
>       .mapValues(value => {
>                    println("filteredSourceStream value: " + value)
>                    value
>                  })
>       .process(new CdcProcessorSupplier)
>
>     val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
>     println("\nstart filtering BcAccessLog test run: " + testRun + "\n")
>     stream.start()
>   }
>
> Regards,
> Fred Patton
>



-- 
-- Guozhang

Reply via email to