Thanks a lot Moritz.  Your suggestion worked immediately.

You sort of get on the wrong track since my favorite IDE suggests:

.withValueSerializer((Class<? extends Serializer<GenericRecord>>)
KafkaAvroSerializer.class)

... which simply doesn't even compile for me.

 incompatible types:
java.lang.Class<io.confluent.kafka.serializers.KafkaAvroSerializer> cannot
be converted to java.lang.Class<? extends
org.apache.kafka.common.serialization.Serializer<org.apache.avro.generic.GenericRecord>>

It sort of puts you on the wrong footing hence my question.
If you don't mind I'll simply create a PR to amend the Javadoc for KafkaIO.

https://issues.apache.org/jira/browse/BEAM-13854

Easier to figure out was AvroCoder.of(schema) but it might make sense to
document that in the same context as well.

Thanks again!

Cheers,
Matt


On Tue, Feb 8, 2022 at 4:09 PM Moritz Mack <mm...@talend.com> wrote:

> Just having a quick look, it looks like the respective interface in
> KafkaIO should rather look like this to support KafkaAvroSerializer, which
> is a Serializer<Object>:
>
>
>
> public Write<K, V>  withValueSerializer(Class<? extends Serializer<? super
> V>> valueSerializer)
>
>
>
> Thoughts?
>
> Cheers, Moritz
>
>
>
> *From: *Moritz Mack <mm...@talend.com>
> *Date: *Tuesday, 8. February 2022 at 15:55
> *To: *dev@beam.apache.org <dev@beam.apache.org>, matt.cast...@neo4j.com <
> matt.cast...@neo4j.com>
> *Subject: *Re: KafkaIO.write and Avro
>
> Hi Matt, Unfortunately, the types don’t play well when using
> KafkaAvroSerializer. It currently requires a cast :/ The following will
> work: write.withValueSerializer((Class)KafkaAvroSerializer.class)) ‍ ‍ ‍ ‍
> ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍
> ZjQcmQRYFpfptBannerStart
>
> This Message Is From an External Sender
>
> This message came from outside your organization.
>
> Exercise caution when opening attachments or clicking any links.
>
> ZjQcmQRYFpfptBannerEnd
>
> Hi Matt,
>
>
>
> Unfortunately, the types don’t play well when using KafkaAvroSerializer.
> It currently requires a cast :/
>
> The following will work:
>
> write.withValueSerializer((Class)KafkaAvroSerializer.class))
>
>
>
> This seems to be the cause of repeated confusion, so probably worth
> improving the user experience here!
>
>
>
> Cheers,
>
> Moritz
>
>
>
> *From: *Matt Casters <matt.cast...@neo4j.com>
> *Date: *Tuesday, 8. February 2022 at 14:17
> *To: *Beam Development List <dev@beam.apache.org>
> *Subject: *KafkaIO.write and Avro
>
> Dear Beams, When sending Avro values to Kafka, say GenericRecord, you
> typically specify option value.serializer as
> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
> bunch of other options for authentication ZjQcmQRYFpfptBannerStart
>
> This Message Is From an External Sender
>
> This message came from outside your organization.
>
> Exercise caution when opening attachments or clicking any links.
>
> ZjQcmQRYFpfptBannerEnd
>
> Dear Beams,
>
>
>
> When sending Avro values to Kafka, say GenericRecord, you typically
> specify option value.serializer as
> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
> bunch of other options for authentication and so on verifies the schema
> stored in the Avro record with a schema registry.   Unfortunately, I
> couldn't figure out how to pass this serializer class to KafkaIO.write() as
> it's not acceptable to the withValueSerializer() method.
>
>
>
> For KafkaIO.read() we made a specific provision in the form of
> class ConfluentSchemaRegistryDeserializer but it doesn't look like we
> covered the producer side of Avro values yet.
>
>
>
> I'd be happy to dive into the code to add proper support for a Confluent
> schema registry in KafkaIO.write() but I was just wondering if there was
> something I might have overlooked.  It's hard to find samples or
> documentation on producing Avro messages with Beam.
>
>
>
> Thanks in advance,
>
> Matt
>
>
>
> *As a recipient of an email from Talend, your contact personal data will
> be on our systems. Please see our privacy notice (updated August 2020) at
> Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>
>
>
> *As a recipient of an email from Talend, your contact personal data will
> be on our systems. Please see our privacy notice (updated August 2020) at
> Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>
>
>

-- 
Neo4j Chief Solutions Architect
*✉   *matt.cast...@neo4j.com

Reply via email to