Before, switching the example from to() to through() things seemed to work
fine. Afterward, I get ClassCastExceptions from GenericRecord to
SpecificRecord. Hopefully, someone can point out something quick and dumb
so I can get my demo wrapped up. I never get past the through() method. I
am using the Kafka Streams tech preview, and using Avro compiled from
AvroTools. Additionally, initially I had a second parallel application
taking the output of this to() invocation and continuing with the same
logic, there was no crash, but no code ever ran such as initializing the
context. Any illumination is most appreciated.

Here's the stack trace followed by the offending bit code...

[info] CdcProcessorSupplier::get
[info] CdcProcessor::init - ctx:
org.apache.kafka.streams.processor.internals.ProcessorContextImpl@c5ff552
[info] CdcProcessor::close
[error] Exception in thread "StreamThread-1" java.lang.ClassCastException:
org.apache.avro.generic.GenericData$Record cannot be cast to
org.apache.avro.specific.SpecificRecord
[error] at
io.confluent.examples.streams.utils.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:51)
[error] at
io.confluent.examples.streams.utils.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:24)
[error] at
org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:41)
[error] at
org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:77)
[error] at
org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:113)
[error] at
org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:134)
[error] at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:334)
[error] at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248)

  def main(args: Array[String]) {
    val schemaRegistry = new CachedSchemaRegistryClient("
http://localhost:8081";,
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT)
    val builder: KStreamBuilder = new KStreamBuilder
    val streamingConfig = {
      val settings = new Properties
      settings.put(StreamsConfig.JOB_ID_CONFIG, "kstreams")
      settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
      settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181")
      settings.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroSerializer")
      settings.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroDeserializer")
      settings.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroSerializer")
      settings.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroDeserializer")
      settings
    }

    import KeyValueImplicits._

    val stringSerializer = new StringSerializer
    val stringDeserializer = new StringDeserializer
    val bcAccessLogSerializer = new
SpecificAvroSerializer[BcAccessLog](schemaRegistry)
    val bcAccessLogDeserializer = new
SpecificAvroDeserializer[BcAccessLog](schemaRegistry)

    val rawSourceStream: KStream[String, String] =
builder.stream(stringDeserializer,
      stringDeserializer,
      "raw-accesslog-" + testRun)

    val filteredStream: KStream[String, BcAccessLog] =
rawSourceStream.mapValues(parseAsJson)
      .filter(isCdcEvent)
      .map((key, value) => (getAccessLogKey(value), toBcAccessLog(value)))

    filteredStream
      .through("accesslog-" + testRun,
               stringSerializer,
               bcAccessLogSerializer,
               stringDeserializer,
               bcAccessLogDeserializer)
      .mapValues(value => {
                   println("filteredSourceStream value: " + value)
                   value
                 })
      .process(new CdcProcessorSupplier)

    val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
    println("\nstart filtering BcAccessLog test run: " + testRun + "\n")
    stream.start()
  }

Regards,
Fred Patton

Reply via email to