Of-course IMO it would be fine as well to not force developers to use withKeySerializer() / withValueSerializer() in the first place. This way you could use the standard way of configuring the Kafka serializer classes using properties as per the Kafka Consumer/Producer documentation.
Just an idea. Matt On Wed, Feb 9, 2022 at 3:40 PM Kenneth Knowles <k...@apache.org> wrote: > Just a quick type check: is it the case that a Serializer<Object> is > expected to be able to properly serde any subclass of object? More > generally that any Serializer<? super V> should be able to properly serde > V? Typically this isn't the case. Not saying we shouldn't make the proposed > change, but it could result in surprises. > > Another possibility based on experience with coders, I would highlight > three types of serde that could apply to Serializer as well as it does to > Coder: > > 1. handles just a single type (VarIntCoder, etc) > 2. lossy/converts concrete types because it is allowed (ListCoder works > for any list, but does *not* restore the original concrete subclass) > 3. generic/tagging (SerializableCoder which restores the concrete subclass) > > The API in KafkaIO is right for types 1 and 2 but too strict for type 3. > But the new API is great for type 3, potentially dangerous for type 2 and 1 > (but mostly type 1 it will be irrelevant). > > We could have a separate entrypoint for type 3, like > .withGenericValueCoder(Serializer<? super V>) that makes it very clear that > if you call this one you have to pass a Serializer that tags the concrete > subclass and restores it. Often, the only relevant type will be > Serializer<Object> so we could even make that the parameter type. > > Kenn > > On Tue, Feb 8, 2022 at 10:57 AM Brian Hulette <bhule...@google.com> wrote: > >> The issue here is that KafkaAvroSerializer implements Serializer<Object>, >> and not Serializer<GenericRecord> [1]. So you need to erase the type to >> force it. I think Moritz's suggestion is actually to update the signature >> here [2] to make the type parameter `? super V`, so that a >> Serializer<Object> will be acceptable. That change would be preferable to >> updating the docs. >> >> [1] >> https://github.com/confluentinc/schema-registry/blob/cb6ca203ea9d26c84ed1e92d072f793f7bc2417b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java#L27 >> [2] >> https://github.com/apache/beam/blob/963c04ae6d503a97b9a169f07750c5d8c33fdd6c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2379 >> >> On Tue, Feb 8, 2022 at 9:05 AM Sachin Agarwal <sachi...@google.com> >> wrote: >> >>> Thanks both, that's great - >>> >>> On Tue, Feb 8, 2022 at 8:00 AM Matt Casters <matt.cast...@neo4j.com> >>> wrote: >>> >>>> Thanks a lot Moritz. Your suggestion worked immediately. >>>> >>>> You sort of get on the wrong track since my favorite IDE suggests: >>>> >>>> .withValueSerializer((Class<? extends Serializer<GenericRecord>>) >>>> KafkaAvroSerializer.class) >>>> >>>> ... which simply doesn't even compile for me. >>>> >>>> incompatible types: >>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroSerializer> cannot >>>> be converted to java.lang.Class<? extends >>>> org.apache.kafka.common.serialization.Serializer<org.apache.avro.generic.GenericRecord>> >>>> >>>> It sort of puts you on the wrong footing hence my question. >>>> If you don't mind I'll simply create a PR to amend the Javadoc for >>>> KafkaIO. >>>> >>>> https://issues.apache.org/jira/browse/BEAM-13854 >>>> >>>> Easier to figure out was AvroCoder.of(schema) but it might make sense >>>> to document that in the same context as well. >>>> >>>> Thanks again! >>>> >>>> Cheers, >>>> Matt >>>> >>>> >>>> On Tue, Feb 8, 2022 at 4:09 PM Moritz Mack <mm...@talend.com> wrote: >>>> >>>>> Just having a quick look, it looks like the respective interface in >>>>> KafkaIO should rather look like this to support KafkaAvroSerializer, which >>>>> is a Serializer<Object>: >>>>> >>>>> >>>>> >>>>> public Write<K, V> withValueSerializer(Class<? extends Serializer<? super >>>>> V>> valueSerializer) >>>>> >>>>> >>>>> >>>>> Thoughts? >>>>> >>>>> Cheers, Moritz >>>>> >>>>> >>>>> >>>>> *From: *Moritz Mack <mm...@talend.com> >>>>> *Date: *Tuesday, 8. February 2022 at 15:55 >>>>> *To: *dev@beam.apache.org <dev@beam.apache.org>, >>>>> matt.cast...@neo4j.com <matt.cast...@neo4j.com> >>>>> *Subject: *Re: KafkaIO.write and Avro >>>>> >>>>> Hi Matt, Unfortunately, the types don’t play well when using >>>>> KafkaAvroSerializer. It currently requires a cast :/ The following will >>>>> work: write.withValueSerializer((Class)KafkaAvroSerializer.class)) >>>>> >>>>> ZjQcmQRYFpfptBannerStart >>>>> >>>>> This Message Is From an External Sender >>>>> >>>>> This message came from outside your organization. >>>>> >>>>> Exercise caution when opening attachments or clicking any links. >>>>> >>>>> ZjQcmQRYFpfptBannerEnd >>>>> >>>>> Hi Matt, >>>>> >>>>> >>>>> >>>>> Unfortunately, the types don’t play well when using >>>>> KafkaAvroSerializer. It currently requires a cast :/ >>>>> >>>>> The following will work: >>>>> >>>>> write.withValueSerializer((Class)KafkaAvroSerializer.class)) >>>>> >>>>> >>>>> >>>>> This seems to be the cause of repeated confusion, so probably worth >>>>> improving the user experience here! >>>>> >>>>> >>>>> >>>>> Cheers, >>>>> >>>>> Moritz >>>>> >>>>> >>>>> >>>>> *From: *Matt Casters <matt.cast...@neo4j.com> >>>>> *Date: *Tuesday, 8. February 2022 at 14:17 >>>>> *To: *Beam Development List <dev@beam.apache.org> >>>>> *Subject: *KafkaIO.write and Avro >>>>> >>>>> Dear Beams, When sending Avro values to Kafka, say GenericRecord, you >>>>> typically specify option value.serializer as >>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer". This along with a >>>>> bunch of other options for authentication ZjQcmQRYFpfptBannerStart >>>>> >>>>> This Message Is From an External Sender >>>>> >>>>> This message came from outside your organization. >>>>> >>>>> Exercise caution when opening attachments or clicking any links. >>>>> >>>>> ZjQcmQRYFpfptBannerEnd >>>>> >>>>> Dear Beams, >>>>> >>>>> >>>>> >>>>> When sending Avro values to Kafka, say GenericRecord, you typically >>>>> specify option value.serializer as >>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer". This along with a >>>>> bunch of other options for authentication and so on verifies the schema >>>>> stored in the Avro record with a schema registry. Unfortunately, I >>>>> couldn't figure out how to pass this serializer class to KafkaIO.write() >>>>> as >>>>> it's not acceptable to the withValueSerializer() method. >>>>> >>>>> >>>>> >>>>> For KafkaIO.read() we made a specific provision in the form of >>>>> class ConfluentSchemaRegistryDeserializer but it doesn't look like we >>>>> covered the producer side of Avro values yet. >>>>> >>>>> >>>>> >>>>> I'd be happy to dive into the code to add proper support for a >>>>> Confluent schema registry in KafkaIO.write() but I was just wondering if >>>>> there was something I might have overlooked. It's hard to find samples or >>>>> documentation on producing Avro messages with Beam. >>>>> >>>>> >>>>> >>>>> Thanks in advance, >>>>> >>>>> Matt >>>>> >>>>> >>>>> >>>>> *As a recipient of an email from Talend, your contact personal data >>>>> will be on our systems. Please see our privacy notice (updated August >>>>> 2020) >>>>> at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>* >>>>> >>>>> >>>>> >>>>> *As a recipient of an email from Talend, your contact personal data >>>>> will be on our systems. Please see our privacy notice (updated August >>>>> 2020) >>>>> at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>* >>>>> >>>>> >>>>> >>>> >>>> -- >>>> Neo4j Chief Solutions Architect >>>> *✉ *matt.cast...@neo4j.com >>>> >>>> >>>> >>>> -- Neo4j Chief Solutions Architect *✉ *matt.cast...@neo4j.com