Hi - I am using Avro Serialization in a Kafka Streams application through the following dependency ..
"io.confluent" % "kafka-avro-serializer" % "3.2.2" My question is : Is schema registry mandatory for using Avro Serialization ? Because when I run the application I get the following exception where it complains that there is no default value for "schema.registry.url". My current settings for StreamsConfig are the following .. settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "kstream-log-processing-avro") settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokers) settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray.getClass.getName) settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[SpecificAvroSerde[LogRecordAvro]]) .. and the exception .. 23:49:34.054 TKD [StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group kstream-log-processing-avro failed on partition assignment org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class com.lightbend.fdp.sample.kstream.serializers.SpecificAvroSerde at org.apache.kafka.streams.StreamsConfig.valueSerde(StreamsConfig.java:594) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init>(AbstractProcessorContext.java:58) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init>(ProcessorContextImpl.java:41) at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:137) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) at org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) Caused by: io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value. at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:241) at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:76) at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:51) at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:33) at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:49) at com.lightbend.fdp.sample.kstream.serializers.SpecificAvroSerializer.configure(SpecificAvroSerializer.scala:21) at com.lightbend.fdp.sample.kstream.serializers.SpecificAvroSerde.configure(SpecificAvroSerde.scala:18) at org.apache.kafka.streams.StreamsConfig.valueSerde(StreamsConfig.java:591) ... 17 common frames omitted regards. -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg