Thanks Dawid, Gyula

we will create a pull request then with the proposed changes were we can
further elaborate on the dependencies.

Regards,
Matyas

On Thu, Nov 14, 2019 at 11:10 AM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hi Gyula,
>
> First of all sorry for the delayed response.
>
> I see the argument for handling metadata from kafka headers. I haven't
> noticed the schema you are proposing is actually
> KafkaDeserializationSchema, which means it works only with Kafka.
>
> I still believe it would be really beneficial for the community to have a
> more general registry schema, but if we want to support the schema being
> encoded in the records metadata it would require a rework of the hierarchy
> of the (Connector)DeserializationSchemas. Which I guess should be discussed
> separately.
>
> Having said that I tend to agree with you it would make sense to add the
> thin wrapper as an initial version. Especially as you are suggesting to
> hide the implementation details behind a builder. Some comments on the
> design:
>
> * I would make it more explicit in the entry point this works with the
> Cloudera(Hortonworks) schema registry (Maybe sth like
> ClouderaRegistryDeserializationSchema.builder())
>
> * I would make it somehow more explicit that it constructs only *Kafka*
> (De)serializationSchema.
>
> * We should consider the dependencies design. This schema in contrast to
> the Confluent's, would pull in kafka consumer dependencies. If we add a
> schema that could deserialize data from other systems, we should not pull
> the kafka dependencies automatically.
>
> Best,
>
> Dawid
> On 06/11/2019 11:32, Gyula Fóra wrote:
>
> Hi Dawid,
>
> In general I agree if we can provide a completely unified way of handling
> this registries that would be great but I wonder if that makes sense in the
> long term. While the cloudera schema registry only supports Avro at the
> moment, it aims to support other formats in the future, and accessing this
> functionality will probably rely on using those specific
> serializer/deserializer implementations. This might not be a valid concern
> at this point though :)
>
> The reason why we went with wrapping the KafkaAvroDeserializer/Serializer
> directly now, is that it was super simple to do and the current SchemaCoder
> approach lacks a lot of flexibility/functionality.
>
> The schema itself doesn't always come from the serialized data (I believe
> in this case it is either stored in the serialized data or the kafka record
> metadata) and also we want to be able to handle kafka message keys. I guess
> these could be solved by making the deserialization logic Kafka specific
> and exposing the ConsumerRecord but that would completely change the
> current schemacoder related interfaces.
>
> Cheers,
> Gyula
>
> On Wed, Nov 6, 2019 at 10:17 AM Dawid Wysakowicz <dwysakow...@apache.org>
> wrote:
>
>> Hi Gyula,
>>
>> I did not want to discourage this contribution. I do agree we should
>> treat this connector equally to the confluent's schema registry. I just
>> wanted to express my uncertainty about general approach to new
>> connectors contributions. By no means I wanted to discourage this
>> contribution.
>>
>> As for the second point. Do you mean that you are wrapping the
>> KafkaAvroDeserializer/Serializer provided by cloudera/hortonworks schema
>> registry?
>>
>> Personally I would very much prefer using the SchemaCoder approach. All
>> schemas boil down to two steps. (De)Serializing the schema with registry
>> specific protocol + (de)serializing the record itself. I think the
>> approach with SchemaCoder has the benefit that we can optimize
>> instantiation of Avro's readers and writers in a unified way. It's also
>> easier to maintain as we have just a single point where the actual
>> record (de)serialization happens. It also provides a unified way of
>> instantiating the TypeInformation. Could you give some explanation why
>> would you prefer not to use this approach?
>>
>> Best,
>>
>> Dawid
>>
>> On 05/11/2019 14:48, Gyula Fóra wrote:
>> > Thanks Matyas for starting the discussion!
>> > I think this would be a very valuable addition to Flink as many
>> companies
>> > are already using the Hortonworks/Cloudera registry and it would enable
>> > them to connect to Flink easily.
>> >
>> > @Dawid:
>> > Regarding the implementation this a much more lightweight connector than
>> > what we have now for the Confluent registry and the PR you linked. This
>> > wraps the cloudera registry directly, providing a very thin wrapper +
>> some
>> > enhanced functionality regarding handling of Kafka messages keys.
>> >
>> > As for the question of main repo outside, I would prefer this to be
>> > included in the main repo, similar to the Confluent registry connector.
>> > Unless we decide to move all of these connectors out I would like to
>> take a
>> > consistent approach.
>> >
>> > Cheers,
>> > Gyula
>> >
>> >
>> > On Tue, Nov 5, 2019 at 1:44 PM Dawid Wysakowicz <dwysakow...@apache.org
>> >
>> > wrote:
>> >
>> >> Hi Matyas,
>> >>
>> >> I think this would be a valuable addition. You may reuse some of the
>> >> already available abstractions for writing avro deserialization schema
>> >> based on a schema registry (have a look at
>> RegistryDeserializationSchema
>> >> and SchemaCoderProvider). There is also an opened PR for adding a
>> >> similar serialization schema[1].
>> >>
>> >> The only concern is that I am not 100% sure what is the consensus on
>> >> which connectors do we want to adapt into the main repository and which
>> >> would we prefer to be hosted separately and included in the ecosystem
>> >> webpage[2] (that I hope will be published soon).
>> >>
>> >> Whatever option will be preferred I could help review the code.
>> >>
>> >> Best,
>> >>
>> >> Dawid
>> >>
>> >> [1] https://github.com/apache/flink/pull/8371
>> >>
>> >> [2]
>> >>
>> >>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-Flink-ecosystem-website-td27519.html
>> >>
>> >> On 05/11/2019 12:40, Őrhidi Mátyás wrote:
>> >>> Dear Flink Community!
>> >>>
>> >>> We have noticed a recent request for Hortonworks schema registry
>> support
>> >> (
>> >>> FLINK-14577 <https://issues.apache.org/jira/browse/FLINK-14577>). We
>> >> have
>> >>> an implementation for it already, and we would be happy to contribute
>> it
>> >> to
>> >>> Apache Flink.
>> >>>
>> >>> You can find the documentation below[1]. Let us know your thoughts!
>> >>>
>> >>> Best Regards,
>> >>> Matyas
>> >>>
>> >>> [1] Flink Avro Cloudera Registry User Guide
>> >>> -----------------------------------------------------------
>> >>>
>> >>> Add the following dependency to use the schema registry integration:
>> >>> <dependency>
>> >>>     <groupId>org.apache.flink</groupId>
>> >>>     <artifactId>flink-avro-cloudera-registry</artifactId>
>> >>>     <version>${flink.version}</version>
>> >>> </dependency>
>> >>>
>> >>>
>> >>> The schema registry can be plugged directly into the
>> FlinkKafkaConsumer
>> >> and
>> >>> FlinkKafkaProducer using the appropriate schema:
>> >>> -
>> >>>
>> >>
>> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistryDeserializationSchema
>> >>> -
>> >>>
>> >>
>> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistrySerializationSchema
>> >>>
>> >>> Supported types
>> >>> ----------------------
>> >>> - Avro Specific Record types
>> >>> - Avro Generic Records
>> >>> - Basic Java Data types: byte[], Byte, Integer, Short, Double, Float,
>> >> Long,
>> >>> String, Boolean
>> >>>
>> >>> SchemaRegistrySerializationSchema
>> >>> --------------------------------------------------
>> >>> The serialization schema can be constructed using the included builder
>> >>> object SchemaRegistrySerializationSchema.builder(..).
>> >>>
>> >>> Required settings:
>> >>> - Topic configuration when creating the builder. Can be static or
>> dynamic
>> >>> (extracted from the data)
>> >>> - RegistryAddress parameter on the builder to establish the connection
>> >>>
>> >>> Optional settings:
>> >>> - Arbitrary SchemaRegistry client configuration using the setConfig
>> >> method
>> >>> - Key configuration for the produced Kafka messages
>> >>>  - By specifying a KeySelector function that extracts the key from
>> each
>> >>> record
>> >>>  - Using a Tuple2 stream for (key, value) pairs directly
>> >>> - Security configuration
>> >>>
>> >>> Example:
>> >>> KafkaSerializationSchema<ItemTransaction> schema =
>> >>> SchemaRegistrySerializationSchema
>> >>>     .<ItemTransaction>builder(topic)
>> >>>     .setRegistryAddress(registryAddress)
>> >>>     .setKey(ItemTransaction::getItemId)
>> >>>     .build();
>> >>> FlinkKafkaProducer<ItemTransaction> sink = new
>> >>> FlinkKafkaProducer<>("dummy", schema, kafkaProps,
>> >>> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
>> >>>
>> >>> SchemaRegistryDeserializationSchema
>> >>> -----------------------------------------------------
>> >>> The deserialization schema can be constructed using the included
>> builder
>> >>> object SchemaRegistryDeserializationSchema.builder(..).
>> >>> When reading messages (and keys) we always have to specify the
>> expected
>> >>> Class<T> or record Schema of the input records so that Flink can do
>> any
>> >>> necessary conversion between the data on Kafka and what is expected.
>> >>>
>> >>> Required settings:
>> >>> - Class or Schema of the input messages depending on the data type
>> >>> - RegistryAddress parameter on the builder to establish the connection
>> >>>
>> >>> Optional settings:
>> >>> - Arbitrary SchemaRegistry client configuration using the setConfig
>> >> method
>> >>> - Key configuration for the consumed Kafka messages
>> >>>  - Should only be specified when we want to read the keys as well
>> into a
>> >>> (key, value) stream
>> >>> - Security configuration
>> >>>
>> >>> Example:
>> >>> KafkaDeserializationSchema<ItemTransaction> schema =
>> >>> SchemaRegistryDeserializationSchema
>> >>>    .builder(ItemTransaction.class)
>> >>>    .setRegistryAddress(registryAddress)
>> >>>    .build();
>> >>> FlinkKafkaConsumer<ItemTransaction> source = new
>> >>> FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId);
>> >>>
>> >>
>>
>>

Reply via email to