I am stuck on this . Please give some suggestions. On Tue, Jun 9, 2020, 21:40 aj <ajainje...@gmail.com> wrote:
> please help with this. Any suggestions. > > On Sat, Jun 6, 2020 at 12:20 PM aj <ajainje...@gmail.com> wrote: > >> Hello All, >> >> I am receiving a set of events in Avro format on different topics. I want >> to consume these and write to s3 in parquet format. >> I have written a below job that creates a different stream for each event >> and fetches it schema from the confluent schema registry to create a >> parquet sink for an event. >> This is working fine but the only problem I am facing is whenever a new >> event start coming I have to change in the YAML config and restart the job >> every time. Is there any way I do not have to restart the job and it start >> consuming a new set of events. >> >> >> YAML config : >> >> !com.bounce.config.EventTopologyConfig >> eventsType: >> - !com.bounce.config.EventConfig >> event_name: "search_list_keyless" >> schema_subject: >> "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless" >> topic: "search_list_keyless" >> >> - !com.bounce.config.EventConfig >> event_name: "bike_search_details" >> schema_subject: >> "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details" >> topic: "bike_search_details" >> >> - !com.bounce.config.EventConfig >> event_name: "keyless_bike_lock" >> schema_subject: >> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock" >> topic: "analytics-keyless" >> >> - !com.bounce.config.EventConfig >> event_name: "keyless_bike_unlock" >> schema_subject: >> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock" >> topic: "analytics-keyless" >> >> >> checkPointInterval: 1200000 >> >> topics: ["search_list_keyless","bike_search_details","analytics-keyless"] >> >> >> >> >> >> *Sink code :* >> >> YamlReader reader = new YamlReader(topologyConfig); >> EventTopologyConfig eventTopologyConfig = >> reader.read(EventTopologyConfig.class); >> >> long checkPointInterval = eventTopologyConfig.getCheckPointInterval(); >> topics = eventTopologyConfig.getTopics(); >> >> List<EventConfig> eventTypesList = >> eventTopologyConfig.getEventsType(); >> >> CachedSchemaRegistryClient registryClient = new >> CachedSchemaRegistryClient(schemaRegistryUrl, 1000); >> >> >> FlinkKafkaConsumer flinkKafkaConsumer = new >> FlinkKafkaConsumer(topics, >> new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >> properties); >> >> DataStream<GenericRecord> dataStream = >> streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source"); >> >> try { >> for (EventConfig eventConfig : eventTypesList) { >> >> LOG.info("creating a stream for ", eventConfig.getEvent_name()); >> >> final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat >> (path, >> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(), >> registryClient))) >> .withBucketAssigner(new EventTimeBucketAssigner()) >> .build(); >> >> DataStream<GenericRecord> outStream = >> dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> { >> if (genericRecord != null && >> genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) >> { >> return true; >> } >> return false; >> }); >> >> outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism); >> >> } >> } catch (Exception e) { >> e.printStackTrace(); >> } >> >> >> >> >> -- >> Thanks & Regards, >> Anuj Jain >> >> >> >> <http://www.cse.iitm.ac.in/%7Eanujjain/> >> > > > -- > Thanks & Regards, > Anuj Jain > Mob. : +91- 8588817877 > Skype : anuj.jain07 > <http://www.oracle.com/> > > > <http://www.cse.iitm.ac.in/%7Eanujjain/> >