Hi Dawid, In general I agree if we can provide a completely unified way of handling this registries that would be great but I wonder if that makes sense in the long term. While the cloudera schema registry only supports Avro at the moment, it aims to support other formats in the future, and accessing this functionality will probably rely on using those specific serializer/deserializer implementations. This might not be a valid concern at this point though :)
The reason why we went with wrapping the KafkaAvroDeserializer/Serializer directly now, is that it was super simple to do and the current SchemaCoder approach lacks a lot of flexibility/functionality. The schema itself doesn't always come from the serialized data (I believe in this case it is either stored in the serialized data or the kafka record metadata) and also we want to be able to handle kafka message keys. I guess these could be solved by making the deserialization logic Kafka specific and exposing the ConsumerRecord but that would completely change the current schemacoder related interfaces. Cheers, Gyula On Wed, Nov 6, 2019 at 10:17 AM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi Gyula, > > I did not want to discourage this contribution. I do agree we should > treat this connector equally to the confluent's schema registry. I just > wanted to express my uncertainty about general approach to new > connectors contributions. By no means I wanted to discourage this > contribution. > > As for the second point. Do you mean that you are wrapping the > KafkaAvroDeserializer/Serializer provided by cloudera/hortonworks schema > registry? > > Personally I would very much prefer using the SchemaCoder approach. All > schemas boil down to two steps. (De)Serializing the schema with registry > specific protocol + (de)serializing the record itself. I think the > approach with SchemaCoder has the benefit that we can optimize > instantiation of Avro's readers and writers in a unified way. It's also > easier to maintain as we have just a single point where the actual > record (de)serialization happens. It also provides a unified way of > instantiating the TypeInformation. Could you give some explanation why > would you prefer not to use this approach? > > Best, > > Dawid > > On 05/11/2019 14:48, Gyula Fóra wrote: > > Thanks Matyas for starting the discussion! > > I think this would be a very valuable addition to Flink as many companies > > are already using the Hortonworks/Cloudera registry and it would enable > > them to connect to Flink easily. > > > > @Dawid: > > Regarding the implementation this a much more lightweight connector than > > what we have now for the Confluent registry and the PR you linked. This > > wraps the cloudera registry directly, providing a very thin wrapper + > some > > enhanced functionality regarding handling of Kafka messages keys. > > > > As for the question of main repo outside, I would prefer this to be > > included in the main repo, similar to the Confluent registry connector. > > Unless we decide to move all of these connectors out I would like to > take a > > consistent approach. > > > > Cheers, > > Gyula > > > > > > On Tue, Nov 5, 2019 at 1:44 PM Dawid Wysakowicz <dwysakow...@apache.org> > > wrote: > > > >> Hi Matyas, > >> > >> I think this would be a valuable addition. You may reuse some of the > >> already available abstractions for writing avro deserialization schema > >> based on a schema registry (have a look at RegistryDeserializationSchema > >> and SchemaCoderProvider). There is also an opened PR for adding a > >> similar serialization schema[1]. > >> > >> The only concern is that I am not 100% sure what is the consensus on > >> which connectors do we want to adapt into the main repository and which > >> would we prefer to be hosted separately and included in the ecosystem > >> webpage[2] (that I hope will be published soon). > >> > >> Whatever option will be preferred I could help review the code. > >> > >> Best, > >> > >> Dawid > >> > >> [1] https://github.com/apache/flink/pull/8371 > >> > >> [2] > >> > >> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-Flink-ecosystem-website-td27519.html > >> > >> On 05/11/2019 12:40, Őrhidi Mátyás wrote: > >>> Dear Flink Community! > >>> > >>> We have noticed a recent request for Hortonworks schema registry > support > >> ( > >>> FLINK-14577 <https://issues.apache.org/jira/browse/FLINK-14577>). We > >> have > >>> an implementation for it already, and we would be happy to contribute > it > >> to > >>> Apache Flink. > >>> > >>> You can find the documentation below[1]. Let us know your thoughts! > >>> > >>> Best Regards, > >>> Matyas > >>> > >>> [1] Flink Avro Cloudera Registry User Guide > >>> ----------------------------------------------------------- > >>> > >>> Add the following dependency to use the schema registry integration: > >>> <dependency> > >>> <groupId>org.apache.flink</groupId> > >>> <artifactId>flink-avro-cloudera-registry</artifactId> > >>> <version>${flink.version}</version> > >>> </dependency> > >>> > >>> > >>> The schema registry can be plugged directly into the FlinkKafkaConsumer > >> and > >>> FlinkKafkaProducer using the appropriate schema: > >>> - > >>> > >> > org.apache.flink.formats.avro.registry.cloudera.SchemaRegistryDeserializationSchema > >>> - > >>> > >> > org.apache.flink.formats.avro.registry.cloudera.SchemaRegistrySerializationSchema > >>> > >>> Supported types > >>> ---------------------- > >>> - Avro Specific Record types > >>> - Avro Generic Records > >>> - Basic Java Data types: byte[], Byte, Integer, Short, Double, Float, > >> Long, > >>> String, Boolean > >>> > >>> SchemaRegistrySerializationSchema > >>> -------------------------------------------------- > >>> The serialization schema can be constructed using the included builder > >>> object SchemaRegistrySerializationSchema.builder(..). > >>> > >>> Required settings: > >>> - Topic configuration when creating the builder. Can be static or > dynamic > >>> (extracted from the data) > >>> - RegistryAddress parameter on the builder to establish the connection > >>> > >>> Optional settings: > >>> - Arbitrary SchemaRegistry client configuration using the setConfig > >> method > >>> - Key configuration for the produced Kafka messages > >>> - By specifying a KeySelector function that extracts the key from each > >>> record > >>> - Using a Tuple2 stream for (key, value) pairs directly > >>> - Security configuration > >>> > >>> Example: > >>> KafkaSerializationSchema<ItemTransaction> schema = > >>> SchemaRegistrySerializationSchema > >>> .<ItemTransaction>builder(topic) > >>> .setRegistryAddress(registryAddress) > >>> .setKey(ItemTransaction::getItemId) > >>> .build(); > >>> FlinkKafkaProducer<ItemTransaction> sink = new > >>> FlinkKafkaProducer<>("dummy", schema, kafkaProps, > >>> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); > >>> > >>> SchemaRegistryDeserializationSchema > >>> ----------------------------------------------------- > >>> The deserialization schema can be constructed using the included > builder > >>> object SchemaRegistryDeserializationSchema.builder(..). > >>> When reading messages (and keys) we always have to specify the expected > >>> Class<T> or record Schema of the input records so that Flink can do any > >>> necessary conversion between the data on Kafka and what is expected. > >>> > >>> Required settings: > >>> - Class or Schema of the input messages depending on the data type > >>> - RegistryAddress parameter on the builder to establish the connection > >>> > >>> Optional settings: > >>> - Arbitrary SchemaRegistry client configuration using the setConfig > >> method > >>> - Key configuration for the consumed Kafka messages > >>> - Should only be specified when we want to read the keys as well into > a > >>> (key, value) stream > >>> - Security configuration > >>> > >>> Example: > >>> KafkaDeserializationSchema<ItemTransaction> schema = > >>> SchemaRegistryDeserializationSchema > >>> .builder(ItemTransaction.class) > >>> .setRegistryAddress(registryAddress) > >>> .build(); > >>> FlinkKafkaConsumer<ItemTransaction> source = new > >>> FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId); > >>> > >> > >