Hi all we have a single kafka topic which is used to receive 2 different types of messages. These 2 messages are Avro. So when reading messages from kafka i used the GenericRecord
KafkaIO.<String, GenericRecord>read() .withBootstrapServers(bootstrapServers) .withTopic(topic) .withConsumerConfigUpdates(ImmutableMap.of( SerdeConfig.REGISTRY_URL, PipelineUtil.getSchemaURL(), ConsumerConfig.GROUP_ID_CONFIG, consumerGroup, SerdeConfig.CHECK_PERIOD_MS, TimeUnit.DAYS.toMillis(1) )) .withKeyDeserializer(StringDeserializer.class) I am not sure how to define the *withValueDeserializer* and coder. i tried to read the message as GenericRecord but it fails with "Could not extract the Kafka Deserializer type from class io.apicurio.registry.serde.avro.AvroKafkaDeserialize" i am using apicurio as the schema registry Thanks Sigalit