Hi Gyula, I did not want to discourage this contribution. I do agree we should treat this connector equally to the confluent's schema registry. I just wanted to express my uncertainty about general approach to new connectors contributions. By no means I wanted to discourage this contribution.
As for the second point. Do you mean that you are wrapping the KafkaAvroDeserializer/Serializer provided by cloudera/hortonworks schema registry? Personally I would very much prefer using the SchemaCoder approach. All schemas boil down to two steps. (De)Serializing the schema with registry specific protocol + (de)serializing the record itself. I think the approach with SchemaCoder has the benefit that we can optimize instantiation of Avro's readers and writers in a unified way. It's also easier to maintain as we have just a single point where the actual record (de)serialization happens. It also provides a unified way of instantiating the TypeInformation. Could you give some explanation why would you prefer not to use this approach? Best, Dawid On 05/11/2019 14:48, Gyula Fóra wrote: > Thanks Matyas for starting the discussion! > I think this would be a very valuable addition to Flink as many companies > are already using the Hortonworks/Cloudera registry and it would enable > them to connect to Flink easily. > > @Dawid: > Regarding the implementation this a much more lightweight connector than > what we have now for the Confluent registry and the PR you linked. This > wraps the cloudera registry directly, providing a very thin wrapper + some > enhanced functionality regarding handling of Kafka messages keys. > > As for the question of main repo outside, I would prefer this to be > included in the main repo, similar to the Confluent registry connector. > Unless we decide to move all of these connectors out I would like to take a > consistent approach. > > Cheers, > Gyula > > > On Tue, Nov 5, 2019 at 1:44 PM Dawid Wysakowicz <dwysakow...@apache.org> > wrote: > >> Hi Matyas, >> >> I think this would be a valuable addition. You may reuse some of the >> already available abstractions for writing avro deserialization schema >> based on a schema registry (have a look at RegistryDeserializationSchema >> and SchemaCoderProvider). There is also an opened PR for adding a >> similar serialization schema[1]. >> >> The only concern is that I am not 100% sure what is the consensus on >> which connectors do we want to adapt into the main repository and which >> would we prefer to be hosted separately and included in the ecosystem >> webpage[2] (that I hope will be published soon). >> >> Whatever option will be preferred I could help review the code. >> >> Best, >> >> Dawid >> >> [1] https://github.com/apache/flink/pull/8371 >> >> [2] >> >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-Flink-ecosystem-website-td27519.html >> >> On 05/11/2019 12:40, Őrhidi Mátyás wrote: >>> Dear Flink Community! >>> >>> We have noticed a recent request for Hortonworks schema registry support >> ( >>> FLINK-14577 <https://issues.apache.org/jira/browse/FLINK-14577>). We >> have >>> an implementation for it already, and we would be happy to contribute it >> to >>> Apache Flink. >>> >>> You can find the documentation below[1]. Let us know your thoughts! >>> >>> Best Regards, >>> Matyas >>> >>> [1] Flink Avro Cloudera Registry User Guide >>> ----------------------------------------------------------- >>> >>> Add the following dependency to use the schema registry integration: >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-avro-cloudera-registry</artifactId> >>> <version>${flink.version}</version> >>> </dependency> >>> >>> >>> The schema registry can be plugged directly into the FlinkKafkaConsumer >> and >>> FlinkKafkaProducer using the appropriate schema: >>> - >>> >> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistryDeserializationSchema >>> - >>> >> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistrySerializationSchema >>> >>> Supported types >>> ---------------------- >>> - Avro Specific Record types >>> - Avro Generic Records >>> - Basic Java Data types: byte[], Byte, Integer, Short, Double, Float, >> Long, >>> String, Boolean >>> >>> SchemaRegistrySerializationSchema >>> -------------------------------------------------- >>> The serialization schema can be constructed using the included builder >>> object SchemaRegistrySerializationSchema.builder(..). >>> >>> Required settings: >>> - Topic configuration when creating the builder. Can be static or dynamic >>> (extracted from the data) >>> - RegistryAddress parameter on the builder to establish the connection >>> >>> Optional settings: >>> - Arbitrary SchemaRegistry client configuration using the setConfig >> method >>> - Key configuration for the produced Kafka messages >>> - By specifying a KeySelector function that extracts the key from each >>> record >>> - Using a Tuple2 stream for (key, value) pairs directly >>> - Security configuration >>> >>> Example: >>> KafkaSerializationSchema<ItemTransaction> schema = >>> SchemaRegistrySerializationSchema >>> .<ItemTransaction>builder(topic) >>> .setRegistryAddress(registryAddress) >>> .setKey(ItemTransaction::getItemId) >>> .build(); >>> FlinkKafkaProducer<ItemTransaction> sink = new >>> FlinkKafkaProducer<>("dummy", schema, kafkaProps, >>> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); >>> >>> SchemaRegistryDeserializationSchema >>> ----------------------------------------------------- >>> The deserialization schema can be constructed using the included builder >>> object SchemaRegistryDeserializationSchema.builder(..). >>> When reading messages (and keys) we always have to specify the expected >>> Class<T> or record Schema of the input records so that Flink can do any >>> necessary conversion between the data on Kafka and what is expected. >>> >>> Required settings: >>> - Class or Schema of the input messages depending on the data type >>> - RegistryAddress parameter on the builder to establish the connection >>> >>> Optional settings: >>> - Arbitrary SchemaRegistry client configuration using the setConfig >> method >>> - Key configuration for the consumed Kafka messages >>> - Should only be specified when we want to read the keys as well into a >>> (key, value) stream >>> - Security configuration >>> >>> Example: >>> KafkaDeserializationSchema<ItemTransaction> schema = >>> SchemaRegistryDeserializationSchema >>> .builder(ItemTransaction.class) >>> .setRegistryAddress(registryAddress) >>> .build(); >>> FlinkKafkaConsumer<ItemTransaction> source = new >>> FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId); >>> >>
signature.asc
Description: OpenPGP digital signature