Thanks Dawid, Gyula we will create a pull request then with the proposed changes were we can further elaborate on the dependencies.
Regards, Matyas On Thu, Nov 14, 2019 at 11:10 AM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi Gyula, > > First of all sorry for the delayed response. > > I see the argument for handling metadata from kafka headers. I haven't > noticed the schema you are proposing is actually > KafkaDeserializationSchema, which means it works only with Kafka. > > I still believe it would be really beneficial for the community to have a > more general registry schema, but if we want to support the schema being > encoded in the records metadata it would require a rework of the hierarchy > of the (Connector)DeserializationSchemas. Which I guess should be discussed > separately. > > Having said that I tend to agree with you it would make sense to add the > thin wrapper as an initial version. Especially as you are suggesting to > hide the implementation details behind a builder. Some comments on the > design: > > * I would make it more explicit in the entry point this works with the > Cloudera(Hortonworks) schema registry (Maybe sth like > ClouderaRegistryDeserializationSchema.builder()) > > * I would make it somehow more explicit that it constructs only *Kafka* > (De)serializationSchema. > > * We should consider the dependencies design. This schema in contrast to > the Confluent's, would pull in kafka consumer dependencies. If we add a > schema that could deserialize data from other systems, we should not pull > the kafka dependencies automatically. > > Best, > > Dawid > On 06/11/2019 11:32, Gyula Fóra wrote: > > Hi Dawid, > > In general I agree if we can provide a completely unified way of handling > this registries that would be great but I wonder if that makes sense in the > long term. While the cloudera schema registry only supports Avro at the > moment, it aims to support other formats in the future, and accessing this > functionality will probably rely on using those specific > serializer/deserializer implementations. This might not be a valid concern > at this point though :) > > The reason why we went with wrapping the KafkaAvroDeserializer/Serializer > directly now, is that it was super simple to do and the current SchemaCoder > approach lacks a lot of flexibility/functionality. > > The schema itself doesn't always come from the serialized data (I believe > in this case it is either stored in the serialized data or the kafka record > metadata) and also we want to be able to handle kafka message keys. I guess > these could be solved by making the deserialization logic Kafka specific > and exposing the ConsumerRecord but that would completely change the > current schemacoder related interfaces. > > Cheers, > Gyula > > On Wed, Nov 6, 2019 at 10:17 AM Dawid Wysakowicz <dwysakow...@apache.org> > wrote: > >> Hi Gyula, >> >> I did not want to discourage this contribution. I do agree we should >> treat this connector equally to the confluent's schema registry. I just >> wanted to express my uncertainty about general approach to new >> connectors contributions. By no means I wanted to discourage this >> contribution. >> >> As for the second point. Do you mean that you are wrapping the >> KafkaAvroDeserializer/Serializer provided by cloudera/hortonworks schema >> registry? >> >> Personally I would very much prefer using the SchemaCoder approach. All >> schemas boil down to two steps. (De)Serializing the schema with registry >> specific protocol + (de)serializing the record itself. I think the >> approach with SchemaCoder has the benefit that we can optimize >> instantiation of Avro's readers and writers in a unified way. It's also >> easier to maintain as we have just a single point where the actual >> record (de)serialization happens. It also provides a unified way of >> instantiating the TypeInformation. Could you give some explanation why >> would you prefer not to use this approach? >> >> Best, >> >> Dawid >> >> On 05/11/2019 14:48, Gyula Fóra wrote: >> > Thanks Matyas for starting the discussion! >> > I think this would be a very valuable addition to Flink as many >> companies >> > are already using the Hortonworks/Cloudera registry and it would enable >> > them to connect to Flink easily. >> > >> > @Dawid: >> > Regarding the implementation this a much more lightweight connector than >> > what we have now for the Confluent registry and the PR you linked. This >> > wraps the cloudera registry directly, providing a very thin wrapper + >> some >> > enhanced functionality regarding handling of Kafka messages keys. >> > >> > As for the question of main repo outside, I would prefer this to be >> > included in the main repo, similar to the Confluent registry connector. >> > Unless we decide to move all of these connectors out I would like to >> take a >> > consistent approach. >> > >> > Cheers, >> > Gyula >> > >> > >> > On Tue, Nov 5, 2019 at 1:44 PM Dawid Wysakowicz <dwysakow...@apache.org >> > >> > wrote: >> > >> >> Hi Matyas, >> >> >> >> I think this would be a valuable addition. You may reuse some of the >> >> already available abstractions for writing avro deserialization schema >> >> based on a schema registry (have a look at >> RegistryDeserializationSchema >> >> and SchemaCoderProvider). There is also an opened PR for adding a >> >> similar serialization schema[1]. >> >> >> >> The only concern is that I am not 100% sure what is the consensus on >> >> which connectors do we want to adapt into the main repository and which >> >> would we prefer to be hosted separately and included in the ecosystem >> >> webpage[2] (that I hope will be published soon). >> >> >> >> Whatever option will be preferred I could help review the code. >> >> >> >> Best, >> >> >> >> Dawid >> >> >> >> [1] https://github.com/apache/flink/pull/8371 >> >> >> >> [2] >> >> >> >> >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-Flink-ecosystem-website-td27519.html >> >> >> >> On 05/11/2019 12:40, Őrhidi Mátyás wrote: >> >>> Dear Flink Community! >> >>> >> >>> We have noticed a recent request for Hortonworks schema registry >> support >> >> ( >> >>> FLINK-14577 <https://issues.apache.org/jira/browse/FLINK-14577>). We >> >> have >> >>> an implementation for it already, and we would be happy to contribute >> it >> >> to >> >>> Apache Flink. >> >>> >> >>> You can find the documentation below[1]. Let us know your thoughts! >> >>> >> >>> Best Regards, >> >>> Matyas >> >>> >> >>> [1] Flink Avro Cloudera Registry User Guide >> >>> ----------------------------------------------------------- >> >>> >> >>> Add the following dependency to use the schema registry integration: >> >>> <dependency> >> >>> <groupId>org.apache.flink</groupId> >> >>> <artifactId>flink-avro-cloudera-registry</artifactId> >> >>> <version>${flink.version}</version> >> >>> </dependency> >> >>> >> >>> >> >>> The schema registry can be plugged directly into the >> FlinkKafkaConsumer >> >> and >> >>> FlinkKafkaProducer using the appropriate schema: >> >>> - >> >>> >> >> >> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistryDeserializationSchema >> >>> - >> >>> >> >> >> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistrySerializationSchema >> >>> >> >>> Supported types >> >>> ---------------------- >> >>> - Avro Specific Record types >> >>> - Avro Generic Records >> >>> - Basic Java Data types: byte[], Byte, Integer, Short, Double, Float, >> >> Long, >> >>> String, Boolean >> >>> >> >>> SchemaRegistrySerializationSchema >> >>> -------------------------------------------------- >> >>> The serialization schema can be constructed using the included builder >> >>> object SchemaRegistrySerializationSchema.builder(..). >> >>> >> >>> Required settings: >> >>> - Topic configuration when creating the builder. Can be static or >> dynamic >> >>> (extracted from the data) >> >>> - RegistryAddress parameter on the builder to establish the connection >> >>> >> >>> Optional settings: >> >>> - Arbitrary SchemaRegistry client configuration using the setConfig >> >> method >> >>> - Key configuration for the produced Kafka messages >> >>> - By specifying a KeySelector function that extracts the key from >> each >> >>> record >> >>> - Using a Tuple2 stream for (key, value) pairs directly >> >>> - Security configuration >> >>> >> >>> Example: >> >>> KafkaSerializationSchema<ItemTransaction> schema = >> >>> SchemaRegistrySerializationSchema >> >>> .<ItemTransaction>builder(topic) >> >>> .setRegistryAddress(registryAddress) >> >>> .setKey(ItemTransaction::getItemId) >> >>> .build(); >> >>> FlinkKafkaProducer<ItemTransaction> sink = new >> >>> FlinkKafkaProducer<>("dummy", schema, kafkaProps, >> >>> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); >> >>> >> >>> SchemaRegistryDeserializationSchema >> >>> ----------------------------------------------------- >> >>> The deserialization schema can be constructed using the included >> builder >> >>> object SchemaRegistryDeserializationSchema.builder(..). >> >>> When reading messages (and keys) we always have to specify the >> expected >> >>> Class<T> or record Schema of the input records so that Flink can do >> any >> >>> necessary conversion between the data on Kafka and what is expected. >> >>> >> >>> Required settings: >> >>> - Class or Schema of the input messages depending on the data type >> >>> - RegistryAddress parameter on the builder to establish the connection >> >>> >> >>> Optional settings: >> >>> - Arbitrary SchemaRegistry client configuration using the setConfig >> >> method >> >>> - Key configuration for the consumed Kafka messages >> >>> - Should only be specified when we want to read the keys as well >> into a >> >>> (key, value) stream >> >>> - Security configuration >> >>> >> >>> Example: >> >>> KafkaDeserializationSchema<ItemTransaction> schema = >> >>> SchemaRegistryDeserializationSchema >> >>> .builder(ItemTransaction.class) >> >>> .setRegistryAddress(registryAddress) >> >>> .build(); >> >>> FlinkKafkaConsumer<ItemTransaction> source = new >> >>> FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId); >> >>> >> >> >> >>