Hi Gyula,

I did not want to discourage this contribution. I do agree we should
treat this connector equally to the confluent's schema registry. I just
wanted to express my uncertainty about general approach to new
connectors contributions. By no means I wanted to discourage this
contribution.

As for the second point. Do you mean that you are wrapping the
KafkaAvroDeserializer/Serializer provided by cloudera/hortonworks schema
registry?

Personally I would very much prefer using the SchemaCoder approach. All
schemas boil down to two steps. (De)Serializing the schema with registry
specific protocol + (de)serializing the record itself. I think the
approach with SchemaCoder has the benefit that we can optimize
instantiation of Avro's readers and writers in a unified way. It's also
easier to maintain as we have just a single point where the actual
record (de)serialization happens. It also provides a unified way of
instantiating the TypeInformation. Could you give some explanation why
would you prefer not to use this approach?

Best,

Dawid

On 05/11/2019 14:48, Gyula Fóra wrote:
> Thanks Matyas for starting the discussion!
> I think this would be a very valuable addition to Flink as many companies
> are already using the Hortonworks/Cloudera registry and it would enable
> them to connect to Flink easily.
>
> @Dawid:
> Regarding the implementation this a much more lightweight connector than
> what we have now for the Confluent registry and the PR you linked. This
> wraps the cloudera registry directly, providing a very thin wrapper + some
> enhanced functionality regarding handling of Kafka messages keys.
>
> As for the question of main repo outside, I would prefer this to be
> included in the main repo, similar to the Confluent registry connector.
> Unless we decide to move all of these connectors out I would like to take a
> consistent approach.
>
> Cheers,
> Gyula
>
>
> On Tue, Nov 5, 2019 at 1:44 PM Dawid Wysakowicz <dwysakow...@apache.org>
> wrote:
>
>> Hi Matyas,
>>
>> I think this would be a valuable addition. You may reuse some of the
>> already available abstractions for writing avro deserialization schema
>> based on a schema registry (have a look at RegistryDeserializationSchema
>> and SchemaCoderProvider). There is also an opened PR for adding a
>> similar serialization schema[1].
>>
>> The only concern is that I am not 100% sure what is the consensus on
>> which connectors do we want to adapt into the main repository and which
>> would we prefer to be hosted separately and included in the ecosystem
>> webpage[2] (that I hope will be published soon).
>>
>> Whatever option will be preferred I could help review the code.
>>
>> Best,
>>
>> Dawid
>>
>> [1] https://github.com/apache/flink/pull/8371
>>
>> [2]
>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-Flink-ecosystem-website-td27519.html
>>
>> On 05/11/2019 12:40, Őrhidi Mátyás wrote:
>>> Dear Flink Community!
>>>
>>> We have noticed a recent request for Hortonworks schema registry support
>> (
>>> FLINK-14577 <https://issues.apache.org/jira/browse/FLINK-14577>). We
>> have
>>> an implementation for it already, and we would be happy to contribute it
>> to
>>> Apache Flink.
>>>
>>> You can find the documentation below[1]. Let us know your thoughts!
>>>
>>> Best Regards,
>>> Matyas
>>>
>>> [1] Flink Avro Cloudera Registry User Guide
>>> -----------------------------------------------------------
>>>
>>> Add the following dependency to use the schema registry integration:
>>> <dependency>
>>>     <groupId>org.apache.flink</groupId>
>>>     <artifactId>flink-avro-cloudera-registry</artifactId>
>>>     <version>${flink.version}</version>
>>> </dependency>
>>>
>>>
>>> The schema registry can be plugged directly into the FlinkKafkaConsumer
>> and
>>> FlinkKafkaProducer using the appropriate schema:
>>> -
>>>
>> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistryDeserializationSchema
>>> -
>>>
>> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistrySerializationSchema
>>>
>>> Supported types
>>> ----------------------
>>> - Avro Specific Record types
>>> - Avro Generic Records
>>> - Basic Java Data types: byte[], Byte, Integer, Short, Double, Float,
>> Long,
>>> String, Boolean
>>>
>>> SchemaRegistrySerializationSchema
>>> --------------------------------------------------
>>> The serialization schema can be constructed using the included builder
>>> object SchemaRegistrySerializationSchema.builder(..).
>>>
>>> Required settings:
>>> - Topic configuration when creating the builder. Can be static or dynamic
>>> (extracted from the data)
>>> - RegistryAddress parameter on the builder to establish the connection
>>>
>>> Optional settings:
>>> - Arbitrary SchemaRegistry client configuration using the setConfig
>> method
>>> - Key configuration for the produced Kafka messages
>>>  - By specifying a KeySelector function that extracts the key from each
>>> record
>>>  - Using a Tuple2 stream for (key, value) pairs directly
>>> - Security configuration
>>>
>>> Example:
>>> KafkaSerializationSchema<ItemTransaction> schema =
>>> SchemaRegistrySerializationSchema
>>>     .<ItemTransaction>builder(topic)
>>>     .setRegistryAddress(registryAddress)
>>>     .setKey(ItemTransaction::getItemId)
>>>     .build();
>>> FlinkKafkaProducer<ItemTransaction> sink = new
>>> FlinkKafkaProducer<>("dummy", schema, kafkaProps,
>>> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
>>>
>>> SchemaRegistryDeserializationSchema
>>> -----------------------------------------------------
>>> The deserialization schema can be constructed using the included builder
>>> object SchemaRegistryDeserializationSchema.builder(..).
>>> When reading messages (and keys) we always have to specify the expected
>>> Class<T> or record Schema of the input records so that Flink can do any
>>> necessary conversion between the data on Kafka and what is expected.
>>>
>>> Required settings:
>>> - Class or Schema of the input messages depending on the data type
>>> - RegistryAddress parameter on the builder to establish the connection
>>>
>>> Optional settings:
>>> - Arbitrary SchemaRegistry client configuration using the setConfig
>> method
>>> - Key configuration for the consumed Kafka messages
>>>  - Should only be specified when we want to read the keys as well into a
>>> (key, value) stream
>>> - Security configuration
>>>
>>> Example:
>>> KafkaDeserializationSchema<ItemTransaction> schema =
>>> SchemaRegistryDeserializationSchema
>>>    .builder(ItemTransaction.class)
>>>    .setRegistryAddress(registryAddress)
>>>    .build();
>>> FlinkKafkaConsumer<ItemTransaction> source = new
>>> FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId);
>>>
>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to