[
https://issues.apache.org/jira/browse/BEAM-13854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17492406#comment-17492406
]
Matt Casters commented on BEAM-13854:
-------------------------------------
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 8164daf0be..e5087e6420 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -468,6 +468,19 @@ import org.slf4j.LoggerFactory;
* );
* }</pre>
*
+ * <p>To produce Avro values you can use class \{@link
io.confluent.kafka.serializers.KafkaAvroSerializer}. To make
+ * this class work with \{@link KafkaIO#write()} and method
withValueSerializer() make sure to erase the generic types
+ * by casting to (Class):
+ *
+ * <pre>{@code
+ * KafkaIO.<Long, String>write()
+ * ...
+ * .withValueSerializer((Class)KafkaAvroSerializer.class)
+ * .withProducerConfigUpdates( <Map with schema registry configuration
details> )
+ * ...
+ * }</pre>
+ *
+ *
* <p>Often you might want to write just values without any keys to Kafka. Use
\{@code values()} to
* write records with default empty(null) key:
*
> Document casting trick for Avro value serializer in KafkaIO
> -----------------------------------------------------------
>
> Key: BEAM-13854
> URL: https://issues.apache.org/jira/browse/BEAM-13854
> Project: Beam
> Issue Type: Improvement
> Components: io-java-kafka
> Affects Versions: 2.36.0
> Reporter: Matt Casters
> Priority: P3
> Fix For: 2.37.0
>
>
> Consider we want to write Avro values to Kafka with for example the following
> code:
> {code:java}
> KafkaIO.Write<String, GenericRecord> stringsToKafka =
> KafkaIO.<String, GenericRecord>write()
> .withBootstrapServers(bootstrapServers)
> .withTopic(topic)
> .withKeySerializer(StringSerializer.class)
> .withValueSerializer(KafkaAvroSerializer.class)
> .withProducerConfigUpdates(producerConfigUpdates);{code}
> The KafkaAvroSerializer.class argument can't be passed as would normally be
> the case in Producer option:
> value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
> So the question then is which class should we pass or how to cast. IntelliJ
> IDEA suggests a cast which doesn't compile.
> In the end the answer is simply:
> {code:java}
> .withValueSerializer((Class)KafkaAvroSerializer.class) {code}
> I think it's worth documenting this little trick more clearly in the Javadoc
> of KafkaIO to prevent others from bumping into the same issue.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)