@Alexey Romanenko<mailto:[email protected]> thanks so much for your
suggestions.
Actually I found the below code seems to work.
KafkaIO
.<Void, GenericRecord>write()
.withBootstrapServers(bootstrapServers)
.withTopic(topicName)
.withValueSerializer((Class) KafkaAvroSerializer.class)
.withProducerConfigUpdates(ImmutableMap.of("schema.registry.url",
schemaRegistryUrl))
Thanks and I hope there will be more great improvements coming in future, as
you mentioned š
From: Alexey Romanenko <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Wednesday, December 9, 2020 at 9:08 AM
To: "[email protected]" <[email protected]>
Subject: Re: Quick question about KafkaIO.Write<K,V>
AFAIR, DeserializerProvider in KafkaIO was added along with adding a Confluent
Schema Registry's support in KafkaIO.Read to provide a universal way to use
different Deserializers (itās Local and ConfluentSchemaRegistry for the moment).
Regarding Write part, I believe we can do the similar refactoring. Feel free to
provide a patch, we can help with review/testing/advices.
For now, just an idea of workaround (I didnāt test it) - you need to fetch your
schema from Schema Registry in advance by yourself with SchemaRegistryClient to
create an Avro record for write (e.g. GenericRecord) and then set
KafkaAvroSerializer as ValueSerializer and specify āschema.registry.urlā in
producer properties.
On 8 Dec 2020, at 20:59, Tao Li <[email protected]<mailto:[email protected]>> wrote:
Hi Beam community,
I got a quick question about withValueSerializer() method of KafkaIO.Write<K,V>
class:https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/kafka/KafkaIO.Write.html<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fkafka%2FKafkaIO.Write.html&data=04%7C01%7Ctaol%40zillow.com%7C8e4ef48dfd1943bf9b5108d89c65082f%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637431305030435395%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=6%2FEKkQe0domyHoqg%2FTe6bZPZBFypiQtmKRPgyKj0w1o%3D&reserved=0>
The withValueSerializer method does not support passing in a serializer
provider. The problem with lacking that functionality is that I cannot use
Kafka schema registry to fetch the schema for serialization.
However at the same time, the KafkaIO.Read<K,V> withKeyDeserializer method
supports specifying a deserializer
provider:https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withKeyDeserializer-org.apache.beam.sdk.io.kafka.DeserializerProvider-<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fkafka%2FKafkaIO.Read.html%23withKeyDeserializer-org.apache.beam.sdk.io.kafka.DeserializerProvider-&data=04%7C01%7Ctaol%40zillow.com%7C8e4ef48dfd1943bf9b5108d89c65082f%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637431305030435395%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=OXx6Gtd00OsIrw9yMP1kflAwpXzveb%2FM6IoggnkpWkk%3D&reserved=0>
Is this a gap for KafkaIO.Write<K,V> or is it by design? Is there a workaround
to specify the schema registry info for KafkaIO.Write<K,V>?
Thanks so much!