AFAIR, DeserializerProvider in KafkaIO was added along with adding a Confluent 
Schema Registry's support in KafkaIO.Read to provide a universal way to use 
different Deserializers (it’s Local and ConfluentSchemaRegistry for the moment).

Regarding Write part, I believe we can do the similar refactoring. Feel free to 
provide a patch, we can help with review/testing/advices. 

For now, just an idea of workaround (I didn’t test it) - you need to fetch your 
schema from Schema Registry in advance by yourself with SchemaRegistryClient to 
create an Avro record for write (e.g. GenericRecord) and then set 
KafkaAvroSerializer as ValueSerializer and specify “schema.registry.url” in 
producer properties. 

> On 8 Dec 2020, at 20:59, Tao Li <[email protected]> wrote:
> 
> Hi Beam community,
>  
> I got a quick question about withValueSerializer() method of 
> KafkaIO.Write<K,V> 
> class:https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/kafka/KafkaIO.Write.html
>  
> <https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/kafka/KafkaIO.Write.html>
>  
> The withValueSerializer method does not support passing in a serializer 
> provider. The problem with lacking that functionality is that I cannot use 
> Kafka schema registry to fetch the schema for serialization.
>  
> However at the same time, the KafkaIO.Read<K,V> withKeyDeserializer method 
> supports specifying a deserializer 
> provider:https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withKeyDeserializer-org.apache.beam.sdk.io.kafka.DeserializerProvider-
>  
> <https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withKeyDeserializer-org.apache.beam.sdk.io.kafka.DeserializerProvider->
>  
> Is this a gap for KafkaIO.Write<K,V> or is it by design? Is there a 
> workaround to specify the schema registry info for KafkaIO.Write<K,V>?
>  
> Thanks so much!

Reply via email to