Hello All, I am receiving a set of events in Avro format on different topics. I want to consume these and write to s3 in parquet format. I have written a below job that creates a different stream for each event and fetches it schema from the confluent schema registry to create a parquet sink for an event. This is working fine but the only problem I am facing is whenever a new event start coming I have to change in the YAML config and restart the job every time. Is there any way I do not have to restart the job and it start consuming a new set of events.
YAML config : !com.bounce.config.EventTopologyConfig eventsType: - !com.bounce.config.EventConfig event_name: "search_list_keyless" schema_subject: "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless" topic: "search_list_keyless" - !com.bounce.config.EventConfig event_name: "bike_search_details" schema_subject: "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details" topic: "bike_search_details" - !com.bounce.config.EventConfig event_name: "keyless_bike_lock" schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock" topic: "analytics-keyless" - !com.bounce.config.EventConfig event_name: "keyless_bike_unlock" schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock" topic: "analytics-keyless" checkPointInterval: 1200000 topics: ["search_list_keyless","bike_search_details","analytics-keyless"] *Sink code :* YamlReader reader = new YamlReader(topologyConfig); EventTopologyConfig eventTopologyConfig = reader.read(EventTopologyConfig.class); long checkPointInterval = eventTopologyConfig.getCheckPointInterval(); topics = eventTopologyConfig.getTopics(); List<EventConfig> eventTypesList = eventTopologyConfig.getEventsType(); CachedSchemaRegistryClient registryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000); FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics, new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), properties); DataStream<GenericRecord> dataStream = streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source"); try { for (EventConfig eventConfig : eventTypesList) { LOG.info("creating a stream for ", eventConfig.getEvent_name()); final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat (path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(), registryClient))) .withBucketAssigner(new EventTimeBucketAssigner()) .build(); DataStream<GenericRecord> outStream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> { if (genericRecord != null && genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) { return true; } return false; }); outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism); } } catch (Exception e) { e.printStackTrace(); } -- Thanks & Regards, Anuj Jain <http://www.cse.iitm.ac.in/%7Eanujjain/>