Thanks Matyas for starting the discussion!
I think this would be a very valuable addition to Flink as many companies
are already using the Hortonworks/Cloudera registry and it would enable
them to connect to Flink easily.

@Dawid:
Regarding the implementation this a much more lightweight connector than
what we have now for the Confluent registry and the PR you linked. This
wraps the cloudera registry directly, providing a very thin wrapper + some
enhanced functionality regarding handling of Kafka messages keys.

As for the question of main repo outside, I would prefer this to be
included in the main repo, similar to the Confluent registry connector.
Unless we decide to move all of these connectors out I would like to take a
consistent approach.

Cheers,
Gyula


On Tue, Nov 5, 2019 at 1:44 PM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hi Matyas,
>
> I think this would be a valuable addition. You may reuse some of the
> already available abstractions for writing avro deserialization schema
> based on a schema registry (have a look at RegistryDeserializationSchema
> and SchemaCoderProvider). There is also an opened PR for adding a
> similar serialization schema[1].
>
> The only concern is that I am not 100% sure what is the consensus on
> which connectors do we want to adapt into the main repository and which
> would we prefer to be hosted separately and included in the ecosystem
> webpage[2] (that I hope will be published soon).
>
> Whatever option will be preferred I could help review the code.
>
> Best,
>
> Dawid
>
> [1] https://github.com/apache/flink/pull/8371
>
> [2]
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-Flink-ecosystem-website-td27519.html
>
> On 05/11/2019 12:40, Őrhidi Mátyás wrote:
> > Dear Flink Community!
> >
> > We have noticed a recent request for Hortonworks schema registry support
> (
> > FLINK-14577 <https://issues.apache.org/jira/browse/FLINK-14577>). We
> have
> > an implementation for it already, and we would be happy to contribute it
> to
> > Apache Flink.
> >
> > You can find the documentation below[1]. Let us know your thoughts!
> >
> > Best Regards,
> > Matyas
> >
> > [1] Flink Avro Cloudera Registry User Guide
> > -----------------------------------------------------------
> >
> > Add the following dependency to use the schema registry integration:
> > <dependency>
> >     <groupId>org.apache.flink</groupId>
> >     <artifactId>flink-avro-cloudera-registry</artifactId>
> >     <version>${flink.version}</version>
> > </dependency>
> >
> >
> > The schema registry can be plugged directly into the FlinkKafkaConsumer
> and
> > FlinkKafkaProducer using the appropriate schema:
> > -
> >
> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistryDeserializationSchema
> > -
> >
> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistrySerializationSchema
> >
> >
> > Supported types
> > ----------------------
> > - Avro Specific Record types
> > - Avro Generic Records
> > - Basic Java Data types: byte[], Byte, Integer, Short, Double, Float,
> Long,
> > String, Boolean
> >
> > SchemaRegistrySerializationSchema
> > --------------------------------------------------
> > The serialization schema can be constructed using the included builder
> > object SchemaRegistrySerializationSchema.builder(..).
> >
> > Required settings:
> > - Topic configuration when creating the builder. Can be static or dynamic
> > (extracted from the data)
> > - RegistryAddress parameter on the builder to establish the connection
> >
> > Optional settings:
> > - Arbitrary SchemaRegistry client configuration using the setConfig
> method
> > - Key configuration for the produced Kafka messages
> >  - By specifying a KeySelector function that extracts the key from each
> > record
> >  - Using a Tuple2 stream for (key, value) pairs directly
> > - Security configuration
> >
> > Example:
> > KafkaSerializationSchema<ItemTransaction> schema =
> > SchemaRegistrySerializationSchema
> >     .<ItemTransaction>builder(topic)
> >     .setRegistryAddress(registryAddress)
> >     .setKey(ItemTransaction::getItemId)
> >     .build();
> > FlinkKafkaProducer<ItemTransaction> sink = new
> > FlinkKafkaProducer<>("dummy", schema, kafkaProps,
> > FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
> >
> > SchemaRegistryDeserializationSchema
> > -----------------------------------------------------
> > The deserialization schema can be constructed using the included builder
> > object SchemaRegistryDeserializationSchema.builder(..).
> > When reading messages (and keys) we always have to specify the expected
> > Class<T> or record Schema of the input records so that Flink can do any
> > necessary conversion between the data on Kafka and what is expected.
> >
> > Required settings:
> > - Class or Schema of the input messages depending on the data type
> > - RegistryAddress parameter on the builder to establish the connection
> >
> > Optional settings:
> > - Arbitrary SchemaRegistry client configuration using the setConfig
> method
> > - Key configuration for the consumed Kafka messages
> >  - Should only be specified when we want to read the keys as well into a
> > (key, value) stream
> > - Security configuration
> >
> > Example:
> > KafkaDeserializationSchema<ItemTransaction> schema =
> > SchemaRegistryDeserializationSchema
> >    .builder(ItemTransaction.class)
> >    .setRegistryAddress(registryAddress)
> >    .build();
> > FlinkKafkaConsumer<ItemTransaction> source = new
> > FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId);
> >
>
>

Reply via email to