Thanks, Arvid.

I do not fully understand the above approach,
so currently, I am thinking to go with the envelope approach that you
suggested.

One more question I have if I do not want to keep schema in my consumer
project even its a single envelope schema. I want it to be fetched from the
schema registry and pass to my parquet-sink so that I always use the same
schema that is used by the producer.  Can you provide a sample code how can
i infer the schema from the generic record or get it from schema registry?


Regards,
Anuj



On Sat, Jan 18, 2020 at 6:55 PM Arvid Heise <ar...@ververica.com> wrote:

> (Readded user mailing list)
>
> Hi Anuj,
>
> since I'd still recommend going with distinct sources/sinks, let me try to
> solve your issues in this mail. If that doesn't work out, I'd address your
> concerns about the envelope format later.
>
> In Flink, you can have several subtopologies in the same application.
>
> Thus, for each event type, you can add
> AvroSource(eventType) -> generic transformation/validation ->
> AvroSink(eventType)
> for each event.
>
> I'd put all avro schema in one project and use an avro plugin to generate
> the respective Java Classes. Then I'd simply create a map of Avro Schema
> (GeneratedClassA::SCHEMA, GeneratedClassB::SCHEMA...) to topic name
> (event-a, event-b, ...).
> Next, I'd iterate over the list to add the respective subtopologies to env.
> Finally, execute everything.
>
> You have one project where all validations reside. But you'd have almost
> no overhead to process a given source of eventType. The downside of that
> approach is of course, that each new event type would require a
> redeployment, but that seems like what you'd want to do anyhow.
>
> Best,
>
> Arvid
>
> On Sat, Jan 18, 2020 at 2:08 PM aj <ajainje...@gmail.com> wrote:
>
>> Thanks, Arvid.
>>
>> 1. I like your approach as I can write a single consumer and put the data
>> in S3 in parquet format. The only challenge is there are extra columns that
>> always going to be null as at a time I will get one type of event.
>>
>> 2. if I go with a separate schema I am not sure how I can solve it using
>> a single generalize consumer. Till now what my understanding is I have to
>> write a consumer for each type of event. Each consumer will read the whole
>> data then filter the respective events from this and then I can pass this
>> stream to sink. But this does not look scalable solution as the new events
>> keep growing then I have to write a consumer for each new type.
>>
>>
>> DataStreamSource<GenericRecord> input = env
>>         .addSource(
>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>                         new
>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>                         config).setStartFromEarliest());
>>
>> Example :
>>
>> * 1st Consumer:*
>>                   DataStreamSource<GenericRecord> input = env.addSource(
>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>                         new
>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>                         config).setStartFromEarliest());
>> *                 DataStream<GenericRecord> aInput =
>> input.filter("event_name"= "a")*
>>
>> * 2nd Consumer:*
>>   DataStreamSource<GenericRecord> input = env.addSource(
>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>                         new
>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>                         config).setStartFromEarliest());
>> *                 DataStream<GenericRecord> bInput =
>> input.filter("event_name"= "b")*
>>
>>
>> Can you help me How I solve this using a single consumer as I do not want
>> to write a separate consumer for each type of schema?
>>
>> For example, this is my consumer that contains different types of records.
>>
>> DataStreamSource<GenericRecord> input = env
>>         .addSource(
>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>                         new
>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>                         config).setStartFromEarliest());
>>
>> Now I can not write this stream directly as there is no common schema of
>> records in this stream. So possible way I am thinking is
>>
>> 1. Can I create multiple streams from this stream using the key by on 
>> *"event_name"
>> *and then write each stream separately.
>>
>> Just wanna know is this possible ??
>>
>>
>> Thanks,
>> Anuj
>>
>> On Fri, Jan 17, 2020 at 3:20 PM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Anuj,
>>>
>>> I originally understood that you would like to store data in the same
>>> Kafka topic and also want to store it in the same parquet file. In the
>>> past, I mostly used schema registry with Kafka Streams, where you could
>>> only store a schema for a key and value respectively. To use different
>>> record types in the same kafka topic, you had to disable schema
>>> compatibility checks and just stored the schemas as different versions
>>> under the same subject.
>>>
>>> Your approach is much better. You can ensure full schema compatibility.
>>> Nevertheless, it still shares the same drawback that consumption is much
>>> harder (using GenericRecord is proof of that) if you want to read/write
>>> everything into the same place. Also you will never be able to write one
>>> consistent file, as they can only have one schema (both on Avro and
>>> Parquet).
>>> So you only have two options:
>>> * keep schemas separated, but then you also need to write separate files
>>> per record type.
>>> * have a common schema (either my outlined approach or any other wrapper
>>> schema).
>>> The approach with a common schema makes only sense if you want to write
>>> it into one table/kafka topic.
>>>
>>> However, in the last mail you pointed out that you actually want to
>>> store the record types separately. Then, you should keep everything
>>> separated. Then you should have a sink for each type each getting the
>>> respective schema. Note that you'd need to fetch the schema manually from
>>> the schema registry when creating the query as you would need to pass it to
>>> the sink.
>>>
>>> Btw, do you actually have a need to write all events into one Kafka
>>> topic? The only real use case is to preserve the time order per key.
>>> Everything else is much more complicated then storing events individually.
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>>
>>> On Thu, Jan 16, 2020 at 3:39 PM aj <ajainje...@gmail.com> wrote:
>>>
>>>> Hi Arvid,
>>>> Thanks for the quick response. I am new to this Avro design so can you
>>>> please help me understand and design for my use case.
>>>>
>>>> I have use case like this :
>>>>
>>>> 1. we have an app where a lot of action happened from the user side.
>>>> 2. for each action we collect some set of information that
>>>> defined using some key-value pairs. This information we want to define as
>>>> proper schemas so that we maintain the proper format and not push random
>>>> data.
>>>> 3. So we are defining for each action a schema and register in the
>>>> schema registry using  topic+record.name as the subject .
>>>> 4. So I do not think the producer side has any issue as whenever we
>>>> push the event to Kafka we register a new schema with the above subject.
>>>>
>>>> Example :
>>>>
>>>> {
>>>> event_name : "a"
>>>> "timestamp":
>>>> "properties"  :[
>>>>   "key-1 : "val-1"
>>>>    "key-2 : "val-2"
>>>> ]
>>>> }
>>>>
>>>> {
>>>> event_name : "b"
>>>> "timestamp":
>>>> "properties"  :[
>>>>   "key-3 : "val-3"
>>>>    "key-4 : "val-4"
>>>> ]
>>>> }
>>>>
>>>> Now I  have a consumer that will parse the data by fetching the schema
>>>> from schema registry and deserialize in the generic record streams.
>>>>
>>>> Why you think it will break as I am always deserializing with writer
>>>> schema only.
>>>>
>>>> As you suggested to keep an envelope Avro schema and not separate
>>>> schema for each type of event that I am generating. I have some doubts
>>>> about that:
>>>>
>>>> 1. How I enforce a schema on each event as it subtypes in the main
>>>> schema. so when I am getting a JSON event of type "a" how I enforce and
>>>> convert it to subschema type of "a" and push to Kafka.
>>>> 2. I want to create a separate hive table for each of the events so
>>>> when I write this data and lets says I have 20 events than for 19 columns I
>>>> am getting null values always in data.
>>>>
>>>> Please help me in doing this right way. It will be a great help and
>>>> learning for me.
>>>>
>>>> Thanks,
>>>> Anuj
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi Anuj,
>>>>>
>>>>> you should always avoid having records with different schemas in the
>>>>> same topic/dataset. You will break the compatibility features of the 
>>>>> schema
>>>>> registry and your consumer/producer code is always hard to maintain.
>>>>>
>>>>> A common and scalable way to avoid it is to use some kind of envelope
>>>>> format.
>>>>>
>>>>> {
>>>>>   "namespace": "example",
>>>>>   "name": "Envelope",
>>>>>   "type": "record",
>>>>>   "fields": [
>>>>>     {
>>>>>       "name": "type1",
>>>>>       "type": ["null", {
>>>>>         "type": "record",
>>>>>         "fields": [ ... ]
>>>>>       }],
>>>>>       "default": null
>>>>>     },
>>>>>     {
>>>>>       "name": "type2",
>>>>>       "type": ["null", {
>>>>>         "type": "record",
>>>>>         "fields": [ ... ]
>>>>>       }],
>>>>>       "default": null
>>>>>     }
>>>>>   ]
>>>>> }
>>>>>
>>>>> This envelope is evolvable (arbitrary addition/removal of wrapped
>>>>> types, which by themselves can be evolved), and adds only a little 
>>>>> overhead
>>>>> (1 byte per subtype). The downside is that you cannot enforce that exactly
>>>>> one of the subtypes is set.
>>>>>
>>>>> This schema is fully compatible with the schema registry, so no need
>>>>> to parse anything manually.
>>>>>
>>>>> This schema can easily be used with Parquet. If you can't change the
>>>>> input format anymore, you can at least use that approach on your output.
>>>>>
>>>>> Best,
>>>>>
>>>>> Arvid
>>>>>
>>>>> On Thu, Jan 16, 2020 at 2:53 PM aj <ajainje...@gmail.com> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I have a use case where I am getting a different set of Avro records
>>>>>> in Kafka. I am using the schema registry to store Avro schema. One topic
>>>>>> can also have different types of records.
>>>>>>
>>>>>> Now I have created a GenericRecord Stream using kafkaAvroDeseralizer
>>>>>> by defining custom
>>>>>> Deserializer class like this
>>>>>>
>>>>>> @Override
>>>>>> public GenericRecord deserialize(
>>>>>> byte[] messageKey, byte[] message, String topic, int partition, long
>>>>>> offset) {
>>>>>> checkInitialized();
>>>>>> return (GenericRecord) inner.deserialize(topic, message);
>>>>>> }
>>>>>>
>>>>>> private void checkInitialized() {
>>>>>> if (inner == null) {
>>>>>> Map<String, Object> props = new HashMap<>();
>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>>>>> registryUrl);
>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>>>>> false);
>>>>>> SchemaRegistryClient client =
>>>>>> new CachedSchemaRegistryClient(
>>>>>> registryUrl,
>>>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>>>>> inner = new KafkaAvroDeserializer(client, props);
>>>>>> }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> And this is my consumer code :
>>>>>>
>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>         .addSource(
>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>                         new
>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>                         config).setStartFromEarliest());
>>>>>>
>>>>>> Now I want to write this stream partition on
>>>>>> *event_name="a"/year=/month=/day=* in parquet format so that I can
>>>>>> expose hive tables directly on top of this data.
>>>>>> event_name is common field for all types of records that I am getting
>>>>>> in Kafka.
>>>>>> I am stuck as parquet writer needs a schema to write but my different
>>>>>> records have different schemas  So how do I write this stream in s3 in
>>>>>> above partition format.
>>>>>>
>>>>>>
>>>>>> Thanks & Regards,
>>>>>> Anuj Jain
>>>>>>
>>>>>>
>>>>>>
>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Thanks & Regards,
>>>> Anuj Jain
>>>> Mob. : +91- 8588817877
>>>> Skype : anuj.jain07
>>>> <http://www.oracle.com/>
>>>>
>>>>
>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>
>>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> <http://www.oracle.com/>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Reply via email to