Dear Flink Community!

We have noticed a recent request for Hortonworks schema registry support (
FLINK-14577 <https://issues.apache.org/jira/browse/FLINK-14577>). We have
an implementation for it already, and we would be happy to contribute it to
Apache Flink.

You can find the documentation below[1]. Let us know your thoughts!

Best Regards,
Matyas

[1] Flink Avro Cloudera Registry User Guide
-----------------------------------------------------------

Add the following dependency to use the schema registry integration:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-avro-cloudera-registry</artifactId>
    <version>${flink.version}</version>
</dependency>


The schema registry can be plugged directly into the FlinkKafkaConsumer and
FlinkKafkaProducer using the appropriate schema:
-
org.apache.flink.formats.avro.registry.cloudera.SchemaRegistryDeserializationSchema
-
org.apache.flink.formats.avro.registry.cloudera.SchemaRegistrySerializationSchema


Supported types
----------------------
- Avro Specific Record types
- Avro Generic Records
- Basic Java Data types: byte[], Byte, Integer, Short, Double, Float, Long,
String, Boolean

SchemaRegistrySerializationSchema
--------------------------------------------------
The serialization schema can be constructed using the included builder
object SchemaRegistrySerializationSchema.builder(..).

Required settings:
- Topic configuration when creating the builder. Can be static or dynamic
(extracted from the data)
- RegistryAddress parameter on the builder to establish the connection

Optional settings:
- Arbitrary SchemaRegistry client configuration using the setConfig method
- Key configuration for the produced Kafka messages
 - By specifying a KeySelector function that extracts the key from each
record
 - Using a Tuple2 stream for (key, value) pairs directly
- Security configuration

Example:
KafkaSerializationSchema<ItemTransaction> schema =
SchemaRegistrySerializationSchema
    .<ItemTransaction>builder(topic)
    .setRegistryAddress(registryAddress)
    .setKey(ItemTransaction::getItemId)
    .build();
FlinkKafkaProducer<ItemTransaction> sink = new
FlinkKafkaProducer<>("dummy", schema, kafkaProps,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);

SchemaRegistryDeserializationSchema
-----------------------------------------------------
The deserialization schema can be constructed using the included builder
object SchemaRegistryDeserializationSchema.builder(..).
When reading messages (and keys) we always have to specify the expected
Class<T> or record Schema of the input records so that Flink can do any
necessary conversion between the data on Kafka and what is expected.

Required settings:
- Class or Schema of the input messages depending on the data type
- RegistryAddress parameter on the builder to establish the connection

Optional settings:
- Arbitrary SchemaRegistry client configuration using the setConfig method
- Key configuration for the consumed Kafka messages
 - Should only be specified when we want to read the keys as well into a
(key, value) stream
- Security configuration

Example:
KafkaDeserializationSchema<ItemTransaction> schema =
SchemaRegistryDeserializationSchema
   .builder(ItemTransaction.class)
   .setRegistryAddress(registryAddress)
   .build();
FlinkKafkaConsumer<ItemTransaction> source = new
FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId);

Reply via email to