For your first question, no you can use the avro API.
On Mon, Jul 17, 2017 at 2:29 PM Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > Hi - > > I am using Avro Serialization in a Kafka Streams application through the > following dependency .. > > "io.confluent" % "kafka-avro-serializer" % "3.2.2" > > My question is : Is schema registry mandatory for using Avro Serialization > ? Because when I run the application I get the following exception where it > complains that there is no default value for "schema.registry.url". My > current settings for StreamsConfig are the following .. > > settings.put(StreamsConfig.APPLICATION_ID_CONFIG, > "kstream-log-processing-avro") > settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokers) > settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > Serdes.ByteArray.getClass.getName) > settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > classOf[SpecificAvroSerde[LogRecordAvro]]) > > .. and the exception .. > > 23:49:34.054 TKD [StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - > User provided listener > org.apache.kafka.streams.processor.internals.StreamThread$1 for group > kstream-log-processing-avro failed on partition assignment > org.apache.kafka.streams.errors.StreamsException: Failed to configure value > serde class com.lightbend.fdp.sample.kstream.serializers.SpecificAvroSerde > at > org.apache.kafka.streams.StreamsConfig.valueSerde(StreamsConfig.java:594) > at > > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init>(AbstractProcessorContext.java:58) > at > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init>(ProcessorContextImpl.java:41) > at > > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:137) > at > > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > at > > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > at > > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > at > > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > at > > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > at > > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > at > > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) > at > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > Caused by: io.confluent.common.config.ConfigException: Missing required > configuration "schema.registry.url" which has no default value. > at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:241) > at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:76) > at > > io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:51) > at > > io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:33) > at > > io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:49) > at > > com.lightbend.fdp.sample.kstream.serializers.SpecificAvroSerializer.configure(SpecificAvroSerializer.scala:21) > at > > com.lightbend.fdp.sample.kstream.serializers.SpecificAvroSerde.configure(SpecificAvroSerde.scala:18) > at > org.apache.kafka.streams.StreamsConfig.valueSerde(StreamsConfig.java:591) > ... 17 common frames omitted > > regards. > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg >