Hi -

I am using Avro Serialization in a Kafka Streams application through the
following dependency ..

"io.confluent"  % "kafka-avro-serializer" % "3.2.2"

My question is : Is schema registry mandatory for using Avro Serialization
? Because when I run the application I get the following exception where it
complains that there is no default value for "schema.registry.url". My
current settings for StreamsConfig are the following ..

      settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
"kstream-log-processing-avro")
   settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokers)
   settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.ByteArray.getClass.getName)
   settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
classOf[SpecificAvroSerde[LogRecordAvro]])

.. and the exception ..

23:49:34.054 TKD [StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator -
User provided listener
org.apache.kafka.streams.processor.internals.StreamThread$1 for group
kstream-log-processing-avro failed on partition assignment
org.apache.kafka.streams.errors.StreamsException: Failed to configure value
serde class com.lightbend.fdp.sample.kstream.serializers.SpecificAvroSerde
at org.apache.kafka.streams.StreamsConfig.valueSerde(StreamsConfig.java:594)
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init>(AbstractProcessorContext.java:58)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init>(ProcessorContextImpl.java:41)
at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:137)
at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
at
org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
Caused by: io.confluent.common.config.ConfigException: Missing required
configuration "schema.registry.url" which has no default value.
at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:241)
at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:76)
at
io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:51)
at
io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:33)
at
io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:49)
at
com.lightbend.fdp.sample.kstream.serializers.SpecificAvroSerializer.configure(SpecificAvroSerializer.scala:21)
at
com.lightbend.fdp.sample.kstream.serializers.SpecificAvroSerde.configure(SpecificAvroSerde.scala:18)
at org.apache.kafka.streams.StreamsConfig.valueSerde(StreamsConfig.java:591)
... 17 common frames omitted

regards.

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to