Dear Flink Community! We have noticed a recent request for Hortonworks schema registry support ( FLINK-14577 <https://issues.apache.org/jira/browse/FLINK-14577>). We have an implementation for it already, and we would be happy to contribute it to Apache Flink.
You can find the documentation below[1]. Let us know your thoughts! Best Regards, Matyas [1] Flink Avro Cloudera Registry User Guide ----------------------------------------------------------- Add the following dependency to use the schema registry integration: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-avro-cloudera-registry</artifactId> <version>${flink.version}</version> </dependency> The schema registry can be plugged directly into the FlinkKafkaConsumer and FlinkKafkaProducer using the appropriate schema: - org.apache.flink.formats.avro.registry.cloudera.SchemaRegistryDeserializationSchema - org.apache.flink.formats.avro.registry.cloudera.SchemaRegistrySerializationSchema Supported types ---------------------- - Avro Specific Record types - Avro Generic Records - Basic Java Data types: byte[], Byte, Integer, Short, Double, Float, Long, String, Boolean SchemaRegistrySerializationSchema -------------------------------------------------- The serialization schema can be constructed using the included builder object SchemaRegistrySerializationSchema.builder(..). Required settings: - Topic configuration when creating the builder. Can be static or dynamic (extracted from the data) - RegistryAddress parameter on the builder to establish the connection Optional settings: - Arbitrary SchemaRegistry client configuration using the setConfig method - Key configuration for the produced Kafka messages - By specifying a KeySelector function that extracts the key from each record - Using a Tuple2 stream for (key, value) pairs directly - Security configuration Example: KafkaSerializationSchema<ItemTransaction> schema = SchemaRegistrySerializationSchema .<ItemTransaction>builder(topic) .setRegistryAddress(registryAddress) .setKey(ItemTransaction::getItemId) .build(); FlinkKafkaProducer<ItemTransaction> sink = new FlinkKafkaProducer<>("dummy", schema, kafkaProps, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); SchemaRegistryDeserializationSchema ----------------------------------------------------- The deserialization schema can be constructed using the included builder object SchemaRegistryDeserializationSchema.builder(..). When reading messages (and keys) we always have to specify the expected Class<T> or record Schema of the input records so that Flink can do any necessary conversion between the data on Kafka and what is expected. Required settings: - Class or Schema of the input messages depending on the data type - RegistryAddress parameter on the builder to establish the connection Optional settings: - Arbitrary SchemaRegistry client configuration using the setConfig method - Key configuration for the consumed Kafka messages - Should only be specified when we want to read the keys as well into a (key, value) stream - Security configuration Example: KafkaDeserializationSchema<ItemTransaction> schema = SchemaRegistryDeserializationSchema .builder(ItemTransaction.class) .setRegistryAddress(registryAddress) .build(); FlinkKafkaConsumer<ItemTransaction> source = new FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId);