Thanks, Guozhang, I'll file a JIRA this afternoon. --Fred On Wed, May 18, 2016 at 11:51 AM, Guozhang Wang <wangg...@gmail.com> wrote:
> Fred, > > You are right, only default serdes provided through configs are > auto-configured today. But we could auto-configure other serdes passed > along side the topology builder as well. Do you want to file a JIRA to keep > track of this? > > > Guozhang > > On Wed, May 18, 2016 at 11:36 AM, Fred Patton <thoughtp...@gmail.com> > wrote: > > > Thanks, so much, Guozhang. It didn't work immediately when I tried end of > > day, so I deferred til this morning. It seems I also needed to invoke > > configure manually, or pass the props to the deserializer constructor. I > am > > providing the revised example in case it helps any one. Thank you Liquan > > for pointing out the need for the cast, which was also the case. > > > > Regards, > > Fred > > > > def main(args: Array[String]) { > > > > val schemaRegistry = new CachedSchemaRegistryClient(" > > http://localhost:8081", > > AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT) > > > > val builder: KStreamBuilder = new KStreamBuilder > > > > val streamingConfig = { > > > > val settings = new Properties > > > > settings.put(StreamsConfig.JOB_ID_CONFIG, "kstreams") > > > > settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > "localhost:9092") > > > > settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, > > "localhost:2181") > > > > > settings.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, > > "http://localhost:8081"); > > > > settings.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, > > "io.confluent.kafka.serializers.KafkaAvroSerializer") > > > > settings.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, > > "io.confluent.kafka.serializers.KafkaAvroDeserializer") > > > > settings.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, > > "io.confluent.kafka.serializers.KafkaAvroSerializer") > > > > settings.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > > "io.confluent.kafka.serializers.KafkaAvroDeserializer") > > > > > settings.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, > > "true") > > > > settings > > > > } > > > > > > import KeyValueImplicits._ > > > > > > val stringSerializer = new StringSerializer > > > > val stringDeserializer = new StringDeserializer > > > > val bcAccessLogSerializer = new > > SpecificAvroSerializer[BcAccessLog](schemaRegistry) > > > > val bcAccessLogDeserializer = new > > SpecificAvroDeserializer[BcAccessLog](schemaRegistry) > > > > > > > > > > > bcAccessLogSerializer.configure(streamingConfig.asInstanceOf[java.util.Map[String, > > String]], false) > > > > > > > > > bcAccessLogDeserializer.configure(streamingConfig.asInstanceOf[java.util.Map[String, > > String]], false) > > > > > > val rawSourceStream: KStream[String, String] = > > builder.stream(stringDeserializer, > > > > stringDeserializer, > > > > "raw-accesslog-" + testRun) > > > > > > val filteredStream: KStream[String, BcAccessLog] = > > rawSourceStream.mapValues(parseAsJson) > > > > .filter(isCdcEvent) > > > > .map((key, value) => (getAccessLogKey(value), > toBcAccessLog(value))) > > > > > > filteredStream > > > > .through("accesslog-" + testRun, > > > > stringSerializer, > > > > bcAccessLogSerializer, > > > > stringDeserializer, > > > > bcAccessLogDeserializer) > > > > .mapValues(value => { > > > > println("filteredSourceStream value: " + value) > > > > value > > > > }) > > > > .process(new CdcProcessorSupplier) > > > > > > val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig) > > > > println("\nstart filtering BcAccessLog test run: " + testRun + "\n") > > > > stream.start() > > > > } > > > > On Tue, May 17, 2016 at 5:29 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Hello Fred, > > > > > > Thanks for reporting this. I looked through your code and found this is > > due > > > to a bug in > io.confluent.examples.streams.utils.SpecificAvroDeserializer. > > > > > > I will fix this bug in Confluent's repo, as for you to work around it, > > you > > > can create your own SpecificAvroDeserializer with the correct logic by > > > copy-past Confluent class with the following one-line change: > > > > > > In configure(Map<String, ?> configs, boolean isKey) function, add the > > > following key-value pair to the configs before calling inner.configure: > > > > > > configs.put("specific.avro.reader", "true"); > > > > > > > > > Guozhang > > > > > > On Tue, May 17, 2016 at 11:35 AM, Fred Patton <thoughtp...@gmail.com> > > > wrote: > > > > > > > Before, switching the example from to() to through() things seemed to > > > work > > > > fine. Afterward, I get ClassCastExceptions from GenericRecord to > > > > SpecificRecord. Hopefully, someone can point out something quick and > > dumb > > > > so I can get my demo wrapped up. I never get past the through() > > method. I > > > > am using the Kafka Streams tech preview, and using Avro compiled from > > > > AvroTools. Additionally, initially I had a second parallel > application > > > > taking the output of this to() invocation and continuing with the > same > > > > logic, there was no crash, but no code ever ran such as initializing > > the > > > > context. Any illumination is most appreciated. > > > > > > > > Here's the stack trace followed by the offending bit code... > > > > > > > > [info] CdcProcessorSupplier::get > > > > [info] CdcProcessor::init - ctx: > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl@c5ff552 > > > > [info] CdcProcessor::close > > > > [error] Exception in thread "StreamThread-1" > > > java.lang.ClassCastException: > > > > org.apache.avro.generic.GenericData$Record cannot be cast to > > > > org.apache.avro.specific.SpecificRecord > > > > [error] at > > > > > > > > > > > > > > io.confluent.examples.streams.utils.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:51) > > > > [error] at > > > > > > > > > > > > > > io.confluent.examples.streams.utils.SpecificAvroDeserializer.deserialize(SpecificAvroDeserializer.java:24) > > > > [error] at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:41) > > > > [error] at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:77) > > > > [error] at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:113) > > > > [error] at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:134) > > > > [error] at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:334) > > > > [error] at > > > > > > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:248) > > > > > > > > def main(args: Array[String]) { > > > > val schemaRegistry = new CachedSchemaRegistryClient(" > > > > http://localhost:8081", > > > > AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT) > > > > val builder: KStreamBuilder = new KStreamBuilder > > > > val streamingConfig = { > > > > val settings = new Properties > > > > settings.put(StreamsConfig.JOB_ID_CONFIG, "kstreams") > > > > settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > > > "localhost:9092") > > > > settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, > > > > "localhost:2181") > > > > settings.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, > > > > "io.confluent.kafka.serializers.KafkaAvroSerializer") > > > > settings.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, > > > > "io.confluent.kafka.serializers.KafkaAvroDeserializer") > > > > settings.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, > > > > "io.confluent.kafka.serializers.KafkaAvroSerializer") > > > > settings.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > > > > "io.confluent.kafka.serializers.KafkaAvroDeserializer") > > > > settings > > > > } > > > > > > > > import KeyValueImplicits._ > > > > > > > > val stringSerializer = new StringSerializer > > > > val stringDeserializer = new StringDeserializer > > > > val bcAccessLogSerializer = new > > > > SpecificAvroSerializer[BcAccessLog](schemaRegistry) > > > > val bcAccessLogDeserializer = new > > > > SpecificAvroDeserializer[BcAccessLog](schemaRegistry) > > > > > > > > val rawSourceStream: KStream[String, String] = > > > > builder.stream(stringDeserializer, > > > > stringDeserializer, > > > > "raw-accesslog-" + testRun) > > > > > > > > val filteredStream: KStream[String, BcAccessLog] = > > > > rawSourceStream.mapValues(parseAsJson) > > > > .filter(isCdcEvent) > > > > .map((key, value) => (getAccessLogKey(value), > > > toBcAccessLog(value))) > > > > > > > > filteredStream > > > > .through("accesslog-" + testRun, > > > > stringSerializer, > > > > bcAccessLogSerializer, > > > > stringDeserializer, > > > > bcAccessLogDeserializer) > > > > .mapValues(value => { > > > > println("filteredSourceStream value: " + value) > > > > value > > > > }) > > > > .process(new CdcProcessorSupplier) > > > > > > > > val stream: KafkaStreams = new KafkaStreams(builder, > > streamingConfig) > > > > println("\nstart filtering BcAccessLog test run: " + testRun + > > "\n") > > > > stream.start() > > > > } > > > > > > > > Regards, > > > > Fred Patton > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >