Thanks Matyas for starting the discussion! I think this would be a very valuable addition to Flink as many companies are already using the Hortonworks/Cloudera registry and it would enable them to connect to Flink easily.
@Dawid: Regarding the implementation this a much more lightweight connector than what we have now for the Confluent registry and the PR you linked. This wraps the cloudera registry directly, providing a very thin wrapper + some enhanced functionality regarding handling of Kafka messages keys. As for the question of main repo outside, I would prefer this to be included in the main repo, similar to the Confluent registry connector. Unless we decide to move all of these connectors out I would like to take a consistent approach. Cheers, Gyula On Tue, Nov 5, 2019 at 1:44 PM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi Matyas, > > I think this would be a valuable addition. You may reuse some of the > already available abstractions for writing avro deserialization schema > based on a schema registry (have a look at RegistryDeserializationSchema > and SchemaCoderProvider). There is also an opened PR for adding a > similar serialization schema[1]. > > The only concern is that I am not 100% sure what is the consensus on > which connectors do we want to adapt into the main repository and which > would we prefer to be hosted separately and included in the ecosystem > webpage[2] (that I hope will be published soon). > > Whatever option will be preferred I could help review the code. > > Best, > > Dawid > > [1] https://github.com/apache/flink/pull/8371 > > [2] > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-Flink-ecosystem-website-td27519.html > > On 05/11/2019 12:40, Őrhidi Mátyás wrote: > > Dear Flink Community! > > > > We have noticed a recent request for Hortonworks schema registry support > ( > > FLINK-14577 <https://issues.apache.org/jira/browse/FLINK-14577>). We > have > > an implementation for it already, and we would be happy to contribute it > to > > Apache Flink. > > > > You can find the documentation below[1]. Let us know your thoughts! > > > > Best Regards, > > Matyas > > > > [1] Flink Avro Cloudera Registry User Guide > > ----------------------------------------------------------- > > > > Add the following dependency to use the schema registry integration: > > <dependency> > > <groupId>org.apache.flink</groupId> > > <artifactId>flink-avro-cloudera-registry</artifactId> > > <version>${flink.version}</version> > > </dependency> > > > > > > The schema registry can be plugged directly into the FlinkKafkaConsumer > and > > FlinkKafkaProducer using the appropriate schema: > > - > > > org.apache.flink.formats.avro.registry.cloudera.SchemaRegistryDeserializationSchema > > - > > > org.apache.flink.formats.avro.registry.cloudera.SchemaRegistrySerializationSchema > > > > > > Supported types > > ---------------------- > > - Avro Specific Record types > > - Avro Generic Records > > - Basic Java Data types: byte[], Byte, Integer, Short, Double, Float, > Long, > > String, Boolean > > > > SchemaRegistrySerializationSchema > > -------------------------------------------------- > > The serialization schema can be constructed using the included builder > > object SchemaRegistrySerializationSchema.builder(..). > > > > Required settings: > > - Topic configuration when creating the builder. Can be static or dynamic > > (extracted from the data) > > - RegistryAddress parameter on the builder to establish the connection > > > > Optional settings: > > - Arbitrary SchemaRegistry client configuration using the setConfig > method > > - Key configuration for the produced Kafka messages > > - By specifying a KeySelector function that extracts the key from each > > record > > - Using a Tuple2 stream for (key, value) pairs directly > > - Security configuration > > > > Example: > > KafkaSerializationSchema<ItemTransaction> schema = > > SchemaRegistrySerializationSchema > > .<ItemTransaction>builder(topic) > > .setRegistryAddress(registryAddress) > > .setKey(ItemTransaction::getItemId) > > .build(); > > FlinkKafkaProducer<ItemTransaction> sink = new > > FlinkKafkaProducer<>("dummy", schema, kafkaProps, > > FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); > > > > SchemaRegistryDeserializationSchema > > ----------------------------------------------------- > > The deserialization schema can be constructed using the included builder > > object SchemaRegistryDeserializationSchema.builder(..). > > When reading messages (and keys) we always have to specify the expected > > Class<T> or record Schema of the input records so that Flink can do any > > necessary conversion between the data on Kafka and what is expected. > > > > Required settings: > > - Class or Schema of the input messages depending on the data type > > - RegistryAddress parameter on the builder to establish the connection > > > > Optional settings: > > - Arbitrary SchemaRegistry client configuration using the setConfig > method > > - Key configuration for the consumed Kafka messages > > - Should only be specified when we want to read the keys as well into a > > (key, value) stream > > - Security configuration > > > > Example: > > KafkaDeserializationSchema<ItemTransaction> schema = > > SchemaRegistryDeserializationSchema > > .builder(ItemTransaction.class) > > .setRegistryAddress(registryAddress) > > .build(); > > FlinkKafkaConsumer<ItemTransaction> source = new > > FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId); > > > >