I am stuck on this . Please give some suggestions.

On Tue, Jun 9, 2020, 21:40 aj <ajainje...@gmail.com> wrote:

> please help with this. Any suggestions.
>
> On Sat, Jun 6, 2020 at 12:20 PM aj <ajainje...@gmail.com> wrote:
>
>> Hello All,
>>
>> I am receiving a set of events in Avro format on different topics. I want
>> to consume these and write to s3 in parquet format.
>> I have written a below job that creates a different stream for each event
>> and fetches it schema from the confluent schema registry to create a
>> parquet sink for an event.
>> This is working fine but the only problem I am facing is whenever a new
>> event start coming I have to change in the YAML config and restart the job
>> every time. Is there any way I do not have to restart the job and it start
>> consuming a new set of events.
>>
>>
>> YAML config :
>>
>> !com.bounce.config.EventTopologyConfig
>> eventsType:
>>   - !com.bounce.config.EventConfig
>>     event_name: "search_list_keyless"
>>     schema_subject: 
>> "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
>>     topic: "search_list_keyless"
>>
>>   - !com.bounce.config.EventConfig
>>     event_name: "bike_search_details"
>>     schema_subject: 
>> "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
>>     topic: "bike_search_details"
>>
>>   - !com.bounce.config.EventConfig
>>     event_name: "keyless_bike_lock"
>>     schema_subject: 
>> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
>>     topic: "analytics-keyless"
>>
>>   - !com.bounce.config.EventConfig
>>       event_name: "keyless_bike_unlock"
>>       schema_subject: 
>> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
>>       topic: "analytics-keyless"
>>
>>
>> checkPointInterval: 1200000
>>
>> topics: ["search_list_keyless","bike_search_details","analytics-keyless"]
>>
>>
>>
>>
>>
>> *Sink code :*
>>
>>   YamlReader reader = new YamlReader(topologyConfig);
>>     EventTopologyConfig eventTopologyConfig = 
>> reader.read(EventTopologyConfig.class);
>>
>>     long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
>>         topics = eventTopologyConfig.getTopics();
>>
>>                 List<EventConfig> eventTypesList = 
>> eventTopologyConfig.getEventsType();
>>
>>         CachedSchemaRegistryClient registryClient = new 
>> CachedSchemaRegistryClient(schemaRegistryUrl, 1000);
>>
>>
>>         FlinkKafkaConsumer flinkKafkaConsumer = new 
>> FlinkKafkaConsumer(topics,
>>         new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>         properties);
>>
>>         DataStream<GenericRecord> dataStream = 
>> streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");
>>
>>         try {
>>         for (EventConfig eventConfig : eventTypesList) {
>>
>>         LOG.info("creating a stream for ", eventConfig.getEvent_name());
>>
>> final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
>>         (path, 
>> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(),
>>  registryClient)))
>>         .withBucketAssigner(new EventTimeBucketAssigner())
>>         .build();
>>
>>         DataStream<GenericRecord> outStream = 
>> dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
>>         if (genericRecord != null && 
>> genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name()))
>>  {
>>         return true;
>>         }
>>         return false;
>>         });
>>         
>> outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);
>>
>>         }
>>         } catch (Exception e) {
>>         e.printStackTrace();
>>         }
>>
>>
>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> <http://www.oracle.com/>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>

Reply via email to