(Readded user mailing list) Hi Anuj,
since I'd still recommend going with distinct sources/sinks, let me try to solve your issues in this mail. If that doesn't work out, I'd address your concerns about the envelope format later. In Flink, you can have several subtopologies in the same application. Thus, for each event type, you can add AvroSource(eventType) -> generic transformation/validation -> AvroSink(eventType) for each event. I'd put all avro schema in one project and use an avro plugin to generate the respective Java Classes. Then I'd simply create a map of Avro Schema (GeneratedClassA::SCHEMA, GeneratedClassB::SCHEMA...) to topic name (event-a, event-b, ...). Next, I'd iterate over the list to add the respective subtopologies to env. Finally, execute everything. You have one project where all validations reside. But you'd have almost no overhead to process a given source of eventType. The downside of that approach is of course, that each new event type would require a redeployment, but that seems like what you'd want to do anyhow. Best, Arvid On Sat, Jan 18, 2020 at 2:08 PM aj <ajainje...@gmail.com> wrote: > Thanks, Arvid. > > 1. I like your approach as I can write a single consumer and put the data > in S3 in parquet format. The only challenge is there are extra columns that > always going to be null as at a time I will get one type of event. > > 2. if I go with a separate schema I am not sure how I can solve it using a > single generalize consumer. Till now what my understanding is I have to > write a consumer for each type of event. Each consumer will read the whole > data then filter the respective events from this and then I can pass this > stream to sink. But this does not look scalable solution as the new events > keep growing then I have to write a consumer for each new type. > > > DataStreamSource<GenericRecord> input = env > .addSource( > new FlinkKafkaConsumer010<GenericRecord>(topics, > new > KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), > config).setStartFromEarliest()); > > Example : > > * 1st Consumer:* > DataStreamSource<GenericRecord> input = env.addSource( > new FlinkKafkaConsumer010<GenericRecord>(topics, > new > KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), > config).setStartFromEarliest()); > * DataStream<GenericRecord> aInput = > input.filter("event_name"= "a")* > > * 2nd Consumer:* > DataStreamSource<GenericRecord> input = env.addSource( > new FlinkKafkaConsumer010<GenericRecord>(topics, > new > KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), > config).setStartFromEarliest()); > * DataStream<GenericRecord> bInput = > input.filter("event_name"= "b")* > > > Can you help me How I solve this using a single consumer as I do not want > to write a separate consumer for each type of schema? > > For example, this is my consumer that contains different types of records. > > DataStreamSource<GenericRecord> input = env > .addSource( > new FlinkKafkaConsumer010<GenericRecord>(topics, > new > KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), > config).setStartFromEarliest()); > > Now I can not write this stream directly as there is no common schema of > records in this stream. So possible way I am thinking is > > 1. Can I create multiple streams from this stream using the key by on > *"event_name" > *and then write each stream separately. > > Just wanna know is this possible ?? > > > Thanks, > Anuj > > On Fri, Jan 17, 2020 at 3:20 PM Arvid Heise <ar...@ververica.com> wrote: > >> Hi Anuj, >> >> I originally understood that you would like to store data in the same >> Kafka topic and also want to store it in the same parquet file. In the >> past, I mostly used schema registry with Kafka Streams, where you could >> only store a schema for a key and value respectively. To use different >> record types in the same kafka topic, you had to disable schema >> compatibility checks and just stored the schemas as different versions >> under the same subject. >> >> Your approach is much better. You can ensure full schema compatibility. >> Nevertheless, it still shares the same drawback that consumption is much >> harder (using GenericRecord is proof of that) if you want to read/write >> everything into the same place. Also you will never be able to write one >> consistent file, as they can only have one schema (both on Avro and >> Parquet). >> So you only have two options: >> * keep schemas separated, but then you also need to write separate files >> per record type. >> * have a common schema (either my outlined approach or any other wrapper >> schema). >> The approach with a common schema makes only sense if you want to write >> it into one table/kafka topic. >> >> However, in the last mail you pointed out that you actually want to store >> the record types separately. Then, you should keep everything separated. >> Then you should have a sink for each type each getting the respective >> schema. Note that you'd need to fetch the schema manually from the schema >> registry when creating the query as you would need to pass it to the sink. >> >> Btw, do you actually have a need to write all events into one Kafka >> topic? The only real use case is to preserve the time order per key. >> Everything else is much more complicated then storing events individually. >> >> Best, >> >> Arvid >> >> >> On Thu, Jan 16, 2020 at 3:39 PM aj <ajainje...@gmail.com> wrote: >> >>> Hi Arvid, >>> Thanks for the quick response. I am new to this Avro design so can you >>> please help me understand and design for my use case. >>> >>> I have use case like this : >>> >>> 1. we have an app where a lot of action happened from the user side. >>> 2. for each action we collect some set of information that defined using >>> some key-value pairs. This information we want to define as proper schemas >>> so that we maintain the proper format and not push random data. >>> 3. So we are defining for each action a schema and register in the >>> schema registry using topic+record.name as the subject . >>> 4. So I do not think the producer side has any issue as whenever we push >>> the event to Kafka we register a new schema with the above subject. >>> >>> Example : >>> >>> { >>> event_name : "a" >>> "timestamp": >>> "properties" :[ >>> "key-1 : "val-1" >>> "key-2 : "val-2" >>> ] >>> } >>> >>> { >>> event_name : "b" >>> "timestamp": >>> "properties" :[ >>> "key-3 : "val-3" >>> "key-4 : "val-4" >>> ] >>> } >>> >>> Now I have a consumer that will parse the data by fetching the schema >>> from schema registry and deserialize in the generic record streams. >>> >>> Why you think it will break as I am always deserializing with writer >>> schema only. >>> >>> As you suggested to keep an envelope Avro schema and not separate schema >>> for each type of event that I am generating. I have some doubts about that: >>> >>> 1. How I enforce a schema on each event as it subtypes in the main >>> schema. so when I am getting a JSON event of type "a" how I enforce and >>> convert it to subschema type of "a" and push to Kafka. >>> 2. I want to create a separate hive table for each of the events so when >>> I write this data and lets says I have 20 events than for 19 columns I am >>> getting null values always in data. >>> >>> Please help me in doing this right way. It will be a great help and >>> learning for me. >>> >>> Thanks, >>> Anuj >>> >>> >>> >>> >>> >>> >>> >>> On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com> wrote: >>> >>>> Hi Anuj, >>>> >>>> you should always avoid having records with different schemas in the >>>> same topic/dataset. You will break the compatibility features of the schema >>>> registry and your consumer/producer code is always hard to maintain. >>>> >>>> A common and scalable way to avoid it is to use some kind of envelope >>>> format. >>>> >>>> { >>>> "namespace": "example", >>>> "name": "Envelope", >>>> "type": "record", >>>> "fields": [ >>>> { >>>> "name": "type1", >>>> "type": ["null", { >>>> "type": "record", >>>> "fields": [ ... ] >>>> }], >>>> "default": null >>>> }, >>>> { >>>> "name": "type2", >>>> "type": ["null", { >>>> "type": "record", >>>> "fields": [ ... ] >>>> }], >>>> "default": null >>>> } >>>> ] >>>> } >>>> >>>> This envelope is evolvable (arbitrary addition/removal of wrapped >>>> types, which by themselves can be evolved), and adds only a little overhead >>>> (1 byte per subtype). The downside is that you cannot enforce that exactly >>>> one of the subtypes is set. >>>> >>>> This schema is fully compatible with the schema registry, so no need to >>>> parse anything manually. >>>> >>>> This schema can easily be used with Parquet. If you can't change the >>>> input format anymore, you can at least use that approach on your output. >>>> >>>> Best, >>>> >>>> Arvid >>>> >>>> On Thu, Jan 16, 2020 at 2:53 PM aj <ajainje...@gmail.com> wrote: >>>> >>>>> Hi All, >>>>> >>>>> I have a use case where I am getting a different set of Avro records >>>>> in Kafka. I am using the schema registry to store Avro schema. One topic >>>>> can also have different types of records. >>>>> >>>>> Now I have created a GenericRecord Stream using kafkaAvroDeseralizer >>>>> by defining custom >>>>> Deserializer class like this >>>>> >>>>> @Override >>>>> public GenericRecord deserialize( >>>>> byte[] messageKey, byte[] message, String topic, int partition, long >>>>> offset) { >>>>> checkInitialized(); >>>>> return (GenericRecord) inner.deserialize(topic, message); >>>>> } >>>>> >>>>> private void checkInitialized() { >>>>> if (inner == null) { >>>>> Map<String, Object> props = new HashMap<>(); >>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, >>>>> registryUrl); >>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, >>>>> false); >>>>> SchemaRegistryClient client = >>>>> new CachedSchemaRegistryClient( >>>>> registryUrl, >>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); >>>>> inner = new KafkaAvroDeserializer(client, props); >>>>> } >>>>> } >>>>> >>>>> >>>>> And this is my consumer code : >>>>> >>>>> DataStreamSource<GenericRecord> input = env >>>>> .addSource( >>>>> new FlinkKafkaConsumer010<GenericRecord>(topics, >>>>> new >>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>>> config).setStartFromEarliest()); >>>>> >>>>> Now I want to write this stream partition on >>>>> *event_name="a"/year=/month=/day=* in parquet format so that I can >>>>> expose hive tables directly on top of this data. >>>>> event_name is common field for all types of records that I am getting >>>>> in Kafka. >>>>> I am stuck as parquet writer needs a schema to write but my different >>>>> records have different schemas So how do I write this stream in s3 in >>>>> above partition format. >>>>> >>>>> >>>>> Thanks & Regards, >>>>> Anuj Jain >>>>> >>>>> >>>>> >>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>>> >>>> >>> >>> -- >>> Thanks & Regards, >>> Anuj Jain >>> Mob. : +91- 8588817877 >>> Skype : anuj.jain07 >>> <http://www.oracle.com/> >>> >>> >>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>> >> > > -- > Thanks & Regards, > Anuj Jain > Mob. : +91- 8588817877 > Skype : anuj.jain07 > <http://www.oracle.com/> > > > <http://www.cse.iitm.ac.in/%7Eanujjain/> >