Hi Anuj, There is currently no way to dynamically change the topology. It would be good to know why your current approach is not working (restart taking too long? Too frequent changes?)
So some ideas: - Have some kind submitter that restarts flink automatically on config change (assumes that restart time is not the issue). - Your job could periodically check for a change and then fail. However, you need to fail in a way that the topology is rebuilt. I guess it's close to a fatal error and then the driver handles that error and restart. - Rewrite the job in such a way that you have only one sink in your job graph which demultiplexes the event to several internal sinks. Then you could simply add a new sink whenever a new event occurs. The first option is the easiest and the last option the most versatile (could even have different sink types mixed). On Tue, Jun 23, 2020 at 5:34 AM aj <ajainje...@gmail.com> wrote: > I am stuck on this . Please give some suggestions. > > On Tue, Jun 9, 2020, 21:40 aj <ajainje...@gmail.com> wrote: > >> please help with this. Any suggestions. >> >> On Sat, Jun 6, 2020 at 12:20 PM aj <ajainje...@gmail.com> wrote: >> >>> Hello All, >>> >>> I am receiving a set of events in Avro format on different topics. I >>> want to consume these and write to s3 in parquet format. >>> I have written a below job that creates a different stream for each >>> event and fetches it schema from the confluent schema registry to create a >>> parquet sink for an event. >>> This is working fine but the only problem I am facing is whenever a new >>> event start coming I have to change in the YAML config and restart the job >>> every time. Is there any way I do not have to restart the job and it start >>> consuming a new set of events. >>> >>> >>> YAML config : >>> >>> !com.bounce.config.EventTopologyConfig >>> eventsType: >>> - !com.bounce.config.EventConfig >>> event_name: "search_list_keyless" >>> schema_subject: >>> "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless" >>> topic: "search_list_keyless" >>> >>> - !com.bounce.config.EventConfig >>> event_name: "bike_search_details" >>> schema_subject: >>> "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details" >>> topic: "bike_search_details" >>> >>> - !com.bounce.config.EventConfig >>> event_name: "keyless_bike_lock" >>> schema_subject: >>> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock" >>> topic: "analytics-keyless" >>> >>> - !com.bounce.config.EventConfig >>> event_name: "keyless_bike_unlock" >>> schema_subject: >>> "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock" >>> topic: "analytics-keyless" >>> >>> >>> checkPointInterval: 1200000 >>> >>> topics: ["search_list_keyless","bike_search_details","analytics-keyless"] >>> >>> >>> >>> >>> >>> *Sink code :* >>> >>> YamlReader reader = new YamlReader(topologyConfig); >>> EventTopologyConfig eventTopologyConfig = >>> reader.read(EventTopologyConfig.class); >>> >>> long checkPointInterval = eventTopologyConfig.getCheckPointInterval(); >>> topics = eventTopologyConfig.getTopics(); >>> >>> List<EventConfig> eventTypesList = >>> eventTopologyConfig.getEventsType(); >>> >>> CachedSchemaRegistryClient registryClient = new >>> CachedSchemaRegistryClient(schemaRegistryUrl, 1000); >>> >>> >>> FlinkKafkaConsumer flinkKafkaConsumer = new >>> FlinkKafkaConsumer(topics, >>> new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>> properties); >>> >>> DataStream<GenericRecord> dataStream = >>> streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source"); >>> >>> try { >>> for (EventConfig eventConfig : eventTypesList) { >>> >>> LOG.info("creating a stream for ", eventConfig.getEvent_name()); >>> >>> final StreamingFileSink<GenericRecord> sink = >>> StreamingFileSink.forBulkFormat >>> (path, >>> ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(), >>> registryClient))) >>> .withBucketAssigner(new EventTimeBucketAssigner()) >>> .build(); >>> >>> DataStream<GenericRecord> outStream = >>> dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> { >>> if (genericRecord != null && >>> genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) >>> { >>> return true; >>> } >>> return false; >>> }); >>> >>> outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism); >>> >>> } >>> } catch (Exception e) { >>> e.printStackTrace(); >>> } >>> >>> >>> >>> >>> -- >>> Thanks & Regards, >>> Anuj Jain >>> >>> >>> >>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>> >> >> >> -- >> Thanks & Regards, >> Anuj Jain >> Mob. : +91- 8588817877 >> Skype : anuj.jain07 >> <http://www.oracle.com/> >> >> >> <http://www.cse.iitm.ac.in/%7Eanujjain/> >> > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng