Hi Matyas, I think this would be a valuable addition. You may reuse some of the already available abstractions for writing avro deserialization schema based on a schema registry (have a look at RegistryDeserializationSchema and SchemaCoderProvider). There is also an opened PR for adding a similar serialization schema[1].
The only concern is that I am not 100% sure what is the consensus on which connectors do we want to adapt into the main repository and which would we prefer to be hosted separately and included in the ecosystem webpage[2] (that I hope will be published soon). Whatever option will be preferred I could help review the code. Best, Dawid [1] https://github.com/apache/flink/pull/8371 [2] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-Flink-ecosystem-website-td27519.html On 05/11/2019 12:40, Őrhidi Mátyás wrote: > Dear Flink Community! > > We have noticed a recent request for Hortonworks schema registry support ( > FLINK-14577 <https://issues.apache.org/jira/browse/FLINK-14577>). We have > an implementation for it already, and we would be happy to contribute it to > Apache Flink. > > You can find the documentation below[1]. Let us know your thoughts! > > Best Regards, > Matyas > > [1] Flink Avro Cloudera Registry User Guide > ----------------------------------------------------------- > > Add the following dependency to use the schema registry integration: > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-avro-cloudera-registry</artifactId> > <version>${flink.version}</version> > </dependency> > > > The schema registry can be plugged directly into the FlinkKafkaConsumer and > FlinkKafkaProducer using the appropriate schema: > - > org.apache.flink.formats.avro.registry.cloudera.SchemaRegistryDeserializationSchema > - > org.apache.flink.formats.avro.registry.cloudera.SchemaRegistrySerializationSchema > > > Supported types > ---------------------- > - Avro Specific Record types > - Avro Generic Records > - Basic Java Data types: byte[], Byte, Integer, Short, Double, Float, Long, > String, Boolean > > SchemaRegistrySerializationSchema > -------------------------------------------------- > The serialization schema can be constructed using the included builder > object SchemaRegistrySerializationSchema.builder(..). > > Required settings: > - Topic configuration when creating the builder. Can be static or dynamic > (extracted from the data) > - RegistryAddress parameter on the builder to establish the connection > > Optional settings: > - Arbitrary SchemaRegistry client configuration using the setConfig method > - Key configuration for the produced Kafka messages > - By specifying a KeySelector function that extracts the key from each > record > - Using a Tuple2 stream for (key, value) pairs directly > - Security configuration > > Example: > KafkaSerializationSchema<ItemTransaction> schema = > SchemaRegistrySerializationSchema > .<ItemTransaction>builder(topic) > .setRegistryAddress(registryAddress) > .setKey(ItemTransaction::getItemId) > .build(); > FlinkKafkaProducer<ItemTransaction> sink = new > FlinkKafkaProducer<>("dummy", schema, kafkaProps, > FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); > > SchemaRegistryDeserializationSchema > ----------------------------------------------------- > The deserialization schema can be constructed using the included builder > object SchemaRegistryDeserializationSchema.builder(..). > When reading messages (and keys) we always have to specify the expected > Class<T> or record Schema of the input records so that Flink can do any > necessary conversion between the data on Kafka and what is expected. > > Required settings: > - Class or Schema of the input messages depending on the data type > - RegistryAddress parameter on the builder to establish the connection > > Optional settings: > - Arbitrary SchemaRegistry client configuration using the setConfig method > - Key configuration for the consumed Kafka messages > - Should only be specified when we want to read the keys as well into a > (key, value) stream > - Security configuration > > Example: > KafkaDeserializationSchema<ItemTransaction> schema = > SchemaRegistryDeserializationSchema > .builder(ItemTransaction.class) > .setRegistryAddress(registryAddress) > .build(); > FlinkKafkaConsumer<ItemTransaction> source = new > FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId); >
signature.asc
Description: OpenPGP digital signature