Hi all
we have a single kafka topic which is used to receive 2 different types of
messages.
These 2 messages are Avro.
So when reading messages from kafka i used the GenericRecord

KafkaIO.<String, GenericRecord>read()
        .withBootstrapServers(bootstrapServers)
        .withTopic(topic)
        .withConsumerConfigUpdates(ImmutableMap.of(
                SerdeConfig.REGISTRY_URL, PipelineUtil.getSchemaURL(),
                ConsumerConfig.GROUP_ID_CONFIG, consumerGroup,
                SerdeConfig.CHECK_PERIOD_MS, TimeUnit.DAYS.toMillis(1)
        ))
        .withKeyDeserializer(StringDeserializer.class)

I am not sure how to define the *withValueDeserializer* and coder.

i tried to read the message as GenericRecord but it fails with

 "Could not extract the Kafka Deserializer type from class
io.apicurio.registry.serde.avro.AvroKafkaDeserialize"

i am using apicurio as the schema registry


Thanks

Sigalit

Reply via email to