Of-course IMO it would be fine as well to not force developers to use
withKeySerializer() / withValueSerializer() in the first place.
This way you could use the standard way of configuring the Kafka serializer
classes using properties as per the Kafka Consumer/Producer documentation.

Just an idea.
Matt

On Wed, Feb 9, 2022 at 3:40 PM Kenneth Knowles <k...@apache.org> wrote:

> Just a quick type check: is it the case that a Serializer<Object> is
> expected to be able to properly serde any subclass of object? More
> generally that any Serializer<? super V> should be able to properly serde
> V? Typically this isn't the case. Not saying we shouldn't make the proposed
> change, but it could result in surprises.
>
> Another possibility based on experience with coders, I would highlight
> three types of serde that could apply to Serializer as well as it does to
> Coder:
>
> 1. handles just a single type (VarIntCoder, etc)
> 2. lossy/converts concrete types because it is allowed (ListCoder works
> for any list, but does *not* restore the original concrete subclass)
> 3. generic/tagging (SerializableCoder which restores the concrete subclass)
>
> The API in KafkaIO is right for types 1 and 2 but too strict for type 3.
> But the new API is great for type 3, potentially dangerous for type 2 and 1
> (but mostly type 1 it will be irrelevant).
>
> We could have a separate entrypoint for type 3, like
> .withGenericValueCoder(Serializer<? super V>) that makes it very clear that
> if you call this one you have to pass a Serializer that tags the concrete
> subclass and restores it. Often, the only relevant type will be
> Serializer<Object> so we could even make that the parameter type.
>
> Kenn
>
> On Tue, Feb 8, 2022 at 10:57 AM Brian Hulette <bhule...@google.com> wrote:
>
>> The issue here is that KafkaAvroSerializer implements Serializer<Object>,
>> and not Serializer<GenericRecord> [1]. So you need to erase the type to
>> force it. I think Moritz's suggestion is actually to update the signature
>> here [2] to make the type parameter `? super V`, so that a
>> Serializer<Object> will be acceptable. That change would be preferable to
>> updating the docs.
>>
>> [1]
>> https://github.com/confluentinc/schema-registry/blob/cb6ca203ea9d26c84ed1e92d072f793f7bc2417b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java#L27
>> [2]
>> https://github.com/apache/beam/blob/963c04ae6d503a97b9a169f07750c5d8c33fdd6c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2379
>>
>> On Tue, Feb 8, 2022 at 9:05 AM Sachin Agarwal <sachi...@google.com>
>> wrote:
>>
>>> Thanks both, that's great -
>>>
>>> On Tue, Feb 8, 2022 at 8:00 AM Matt Casters <matt.cast...@neo4j.com>
>>> wrote:
>>>
>>>> Thanks a lot Moritz.  Your suggestion worked immediately.
>>>>
>>>> You sort of get on the wrong track since my favorite IDE suggests:
>>>>
>>>> .withValueSerializer((Class<? extends Serializer<GenericRecord>>)
>>>> KafkaAvroSerializer.class)
>>>>
>>>> ... which simply doesn't even compile for me.
>>>>
>>>>  incompatible types:
>>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroSerializer> cannot
>>>> be converted to java.lang.Class<? extends
>>>> org.apache.kafka.common.serialization.Serializer<org.apache.avro.generic.GenericRecord>>
>>>>
>>>> It sort of puts you on the wrong footing hence my question.
>>>> If you don't mind I'll simply create a PR to amend the Javadoc for
>>>> KafkaIO.
>>>>
>>>> https://issues.apache.org/jira/browse/BEAM-13854
>>>>
>>>> Easier to figure out was AvroCoder.of(schema) but it might make sense
>>>> to document that in the same context as well.
>>>>
>>>> Thanks again!
>>>>
>>>> Cheers,
>>>> Matt
>>>>
>>>>
>>>> On Tue, Feb 8, 2022 at 4:09 PM Moritz Mack <mm...@talend.com> wrote:
>>>>
>>>>> Just having a quick look, it looks like the respective interface in
>>>>> KafkaIO should rather look like this to support KafkaAvroSerializer, which
>>>>> is a Serializer<Object>:
>>>>>
>>>>>
>>>>>
>>>>> public Write<K, V>  withValueSerializer(Class<? extends Serializer<? super
>>>>> V>> valueSerializer)
>>>>>
>>>>>
>>>>>
>>>>> Thoughts?
>>>>>
>>>>> Cheers, Moritz
>>>>>
>>>>>
>>>>>
>>>>> *From: *Moritz Mack <mm...@talend.com>
>>>>> *Date: *Tuesday, 8. February 2022 at 15:55
>>>>> *To: *dev@beam.apache.org <dev@beam.apache.org>,
>>>>> matt.cast...@neo4j.com <matt.cast...@neo4j.com>
>>>>> *Subject: *Re: KafkaIO.write and Avro
>>>>>
>>>>> Hi Matt, Unfortunately, the types don’t play well when using
>>>>> KafkaAvroSerializer. It currently requires a cast :/ The following will
>>>>> work: write.withValueSerializer((Class)KafkaAvroSerializer.class)) ‍ ‍ ‍ ‍
>>>>> ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍
>>>>> ZjQcmQRYFpfptBannerStart
>>>>>
>>>>> This Message Is From an External Sender
>>>>>
>>>>> This message came from outside your organization.
>>>>>
>>>>> Exercise caution when opening attachments or clicking any links.
>>>>>
>>>>> ZjQcmQRYFpfptBannerEnd
>>>>>
>>>>> Hi Matt,
>>>>>
>>>>>
>>>>>
>>>>> Unfortunately, the types don’t play well when using
>>>>> KafkaAvroSerializer. It currently requires a cast :/
>>>>>
>>>>> The following will work:
>>>>>
>>>>> write.withValueSerializer((Class)KafkaAvroSerializer.class))
>>>>>
>>>>>
>>>>>
>>>>> This seems to be the cause of repeated confusion, so probably worth
>>>>> improving the user experience here!
>>>>>
>>>>>
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Moritz
>>>>>
>>>>>
>>>>>
>>>>> *From: *Matt Casters <matt.cast...@neo4j.com>
>>>>> *Date: *Tuesday, 8. February 2022 at 14:17
>>>>> *To: *Beam Development List <dev@beam.apache.org>
>>>>> *Subject: *KafkaIO.write and Avro
>>>>>
>>>>> Dear Beams, When sending Avro values to Kafka, say GenericRecord, you
>>>>> typically specify option value.serializer as
>>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
>>>>> bunch of other options for authentication ZjQcmQRYFpfptBannerStart
>>>>>
>>>>> This Message Is From an External Sender
>>>>>
>>>>> This message came from outside your organization.
>>>>>
>>>>> Exercise caution when opening attachments or clicking any links.
>>>>>
>>>>> ZjQcmQRYFpfptBannerEnd
>>>>>
>>>>> Dear Beams,
>>>>>
>>>>>
>>>>>
>>>>> When sending Avro values to Kafka, say GenericRecord, you typically
>>>>> specify option value.serializer as
>>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
>>>>> bunch of other options for authentication and so on verifies the schema
>>>>> stored in the Avro record with a schema registry.   Unfortunately, I
>>>>> couldn't figure out how to pass this serializer class to KafkaIO.write() 
>>>>> as
>>>>> it's not acceptable to the withValueSerializer() method.
>>>>>
>>>>>
>>>>>
>>>>> For KafkaIO.read() we made a specific provision in the form of
>>>>> class ConfluentSchemaRegistryDeserializer but it doesn't look like we
>>>>> covered the producer side of Avro values yet.
>>>>>
>>>>>
>>>>>
>>>>> I'd be happy to dive into the code to add proper support for a
>>>>> Confluent schema registry in KafkaIO.write() but I was just wondering if
>>>>> there was something I might have overlooked.  It's hard to find samples or
>>>>> documentation on producing Avro messages with Beam.
>>>>>
>>>>>
>>>>>
>>>>> Thanks in advance,
>>>>>
>>>>> Matt
>>>>>
>>>>>
>>>>>
>>>>> *As a recipient of an email from Talend, your contact personal data
>>>>> will be on our systems. Please see our privacy notice (updated August 
>>>>> 2020)
>>>>> at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>>>>>
>>>>>
>>>>>
>>>>> *As a recipient of an email from Talend, your contact personal data
>>>>> will be on our systems. Please see our privacy notice (updated August 
>>>>> 2020)
>>>>> at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Neo4j Chief Solutions Architect
>>>> *✉   *matt.cast...@neo4j.com
>>>>
>>>>
>>>>
>>>>

-- 
Neo4j Chief Solutions Architect
*✉   *matt.cast...@neo4j.com

Reply via email to