please help with this. Any suggestions. On Sat, Jun 6, 2020 at 12:20 PM aj <ajainje...@gmail.com> wrote:
> Hello All, > > I am receiving a set of events in Avro format on different topics. I want > to consume these and write to s3 in parquet format. > I have written a below job that creates a different stream for each event > and fetches it schema from the confluent schema registry to create a > parquet sink for an event. > This is working fine but the only problem I am facing is whenever a new > event start coming I have to change in the YAML config and restart the job > every time. Is there any way I do not have to restart the job and it start > consuming a new set of events. > > > YAML config : > > !com.bounce.config.EventTopologyConfig > eventsType: > - !com.bounce.config.EventConfig > event_name: "search_list_keyless" > schema_subject: > "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless" > topic: "search_list_keyless" > > - !com.bounce.config.EventConfig > event_name: "bike_search_details" > schema_subject: > "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details" > topic: "bike_search_details" > > - !com.bounce.config.EventConfig > event_name: "keyless_bike_lock" > schema_subject: > "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock" > topic: "analytics-keyless" > > - !com.bounce.config.EventConfig > event_name: "keyless_bike_unlock" > schema_subject: > "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock" > topic: "analytics-keyless" > > > checkPointInterval: 1200000 > > topics: ["search_list_keyless","bike_search_details","analytics-keyless"] > > > > > > *Sink code :* > > YamlReader reader = new YamlReader(topologyConfig); > EventTopologyConfig eventTopologyConfig = > reader.read(EventTopologyConfig.class); > > long checkPointInterval = eventTopologyConfig.getCheckPointInterval(); > topics = eventTopologyConfig.getTopics(); > > List<EventConfig> eventTypesList = > eventTopologyConfig.getEventsType(); > > CachedSchemaRegistryClient registryClient = new > CachedSchemaRegistryClient(schemaRegistryUrl, 1000); > > > FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics, > new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), > properties); > > DataStream<GenericRecord> dataStream = > streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source"); > > try { > for (EventConfig eventConfig : eventTypesList) { > > LOG.info("creating a stream for ", eventConfig.getEvent_name()); > > final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat > (path, > ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(), > registryClient))) > .withBucketAssigner(new EventTimeBucketAssigner()) > .build(); > > DataStream<GenericRecord> outStream = > dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> { > if (genericRecord != null && > genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) > { > return true; > } > return false; > }); > > outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism); > > } > } catch (Exception e) { > e.printStackTrace(); > } > > > > > -- > Thanks & Regards, > Anuj Jain > > > > <http://www.cse.iitm.ac.in/%7Eanujjain/> > -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07 <http://www.oracle.com/> <http://www.cse.iitm.ac.in/%7Eanujjain/>