[ 
https://issues.apache.org/jira/browse/BEAM-13854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17492406#comment-17492406
 ] 

Matt Casters commented on BEAM-13854:
-------------------------------------

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 8164daf0be..e5087e6420 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -468,6 +468,19 @@ import org.slf4j.LoggerFactory;
  *   );
  * }</pre>
  *
+ * <p>To produce Avro values you can use class \{@link 
io.confluent.kafka.serializers.KafkaAvroSerializer}. To make
+ * this class work with \{@link KafkaIO#write()} and method 
withValueSerializer() make sure to erase the generic types
+ * by casting to (Class):
+ *
+ * <pre>{@code
+ *   KafkaIO.<Long, String>write()
+ *     ...
+ *     .withValueSerializer((Class)KafkaAvroSerializer.class)
+ *     .withProducerConfigUpdates( <Map with schema registry configuration 
details> )
+ *     ...
+ * }</pre>
+ *
+ *
  * <p>Often you might want to write just values without any keys to Kafka. Use 
\{@code values()} to
  * write records with default empty(null) key:
  *

> Document casting trick for Avro value serializer in KafkaIO
> -----------------------------------------------------------
>
>                 Key: BEAM-13854
>                 URL: https://issues.apache.org/jira/browse/BEAM-13854
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kafka
>    Affects Versions: 2.36.0
>            Reporter: Matt Casters
>            Priority: P3
>             Fix For: 2.37.0
>
>
> Consider we want to write Avro values to Kafka with for example the following 
> code:
> {code:java}
> KafkaIO.Write<String, GenericRecord> stringsToKafka =
>  KafkaIO.<String, GenericRecord>write()
>   .withBootstrapServers(bootstrapServers)
>   .withTopic(topic)
>   .withKeySerializer(StringSerializer.class)
>   .withValueSerializer(KafkaAvroSerializer.class)
>   .withProducerConfigUpdates(producerConfigUpdates);{code}
>  The KafkaAvroSerializer.class argument can't be passed as would normally be 
> the case in Producer option: 
> value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
> So the question then is which class should we pass or how to cast. IntelliJ 
> IDEA suggests a cast which doesn't compile.
> In the end the answer is simply:
> {code:java}
>   .withValueSerializer((Class)KafkaAvroSerializer.class) {code}
> I think it's worth documenting this little trick more clearly in the Javadoc 
> of KafkaIO to prevent others from bumping into the same issue. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to