Hi Brian, My point mainly is that KafkaIO (and others as well) tend to be restrictive towards the API user. In this case for example, errors are thrown at runtime if you don't set the serializers using the Beam API. [1] Instead of helping the inexperienced Kafka user, which is great, this blocked me from getting the job done.
Hope that this clarifies things. Cheers, Matt [1] https://github.com/apache/beam/blob/8b213c617ef8cf3a077bb0002b6b0fec8e85cb05/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2500 On Wed, Feb 9, 2022 at 8:05 PM Brian Hulette <bhule...@google.com> wrote: > If the normal way is just to set the producer config we do have an API for > that, e.g. withProducerConfigUpdates [1]. It's just not well-defined what > takes precedence, and in fact it looks like we will just overwrite any > serde configuration specified in this way: > > [1] > https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.Write.html#withProducerConfigUpdates-java.util.Map- > [2] > https://github.com/apache/beam/blob/df907de8519e6a23bb6b016ff8593f103e739e61/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L711 > > On Wed, Feb 9, 2022 at 10:36 AM Alexey Romanenko <aromanenko....@gmail.com> > wrote: > >> +Ismael >> >> Doing it in “normal” way, especially for Kafka, may require some >> additional non-evident steps (well, of course it can be documented). So, >> I’d prefer to have a more user-friendly API around it, like we have for >> reading Avro messages with a schema stored in Confluent Schema Registry, >> which actually just extends a current API by adding a new method >> "withValueDeserializer(DeserializerProvider<V>)” and provides a >> new ConfluentSchemaRegistryDeserializerProvider class that incapsulates all >> business logic inside. So, I'd suggest to follow the same way for KafkaIO >> write part. >> >> Any thoughts on this? >> >> PS: For those (me including), who are curious why KafkaIO has coders and >> serdes in the same time, this Jira [1] can be interesting to read (just >> found it recently) >> >> [1] https://issues.apache.org/jira/browse/BEAM-1573 >> >> — >> Alexey >> >> >> On 9 Feb 2022, at 17:15, Kenneth Knowles <k...@apache.org> wrote: >> >> Good point. Doing things the "normal" way for users of the storage system >> is a good on-ramp. Conversely, having a "normal Beam" way is good for >> people who use Beam more than Kafka. Can we have both easily? >> >> Kenn >> >> On Wed, Feb 9, 2022 at 6:50 AM Matt Casters <matt.cast...@neo4j.com> >> wrote: >> >>> Of-course IMO it would be fine as well to not force developers to use >>> withKeySerializer() / withValueSerializer() in the first place. >>> This way you could use the standard way of configuring the Kafka >>> serializer classes using properties as per the Kafka Consumer/Producer >>> documentation. >>> >>> Just an idea. >>> Matt >>> >>> On Wed, Feb 9, 2022 at 3:40 PM Kenneth Knowles <k...@apache.org> wrote: >>> >>>> Just a quick type check: is it the case that a Serializer<Object> is >>>> expected to be able to properly serde any subclass of object? More >>>> generally that any Serializer<? super V> should be able to properly serde >>>> V? Typically this isn't the case. Not saying we shouldn't make the proposed >>>> change, but it could result in surprises. >>>> >>>> Another possibility based on experience with coders, I would highlight >>>> three types of serde that could apply to Serializer as well as it does to >>>> Coder: >>>> >>>> 1. handles just a single type (VarIntCoder, etc) >>>> 2. lossy/converts concrete types because it is allowed (ListCoder works >>>> for any list, but does *not* restore the original concrete subclass) >>>> 3. generic/tagging (SerializableCoder which restores the concrete >>>> subclass) >>>> >>>> The API in KafkaIO is right for types 1 and 2 but too strict for type >>>> 3. But the new API is great for type 3, potentially dangerous for type 2 >>>> and 1 (but mostly type 1 it will be irrelevant). >>>> >>>> We could have a separate entrypoint for type 3, like >>>> .withGenericValueCoder(Serializer<? super V>) that makes it very clear that >>>> if you call this one you have to pass a Serializer that tags the concrete >>>> subclass and restores it. Often, the only relevant type will be >>>> Serializer<Object> so we could even make that the parameter type. >>>> >>>> Kenn >>>> >>>> On Tue, Feb 8, 2022 at 10:57 AM Brian Hulette <bhule...@google.com> >>>> wrote: >>>> >>>>> The issue here is that KafkaAvroSerializer implements >>>>> Serializer<Object>, and not Serializer<GenericRecord> [1]. So you need to >>>>> erase the type to force it. I think Moritz's suggestion is actually to >>>>> update the signature here [2] to make the type parameter `? super V`, so >>>>> that a Serializer<Object> will be acceptable. That change would be >>>>> preferable to updating the docs. >>>>> >>>>> [1] >>>>> https://github.com/confluentinc/schema-registry/blob/cb6ca203ea9d26c84ed1e92d072f793f7bc2417b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java#L27 >>>>> [2] >>>>> https://github.com/apache/beam/blob/963c04ae6d503a97b9a169f07750c5d8c33fdd6c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2379 >>>>> >>>>> On Tue, Feb 8, 2022 at 9:05 AM Sachin Agarwal <sachi...@google.com> >>>>> wrote: >>>>> >>>>>> Thanks both, that's great - >>>>>> >>>>>> On Tue, Feb 8, 2022 at 8:00 AM Matt Casters <matt.cast...@neo4j.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks a lot Moritz. Your suggestion worked immediately. >>>>>>> >>>>>>> You sort of get on the wrong track since my favorite IDE suggests: >>>>>>> >>>>>>> .withValueSerializer((Class<? extends Serializer<GenericRecord>>) >>>>>>> KafkaAvroSerializer.class) >>>>>>> >>>>>>> ... which simply doesn't even compile for me. >>>>>>> >>>>>>> incompatible types: >>>>>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroSerializer> >>>>>>> cannot >>>>>>> be converted to java.lang.Class<? extends >>>>>>> org.apache.kafka.common.serialization.Serializer<org.apache.avro.generic.GenericRecord>> >>>>>>> >>>>>>> It sort of puts you on the wrong footing hence my question. >>>>>>> If you don't mind I'll simply create a PR to amend the Javadoc for >>>>>>> KafkaIO. >>>>>>> >>>>>>> https://issues.apache.org/jira/browse/BEAM-13854 >>>>>>> >>>>>>> Easier to figure out was AvroCoder.of(schema) but it might make >>>>>>> sense to document that in the same context as well. >>>>>>> >>>>>>> Thanks again! >>>>>>> >>>>>>> Cheers, >>>>>>> Matt >>>>>>> >>>>>>> >>>>>>> On Tue, Feb 8, 2022 at 4:09 PM Moritz Mack <mm...@talend.com> wrote: >>>>>>> >>>>>>>> Just having a quick look, it looks like the respective interface in >>>>>>>> KafkaIO should rather look like this to support KafkaAvroSerializer, >>>>>>>> which >>>>>>>> is a Serializer<Object>: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> public Write<K, V> withValueSerializer(Class<? extends Serializer<? >>>>>>>> super V>> valueSerializer) >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Thoughts? >>>>>>>> >>>>>>>> Cheers, Moritz >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> *From: *Moritz Mack <mm...@talend.com> >>>>>>>> *Date: *Tuesday, 8. February 2022 at 15:55 >>>>>>>> *To: *dev@beam.apache.org <dev@beam.apache.org>, >>>>>>>> matt.cast...@neo4j.com <matt.cast...@neo4j.com> >>>>>>>> *Subject: *Re: KafkaIO.write and Avro >>>>>>>> >>>>>>>> Hi Matt, Unfortunately, the types don’t play well when using >>>>>>>> KafkaAvroSerializer. It currently requires a cast :/ The following will >>>>>>>> work: write.withValueSerializer((Class)KafkaAvroSerializer.class)) >>>>>>>> >>>>>>>> >>>>>>>> ZjQcmQRYFpfptBannerStart >>>>>>>> >>>>>>>> This Message Is From an External Sender >>>>>>>> >>>>>>>> This message came from outside your organization. >>>>>>>> >>>>>>>> Exercise caution when opening attachments or clicking any links. >>>>>>>> >>>>>>>> ZjQcmQRYFpfptBannerEnd >>>>>>>> >>>>>>>> Hi Matt, >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Unfortunately, the types don’t play well when using >>>>>>>> KafkaAvroSerializer. It currently requires a cast :/ >>>>>>>> >>>>>>>> The following will work: >>>>>>>> >>>>>>>> write.withValueSerializer((Class)KafkaAvroSerializer.class)) >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> This seems to be the cause of repeated confusion, so probably worth >>>>>>>> improving the user experience here! >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Cheers, >>>>>>>> >>>>>>>> Moritz >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> *From: *Matt Casters <matt.cast...@neo4j.com> >>>>>>>> *Date: *Tuesday, 8. February 2022 at 14:17 >>>>>>>> *To: *Beam Development List <dev@beam.apache.org> >>>>>>>> *Subject: *KafkaIO.write and Avro >>>>>>>> >>>>>>>> Dear Beams, When sending Avro values to Kafka, say GenericRecord, >>>>>>>> you typically specify option value.serializer as >>>>>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer". This along with >>>>>>>> a >>>>>>>> bunch of other options for authentication ZjQcmQRYFpfptBannerStart >>>>>>>> >>>>>>>> This Message Is From an External Sender >>>>>>>> >>>>>>>> This message came from outside your organization. >>>>>>>> >>>>>>>> Exercise caution when opening attachments or clicking any links. >>>>>>>> >>>>>>>> ZjQcmQRYFpfptBannerEnd >>>>>>>> >>>>>>>> Dear Beams, >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> When sending Avro values to Kafka, say GenericRecord, you typically >>>>>>>> specify option value.serializer as >>>>>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer". This along with >>>>>>>> a >>>>>>>> bunch of other options for authentication and so on verifies the schema >>>>>>>> stored in the Avro record with a schema registry. Unfortunately, I >>>>>>>> couldn't figure out how to pass this serializer class to >>>>>>>> KafkaIO.write() as >>>>>>>> it's not acceptable to the withValueSerializer() method. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> For KafkaIO.read() we made a specific provision in the form of >>>>>>>> class ConfluentSchemaRegistryDeserializer but it doesn't look like we >>>>>>>> covered the producer side of Avro values yet. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I'd be happy to dive into the code to add proper support for a >>>>>>>> Confluent schema registry in KafkaIO.write() but I was just wondering >>>>>>>> if >>>>>>>> there was something I might have overlooked. It's hard to find >>>>>>>> samples or >>>>>>>> documentation on producing Avro messages with Beam. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Thanks in advance, >>>>>>>> >>>>>>>> Matt >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> *As a recipient of an email from Talend, your contact personal data >>>>>>>> will be on our systems. Please see our privacy notice (updated August >>>>>>>> 2020) >>>>>>>> at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>* >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> *As a recipient of an email from Talend, your contact personal data >>>>>>>> will be on our systems. Please see our privacy notice (updated August >>>>>>>> 2020) >>>>>>>> at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>* >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Neo4j Chief Solutions Architect >>>>>>> *✉ *matt.cast...@neo4j.com >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>> >>> -- >>> Neo4j Chief Solutions Architect >>> *✉ *matt.cast...@neo4j.com >>> >>> >>> >>> >> -- Neo4j Chief Solutions Architect *✉ *matt.cast...@neo4j.com