Hi Anuj,

you should always avoid having records with different schemas in the same
topic/dataset. You will break the compatibility features of the schema
registry and your consumer/producer code is always hard to maintain.

A common and scalable way to avoid it is to use some kind of envelope
format.

{
  "namespace": "example",
  "name": "Envelope",
  "type": "record",
  "fields": [
    {
      "name": "type1",
      "type": ["null", {
        "type": "record",
        "fields": [ ... ]
      }],
      "default": null
    },
    {
      "name": "type2",
      "type": ["null", {
        "type": "record",
        "fields": [ ... ]
      }],
      "default": null
    }
  ]
}

This envelope is evolvable (arbitrary addition/removal of wrapped types,
which by themselves can be evolved), and adds only a little overhead (1
byte per subtype). The downside is that you cannot enforce that exactly one
of the subtypes is set.

This schema is fully compatible with the schema registry, so no need to
parse anything manually.

This schema can easily be used with Parquet. If you can't change the input
format anymore, you can at least use that approach on your output.

Best,

Arvid

On Thu, Jan 16, 2020 at 2:53 PM aj <ajainje...@gmail.com> wrote:

> Hi All,
>
> I have a use case where I am getting a different set of Avro records in
> Kafka. I am using the schema registry to store Avro schema. One topic can
> also have different types of records.
>
> Now I have created a GenericRecord Stream using kafkaAvroDeseralizer by
> defining custom
> Deserializer class like this
>
> @Override
> public GenericRecord deserialize(
> byte[] messageKey, byte[] message, String topic, int partition, long
> offset) {
> checkInitialized();
> return (GenericRecord) inner.deserialize(topic, message);
> }
>
> private void checkInitialized() {
> if (inner == null) {
> Map<String, Object> props = new HashMap<>();
> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> registryUrl);
> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
> SchemaRegistryClient client =
> new CachedSchemaRegistryClient(
> registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
> inner = new KafkaAvroDeserializer(client, props);
> }
> }
>
>
> And this is my consumer code :
>
> DataStreamSource<GenericRecord> input = env
>         .addSource(
>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>                         new
> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>                         config).setStartFromEarliest());
>
> Now I want to write this stream partition on
> *event_name="a"/year=/month=/day=* in parquet format so that I can expose
> hive tables directly on top of this data.
> event_name is common field for all types of records that I am getting in
> Kafka.
> I am stuck as parquet writer needs a schema to write but my different
> records have different schemas  So how do I write this stream in s3 in
> above partition format.
>
>
> Thanks & Regards,
> Anuj Jain
>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>

Reply via email to